Second version of the quantum key distribution
This commit is contained in:
parent
2beb548cbe
commit
1030de6fca
12
00_qkd.py
12
00_qkd.py
@ -69,7 +69,7 @@ class Bob:
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Mallory:
|
class Eve:
|
||||||
_bases: list = field(default_factory=list)
|
_bases: list = field(default_factory=list)
|
||||||
_values: list = field(default_factory=list)
|
_values: list = field(default_factory=list)
|
||||||
|
|
||||||
@ -83,13 +83,14 @@ class Mallory:
|
|||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
alice, bob, mallory = Alice(), Bob(), Mallory()
|
alice, bob, eve = Alice(), Bob(), Eve()
|
||||||
|
|
||||||
# Step 1. Generate a stream of random photons polarizations in a random basis
|
# Step 1. Generate a stream of random photons polarizations in a random
|
||||||
|
# basis
|
||||||
stream = alice.generate_stream(STREAM_LENGTH)
|
stream = alice.generate_stream(STREAM_LENGTH)
|
||||||
|
|
||||||
# Step 1.5 MALLORY ALSO INGESTS AND COPIES THE PHOTONS
|
# Step 1.5 EVE ALSO INGESTS AND COPIES THE PHOTONS
|
||||||
stream = mallory.ingest_stream(stream)
|
# stream = eve.ingest_stream(stream)
|
||||||
|
|
||||||
# Step 2. Bob ingests the stream of photons
|
# Step 2. Bob ingests the stream of photons
|
||||||
bob.ingest_stream(stream)
|
bob.ingest_stream(stream)
|
||||||
@ -99,6 +100,7 @@ def main():
|
|||||||
|
|
||||||
assert alice._otp == bob._otp
|
assert alice._otp == bob._otp
|
||||||
print("OTP Length: {}".format(alice.get_otp_length()))
|
print("OTP Length: {}".format(alice.get_otp_length()))
|
||||||
|
print("OTP Length: {}".format(alice._otp))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
340
04_qkd_2.py
Normal file
340
04_qkd_2.py
Normal file
@ -0,0 +1,340 @@
|
|||||||
|
import random
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
# should print some debugging statements?
|
||||||
|
DEBUG = True
|
||||||
|
|
||||||
|
# How long should be the stream - in general about half of the stream
|
||||||
|
# will be used by the One Time pad (i.e. if stream is 100, expect on average
|
||||||
|
# about 50 bits to be used as an OTP
|
||||||
|
STREAM_LENGTH = 20
|
||||||
|
|
||||||
|
# The message that Alice wants to send to Bob
|
||||||
|
_ALICE_MESSAGE = 'd'
|
||||||
|
|
||||||
|
# |0> and |1>
|
||||||
|
_0 = np.array([[1],
|
||||||
|
[0]])
|
||||||
|
_1 = np.array([[0],
|
||||||
|
[1]])
|
||||||
|
|
||||||
|
# |+> and |->
|
||||||
|
_p = np.array([[1 / np.sqrt(2)],
|
||||||
|
[1 / np.sqrt(2)]])
|
||||||
|
_m = np.array([[1 / np.sqrt(2)],
|
||||||
|
[-1 / np.sqrt(2)]])
|
||||||
|
|
||||||
|
# Gates - Identity, Pauli X and Hadamard
|
||||||
|
I = np.array([[1, 0],
|
||||||
|
[0, 1]])
|
||||||
|
X = np.array([[0, 1],
|
||||||
|
[1, 0]])
|
||||||
|
H = np.array([[1 / np.sqrt(2), 1 / np.sqrt(2)],
|
||||||
|
[1 / np.sqrt(2), -1 / np.sqrt(2)]])
|
||||||
|
|
||||||
|
|
||||||
|
def measure_probability(qbit):
|
||||||
|
"""
|
||||||
|
In a qbit [a, b] normalized: |a|^2 + |b|^2 = 1
|
||||||
|
Probability of 0 is |a|^2 and 1 with prob |b|^2
|
||||||
|
|
||||||
|
:returns: tuple of probabilities to measure 0 or 1"""
|
||||||
|
return np.abs(qbit[0][0]) ** 2, np.abs(qbit[1][0]) ** 2
|
||||||
|
|
||||||
|
|
||||||
|
def measure(qbit):
|
||||||
|
"""
|
||||||
|
This gets a random choice of either 0 and 1 with weights
|
||||||
|
based on the probabilities of the qbit
|
||||||
|
|
||||||
|
:returns: classical bit based on qbit probabilities"""
|
||||||
|
return random.choices([0, 1], measure_probability(qbit))[0]
|
||||||
|
|
||||||
|
|
||||||
|
def bases_to_classical(qbits):
|
||||||
|
"""
|
||||||
|
Converts a list of qbits to classical bits.
|
||||||
|
0 -> if sigma_x (Identity)
|
||||||
|
1 -> if sigma_z (Hadamard)
|
||||||
|
:param qbits: list of bits
|
||||||
|
:return: classical 0/1
|
||||||
|
"""
|
||||||
|
return ['1' if np.array_equal(b, H) else '0' for b in qbits]
|
||||||
|
|
||||||
|
|
||||||
|
def unicode_message_to_binary(m):
|
||||||
|
"""Converts a Unicode message to list of bits """
|
||||||
|
return [int(a) for a in list(''.join('{:08b}'.format(b)
|
||||||
|
for b in m.encode('utf8')))]
|
||||||
|
|
||||||
|
|
||||||
|
def binary_to_unicode_message(list_b):
|
||||||
|
"""Converts a list of bits to Unicode message"""
|
||||||
|
s = ''.join([str(a) for a in list_b])
|
||||||
|
return "".join([chr(int(x, 2)) for x in [s[i:i + 8]
|
||||||
|
for i in range(0, len(s), 8)
|
||||||
|
]
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
def encrypt_message(message, otp):
|
||||||
|
"""
|
||||||
|
Encrypt a message using a One Time Pad algorithm.
|
||||||
|
This basically XORs each bit of the message with each bit of the OTP
|
||||||
|
:param message: message as a utf-8 encoded string, e.g. "hello world"
|
||||||
|
:param otp: list of One Time Pad bits
|
||||||
|
:return: List of encrypted bits
|
||||||
|
"""
|
||||||
|
binary_message = unicode_message_to_binary(message)
|
||||||
|
|
||||||
|
# Verify we have enough bits to xor
|
||||||
|
if len(_alice_otp) < len(binary_message):
|
||||||
|
print("ERROR: OTP is not long enough ({} bits) "
|
||||||
|
"to encode message '{}' ({} bits)".format(
|
||||||
|
len(otp), message, len(binary_message)))
|
||||||
|
return
|
||||||
|
|
||||||
|
# XOR the message to be sent
|
||||||
|
return [m_b ^ otp_b for m_b, otp_b in zip(binary_message, otp)]
|
||||||
|
|
||||||
|
|
||||||
|
def decrypt_message(encrypted_bits, otp):
|
||||||
|
"""
|
||||||
|
Decrypt a list of encrypted bits using a One Time Pad algorithm.
|
||||||
|
This basically XORs each bit of the list with each bit of the OTP
|
||||||
|
:param encrypted_bits: list of encrypted bits
|
||||||
|
:param otp: list of One Time Pad bits
|
||||||
|
:return: Decrypted message as a utf-8 encoded string, e.g. hello world
|
||||||
|
"""
|
||||||
|
_decrypted_bits = [m_b ^ otp_b
|
||||||
|
for m_b, otp_b in zip(encrypted_bits, otp)]
|
||||||
|
|
||||||
|
return binary_to_unicode_message(_decrypted_bits)
|
||||||
|
|
||||||
|
|
||||||
|
def run_qbit_tests():
|
||||||
|
# asserts are sets of tests to check if mathz workz
|
||||||
|
|
||||||
|
# Identity: verify that I|0> == |0> and I|1> == |0>
|
||||||
|
assert np.array_equal(I.dot(_0), _0)
|
||||||
|
assert np.array_equal(I.dot(_1), _1)
|
||||||
|
|
||||||
|
# Pauli X: verify that X|0> == |1> and X|1> == |0>
|
||||||
|
assert np.array_equal(X.dot(_0), _1)
|
||||||
|
assert np.array_equal(X.dot(_1), _0)
|
||||||
|
|
||||||
|
# measure probabilities in sigma_x of |0> and |1>
|
||||||
|
# using allclose since dealing with floats
|
||||||
|
assert np.allclose(measure_probability(_0), (1.0, 0.0))
|
||||||
|
assert np.allclose(measure_probability(_1), (0.0, 1.0))
|
||||||
|
|
||||||
|
# applying Hadamard puts the qbit in orthogonal +/- basis
|
||||||
|
assert np.array_equal(H.dot(_0), _p)
|
||||||
|
assert np.array_equal(H.dot(_1), _m)
|
||||||
|
|
||||||
|
# measure probabilities in sigma_x of |+> and |->
|
||||||
|
# using allclose since dealing with floats
|
||||||
|
assert np.allclose(measure_probability(_p), (0.5, 0.5))
|
||||||
|
assert np.allclose(measure_probability(_m), (0.5, 0.5))
|
||||||
|
|
||||||
|
|
||||||
|
def qkd():
|
||||||
|
"""
|
||||||
|
# Quantum Key Distribution Algorithm (QKD)
|
||||||
|
|
||||||
|
# We have Alice and Bob who want to share a common key which can be used
|
||||||
|
# to then encrypt data (for example by xor-ing the common key with a
|
||||||
|
# message)
|
||||||
|
#
|
||||||
|
# This common key is called OTP (One Time Pad) and represents a string of
|
||||||
|
# classical 0 and 1 bits.
|
||||||
|
#
|
||||||
|
# Private data is prefixed by convention with _ and the comments will
|
||||||
|
# say whether that data is private to Alice or to Bob.
|
||||||
|
# Public data is communicated over an insecure channel and is potentially
|
||||||
|
# accessible to adversaries like Eve
|
||||||
|
#
|
||||||
|
# All qubits are initialized as |0>
|
||||||
|
:returns: Alice's and Bob's OTP
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Step 1:
|
||||||
|
# PRIVATE DATA (to Alice)
|
||||||
|
# Alice chooses randomly whether to send |0> or |1> - which is either
|
||||||
|
# apply Identity gate or Pauli X (flip) gate.
|
||||||
|
_alice_values = [random.choice([I, X]) for _ in range(STREAM_LENGTH)]
|
||||||
|
|
||||||
|
# However, Alice will also write these choices classically which can be
|
||||||
|
# used later to combine with Bob's information
|
||||||
|
# Alice uses the values - if chosen Identity, write 0
|
||||||
|
# if chosen Pauli X, write 1
|
||||||
|
_alice_value_choices = ['1' if np.array_equal(v, X) else '0' for v in
|
||||||
|
_alice_values]
|
||||||
|
|
||||||
|
# Step 2:
|
||||||
|
# PUBLIC DATA (Classical channel): Alice will share her choice of bases
|
||||||
|
#
|
||||||
|
# Alice chooses her base - either sigma_x or sigma_z - which is either
|
||||||
|
# apply Identity gate or Hadamard gate
|
||||||
|
alice_bases = [random.choice([I, H]) for _ in range(STREAM_LENGTH)]
|
||||||
|
|
||||||
|
# Just to make it easier to pass on Classical channel
|
||||||
|
alice_base_choices = bases_to_classical(alice_bases)
|
||||||
|
|
||||||
|
# Step 3:
|
||||||
|
# PUBLIC DATA (Quantum channel): Alice will share her qubits with Bob
|
||||||
|
#
|
||||||
|
# Alice prepares her qubits by chaining the initialized |0> to the
|
||||||
|
# values and then the bases.
|
||||||
|
#
|
||||||
|
# (These 3 operations so far are equivalent to choosing a qbit from these
|
||||||
|
# 4 posibilities (but we are making it more verbose):)
|
||||||
|
# alice_qbits = [random.choice([_0, _1, _p, _m])
|
||||||
|
# for _ in range(STREAM_LENGTH)]
|
||||||
|
alice_qbits = [b.dot(v.dot(_0)) for v, b in zip(_alice_values, alice_bases)]
|
||||||
|
|
||||||
|
# Step 4:
|
||||||
|
# PRIVATE DATA (to Bob)
|
||||||
|
# Bob chooses in which basis to measure - either the
|
||||||
|
# sigma_x or the sigma_z which is equivalent to either
|
||||||
|
# apply the Identity gate or the Hadamard gate
|
||||||
|
_bob_bases = [random.choice([I, H]) for _ in range(STREAM_LENGTH)]
|
||||||
|
_bob_base_choices = bases_to_classical(_bob_bases)
|
||||||
|
|
||||||
|
# Step 5:
|
||||||
|
# PUBLIC DATA (Classical channel):Bob will share the common bases with Alice
|
||||||
|
#
|
||||||
|
# Bob receives Alice's bases from Step 2 and uses his chosen bases from
|
||||||
|
# Step 4 to calculate which bases are the same.
|
||||||
|
# This will produce a classical 0/1 stream which can be shared with
|
||||||
|
# Communicate when the choice of bases were the same
|
||||||
|
common_bases = [a == b for a, b in
|
||||||
|
zip(alice_base_choices, _bob_base_choices)]
|
||||||
|
|
||||||
|
# Step 6:
|
||||||
|
# PRIVATE DATA (to Bob)
|
||||||
|
#
|
||||||
|
# Bob uses the qubits shared by Alice in Step 3 and the calculated common
|
||||||
|
# base in Step 5.
|
||||||
|
# Apply the chosen bases to the qubits.
|
||||||
|
# If Alice and Bob chose I -> This will result in either |0> or |1>
|
||||||
|
# Applying the Identity doesn't change the state
|
||||||
|
# which has 100% probability to collapse
|
||||||
|
# to either 0 or 1 classically
|
||||||
|
# If Alice and Bob chose H -> This will result in either |+> or |->
|
||||||
|
# After applying the H on either it results
|
||||||
|
# in either |0> or |1>
|
||||||
|
# which again has 100% probability to collapse
|
||||||
|
# to either 0 or 1 classically
|
||||||
|
_bob_apply_bases = [b.dot(a) for a, b in zip(alice_qbits, _bob_bases)]
|
||||||
|
|
||||||
|
# If the common base matches, measure the qubit - and since from previous
|
||||||
|
# argument this is either |0> or |1> it will collapse to classical 0 or 1
|
||||||
|
# Otherwise - don't measure (put 'N')
|
||||||
|
# This is Bob's side of the OTP
|
||||||
|
_bob_measured = [str(measure(q)) if c else 'N' for q, c in
|
||||||
|
zip(_bob_apply_bases, common_bases)]
|
||||||
|
|
||||||
|
# Step 7:
|
||||||
|
# PRIVATE DATA (to Alice)
|
||||||
|
#
|
||||||
|
# Alice doesn't have to measure qubits - she just needs to use the common
|
||||||
|
# bases from Step 5 to decide which values can be used from Step 1
|
||||||
|
# Otherwise - don't use the value (prints 'N')
|
||||||
|
_alice_chosen_value = [v if c else 'N' for v, c in
|
||||||
|
zip(_alice_value_choices, common_bases)]
|
||||||
|
|
||||||
|
# do the OTP calculation from either Bob's or Alice's - should be the same
|
||||||
|
_alice_otp = [int(v) for v in _alice_chosen_value if v != 'N']
|
||||||
|
_bob_otp = [int(v) for v in _bob_measured if v != 'N']
|
||||||
|
|
||||||
|
# Verify that algo run correctly:
|
||||||
|
assert _alice_otp == _bob_otp
|
||||||
|
|
||||||
|
# Now OTP is agreed upon
|
||||||
|
# Both Alice's chosen values and Bob's measured qbits should be the same
|
||||||
|
# Print some debugging statements
|
||||||
|
if DEBUG:
|
||||||
|
print("A vals (_pA): {}".format(
|
||||||
|
['I' if np.array_equal(v, I) else 'X' for v in _alice_values]))
|
||||||
|
print("A bases (pub): {}".format(
|
||||||
|
['I' if np.array_equal(b, I) else 'H' for b in alice_bases]))
|
||||||
|
print("B bases (_pB): {}".format(
|
||||||
|
['I' if np.array_equal(b, I) else 'H' for b in _bob_bases]))
|
||||||
|
print("C bases (pub): {}".format(
|
||||||
|
['1' if c else '0' for c in common_bases]))
|
||||||
|
print("A chosn (_pA): {}".format(_alice_chosen_value))
|
||||||
|
print("B mesrd (_pB): {}".format(_bob_measured))
|
||||||
|
print("OTP: {}".format(_alice_otp))
|
||||||
|
print("OTP length: {}".format(len(_alice_otp)))
|
||||||
|
|
||||||
|
return _alice_otp, _bob_otp
|
||||||
|
|
||||||
|
|
||||||
|
def message_exchange_otp(_ALICE_MESSAGE, _alice_otp, _bob_otp):
|
||||||
|
"""
|
||||||
|
# OTP (One Time Pad) Algorithm
|
||||||
|
# -----------------------------
|
||||||
|
# One can encrypt a message using a stream of random bits (OTP) using XOR.
|
||||||
|
#
|
||||||
|
# XOR truth table (classical):
|
||||||
|
#
|
||||||
|
# A B R |
|
||||||
|
# ----- |
|
||||||
|
# 0 0 0 |
|
||||||
|
# 0 1 1 |
|
||||||
|
# 1 0 1 |
|
||||||
|
# 1 1 0 |
|
||||||
|
#
|
||||||
|
# XORing bits is a reversable process, e.g. if we have the letter 'd'
|
||||||
|
# 'a' in ascii is 97 [ord('a')]
|
||||||
|
# 97 in binary is 0b1100001 [bin(ord('a'))]
|
||||||
|
#
|
||||||
|
# But bytes are always 8 bit long, so prepend with 0s
|
||||||
|
# string 'a' = 0 1 1 0 0 0 0 1
|
||||||
|
# otp = 1 0 0 1 1 0 1 1
|
||||||
|
# 'a' XOR otp = 1 1 1 1 1 0 1 0 = enc
|
||||||
|
#
|
||||||
|
# This is now the encrypted message (enc = 11111010). To decrypt apply the
|
||||||
|
# same OTP to the encrypted message to reverse the process,
|
||||||
|
# i.e. 'a' = enc XOR otp
|
||||||
|
#
|
||||||
|
# encrypted = 1 1 1 1 1 0 1 0
|
||||||
|
# otp = 1 0 0 1 1 0 1 1
|
||||||
|
# enc XOR otp = 0 1 1 0 0 0 0 1 = dec
|
||||||
|
#
|
||||||
|
# dec is not the same stream of bits originally encrypted, i.e. string 'a'
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Step 8:
|
||||||
|
# PUBLIC DATA (Classical channel): Alice will share the encrypted
|
||||||
|
# message to Bob
|
||||||
|
#
|
||||||
|
# Finally, it we have enough secret shared bits (length of OTP) Alice
|
||||||
|
# can encrypt the message by XOR-ing each bit with the MESSAGE and put
|
||||||
|
# it on the classical channel.
|
||||||
|
|
||||||
|
# Convert the message to a list of bits
|
||||||
|
encrypted_bits = encrypt_message(_ALICE_MESSAGE, _alice_otp)
|
||||||
|
if not encrypted_bits:
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
# Step 9:
|
||||||
|
# PRIVATE DATA (to Bob): Bob can now decrypt the message using the OTP
|
||||||
|
_bob_message = decrypt_message(encrypted_bits, _bob_otp)
|
||||||
|
|
||||||
|
# Finally, verify that the sent message is the same as received message
|
||||||
|
assert _ALICE_MESSAGE == _bob_message
|
||||||
|
print("Alice successfully sent a message to Bob using QKD!")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Run tests to make sure we didn't mess up some quantum mathz
|
||||||
|
run_qbit_tests()
|
||||||
|
|
||||||
|
# First - run the Quantum Key Distribution (QKD) algorithm
|
||||||
|
_alice_otp, _bob_otp = qkd()
|
||||||
|
|
||||||
|
# Second - using the shared One Time Pads (OTP), share a message
|
||||||
|
message_exchange_otp(_ALICE_MESSAGE, _alice_otp, _bob_otp)
|
Loading…
Reference in New Issue
Block a user