Skip to content

Double Ratchet

Double Ratchet

  • Uses Three Ratchets
    • Root Ratchet: Ratchet happens when a user delivers a message with a new unknown DH key. This updates the keys for the Sending and Receiving Ratchet
      • This provides Post-compromise security when a message is sent from the other user
    • Sending Ratchet: Ratchets when need to encrypt a message
      • This provides Perfect Forward Secrecy
    • Receiving Ratchet: Ratchets when needs to decrypt a message
      • This provides Perfect Forward Secrecy
  • Biased on Key Derivation Function Chains
    • Generates new key each message for forward secrecy
    • Also Generates a new key as input key for the next function
  • Has Forward Security
    • Need both the previous Key and the new input to generate the new key

Source

Example:

from cryptography.hazmat.primitives.asymmetric.x448 import X448PrivateKey, X448PublicKey
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey, Ed448PublicKey

from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF

from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305

import json, os, base64



class Signal_Server():
	def __init__(self):
		### x3DH Varables
		self.idenity_database = {}
		self.idenity_database_sign = {}

		self.preKey_database = {}
		self.onetime_database = {}
		self.info = b"Signal_Server"

		### Double Ratchet Variables
		self.dh_ratchet_keys = {}
	
	def registerUser(self, username, idenity_key, idenity_key_sign):
		self.idenity_database[username] = idenity_key
		self.idenity_database_sign[username] = idenity_key_sign

	def getUsersDoubleRatchetKey(self, username):
		if username in self.dh_ratchet_keys:
				return self.dh_ratchet_keys[username]
		return None

	def getUsersIdenityKey(self, username):
		return self.idenity_database[username]

	def getUsersIdenitySigningKey(self, username):
		return self.idenity_database_sign[username]

	def getUsersPreKey(self, username):
		return self.preKey_database[username]

	def getUserOneTimeKey(self, username):
		#Return then remove key
		return self.onetime_database[username].pop(0)

	def updateUsersDoubleRatchetKey(self, src_username, double_ratchet_key):
		if src_username not in self.dh_ratchet_keys:
			self.dh_ratchet_keys[src_username] = {}
		self.dh_ratchet_keys[src_username] = double_ratchet_key

	def updateUserPreKey(self, username, preKey_bytes, signature):
		idenity_key = Ed448PublicKey.from_public_bytes(self.idenity_database_sign[username])

		if idenity_key.verify(signature, preKey_bytes):
			print("[-] Signature doesn't match the Prekey. Either Bad Signature, Wrong Key, wrong username")
			return False
		else:
			#print("[+] Signature Accepted")
			self.preKey_database[username] = {"key": preKey_bytes, "signature": signature}
			return True

	def addOneTimePreKey(self, username, keys_bytes):
		#Check from the correct User???
		if username not in self.onetime_database:
			self.onetime_database[username] = []
		self.onetime_database[username].append(keys_bytes)
			

