X3DH
Extended Triple Diffie-Hellman¶
- provides cryptographic deniability
- Uses X25519 or X448 and 256 or 512-bit hash function
- The main use of this is generate a secure random between you and another person with out them being available to exchange a DH key.
https://signal.org/docs/specifications/x3dh/
Why so many DH¶
The First Diffie-Helman authenticates Alice to Bob
The Second Diffie-Helman authenticates Bob to Alice
The Third Diffie-Helman makes a shared connection secret with forward secrecy
The Fourth Diffie-Helman prevents replay attacks as long as the key is sent to one person and deleted from the server.
x448 Implementation¶
from cryptography.hazmat.primitives.asymmetric.x448 import X448PrivateKey, X448PublicKey
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey, Ed448PublicKey
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import json, os, base64
class Signal_Server(object):
"""docstring for Signal_Server"""
def __init__(self):
self.idenity_database = {}
self.idenity_database_sign = {}
self.preKey_database = {}
self.onetime_database = {}
self.info = b"Signal_Server"
def registerUser(self, username, idenity_key, idenity_key_sign):
self.idenity_database[username] = X448PublicKey.from_public_bytes(idenity_key)
self.idenity_database_sign[username] = Ed448PublicKey.from_public_bytes(idenity_key_sign)
def getUsersIdenityKey(self, username):
return self.idenity_database[username].public_bytes_raw()
def getUsersIdenitySigningKey(self, username):
return self.idenity_database_sign[username].public_bytes_raw()
def getUsersPreKey(self, username):
return self.preKey_database[username]
def getUserOneTimeKey(self, username):
#Return then remove key
return self.onetime_database[username].pop(0)
def updateUserPreKey(self, username, preKey_bytes, signature):
idenity_key = self.idenity_database_sign[username]
if idenity_key.verify(signature, preKey_bytes):
print("[-] Signature does't match the Prekey. Either Bad Signature, Wrong Key, wrong username")
return False
else:
print("[+] Signature Accepted")
self.preKey_database[username] = {"key": preKey_bytes, "signature": signature}
return True
def addOneTimePreKey(self, username, keys_bytes):
#Check from the correct User???
if username not in self.onetime_database:
self.onetime_database[username] = []
self.onetime_database[username].append(keys_bytes)
class Signal_User(object):
"""docstring for Signal_User"""
def __init__(self, username):
self.username = username
self.idenity_private_key = None
self.idenity_private_key_sign = None
self.private_pre_key = None
self.one_time_private_keys = []
self._generateKeys()
def _generateKeys(self):
self.idenity_private_key_sign = Ed448PrivateKey.generate()
#The Ed448 Private Key is 57 bits long while the x448 is 56 bits long
#https://crypto.stackexchange.com/questions/99974/curve448-can-ed448-key-material-be-reused-for-x448
# So we generate the x448 from the Ed448 key
digest = hashes.Hash(hashes.SHAKE256(57))
digest.update(self.idenity_private_key_sign.private_bytes_raw())
x488_key = digest.finalize()
#print(x488_key)
self.idenity_private_key = Ed448PrivateKey.from_private_bytes(x488_key)
self.idenity_private_key = X448PrivateKey.generate()
self.private_pre_key = X448PrivateKey.generate()
#Lets Generate 5 Prekeys
for x in range(5):
self.one_time_private_keys.append(X448PrivateKey.generate())
def registerUser(self, signal_server):
signal_server.registerUser(self.username, self.idenity_private_key.public_key().public_bytes_raw(), self.idenity_private_key_sign.public_key().public_bytes_raw())
def generateEphemeralKey(self):
self.ephemeral_key = X448PrivateKey.generate()
def send_PreKeyToSignalServer(self, signal_server):
#Get Public Key
public_Pre_Key = self.private_pre_key.public_key()
signature = self.idenity_private_key_sign.sign(public_Pre_Key.public_bytes_raw())
#Send to Server
signal_server.updateUserPreKey(self.username, public_Pre_Key.public_bytes_raw(), signature)
def generateDH1_sender(self, signal_server, username):
#Get Destination User's Signed Public Key
dst_idenity_sign_key = Ed448PublicKey.from_public_bytes(signal_server.getUsersIdenitySigningKey(username))
dst_prekey_and_signature = signal_server.getUsersPreKey(username)
#Check that the signature is correct for that key
if dst_idenity_sign_key.verify(dst_prekey_and_signature["signature"], dst_prekey_and_signature["key"]):
print("[-] Signature does't match the Prekey. Either Bad Signature, Wrong Key, wrong username")
return False
#Now that we have checked that the server did not give us the wrong user key Lets continue
dst_prekey = X448PublicKey.from_public_bytes(dst_prekey_and_signature["key"])
return self.idenity_private_key.exchange(dst_prekey)
def generateDH2_sender(self, signal_server, username):
#Get Destination User's Signed Public Key
dst_idenity_key = X448PublicKey.from_public_bytes(signal_server.getUsersIdenityKey(username))
#Generate Ephemeral Key for the rest of the Key generation
self.generateEphemeralKey()
return self.ephemeral_key.exchange(dst_idenity_key)
def generateDH3_sender(self, signal_server, username):
#Get Destination User's Signed Public Key
dst_idenity_sign_key = Ed448PublicKey.from_public_bytes(signal_server.getUsersIdenitySigningKey(username))
dst_prekey_and_signature = signal_server.getUsersPreKey(username)
#Check that the signature is correct for that key
if dst_idenity_sign_key.verify(dst_prekey_and_signature["signature"], dst_prekey_and_signature["key"]):
print("[-] Signature does't match the Prekey. Either Bad Signature, Wrong Key, wrong username")
return False
#Now that we have checked that the server did not give us the wrong user key Lets continue
dst_prekey = X448PublicKey.from_public_bytes(dst_prekey_and_signature["key"])
return self.ephemeral_key.exchange(dst_prekey)
def generateDH4_sender(self, signal_server, username):
#Get Destination User's Signed Public Key
dst_idenity_key = X448PublicKey.from_public_bytes(signal_server.getUsersIdenityKey(username))
dst_one_time_key = X448PublicKey.from_public_bytes(signal_server.getUserOneTimeKey(username))
return self.ephemeral_key.exchange(dst_one_time_key)
def send_OneTimeKeysToSignalServer(self, signal_server):
for key in self.one_time_private_keys:
key_bytes = key.public_key().public_bytes_raw()
signal_server.addOneTimePreKey(self.username, key_bytes)
def generateSecretKey(self, dh1, dh2, dh3, dh4):
#I Dont know where this is used???
if self.idenity_private_key is X448PrivateKey:
pre_hash_bytes = 57*b"\xFF"
hash_obj = PBKDF2HMAC(algorithm=hashes.SHA512(), length=64, salt=(64*b"\x00"), iterations=480000)
else:
pre_hash_bytes = 32*b"\xFF"
hash_obj = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=(32*b"\x00"), iterations=480000)
secret_key = hash_obj.derive(pre_hash_bytes + dh1 + dh2 + dh3 + dh4)
return secret_key
def AADAndEncryptMessage(self, secret_key, message, signal_server, username):
dst_idenity_key = signal_server.getUsersIdenityKey(username)
# Dest Ident Key, Source Ident Key, dest username, source username, server info
aad = {"src_idenity_key": base64.b64encode(self.idenity_private_key.public_key().public_bytes_raw()).decode('ascii') ,
"src_username": self.username, "src_ephemeral_key": base64.b64encode(self.ephemeral_key.public_key().public_bytes_raw()).decode('ascii'),
"dst_idenity_key": base64.b64encode(dst_idenity_key).decode('ascii'), "dst_username": username, "server_info": signal_server.info.decode('ascii')}
print(aad)
aad_bytes = json.dumps(aad).encode('ascii')
aesgcm = AESGCM(secret_key)
nonce = os.urandom(12)
cyphertext = aesgcm.encrypt(nonce, message, aad_bytes)
return {"aad":aad_bytes, "nonce":nonce, "cyphertext": cyphertext}
def recieveMessage(self, message):
aad, nonce, cyphertext = [json.loads(message["aad"]), message["nonce"], message["cyphertext"]]
src_username, src_idenity_key_bytes, src_ephemeral_key_bytes = [aad["src_username"], base64.b64decode(aad["src_idenity_key"]), base64.b64decode(aad["src_ephemeral_key"])]
dst_idenity_key_bytes, dst_username, server_info = [base64.b64decode(aad["dst_idenity_key"]), aad["dst_username"], aad["server_info"]]
src_idenity_key = X448PublicKey.from_public_bytes(src_idenity_key_bytes)
src_ephemeral_key = X448PublicKey.from_public_bytes(src_ephemeral_key_bytes)
#Generate DH1
# DH1 = Src Idenity Key, Dst Signed Pre_key
dh1 = self.private_pre_key.exchange(src_idenity_key)
#Generate DH2
#DH2 = Src Ephemeral Key, Dst Idenity Key
dh2 = self.idenity_private_key.exchange(src_ephemeral_key)
#Generate DH3
#DH3 = Src Ephemeral Key, Dst Signed Pre_key
dh3 = self.private_pre_key.exchange(src_ephemeral_key)
#Generate DH4
#DH4 = Src Ephemeral Key, Dst One Time Key
message_one_time_priv_key = self.one_time_private_keys.pop(0)
dh4 = message_one_time_priv_key.exchange(src_ephemeral_key)
#Generate Secret Key
secret_key = self.generateSecretKey(dh1, dh2, dh3, dh4)
#Decrypt and Verify Cypher text
aesgcm = AESGCM(secret_key)
return aesgcm.decrypt(nonce, cyphertext, message["aad"])
if __name__ == '__main__':
#Generate Server
signal_server = Signal_Server()
#Generate Users
alice = Signal_User("alice")
bob = Signal_User("bob")
#User Setups
alice.registerUser(signal_server)
alice.send_PreKeyToSignalServer(signal_server)
alice.send_OneTimeKeysToSignalServer(signal_server)
bob.registerUser(signal_server)
bob.send_PreKeyToSignalServer(signal_server)
bob.send_OneTimeKeysToSignalServer(signal_server)
#Setup Done lets send a message
#Assume that Bob is offline Alice can only exchange messages with the server
# DH1 = Alice Idenity Key, Bob's Signed Pre_key
#Alice Gets the
dh1_a = alice.generateDH1_sender(signal_server, "bob")
#DH2 = Alice's Ephemeral Key, Bob's Idenity Key
#Alice Generates a new Key for this exchange on the fly
dh2_a = alice.generateDH2_sender(signal_server, "bob")
#DH3 = Alice's Ephemeral Key, Bob's Signed Pre_key
dh3_a = alice.generateDH3_sender(signal_server, "bob")
#DH4 = Alice's Ephemeral Key, Bob's One Time Key
dh4_a = alice.generateDH4_sender(signal_server, "bob")
#Concat Secrets and feed through a KDF
# Salt is either 57*b"\xFF" for X448 or 32*b"\xFF" for X25519
secret_key = alice.generateSecretKey(dh1_a, dh2_a, dh3_a, dh4_a)
#Alice Deletes her Ephemeral Private Key and DH ouputs
encrypted_message = alice.AADAndEncryptMessage(secret_key, b"Test Message 12345", signal_server, "bob")
####### Time for Bob to get message
print(f"Message Recived: {bob.recieveMessage(encrypted_message)}")
Security¶
Using a One time Prekey replay attacks are not possoble