From 1030de6fcaea606ad9cfd287e9723cbfde3969b5 Mon Sep 17 00:00:00 2001 From: Daniel Tsvetkov Date: Mon, 2 Dec 2019 17:35:22 +0100 Subject: [PATCH] Second version of the quantum key distribution --- 00_qkd.py | 12 +- 04_qkd_2.py | 340 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 347 insertions(+), 5 deletions(-) create mode 100644 04_qkd_2.py diff --git a/00_qkd.py b/00_qkd.py index 0d326c4..6eb27c2 100644 --- a/00_qkd.py +++ b/00_qkd.py @@ -69,7 +69,7 @@ class Bob: @dataclass -class Mallory: +class Eve: _bases: list = field(default_factory=list) _values: list = field(default_factory=list) @@ -83,13 +83,14 @@ class Mallory: def main(): - alice, bob, mallory = Alice(), Bob(), Mallory() + alice, bob, eve = Alice(), Bob(), Eve() - # Step 1. Generate a stream of random photons polarizations in a random basis + # Step 1. Generate a stream of random photons polarizations in a random + # basis stream = alice.generate_stream(STREAM_LENGTH) - # Step 1.5 MALLORY ALSO INGESTS AND COPIES THE PHOTONS - stream = mallory.ingest_stream(stream) + # Step 1.5 EVE ALSO INGESTS AND COPIES THE PHOTONS + # stream = eve.ingest_stream(stream) # Step 2. Bob ingests the stream of photons bob.ingest_stream(stream) @@ -99,6 +100,7 @@ def main(): assert alice._otp == bob._otp print("OTP Length: {}".format(alice.get_otp_length())) + print("OTP Length: {}".format(alice._otp)) if __name__ == "__main__": diff --git a/04_qkd_2.py b/04_qkd_2.py new file mode 100644 index 0000000..79fd990 --- /dev/null +++ b/04_qkd_2.py @@ -0,0 +1,340 @@ +import random + +import numpy as np + +# should print some debugging statements? +DEBUG = True + +# How long should be the stream - in general about half of the stream +# will be used by the One Time pad (i.e. if stream is 100, expect on average +# about 50 bits to be used as an OTP +STREAM_LENGTH = 20 + +# The message that Alice wants to send to Bob +_ALICE_MESSAGE = 'd' + +# |0> and |1> +_0 = np.array([[1], + [0]]) +_1 = np.array([[0], + [1]]) + +# |+> and |-> +_p = np.array([[1 / np.sqrt(2)], + [1 / np.sqrt(2)]]) +_m = np.array([[1 / np.sqrt(2)], + [-1 / np.sqrt(2)]]) + +# Gates - Identity, Pauli X and Hadamard +I = np.array([[1, 0], + [0, 1]]) +X = np.array([[0, 1], + [1, 0]]) +H = np.array([[1 / np.sqrt(2), 1 / np.sqrt(2)], + [1 / np.sqrt(2), -1 / np.sqrt(2)]]) + + +def measure_probability(qbit): + """ + In a qbit [a, b] normalized: |a|^2 + |b|^2 = 1 + Probability of 0 is |a|^2 and 1 with prob |b|^2 + + :returns: tuple of probabilities to measure 0 or 1""" + return np.abs(qbit[0][0]) ** 2, np.abs(qbit[1][0]) ** 2 + + +def measure(qbit): + """ + This gets a random choice of either 0 and 1 with weights + based on the probabilities of the qbit + + :returns: classical bit based on qbit probabilities""" + return random.choices([0, 1], measure_probability(qbit))[0] + + +def bases_to_classical(qbits): + """ + Converts a list of qbits to classical bits. + 0 -> if sigma_x (Identity) + 1 -> if sigma_z (Hadamard) + :param qbits: list of bits + :return: classical 0/1 + """ + return ['1' if np.array_equal(b, H) else '0' for b in qbits] + + +def unicode_message_to_binary(m): + """Converts a Unicode message to list of bits """ + return [int(a) for a in list(''.join('{:08b}'.format(b) + for b in m.encode('utf8')))] + + +def binary_to_unicode_message(list_b): + """Converts a list of bits to Unicode message""" + s = ''.join([str(a) for a in list_b]) + return "".join([chr(int(x, 2)) for x in [s[i:i + 8] + for i in range(0, len(s), 8) + ] + ]) + + +def encrypt_message(message, otp): + """ + Encrypt a message using a One Time Pad algorithm. + This basically XORs each bit of the message with each bit of the OTP + :param message: message as a utf-8 encoded string, e.g. "hello world" + :param otp: list of One Time Pad bits + :return: List of encrypted bits + """ + binary_message = unicode_message_to_binary(message) + + # Verify we have enough bits to xor + if len(_alice_otp) < len(binary_message): + print("ERROR: OTP is not long enough ({} bits) " + "to encode message '{}' ({} bits)".format( + len(otp), message, len(binary_message))) + return + + # XOR the message to be sent + return [m_b ^ otp_b for m_b, otp_b in zip(binary_message, otp)] + + +def decrypt_message(encrypted_bits, otp): + """ + Decrypt a list of encrypted bits using a One Time Pad algorithm. + This basically XORs each bit of the list with each bit of the OTP + :param encrypted_bits: list of encrypted bits + :param otp: list of One Time Pad bits + :return: Decrypted message as a utf-8 encoded string, e.g. hello world + """ + _decrypted_bits = [m_b ^ otp_b + for m_b, otp_b in zip(encrypted_bits, otp)] + + return binary_to_unicode_message(_decrypted_bits) + + +def run_qbit_tests(): + # asserts are sets of tests to check if mathz workz + + # Identity: verify that I|0> == |0> and I|1> == |0> + assert np.array_equal(I.dot(_0), _0) + assert np.array_equal(I.dot(_1), _1) + + # Pauli X: verify that X|0> == |1> and X|1> == |0> + assert np.array_equal(X.dot(_0), _1) + assert np.array_equal(X.dot(_1), _0) + + # measure probabilities in sigma_x of |0> and |1> + # using allclose since dealing with floats + assert np.allclose(measure_probability(_0), (1.0, 0.0)) + assert np.allclose(measure_probability(_1), (0.0, 1.0)) + + # applying Hadamard puts the qbit in orthogonal +/- basis + assert np.array_equal(H.dot(_0), _p) + assert np.array_equal(H.dot(_1), _m) + + # measure probabilities in sigma_x of |+> and |-> + # using allclose since dealing with floats + assert np.allclose(measure_probability(_p), (0.5, 0.5)) + assert np.allclose(measure_probability(_m), (0.5, 0.5)) + + +def qkd(): + """ + # Quantum Key Distribution Algorithm (QKD) + + # We have Alice and Bob who want to share a common key which can be used + # to then encrypt data (for example by xor-ing the common key with a + # message) + # + # This common key is called OTP (One Time Pad) and represents a string of + # classical 0 and 1 bits. + # + # Private data is prefixed by convention with _ and the comments will + # say whether that data is private to Alice or to Bob. + # Public data is communicated over an insecure channel and is potentially + # accessible to adversaries like Eve + # + # All qubits are initialized as |0> + :returns: Alice's and Bob's OTP + """ + + # Step 1: + # PRIVATE DATA (to Alice) + # Alice chooses randomly whether to send |0> or |1> - which is either + # apply Identity gate or Pauli X (flip) gate. + _alice_values = [random.choice([I, X]) for _ in range(STREAM_LENGTH)] + + # However, Alice will also write these choices classically which can be + # used later to combine with Bob's information + # Alice uses the values - if chosen Identity, write 0 + # if chosen Pauli X, write 1 + _alice_value_choices = ['1' if np.array_equal(v, X) else '0' for v in + _alice_values] + + # Step 2: + # PUBLIC DATA (Classical channel): Alice will share her choice of bases + # + # Alice chooses her base - either sigma_x or sigma_z - which is either + # apply Identity gate or Hadamard gate + alice_bases = [random.choice([I, H]) for _ in range(STREAM_LENGTH)] + + # Just to make it easier to pass on Classical channel + alice_base_choices = bases_to_classical(alice_bases) + + # Step 3: + # PUBLIC DATA (Quantum channel): Alice will share her qubits with Bob + # + # Alice prepares her qubits by chaining the initialized |0> to the + # values and then the bases. + # + # (These 3 operations so far are equivalent to choosing a qbit from these + # 4 posibilities (but we are making it more verbose):) + # alice_qbits = [random.choice([_0, _1, _p, _m]) + # for _ in range(STREAM_LENGTH)] + alice_qbits = [b.dot(v.dot(_0)) for v, b in zip(_alice_values, alice_bases)] + + # Step 4: + # PRIVATE DATA (to Bob) + # Bob chooses in which basis to measure - either the + # sigma_x or the sigma_z which is equivalent to either + # apply the Identity gate or the Hadamard gate + _bob_bases = [random.choice([I, H]) for _ in range(STREAM_LENGTH)] + _bob_base_choices = bases_to_classical(_bob_bases) + + # Step 5: + # PUBLIC DATA (Classical channel):Bob will share the common bases with Alice + # + # Bob receives Alice's bases from Step 2 and uses his chosen bases from + # Step 4 to calculate which bases are the same. + # This will produce a classical 0/1 stream which can be shared with + # Communicate when the choice of bases were the same + common_bases = [a == b for a, b in + zip(alice_base_choices, _bob_base_choices)] + + # Step 6: + # PRIVATE DATA (to Bob) + # + # Bob uses the qubits shared by Alice in Step 3 and the calculated common + # base in Step 5. + # Apply the chosen bases to the qubits. + # If Alice and Bob chose I -> This will result in either |0> or |1> + # Applying the Identity doesn't change the state + # which has 100% probability to collapse + # to either 0 or 1 classically + # If Alice and Bob chose H -> This will result in either |+> or |-> + # After applying the H on either it results + # in either |0> or |1> + # which again has 100% probability to collapse + # to either 0 or 1 classically + _bob_apply_bases = [b.dot(a) for a, b in zip(alice_qbits, _bob_bases)] + + # If the common base matches, measure the qubit - and since from previous + # argument this is either |0> or |1> it will collapse to classical 0 or 1 + # Otherwise - don't measure (put 'N') + # This is Bob's side of the OTP + _bob_measured = [str(measure(q)) if c else 'N' for q, c in + zip(_bob_apply_bases, common_bases)] + + # Step 7: + # PRIVATE DATA (to Alice) + # + # Alice doesn't have to measure qubits - she just needs to use the common + # bases from Step 5 to decide which values can be used from Step 1 + # Otherwise - don't use the value (prints 'N') + _alice_chosen_value = [v if c else 'N' for v, c in + zip(_alice_value_choices, common_bases)] + + # do the OTP calculation from either Bob's or Alice's - should be the same + _alice_otp = [int(v) for v in _alice_chosen_value if v != 'N'] + _bob_otp = [int(v) for v in _bob_measured if v != 'N'] + + # Verify that algo run correctly: + assert _alice_otp == _bob_otp + + # Now OTP is agreed upon + # Both Alice's chosen values and Bob's measured qbits should be the same + # Print some debugging statements + if DEBUG: + print("A vals (_pA): {}".format( + ['I' if np.array_equal(v, I) else 'X' for v in _alice_values])) + print("A bases (pub): {}".format( + ['I' if np.array_equal(b, I) else 'H' for b in alice_bases])) + print("B bases (_pB): {}".format( + ['I' if np.array_equal(b, I) else 'H' for b in _bob_bases])) + print("C bases (pub): {}".format( + ['1' if c else '0' for c in common_bases])) + print("A chosn (_pA): {}".format(_alice_chosen_value)) + print("B mesrd (_pB): {}".format(_bob_measured)) + print("OTP: {}".format(_alice_otp)) + print("OTP length: {}".format(len(_alice_otp))) + + return _alice_otp, _bob_otp + + +def message_exchange_otp(_ALICE_MESSAGE, _alice_otp, _bob_otp): + """ + # OTP (One Time Pad) Algorithm + # ----------------------------- + # One can encrypt a message using a stream of random bits (OTP) using XOR. + # + # XOR truth table (classical): + # + # A B R | + # ----- | + # 0 0 0 | + # 0 1 1 | + # 1 0 1 | + # 1 1 0 | + # + # XORing bits is a reversable process, e.g. if we have the letter 'd' + # 'a' in ascii is 97 [ord('a')] + # 97 in binary is 0b1100001 [bin(ord('a'))] + # + # But bytes are always 8 bit long, so prepend with 0s + # string 'a' = 0 1 1 0 0 0 0 1 + # otp = 1 0 0 1 1 0 1 1 + # 'a' XOR otp = 1 1 1 1 1 0 1 0 = enc + # + # This is now the encrypted message (enc = 11111010). To decrypt apply the + # same OTP to the encrypted message to reverse the process, + # i.e. 'a' = enc XOR otp + # + # encrypted = 1 1 1 1 1 0 1 0 + # otp = 1 0 0 1 1 0 1 1 + # enc XOR otp = 0 1 1 0 0 0 0 1 = dec + # + # dec is not the same stream of bits originally encrypted, i.e. string 'a' + """ + + # Step 8: + # PUBLIC DATA (Classical channel): Alice will share the encrypted + # message to Bob + # + # Finally, it we have enough secret shared bits (length of OTP) Alice + # can encrypt the message by XOR-ing each bit with the MESSAGE and put + # it on the classical channel. + + # Convert the message to a list of bits + encrypted_bits = encrypt_message(_ALICE_MESSAGE, _alice_otp) + if not encrypted_bits: + exit(1) + + # Step 9: + # PRIVATE DATA (to Bob): Bob can now decrypt the message using the OTP + _bob_message = decrypt_message(encrypted_bits, _bob_otp) + + # Finally, verify that the sent message is the same as received message + assert _ALICE_MESSAGE == _bob_message + print("Alice successfully sent a message to Bob using QKD!") + + +if __name__ == "__main__": + # Run tests to make sure we didn't mess up some quantum mathz + run_qbit_tests() + + # First - run the Quantum Key Distribution (QKD) algorithm + _alice_otp, _bob_otp = qkd() + + # Second - using the shared One Time Pads (OTP), share a message + message_exchange_otp(_ALICE_MESSAGE, _alice_otp, _bob_otp)