Stop processing when we hit bad encryption in mobile app (#88150)
* Stop processing when we hit bad encryption * Accept webhook payload that is a list * Rename functions because we import them * Revert a debug thing --------- Co-authored-by: epenet <6771947+epenet@users.noreply.github.com>
This commit is contained in:
parent
57738fbb8c
commit
bc2b35765e
3 changed files with 37 additions and 38 deletions
|
@ -92,7 +92,7 @@ def _decrypt_payload_helper(
|
|||
return message
|
||||
|
||||
|
||||
def _decrypt_payload(key: str | None, ciphertext: str) -> JsonValueType | None:
|
||||
def decrypt_payload(key: str | None, ciphertext: str) -> JsonValueType | None:
|
||||
"""Decrypt encrypted payload."""
|
||||
|
||||
def get_key_bytes(key: str, keylen: int) -> str:
|
||||
|
@ -101,7 +101,7 @@ def _decrypt_payload(key: str | None, ciphertext: str) -> JsonValueType | None:
|
|||
return _decrypt_payload_helper(key, ciphertext, get_key_bytes, HexEncoder)
|
||||
|
||||
|
||||
def _decrypt_payload_legacy(key: str | None, ciphertext: str) -> JsonValueType | None:
|
||||
def decrypt_payload_legacy(key: str | None, ciphertext: str) -> JsonValueType | None:
|
||||
"""Decrypt encrypted payload."""
|
||||
|
||||
def get_key_bytes(key: str, keylen: int) -> bytes:
|
||||
|
|
|
@ -108,8 +108,8 @@ from .const import (
|
|||
SIGNAL_SENSOR_UPDATE,
|
||||
)
|
||||
from .helpers import (
|
||||
_decrypt_payload,
|
||||
_decrypt_payload_legacy,
|
||||
decrypt_payload,
|
||||
decrypt_payload_legacy,
|
||||
empty_okay_response,
|
||||
error_response,
|
||||
registration_context,
|
||||
|
@ -128,13 +128,20 @@ WEBHOOK_COMMANDS: Registry[
|
|||
|
||||
SENSOR_TYPES = (ATTR_SENSOR_TYPE_BINARY_SENSOR, ATTR_SENSOR_TYPE_SENSOR)
|
||||
|
||||
WEBHOOK_PAYLOAD_SCHEMA = vol.Schema(
|
||||
{
|
||||
vol.Required(ATTR_WEBHOOK_TYPE): cv.string,
|
||||
vol.Required(ATTR_WEBHOOK_DATA, default={}): vol.Any(dict, list),
|
||||
vol.Optional(ATTR_WEBHOOK_ENCRYPTED, default=False): cv.boolean,
|
||||
vol.Optional(ATTR_WEBHOOK_ENCRYPTED_DATA): cv.string,
|
||||
}
|
||||
WEBHOOK_PAYLOAD_SCHEMA = vol.Any(
|
||||
vol.Schema(
|
||||
{
|
||||
vol.Required(ATTR_WEBHOOK_TYPE): cv.string,
|
||||
vol.Optional(ATTR_WEBHOOK_DATA): vol.Any(dict, list),
|
||||
}
|
||||
),
|
||||
vol.Schema(
|
||||
{
|
||||
vol.Required(ATTR_WEBHOOK_TYPE): cv.string,
|
||||
vol.Required(ATTR_WEBHOOK_ENCRYPTED): True,
|
||||
vol.Optional(ATTR_WEBHOOK_ENCRYPTED_DATA): cv.string,
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
|
@ -201,19 +208,19 @@ async def handle_webhook(
|
|||
|
||||
webhook_type = req_data[ATTR_WEBHOOK_TYPE]
|
||||
|
||||
webhook_payload = req_data.get(ATTR_WEBHOOK_DATA, {})
|
||||
webhook_payload = None
|
||||
|
||||
if req_data[ATTR_WEBHOOK_ENCRYPTED]:
|
||||
if ATTR_WEBHOOK_ENCRYPTED in req_data:
|
||||
enc_data = req_data[ATTR_WEBHOOK_ENCRYPTED_DATA]
|
||||
try:
|
||||
webhook_payload = _decrypt_payload(config_entry.data[CONF_SECRET], enc_data)
|
||||
webhook_payload = decrypt_payload(config_entry.data[CONF_SECRET], enc_data)
|
||||
if ATTR_NO_LEGACY_ENCRYPTION not in config_entry.data:
|
||||
data = {**config_entry.data, ATTR_NO_LEGACY_ENCRYPTION: True}
|
||||
hass.config_entries.async_update_entry(config_entry, data=data)
|
||||
except CryptoError:
|
||||
if ATTR_NO_LEGACY_ENCRYPTION not in config_entry.data:
|
||||
try:
|
||||
webhook_payload = _decrypt_payload_legacy(
|
||||
webhook_payload = decrypt_payload_legacy(
|
||||
config_entry.data[CONF_SECRET], enc_data
|
||||
)
|
||||
except CryptoError:
|
||||
|
@ -221,11 +228,16 @@ async def handle_webhook(
|
|||
"Ignoring encrypted payload because unable to decrypt"
|
||||
)
|
||||
except ValueError:
|
||||
_LOGGER.warning("Ignoring invalid encrypted payload")
|
||||
_LOGGER.warning("Ignoring invalid JSON in encrypted payload")
|
||||
else:
|
||||
_LOGGER.warning("Ignoring encrypted payload because unable to decrypt")
|
||||
except ValueError:
|
||||
_LOGGER.warning("Ignoring invalid encrypted payload")
|
||||
except ValueError as err:
|
||||
_LOGGER.warning("Ignoring invalid JSON in encrypted payload: %s", err)
|
||||
else:
|
||||
webhook_payload = req_data.get(ATTR_WEBHOOK_DATA, {})
|
||||
|
||||
if webhook_payload is None:
|
||||
return empty_okay_response()
|
||||
|
||||
if webhook_type not in WEBHOOK_COMMANDS:
|
||||
_LOGGER.error(
|
||||
|
|
|
@ -421,9 +421,8 @@ async def test_webhook_handle_decryption_fail(
|
|||
)
|
||||
|
||||
assert resp.status == HTTPStatus.OK
|
||||
webhook_json = await resp.json()
|
||||
assert decrypt_payload(key, webhook_json["encrypted_data"]) == {}
|
||||
assert "Ignoring invalid encrypted payload" in caplog.text
|
||||
assert await resp.json() == {}
|
||||
assert "Ignoring invalid JSON in encrypted payload" in caplog.text
|
||||
caplog.clear()
|
||||
|
||||
# Break the key, and send JSON data
|
||||
|
@ -434,8 +433,7 @@ async def test_webhook_handle_decryption_fail(
|
|||
)
|
||||
|
||||
assert resp.status == HTTPStatus.OK
|
||||
webhook_json = await resp.json()
|
||||
assert decrypt_payload(key, webhook_json["encrypted_data"]) == {}
|
||||
assert await resp.json() == {}
|
||||
assert "Ignoring encrypted payload because unable to decrypt" in caplog.text
|
||||
|
||||
|
||||
|
@ -466,9 +464,8 @@ async def test_webhook_handle_decryption_legacy_fail(
|
|||
)
|
||||
|
||||
assert resp.status == HTTPStatus.OK
|
||||
webhook_json = await resp.json()
|
||||
assert decrypt_payload_legacy(key, webhook_json["encrypted_data"]) == {}
|
||||
assert "Ignoring invalid encrypted payload" in caplog.text
|
||||
assert await resp.json() == {}
|
||||
assert "Ignoring invalid JSON in encrypted payload" in caplog.text
|
||||
caplog.clear()
|
||||
|
||||
# Break the key, and send JSON data
|
||||
|
@ -479,8 +476,7 @@ async def test_webhook_handle_decryption_legacy_fail(
|
|||
)
|
||||
|
||||
assert resp.status == HTTPStatus.OK
|
||||
webhook_json = await resp.json()
|
||||
assert decrypt_payload_legacy(key, webhook_json["encrypted_data"]) == {}
|
||||
assert await resp.json() == {}
|
||||
assert "Ignoring encrypted payload because unable to decrypt" in caplog.text
|
||||
|
||||
|
||||
|
@ -536,16 +532,7 @@ async def test_webhook_handle_decryption_legacy_upgrade(
|
|||
)
|
||||
|
||||
assert resp.status == HTTPStatus.OK
|
||||
|
||||
webhook_json = await resp.json()
|
||||
assert "encrypted_data" in webhook_json
|
||||
|
||||
# The response should be empty, encrypted with the new method
|
||||
with pytest.raises(Exception):
|
||||
decrypt_payload_legacy(key, webhook_json["encrypted_data"])
|
||||
decrypted_data = decrypt_payload(key, webhook_json["encrypted_data"])
|
||||
|
||||
assert decrypted_data == {}
|
||||
assert await resp.json() == {}
|
||||
|
||||
|
||||
async def test_webhook_requires_encryption(
|
||||
|
|
Loading…
Add table
Reference in a new issue