hass-core/tests/components/dsmr/test_diagnostics.py
Joakim Plate 42610f4e09
Add diagnostic information to DSMR (#122041)
* Add diagnostic information to DSMR

Switches to runtime_data to get access
to the last telegram received.

* Correct import of domain

* Apply suggestions from code review

Co-authored-by: G Johansson <goran.johansson@shiftit.se>

---------

Co-authored-by: G Johansson <goran.johansson@shiftit.se>
2024-07-18 11:57:11 +02:00

82 lines
2.3 KiB
Python

"""Test DSMR diagnostics."""
import datetime
from decimal import Decimal
from unittest.mock import MagicMock
from dsmr_parser.obis_references import (
CURRENT_ELECTRICITY_USAGE,
ELECTRICITY_ACTIVE_TARIFF,
GAS_METER_READING,
)
from dsmr_parser.objects import CosemObject, MBusObject, Telegram
from syrupy.assertion import SnapshotAssertion
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
from tests.components.diagnostics import get_diagnostics_for_config_entry
from tests.typing import ClientSessionGenerator
async def test_diagnostics(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
dsmr_connection_fixture: tuple[MagicMock, MagicMock, MagicMock],
snapshot: SnapshotAssertion,
) -> None:
"""Test diagnostics."""
(connection_factory, transport, protocol) = dsmr_connection_fixture
entry_data = {
"port": "/dev/ttyUSB0",
"dsmr_version": "2.2",
"serial_id": "1234",
"serial_id_gas": "5678",
}
entry_options = {
"time_between_update": 0,
}
telegram = Telegram()
telegram.add(
CURRENT_ELECTRICITY_USAGE,
CosemObject(
(0, 0),
[{"value": Decimal("0.0"), "unit": "W"}],
),
"CURRENT_ELECTRICITY_USAGE",
)
telegram.add(
ELECTRICITY_ACTIVE_TARIFF,
CosemObject((0, 0), [{"value": "0001", "unit": ""}]),
"ELECTRICITY_ACTIVE_TARIFF",
)
telegram.add(
GAS_METER_READING,
MBusObject(
(0, 0),
[
{"value": datetime.datetime.fromtimestamp(1551642213)},
{"value": Decimal(745.695), "unit": ""},
],
),
"GAS_METER_READING",
)
mock_entry = MockConfigEntry(
domain="dsmr", unique_id="/dev/ttyUSB0", data=entry_data, options=entry_options
)
mock_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_entry.entry_id)
await hass.async_block_till_done()
telegram_callback = connection_factory.call_args_list[0][0][2]
# simulate a telegram pushed from the smartmeter and parsed by dsmr_parser
telegram_callback(telegram)
result = await get_diagnostics_for_config_entry(hass, hass_client, mock_entry)
assert result == snapshot