From 18fa5b853274afc2d3607732902145415d4e1ace Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sat, 28 Oct 2023 09:18:09 -0500 Subject: [PATCH] Small cleanups to mobile_app encryption (#102883) --- .../components/mobile_app/helpers.py | 64 +++++++++---------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/homeassistant/components/mobile_app/helpers.py b/homeassistant/components/mobile_app/helpers.py index e9bb3af51f2..9265b72d0d0 100644 --- a/homeassistant/components/mobile_app/helpers.py +++ b/homeassistant/components/mobile_app/helpers.py @@ -36,45 +36,49 @@ from .const import ( _LOGGER = logging.getLogger(__name__) -def setup_decrypt(key_encoder) -> tuple[int, Callable]: +def setup_decrypt( + key_encoder: type[RawEncoder] | type[HexEncoder], +) -> Callable[[bytes, bytes], bytes]: """Return decryption function and length of key. Async friendly. """ - def decrypt(ciphertext, key): + def decrypt(ciphertext: bytes, key: bytes) -> bytes: """Decrypt ciphertext using key.""" return SecretBox(key, encoder=key_encoder).decrypt( ciphertext, encoder=Base64Encoder ) - return (SecretBox.KEY_SIZE, decrypt) + return decrypt -def setup_encrypt(key_encoder) -> tuple[int, Callable]: +def setup_encrypt( + key_encoder: type[RawEncoder] | type[HexEncoder], +) -> Callable[[bytes, bytes], bytes]: """Return encryption function and length of key. Async friendly. """ - def encrypt(ciphertext, key): + def encrypt(ciphertext: bytes, key: bytes) -> bytes: """Encrypt ciphertext using key.""" return SecretBox(key, encoder=key_encoder).encrypt( ciphertext, encoder=Base64Encoder ) - return (SecretBox.KEY_SIZE, encrypt) + return encrypt def _decrypt_payload_helper( - key: str | None, - ciphertext: str, - get_key_bytes: Callable[[str, int], str | bytes], - key_encoder, + key: str | bytes, + ciphertext: bytes, + key_bytes: bytes, + key_encoder: type[RawEncoder] | type[HexEncoder], ) -> JsonValueType | None: """Decrypt encrypted payload.""" try: - keylen, decrypt = setup_decrypt(key_encoder) + decrypt = setup_decrypt(key_encoder) except OSError: _LOGGER.warning("Ignoring encrypted payload because libsodium not installed") return None @@ -83,33 +87,31 @@ def _decrypt_payload_helper( _LOGGER.warning("Ignoring encrypted payload because no decryption key known") return None - key_bytes = get_key_bytes(key, keylen) - msg_bytes = decrypt(ciphertext, key_bytes) message = json_loads(msg_bytes) _LOGGER.debug("Successfully decrypted mobile_app payload") return message -def decrypt_payload(key: str | None, ciphertext: str) -> JsonValueType | None: +def decrypt_payload(key: str, ciphertext: bytes) -> JsonValueType | None: """Decrypt encrypted payload.""" - - def get_key_bytes(key: str, keylen: int) -> str: - return key - - return _decrypt_payload_helper(key, ciphertext, get_key_bytes, HexEncoder) + return _decrypt_payload_helper(key, ciphertext, key.encode("utf-8"), HexEncoder) -def decrypt_payload_legacy(key: str | None, ciphertext: str) -> JsonValueType | None: +def _convert_legacy_encryption_key(key: str) -> bytes: + """Convert legacy encryption key.""" + keylen = SecretBox.KEY_SIZE + key_bytes = key.encode("utf-8") + key_bytes = key_bytes[:keylen] + key_bytes = key_bytes.ljust(keylen, b"\0") + return key_bytes + + +def decrypt_payload_legacy(key: str, ciphertext: bytes) -> JsonValueType | None: """Decrypt encrypted payload.""" - - def get_key_bytes(key: str, keylen: int) -> bytes: - key_bytes = key.encode("utf-8") - key_bytes = key_bytes[:keylen] - key_bytes = key_bytes.ljust(keylen, b"\0") - return key_bytes - - return _decrypt_payload_helper(key, ciphertext, get_key_bytes, RawEncoder) + return _decrypt_payload_helper( + key, ciphertext, _convert_legacy_encryption_key(key), RawEncoder + ) def registration_context(registration: Mapping[str, Any]) -> Context: @@ -184,16 +186,14 @@ def webhook_response( json_data = json_bytes(data) if registration[ATTR_SUPPORTS_ENCRYPTION]: - keylen, encrypt = setup_encrypt( + encrypt = setup_encrypt( HexEncoder if ATTR_NO_LEGACY_ENCRYPTION in registration else RawEncoder ) if ATTR_NO_LEGACY_ENCRYPTION in registration: key: bytes = registration[CONF_SECRET] else: - key = registration[CONF_SECRET].encode("utf-8") - key = key[:keylen] - key = key.ljust(keylen, b"\0") + key = _convert_legacy_encryption_key(registration[CONF_SECRET]) enc_data = encrypt(json_data, key).decode("utf-8") json_data = json_bytes({"encrypted": True, "encrypted_data": enc_data})