Restore KNX telegram history (#95800)

* Restore KNX telegram history

* increase default log size

* test removal of telegram history
This commit is contained in:
Matthias Alphart 2023-07-09 21:15:55 +02:00 committed by GitHub
parent 8bbb395bec
commit 89259865fb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 163 additions and 17 deletions

View file

@ -74,7 +74,7 @@ from .const import (
)
from .device import KNXInterfaceDevice
from .expose import KNXExposeSensor, KNXExposeTime, create_knx_exposure
from .project import KNXProject
from .project import STORAGE_KEY as PROJECT_STORAGE_KEY, KNXProject
from .schema import (
BinarySensorSchema,
ButtonSchema,
@ -96,7 +96,7 @@ from .schema import (
ga_validator,
sensor_type_validator,
)
from .telegrams import Telegrams
from .telegrams import STORAGE_KEY as TELEGRAMS_STORAGE_KEY, Telegrams
from .websocket import register_panel
_LOGGER = logging.getLogger(__name__)
@ -360,16 +360,21 @@ async def async_update_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
async def async_remove_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Remove a config entry."""
def remove_keyring_files(file_path: Path) -> None:
"""Remove keyring files."""
def remove_files(storage_dir: Path, knxkeys_filename: str | None) -> None:
"""Remove KNX files."""
if knxkeys_filename is not None:
with contextlib.suppress(FileNotFoundError):
(storage_dir / knxkeys_filename).unlink()
with contextlib.suppress(FileNotFoundError):
file_path.unlink()
(storage_dir / PROJECT_STORAGE_KEY).unlink()
with contextlib.suppress(FileNotFoundError):
(storage_dir / TELEGRAMS_STORAGE_KEY).unlink()
with contextlib.suppress(FileNotFoundError, OSError):
file_path.parent.rmdir()
(storage_dir / DOMAIN).rmdir()
if (_knxkeys_file := entry.data.get(CONF_KNX_KNXKEY_FILENAME)) is not None:
file_path = Path(hass.config.path(STORAGE_DIR)) / _knxkeys_file
await hass.async_add_executor_job(remove_keyring_files, file_path)
storage_dir = Path(hass.config.path(STORAGE_DIR))
knxkeys_filename = entry.data.get(CONF_KNX_KNXKEY_FILENAME)
await hass.async_add_executor_job(remove_files, storage_dir, knxkeys_filename)
class KNXModule:
@ -420,11 +425,13 @@ class KNXModule:
async def start(self) -> None:
"""Start XKNX object. Connect to tunneling or Routing device."""
await self.project.load_project()
await self.telegrams.load_history()
await self.xknx.start()
async def stop(self, event: Event | None = None) -> None:
"""Stop XKNX object. Disconnect from tunneling or Routing device."""
await self.xknx.stop()
await self.telegrams.save_history()
def connection_config(self) -> ConnectionConfig:
"""Return the connection_config."""

View file

@ -53,7 +53,7 @@ CONF_KNX_DEFAULT_RATE_LIMIT: Final = 0
DEFAULT_ROUTING_IA: Final = "0.0.240"
CONF_KNX_TELEGRAM_LOG_SIZE: Final = "telegram_log_size"
TELEGRAM_LOG_DEFAULT: Final = 50
TELEGRAM_LOG_DEFAULT: Final = 200
TELEGRAM_LOG_MAX: Final = 5000 # ~2 MB or ~5 hours of reasonable bus load
##

View file

@ -3,8 +3,7 @@ from __future__ import annotations
from collections import deque
from collections.abc import Callable
import datetime as dt
from typing import TypedDict
from typing import Final, TypedDict
from xknx import XKNX
from xknx.exceptions import XKNXException
@ -12,10 +11,15 @@ from xknx.telegram import Telegram
from xknx.telegram.apci import GroupValueResponse, GroupValueWrite
from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback
from homeassistant.helpers.storage import Store
import homeassistant.util.dt as dt_util
from .const import DOMAIN
from .project import KNXProject
STORAGE_VERSION: Final = 1
STORAGE_KEY: Final = f"{DOMAIN}/telegrams_history.json"
class TelegramDict(TypedDict):
"""Represent a Telegram as a dict."""
@ -31,7 +35,7 @@ class TelegramDict(TypedDict):
source: str
source_name: str
telegramtype: str
timestamp: dt.datetime
timestamp: str # ISO format
unit: str | None
value: str | int | float | bool | None
@ -49,6 +53,9 @@ class Telegrams:
"""Initialize Telegrams class."""
self.hass = hass
self.project = project
self._history_store = Store[list[TelegramDict]](
hass, STORAGE_VERSION, STORAGE_KEY
)
self._jobs: list[HassJob[[TelegramDict], None]] = []
self._xknx_telegram_cb_handle = (
xknx.telegram_queue.register_telegram_received_cb(
@ -58,6 +65,24 @@ class Telegrams:
)
self.recent_telegrams: deque[TelegramDict] = deque(maxlen=log_size)
async def load_history(self) -> None:
"""Load history from store."""
if (telegrams := await self._history_store.async_load()) is None:
return
if self.recent_telegrams.maxlen == 0:
await self._history_store.async_remove()
return
for telegram in telegrams:
# tuples are stored as lists in JSON
if isinstance(telegram["payload"], list):
telegram["payload"] = tuple(telegram["payload"]) # type: ignore[unreachable]
self.recent_telegrams.extend(telegrams)
async def save_history(self) -> None:
"""Save history to store."""
if self.recent_telegrams:
await self._history_store.async_save(list(self.recent_telegrams))
async def _xknx_telegram_cb(self, telegram: Telegram) -> None:
"""Handle incoming and outgoing telegrams from xknx."""
telegram_dict = self.telegram_to_dict(telegram)
@ -129,7 +154,7 @@ class Telegrams:
source=f"{telegram.source_address}",
source_name=src_name,
telegramtype=telegram.payload.__class__.__name__,
timestamp=dt_util.as_local(dt_util.utcnow()),
timestamp=dt_util.now().isoformat(),
unit=unit,
value=value,
)

View file

@ -910,7 +910,7 @@ async def test_form_with_automatic_connection_handling(
CONF_KNX_ROUTE_BACK: False,
CONF_KNX_TUNNEL_ENDPOINT_IA: None,
CONF_KNX_STATE_UPDATER: True,
CONF_KNX_TELEGRAM_LOG_SIZE: 50,
CONF_KNX_TELEGRAM_LOG_SIZE: 200,
}
knx_setup.assert_called_once()
@ -1210,7 +1210,7 @@ async def test_options_flow_connection_type(
CONF_KNX_SECURE_DEVICE_AUTHENTICATION: None,
CONF_KNX_SECURE_USER_ID: None,
CONF_KNX_SECURE_USER_PASSWORD: None,
CONF_KNX_TELEGRAM_LOG_SIZE: 50,
CONF_KNX_TELEGRAM_LOG_SIZE: 200,
}

View file

@ -280,7 +280,7 @@ async def test_async_remove_entry(
"pathlib.Path.rmdir"
) as rmdir_mock:
assert await hass.config_entries.async_remove(config_entry.entry_id)
unlink_mock.assert_called_once()
assert unlink_mock.call_count == 3
rmdir_mock.assert_called_once()
await hass.async_block_till_done()

View file

@ -0,0 +1,114 @@
"""KNX Telegrams Tests."""
from copy import copy
from datetime import datetime
from typing import Any
import pytest
from homeassistant.components.knx import DOMAIN
from homeassistant.components.knx.const import CONF_KNX_TELEGRAM_LOG_SIZE
from homeassistant.components.knx.telegrams import TelegramDict
from homeassistant.core import HomeAssistant
from .conftest import KNXTestKit
MOCK_TIMESTAMP = "2023-07-02T14:51:24.045162-07:00"
MOCK_TELEGRAMS = [
{
"destination": "1/3/4",
"destination_name": "",
"direction": "Incoming",
"dpt_main": None,
"dpt_sub": None,
"dpt_name": None,
"payload": True,
"source": "1.2.3",
"source_name": "",
"telegramtype": "GroupValueWrite",
"timestamp": MOCK_TIMESTAMP,
"unit": None,
"value": None,
},
{
"destination": "2/2/2",
"destination_name": "",
"direction": "Outgoing",
"dpt_main": None,
"dpt_sub": None,
"dpt_name": None,
"payload": [1, 2, 3, 4],
"source": "0.0.0",
"source_name": "",
"telegramtype": "GroupValueWrite",
"timestamp": MOCK_TIMESTAMP,
"unit": None,
"value": None,
},
]
def assert_telegram_history(telegrams: list[TelegramDict]) -> bool:
"""Assert that the mock telegrams are equal to the given telegrams. Omitting timestamp."""
assert len(telegrams) == len(MOCK_TELEGRAMS)
for index in range(len(telegrams)):
test_telegram = copy(telegrams[index]) # don't modify the original
comp_telegram = MOCK_TELEGRAMS[index]
assert datetime.fromisoformat(test_telegram["timestamp"])
if isinstance(test_telegram["payload"], tuple):
# JSON encodes tuples to lists
test_telegram["payload"] = list(test_telegram["payload"])
assert test_telegram | {"timestamp": MOCK_TIMESTAMP} == comp_telegram
return True
async def test_store_telegam_history(
hass: HomeAssistant,
knx: KNXTestKit,
hass_storage: dict[str, Any],
):
"""Test storing telegram history."""
await knx.setup_integration({})
await knx.receive_write("1/3/4", True)
await hass.services.async_call(
"knx", "send", {"address": "2/2/2", "payload": [1, 2, 3, 4]}, blocking=True
)
await knx.assert_write("2/2/2", (1, 2, 3, 4))
assert len(hass.data[DOMAIN].telegrams.recent_telegrams) == 2
with pytest.raises(KeyError):
hass_storage["knx/telegrams_history.json"]
await hass.config_entries.async_unload(knx.mock_config_entry.entry_id)
saved_telegrams = hass_storage["knx/telegrams_history.json"]["data"]
assert assert_telegram_history(saved_telegrams)
async def test_load_telegam_history(
hass: HomeAssistant,
knx: KNXTestKit,
hass_storage: dict[str, Any],
):
"""Test telegram history restoration."""
hass_storage["knx/telegrams_history.json"] = {"version": 1, "data": MOCK_TELEGRAMS}
await knx.setup_integration({})
loaded_telegrams = hass.data[DOMAIN].telegrams.recent_telegrams
assert assert_telegram_history(loaded_telegrams)
# TelegramDict "payload" is a tuple, this shall be restored when loading from JSON
assert isinstance(loaded_telegrams[1]["payload"], tuple)
async def test_remove_telegam_history(
hass: HomeAssistant,
knx: KNXTestKit,
hass_storage: dict[str, Any],
):
"""Test telegram history removal when configured to size 0."""
hass_storage["knx/telegrams_history.json"] = {"version": 1, "data": MOCK_TELEGRAMS}
knx.mock_config_entry.data = knx.mock_config_entry.data | {
CONF_KNX_TELEGRAM_LOG_SIZE: 0
}
await knx.setup_integration({})
# Store.async_remove() is mocked by hass_storage - check that data was removed.
assert "knx/telegrams_history.json" not in hass_storage
assert not hass.data[DOMAIN].telegrams.recent_telegrams