class UserKDFs():
	def __init__(self, hash_obj=hashes.SHA512()):
		self.secret_length = 32
		self.hash_obj = hash_obj
		# Needs to be the same for each user
		# This is the shared secret for the x3DH
		self.kdfchain_root_keys = {}
		self.kdfchain_sending_keys = {}
		self.kdfchain_recieving_keys = {}

		self.kdfchain_x448_key = X448PrivateKey.generate()

		#Setup KDF Chains
		#DH shared key secrets are used for input to the root KDF Chain
		self.root_kdf_chains = {}
		#The root KDF Chain outputs are used as KDF keys for the sending and receiving chains
		#Each incoming/outgoing message advances the chain. The outputs are used for the Symmetric Encryption/Decryption for each message
		self.sending_kdf_chains = {}
		self.receiving_kdf_chains = {}


		#Initialize the Root Chain
	def ratchetChain(self, chain, username, dh_shared_secret=b""):
		if chain != "root" and dh_shared_secret != b"":
			raise ValueError(f"DH_secret only accepted on the root chain")
			return

		elif chain == "root":
			output = HKDF(algorithm=self.hash_obj, length=self.secret_length*2+12, salt=b"", info=b"").derive(self.kdfchain_root_keys[username] + dh_shared_secret)
			#Update RootKDF and return the rest of the output
			self.kdfchain_root_keys[username], ret = output[:self.secret_length], output[self.secret_length:]

		elif chain == "sending":
			output = HKDF(algorithm=self.hash_obj, length=self.secret_length*2+12, salt=b"", info=b"").derive(self.kdfchain_sending_keys[username])
			#Update SendingKDF and return the rest of the output
			self.kdfchain_sending_keys[username], ret = output[:self.secret_length], output[self.secret_length:]

		elif chain == "recieving":
			output = HKDF(algorithm=self.hash_obj, length=self.secret_length*2+12, salt=b"", info=b"").derive(self.kdfchain_recieving_keys[username])
			#Update RecievingKDF and return the rest of the output
			self.kdfchain_recieving_keys[username], ret = output[:self.secret_length], output[self.secret_length:]

		else:
			raise ValueError(f"{chain} is not a valid Chain")
			return

		return ret


	def initUserKDFs(self, username, sending=True, inital_shared_secret=None, public_key=None):
		#If inital_shared_secret is set then initialize the Chains as Symmetric Ratchet
		if inital_shared_secret != None and public_key != None:
			#Use the x3DH shared secret from the username to generate the initial RootKDF
			self.kdfchain_root_keys[username] = inital_shared_secret

			#Ratchet the Root KDF to initialize the Receive and Send Chains
			key1 = self.ratchetChain("root", username)[:self.secret_length]
			key2 = self.ratchetChain("root", username)[:self.secret_length]

			#If sending then update the sending Keys first
			if sending:
				self.kdfchain_sending_keys[username], self.kdfchain_recieving_keys[username] = key1, key2
				#print(f"Sending::initUserKDFs::SEM sending_keys[{username}]: {self.kdfchain_sending_keys[username].hex()}")
				#print(f"Sending::initUserKDFs::SEM recieving_keys[{username}]: {self.kdfchain_recieving_keys[username].hex()}")

				# Should only Trigger on an initialization sending
				dh_shared_secret = self.kdfchain_x448_key.exchange(public_key)
				#print(f"Sending::initUserKDFs::DH initial dh_shared_secret[{username}]: {dh_shared_secret.hex()}")

				#Update Root Ratchet with dh_shared_secret and update the receiving chain
				self.kdfchain_sending_keys[username] = self.ratchetChain("root", username, dh_shared_secret)[:self.secret_length]
				#print(f"Sending::initUserKDFs::DH sending_keys[{username}]: {self.kdfchain_sending_keys[username].hex()}")

			else:
				self.kdfchain_recieving_keys[username], self.kdfchain_sending_keys[username] = key1, key2
				#print(f"Receiving::initUserKDFs::SEM sending_keys[{username}]: {self.kdfchain_sending_keys[username].hex()}")
				#print(f"Receiving::initUserKDFs::SEM recieving_keys[{username}]: {self.kdfchain_recieving_keys[username].hex()}")
				


		#If the public key is also available do DH Ratchet 
		if not sending:
			dh_shared_secret = self.kdfchain_x448_key.exchange(public_key)

			#Update Root Ratchet with dh_shared_secret and update the sending chain
			self.kdfchain_recieving_keys[username] = self.ratchetChain("root", username, dh_shared_secret)[:self.secret_length]
			#print(f"Receiving::initUserKDFs::DH recieving_keys[{username}]: {self.kdfchain_recieving_keys[username].hex()}")


			#Update the Sending Ratchet
			#Create and send keys to the server
			self.kdfchain_x448_key = X448PrivateKey.generate()

			#Get the dh_shared_secret for the sending 
			dh_shared_secret = self.kdfchain_x448_key.exchange(public_key)
			#print(f"Receiving::initUserKDFs::DH new dh_shared_secret[{username}]: {dh_shared_secret.hex()}")

			#Update Root Ratchet with dh_shared_secret
			#Set the sending_key for user
			self.kdfchain_sending_keys[username] = self.ratchetChain("root", username, dh_shared_secret)[:self.secret_length]
			#print(f"Receiving::initUserKDFs::DH sending_keys[{username}]: {self.kdfchain_sending_keys[username].hex()}")




		
