Small cleanups to mobile_app encryption (#102883)

This commit is contained in:
J. Nick Koston 2023-10-28 09:18:09 -05:00 committed by GitHub
parent 7d598801fe
commit 18fa5b8532
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -36,45 +36,49 @@ from .const import (
_LOGGER = logging.getLogger(__name__)
def setup_decrypt(key_encoder) -> tuple[int, Callable]:
def setup_decrypt(
key_encoder: type[RawEncoder] | type[HexEncoder],
) -> Callable[[bytes, bytes], bytes]:
"""Return decryption function and length of key.
Async friendly.
"""
def decrypt(ciphertext, key):
def decrypt(ciphertext: bytes, key: bytes) -> bytes:
"""Decrypt ciphertext using key."""
return SecretBox(key, encoder=key_encoder).decrypt(
ciphertext, encoder=Base64Encoder
)
return (SecretBox.KEY_SIZE, decrypt)
return decrypt
def setup_encrypt(key_encoder) -> tuple[int, Callable]:
def setup_encrypt(
key_encoder: type[RawEncoder] | type[HexEncoder],
) -> Callable[[bytes, bytes], bytes]:
"""Return encryption function and length of key.
Async friendly.
"""
def encrypt(ciphertext, key):
def encrypt(ciphertext: bytes, key: bytes) -> bytes:
"""Encrypt ciphertext using key."""
return SecretBox(key, encoder=key_encoder).encrypt(
ciphertext, encoder=Base64Encoder
)
return (SecretBox.KEY_SIZE, encrypt)
return encrypt
def _decrypt_payload_helper(
key: str | None,
ciphertext: str,
get_key_bytes: Callable[[str, int], str | bytes],
key_encoder,
key: str | bytes,
ciphertext: bytes,
key_bytes: bytes,
key_encoder: type[RawEncoder] | type[HexEncoder],
) -> JsonValueType | None:
"""Decrypt encrypted payload."""
try:
keylen, decrypt = setup_decrypt(key_encoder)
decrypt = setup_decrypt(key_encoder)
except OSError:
_LOGGER.warning("Ignoring encrypted payload because libsodium not installed")
return None
@ -83,33 +87,31 @@ def _decrypt_payload_helper(
_LOGGER.warning("Ignoring encrypted payload because no decryption key known")
return None
key_bytes = get_key_bytes(key, keylen)
msg_bytes = decrypt(ciphertext, key_bytes)
message = json_loads(msg_bytes)
_LOGGER.debug("Successfully decrypted mobile_app payload")
return message
def decrypt_payload(key: str | None, ciphertext: str) -> JsonValueType | None:
def decrypt_payload(key: str, ciphertext: bytes) -> JsonValueType | None:
"""Decrypt encrypted payload."""
def get_key_bytes(key: str, keylen: int) -> str:
return key
return _decrypt_payload_helper(key, ciphertext, get_key_bytes, HexEncoder)
return _decrypt_payload_helper(key, ciphertext, key.encode("utf-8"), HexEncoder)
def decrypt_payload_legacy(key: str | None, ciphertext: str) -> JsonValueType | None:
def _convert_legacy_encryption_key(key: str) -> bytes:
"""Convert legacy encryption key."""
keylen = SecretBox.KEY_SIZE
key_bytes = key.encode("utf-8")
key_bytes = key_bytes[:keylen]
key_bytes = key_bytes.ljust(keylen, b"\0")
return key_bytes
def decrypt_payload_legacy(key: str, ciphertext: bytes) -> JsonValueType | None:
"""Decrypt encrypted payload."""
def get_key_bytes(key: str, keylen: int) -> bytes:
key_bytes = key.encode("utf-8")
key_bytes = key_bytes[:keylen]
key_bytes = key_bytes.ljust(keylen, b"\0")
return key_bytes
return _decrypt_payload_helper(key, ciphertext, get_key_bytes, RawEncoder)
return _decrypt_payload_helper(
key, ciphertext, _convert_legacy_encryption_key(key), RawEncoder
)
def registration_context(registration: Mapping[str, Any]) -> Context:
@ -184,16 +186,14 @@ def webhook_response(
json_data = json_bytes(data)
if registration[ATTR_SUPPORTS_ENCRYPTION]:
keylen, encrypt = setup_encrypt(
encrypt = setup_encrypt(
HexEncoder if ATTR_NO_LEGACY_ENCRYPTION in registration else RawEncoder
)
if ATTR_NO_LEGACY_ENCRYPTION in registration:
key: bytes = registration[CONF_SECRET]
else:
key = registration[CONF_SECRET].encode("utf-8")
key = key[:keylen]
key = key.ljust(keylen, b"\0")
key = _convert_legacy_encryption_key(registration[CONF_SECRET])
enc_data = encrypt(json_data, key).decode("utf-8")
json_data = json_bytes({"encrypted": True, "encrypted_data": enc_data})