class Signal_User(object):
	def __init__(self, username, signal_server):
		self.username = username
		#Add the Signal Server to the user so You don't have to pass it through to all of the functions
		self.signal_server = signal_server


		### 3DH Specific Variables
		self.idenity_private_key_sign = Ed448PrivateKey.generate()
		#The Ed448 Private Key is 57 bits long while the x448 is 56 bits long
		#https://crypto.stackexchange.com/questions/99974/curve448-can-ed448-key-material-be-reused-for-x448
		# So we generate the x448 from the Ed448 key
		digest = hashes.Hash(hashes.SHAKE256(56))
		digest.update(self.idenity_private_key_sign.private_bytes_raw())
		x488_key = digest.finalize()
		self.idenity_private_key = X448PrivateKey.from_private_bytes(x488_key)

		self.private_pre_key = X448PrivateKey.generate()
		self.one_time_private_keys = []
		#Lets Generate 5 Prekeys
		for x in range(5):
			self.one_time_private_keys.append(X448PrivateKey.generate())


		### Double Ratchet Specific Variables
		self.users_ratchet = UserKDFs()
		self.dh_ratchet_cache = {}

	def send_RatchetKeyToSignalServer(self):
		self.signal_server.updateUsersDoubleRatchetKey(self.username, self.users_ratchet.kdfchain_x448_key.public_key().public_bytes_raw())

	def send_registerUser(self):
		self.signal_server.registerUser(self.username, self.idenity_private_key.public_key().public_bytes_raw(), self.idenity_private_key_sign.public_key().public_bytes_raw())

	def send_PreKeyToSignalServer(self):
		#Get Public Key
		public_Pre_Key = self.private_pre_key.public_key() 

		signature = self.idenity_private_key_sign.sign(public_Pre_Key.public_bytes_raw())

		#Send to Server
		self.signal_server.updateUserPreKey(self.username, public_Pre_Key.public_bytes_raw(), signature)


	def send_OneTimeKeysToSignalServer(self):
		for key in self.one_time_private_keys:
			key_bytes = key.public_key().public_bytes_raw()
			self.signal_server.addOneTimePreKey(self.username, key_bytes)

	def _generate3DHSecretKey_sender(self, username):
		#Lets get the required Data from the servers and check signatures
		dst_idenity_key = X448PublicKey.from_public_bytes(self.signal_server.getUsersIdenityKey(username))

		dst_idenity_sign_key = Ed448PublicKey.from_public_bytes(self.signal_server.getUsersIdenitySigningKey(username))
		dst_prekey_and_signature = self.signal_server.getUsersPreKey(username)
		dst_prekey = X448PublicKey.from_public_bytes(dst_prekey_and_signature["key"])
		dst_signature = dst_prekey_and_signature["signature"]

		#Generate Ephemeral Key for the rest of the Key generation
		self.ephemeral_3dh_key = X448PrivateKey.generate()

		dst_one_time_key = X448PublicKey.from_public_bytes(self.signal_server.getUserOneTimeKey(username))

		#Check that the signature is correct for that key
		if dst_idenity_sign_key.verify(dst_signature, dst_prekey_and_signature["key"]):
			print("[-] Signature doesn't match the Prekey. Either Bad Signature, Wrong Key, wrong username")
			return False

		#Generate the 4 DH keys and generate secret
		# DH1 = Alice Identity Key, Bob's Signed Pre_key
		#Now that we have checked that the server did not give us the wrong user key Lets continue
		dh1 = self.idenity_private_key.exchange(dst_prekey)


		#DH2 = Alice's Ephemeral Key, Bob's Identity Key
		#Alice Generates a new Key for this exchange on the fly
		dh2 = self.ephemeral_3dh_key.exchange(dst_idenity_key)


		#DH3 = Alice's Ephemeral Key, Bob's Signed Pre_key
		dh3 = self.ephemeral_3dh_key.exchange(dst_prekey)


		#DH4 = Alice's Ephemeral Key, Bob's One Time Key
		dh4 = self.ephemeral_3dh_key.exchange(dst_one_time_key)

		return self._generate3DHSecretKey(dh1, dh2, dh3, dh4)


	def _generate3DHSecretKey(self, dh1, dh2, dh3, dh4):
		if self.idenity_private_key is X448PrivateKey:
			pre_hash_bytes = 57*b"\xFF"
			hash_obj = HKDF(algorithm=hashes.SHA512(), length=64, salt=(64*b"\x00"), info=b"")
		else:
			pre_hash_bytes = 32*b"\xFF"
			hash_obj = HKDF(algorithm=hashes.SHA256(), length=32, salt=(32*b"\x00"), info=b"")

		secret_key = hash_obj.derive(pre_hash_bytes + dh1 + dh2 + dh3 + dh4)
		return secret_key

	def send_message(self, message, username):
		#Check if Signal Server has a Ratchet key for this User
		dst_ratchet_DH = X448PublicKey.from_public_bytes(self.signal_server.getUsersDoubleRatchetKey(username))
		#print(f"{self.username}::send_message: Get Ratchet Key From Server: {self.signal_server.getUsersDoubleRatchetKey(username).hex()}")
		#for x in self.dh_ratchet_cache:
		#	print(f"    {x}: {self.dh_ratchet_cache[x].public_bytes_raw().hex()}")

		#Only add the x3DH information if needed
		#Make sure to save the old dh key to add to the aad
		aad = {"src_idenity_key":self.idenity_private_key.public_key().public_bytes_raw().hex(),
		"src_ratchet_key": self.users_ratchet.kdfchain_x448_key.public_key().public_bytes_raw().hex(), "src_username": self.username,
		"dst_username": username, "server_info": self.signal_server.info.decode('ascii')}

		#First time seeing this user do x3DH
		if username not in self.dh_ratchet_cache:
			#Check if already done x3dh. If not do it
			#Initialize Ratchets knowing x3dh is done
			dh_secret_key = self._generate3DHSecretKey_sender(username)
			#print(f"{self.username}::send_message: finish 3xDH dh_secret_key: {dh_secret_key.hex()}")

			#Add x3DH to aad
			aad["src_ephemeral_key"] = self.ephemeral_3dh_key.public_key().public_bytes_raw().hex()

			#Init Ratchets
			self.users_ratchet.initUserKDFs(username, sending=True, inital_shared_secret=dh_secret_key, public_key=dst_ratchet_DH)
			print("Sending After Ratchet DH")
			#Update DH Ratchet Cache
			self.dh_ratchet_cache[username] = dst_ratchet_DH
			#Only update DH key when sending a message to another user
			self.send_RatchetKeyToSignalServer()
			#print(f"{self.username}::send_message: send to server Ratchet Key: {self.users_ratchet.kdfchain_x448_key.public_key().public_bytes_raw().hex()}")


		elif dst_ratchet_DH != self.dh_ratchet_cache[username]:
			#Destiation's Ratchet DH key has been updated Update Ratchet
			self.users_ratchet.initUserKDFs(username, sending=True, public_key=dst_ratchet_DH)
			print("Sending After Ratchet DH")
			#Update DH Ratchet Cache
			self.dh_ratchet_cache[username] = dst_ratchet_DH
			#Only update DH key when sending a message to another user
			self.send_RatchetKeyToSignalServer()
			#print(f"{self.username}::send_message: send to server Ratchet Key: {self.users_ratchet.kdfchain_x448_key.public_key().public_bytes_raw().hex()}")

		print(f"{self.username}::send_message: sending_key: {self.users_ratchet.kdfchain_sending_keys[username].hex()}")
		print(f"{self.username}::send_message: recieving_key: {self.users_ratchet.kdfchain_recieving_keys[username].hex()}")


		#Get message_sending Key from the ratchet
		output = self.users_ratchet.ratchetChain("sending", username)
		sending_key, nonce = output[:self.users_ratchet.secret_length], output[self.users_ratchet.secret_length:]
		#print(f"{self.username}::send_message: Encrypt with sending_key: {sending_key.hex()}, nonce: {nonce.hex()}")

		#Encrypt and return all information
		return self._AADAndEncryptMessage(sending_key, nonce, message, username, json.dumps(aad).encode('ascii'))



	def _AADAndEncryptMessage(self, secret_key, nonce, message, username, aad_bytes):
		chacha = ChaCha20Poly1305(secret_key)
		cyphertext = chacha.encrypt(nonce, message, aad_bytes)

		return {"aad":aad_bytes, "cyphertext": cyphertext}

	def recieveMessage(self, message):
		#Extract Information from message
		aad, cyphertext = [json.loads(message["aad"]), message["cyphertext"]]

		src_username, dst_username, server_info = [aad["src_username"], aad["dst_username"], aad["server_info"]]
		src_ratchet_key_bytes, src_idenity_key_bytes, = [bytes.fromhex(aad["src_ratchet_key"]), bytes.fromhex(aad["src_idenity_key"])]


		src_idenity_key = X448PublicKey.from_public_bytes(src_idenity_key_bytes)
		src_ratchet_key = X448PublicKey.from_public_bytes(src_ratchet_key_bytes)

		#Init Ratchets
		#First time seeing this user do x3DH
		if src_username not in self.dh_ratchet_cache:
			### Start x3DH for the Recipient
			src_ephemeral_key = X448PublicKey.from_public_bytes(bytes.fromhex(aad["src_ephemeral_key"]))

			#Generate DH1
			# DH1 = Src Idenity Key, Dst Signed Pre_key
			dh1 = self.private_pre_key.exchange(src_idenity_key)

			#Generate DH2
			#DH2 = Src Ephemeral Key, Dst Identity Key
			dh2 = self.idenity_private_key.exchange(src_ephemeral_key)

			#Generate DH3
			#DH3 = Src Ephemeral Key, Dst Signed Pre_key
			dh3 = self.private_pre_key.exchange(src_ephemeral_key)

			#Generate DH4
			#DH4 = Src Ephemeral Key, Dst One Time Key
			message_one_time_priv_key = self.one_time_private_keys.pop(0)
			dh4 = message_one_time_priv_key.exchange(src_ephemeral_key)

			#Generate Secret Key
			dh_shared_secret = self._generate3DHSecretKey(dh1, dh2, dh3, dh4)
			print(f"{self.username}::recieveMessage: dh_shared_secret: {dh_shared_secret.hex()}")

			#Init Ratchets
			self.users_ratchet.initUserKDFs(src_username, sending=False, inital_shared_secret=dh_shared_secret, public_key=src_ratchet_key)
			print("Receiving After Ratchet DH")
			#Update DH Ratchet Cache
			self.dh_ratchet_cache[src_username] = src_ratchet_key
			#Dont update signal server when receiving a message. Only when sending a message
			#self.send_RatchetKeyToSignalServer()

		elif src_ratchet_key != self.dh_ratchet_cache[src_username]:
			#Destination's Ratchet DH key has been updated Update Ratchet
			self.users_ratchet.initUserKDFs(src_username, sending=False, public_key=src_ratchet_key)
			print("Receiving After Ratchet DH")
			#Update DH Ratchet Cache
			self.dh_ratchet_cache[src_username] = src_ratchet_key
			#Dont update signal server when receiving a message. Only when sending a message
			#self.send_RatchetKeyToSignalServer()

		print(f"{self.username}::recieveMessage: sending_key: {self.users_ratchet.kdfchain_sending_keys[src_username].hex()}")
		print(f"{self.username}::recieveMessage: recieving_key: {self.users_ratchet.kdfchain_recieving_keys[src_username].hex()}")

		#Get message_sending Key from the ratchet
		output = self.users_ratchet.ratchetChain("receiving", src_username)
		recieving_key, nonce = output[:self.users_ratchet.secret_length], output[self.users_ratchet.secret_length:]



		#Decrypt and Verify Cypher text
		chacha = ChaCha20Poly1305(recieving_key)
		return chacha.decrypt(nonce, cyphertext, message["aad"])


if __name__ == '__main__':
	#Generate Server
	signal_server = Signal_Server()

	#Generate Users
	alice = Signal_User("alice", signal_server)
	bob = Signal_User("bob", signal_server)

	#Setup Users with the server
	alice.send_registerUser()
	#x3DH Initialization
	alice.send_PreKeyToSignalServer()
	alice.send_OneTimeKeysToSignalServer()
	#DH Ratchet Key Initialization
	alice.send_RatchetKeyToSignalServer()

	bob.send_registerUser()
	#x3DH Initialization
	bob.send_PreKeyToSignalServer()
	bob.send_OneTimeKeysToSignalServer()
	#DH Ratchet Key Initialization
	bob.send_RatchetKeyToSignalServer()


	#Test 5 Messages from a single user
	for x in range(5):
		encrypted_message = alice.send_message(b"Test Message 12345", "bob")
		print()
		print(f"Message Received: {bob.recieveMessage(encrypted_message)}")
		print()

	print()

	# Test 5 Ping Pong
	for x in range(5):
		encrypted_message = bob.send_message(b"Test Message 12345", "alice")
		print()
		print(f"Message Received: {alice.recieveMessage(encrypted_message)}")
		print()

		encrypted_message = alice.send_message(b"Test Message 12345", "bob")
		print()
		print(f"Message Received: {bob.recieveMessage(encrypted_message)}")
		print()

	#Test 5 Messages from a other user
	for x in range(5):
		encrypted_message = bob.send_message(b"Test Message 12345", "alice")
		print()
		print(f"Message Received: {alice.recieveMessage(encrypted_message)}")
		print()

	print()