DSMR: TCP, reconnecting and V4 CRC support (#5164)
* Add support for TCP connection. * Implement reconnect logic. * Actually register connect loop task and fix error handling. * Fix lint, configure upstream requirement. * Revert debug logging. * Explicitly catch connection errors. * Test reconnect on setup and reconnect after disconnect. * Style.
This commit is contained in:
parent
40ba4fd872
commit
915a91dc1b
4 changed files with 177 additions and 25 deletions
|
@ -1,5 +1,4 @@
|
|||
"""
|
||||
Support for Dutch Smart Meter Requirements.
|
||||
"""Support for Dutch Smart Meter Requirements.
|
||||
|
||||
Also known as: Smartmeter or P1 port.
|
||||
|
||||
|
@ -24,23 +23,27 @@ DSMR version the Entities for this component are create during bootstrap.
|
|||
Another loop (DSMR class) is setup which reads the telegram queue,
|
||||
stores/caches the latest telegram and notifies the Entities that the telegram
|
||||
has been updated.
|
||||
|
||||
"""
|
||||
import asyncio
|
||||
from datetime import timedelta
|
||||
from functools import partial
|
||||
import logging
|
||||
|
||||
from homeassistant.components.sensor import PLATFORM_SCHEMA
|
||||
from homeassistant.const import (
|
||||
CONF_PORT, EVENT_HOMEASSISTANT_STOP, STATE_UNKNOWN)
|
||||
CONF_HOST, CONF_PORT, EVENT_HOMEASSISTANT_STOP, STATE_UNKNOWN)
|
||||
from homeassistant.core import CoreState
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.entity import Entity
|
||||
import voluptuous as vol
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
REQUIREMENTS = ['dsmr_parser==0.4']
|
||||
REQUIREMENTS = ['dsmr_parser==0.6']
|
||||
|
||||
CONF_DSMR_VERSION = 'dsmr_version'
|
||||
CONF_RECONNECT_INTERVAL = 'reconnect_interval'
|
||||
|
||||
DEFAULT_DSMR_VERSION = '2.2'
|
||||
DEFAULT_PORT = '/dev/ttyUSB0'
|
||||
|
@ -51,11 +54,14 @@ ICON_POWER = 'mdi:flash'
|
|||
|
||||
# Smart meter sends telegram every 10 seconds
|
||||
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=10)
|
||||
RECONNECT_INTERVAL = 5
|
||||
|
||||
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
|
||||
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.string,
|
||||
vol.Optional(CONF_HOST, default=None): cv.string,
|
||||
vol.Optional(CONF_DSMR_VERSION, default=DEFAULT_DSMR_VERSION): vol.All(
|
||||
cv.string, vol.In(['4', '2.2'])),
|
||||
vol.Optional(CONF_RECONNECT_INTERVAL, default=30): int,
|
||||
})
|
||||
|
||||
|
||||
|
@ -66,7 +72,8 @@ def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
|
|||
logging.getLogger('dsmr_parser').setLevel(logging.ERROR)
|
||||
|
||||
from dsmr_parser import obis_references as obis_ref
|
||||
from dsmr_parser.protocol import create_dsmr_reader
|
||||
from dsmr_parser.protocol import create_dsmr_reader, create_tcp_dsmr_reader
|
||||
import serial
|
||||
|
||||
dsmr_version = config[CONF_DSMR_VERSION]
|
||||
|
||||
|
@ -105,15 +112,55 @@ def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
|
|||
device.telegram = telegram
|
||||
hass.async_add_job(device.async_update_ha_state)
|
||||
|
||||
# Creates a asyncio.Protocol for reading DSMR telegrams from serial
|
||||
# Creates a asyncio.Protocol factory for reading DSMR telegrams from serial
|
||||
# and calls update_entities_telegram to update entities on arrival
|
||||
dsmr = create_dsmr_reader(config[CONF_PORT], config[CONF_DSMR_VERSION],
|
||||
update_entities_telegram, loop=hass.loop)
|
||||
if config[CONF_HOST]:
|
||||
reader_factory = partial(create_tcp_dsmr_reader,
|
||||
config[CONF_HOST],
|
||||
config[CONF_PORT],
|
||||
config[CONF_DSMR_VERSION],
|
||||
update_entities_telegram,
|
||||
loop=hass.loop)
|
||||
else:
|
||||
reader_factory = partial(create_dsmr_reader,
|
||||
config[CONF_PORT],
|
||||
config[CONF_DSMR_VERSION],
|
||||
update_entities_telegram,
|
||||
loop=hass.loop)
|
||||
|
||||
# Start DSMR asycnio.Protocol reader
|
||||
transport, _ = yield from hass.loop.create_task(dsmr)
|
||||
@asyncio.coroutine
|
||||
def connect_and_reconnect():
|
||||
"""Connect to DSMR and keep reconnecting until HA stops."""
|
||||
while hass.state != CoreState.stopping:
|
||||
# Start DSMR asycnio.Protocol reader
|
||||
try:
|
||||
transport, protocol = yield from hass.loop.create_task(
|
||||
reader_factory())
|
||||
except (serial.serialutil.SerialException, ConnectionRefusedError,
|
||||
TimeoutError):
|
||||
# log any error while establishing connection and drop to retry
|
||||
# connection wait
|
||||
_LOGGER.exception('error connecting to DSMR')
|
||||
transport = None
|
||||
|
||||
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, transport.close)
|
||||
if transport:
|
||||
# register listener to close transport on HA shutdown
|
||||
stop_listerer = hass.bus.async_listen_once(
|
||||
EVENT_HOMEASSISTANT_STOP, transport.close)
|
||||
|
||||
# wait for reader to close
|
||||
yield from protocol.wait_closed()
|
||||
|
||||
if hass.state != CoreState.stopping:
|
||||
if transport:
|
||||
# remove listerer
|
||||
stop_listerer()
|
||||
|
||||
# throttle reconnect attempts
|
||||
yield from asyncio.sleep(config[CONF_RECONNECT_INTERVAL],
|
||||
loop=hass.loop)
|
||||
|
||||
hass.loop.create_task(connect_and_reconnect())
|
||||
|
||||
|
||||
class DSMREntity(Entity):
|
||||
|
@ -187,6 +234,7 @@ class DerivativeDSMREntity(DSMREntity):
|
|||
Gas readings are only reported per hour and don't offer a rate only
|
||||
the current meter reading. This entity converts subsequents readings
|
||||
into a hourly rate.
|
||||
|
||||
"""
|
||||
|
||||
_previous_reading = None
|
||||
|
@ -202,10 +250,11 @@ class DerivativeDSMREntity(DSMREntity):
|
|||
def async_update(self):
|
||||
"""Recalculate hourly rate if timestamp has changed.
|
||||
|
||||
DSMR updates gas meter reading every hour. Along with the
|
||||
new value a timestamp is provided for the reading. Test
|
||||
if the last known timestamp differs from the current one
|
||||
then calculate a new rate for the previous hour.
|
||||
DSMR updates gas meter reading every hour. Along with the new
|
||||
value a timestamp is provided for the reading. Test if the last
|
||||
known timestamp differs from the current one then calculate a
|
||||
new rate for the previous hour.
|
||||
|
||||
"""
|
||||
# check if the timestamp for the object differs from the previous one
|
||||
timestamp = self.get_dsmr_object_attr('datetime')
|
||||
|
|
|
@ -103,7 +103,7 @@ dnspython3==1.15.0
|
|||
dovado==0.1.15
|
||||
|
||||
# homeassistant.components.sensor.dsmr
|
||||
dsmr_parser==0.4
|
||||
dsmr_parser==0.6
|
||||
|
||||
# homeassistant.components.dweet
|
||||
# homeassistant.components.sensor.dweet
|
||||
|
|
|
@ -16,3 +16,4 @@ pytest-sugar>=0.7.1
|
|||
requests_mock>=1.0
|
||||
mock-open>=1.3.1
|
||||
flake8-docstrings==1.0.2
|
||||
asynctest>=0.8.0
|
||||
|
|
|
@ -1,22 +1,51 @@
|
|||
"""Test for DSMR components.
|
||||
|
||||
Tests setup of the DSMR component and ensure incoming telegrams cause Entity
|
||||
to be updated with new values.
|
||||
Tests setup of the DSMR component and ensure incoming telegrams cause
|
||||
Entity to be updated with new values.
|
||||
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from decimal import Decimal
|
||||
from unittest.mock import Mock
|
||||
|
||||
import asynctest
|
||||
from homeassistant.bootstrap import async_setup_component
|
||||
from homeassistant.components.sensor.dsmr import DerivativeDSMREntity
|
||||
from homeassistant.const import STATE_UNKNOWN
|
||||
from tests.common import assert_setup_component, mock_coro
|
||||
import pytest
|
||||
from tests.common import assert_setup_component
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_connection_factory(monkeypatch):
|
||||
"""Mock the create functions for serial and TCP Asyncio connections."""
|
||||
from dsmr_parser.protocol import DSMRProtocol
|
||||
transport = asynctest.Mock(spec=asyncio.Transport)
|
||||
protocol = asynctest.Mock(spec=DSMRProtocol)
|
||||
|
||||
@asyncio.coroutine
|
||||
def connection_factory(*args, **kwargs):
|
||||
"""Return mocked out Asyncio classes."""
|
||||
return (transport, protocol)
|
||||
connection_factory = Mock(wraps=connection_factory)
|
||||
|
||||
# apply the mock to both connection factories
|
||||
monkeypatch.setattr(
|
||||
'dsmr_parser.protocol.create_dsmr_reader',
|
||||
connection_factory)
|
||||
monkeypatch.setattr(
|
||||
'dsmr_parser.protocol.create_tcp_dsmr_reader',
|
||||
connection_factory)
|
||||
|
||||
return connection_factory, transport, protocol
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_default_setup(hass, monkeypatch):
|
||||
def test_default_setup(hass, mock_connection_factory):
|
||||
"""Test the default setup."""
|
||||
(connection_factory, transport, protocol) = mock_connection_factory
|
||||
|
||||
from dsmr_parser.obis_references import (
|
||||
CURRENT_ELECTRICITY_USAGE,
|
||||
ELECTRICITY_ACTIVE_TARIFF,
|
||||
|
@ -34,15 +63,11 @@ def test_default_setup(hass, monkeypatch):
|
|||
]),
|
||||
}
|
||||
|
||||
# mock for injecting DSMR telegram
|
||||
dsmr = Mock(return_value=mock_coro([Mock(), None]))
|
||||
monkeypatch.setattr('dsmr_parser.protocol.create_dsmr_reader', dsmr)
|
||||
|
||||
with assert_setup_component(1):
|
||||
yield from async_setup_component(hass, 'sensor',
|
||||
{'sensor': config})
|
||||
|
||||
telegram_callback = dsmr.call_args_list[0][0][2]
|
||||
telegram_callback = connection_factory.call_args_list[0][0][2]
|
||||
|
||||
# make sure entities have been created and return 'unknown' state
|
||||
power_consumption = hass.states.get('sensor.power_consumption')
|
||||
|
@ -99,3 +124,80 @@ def test_derivative():
|
|||
'state should be difference between first and second update'
|
||||
|
||||
assert entity.unit_of_measurement == 'm3/h'
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_tcp(hass, mock_connection_factory):
|
||||
"""If proper config provided TCP connection should be made."""
|
||||
(connection_factory, transport, protocol) = mock_connection_factory
|
||||
|
||||
config = {
|
||||
'platform': 'dsmr',
|
||||
'host': 'localhost',
|
||||
'port': 1234,
|
||||
}
|
||||
|
||||
with assert_setup_component(1):
|
||||
yield from async_setup_component(hass, 'sensor',
|
||||
{'sensor': config})
|
||||
|
||||
assert connection_factory.call_args_list[0][0][0] == 'localhost'
|
||||
assert connection_factory.call_args_list[0][0][1] == '1234'
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_connection_errors_retry(hass, monkeypatch, mock_connection_factory):
|
||||
"""Connection should be retried on error during setup."""
|
||||
(connection_factory, transport, protocol) = mock_connection_factory
|
||||
|
||||
config = {
|
||||
'platform': 'dsmr',
|
||||
'reconnect_interval': 0,
|
||||
}
|
||||
|
||||
# override the mock to have it fail the first time
|
||||
first_fail_connection_factory = Mock(
|
||||
wraps=connection_factory, side_effect=[
|
||||
TimeoutError])
|
||||
|
||||
monkeypatch.setattr(
|
||||
'dsmr_parser.protocol.create_dsmr_reader',
|
||||
first_fail_connection_factory)
|
||||
yield from async_setup_component(hass, 'sensor', {'sensor': config})
|
||||
|
||||
# wait for sleep to resolve
|
||||
yield from hass.async_block_till_done()
|
||||
assert first_fail_connection_factory.call_count == 2, \
|
||||
'connecting not retried'
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def test_reconnect(hass, monkeypatch, mock_connection_factory):
|
||||
"""If transport disconnects, the connection should be retried."""
|
||||
(connection_factory, transport, protocol) = mock_connection_factory
|
||||
config = {
|
||||
'platform': 'dsmr',
|
||||
'reconnect_interval': 0,
|
||||
}
|
||||
|
||||
# mock waiting coroutine while connection lasts
|
||||
closed = asyncio.Event(loop=hass.loop)
|
||||
|
||||
@asyncio.coroutine
|
||||
def wait_closed():
|
||||
yield from closed.wait()
|
||||
protocol.wait_closed = wait_closed
|
||||
|
||||
yield from async_setup_component(hass, 'sensor', {'sensor': config})
|
||||
|
||||
assert connection_factory.call_count == 1
|
||||
|
||||
# indicate disconnect, release wait lock and allow reconnect to happen
|
||||
closed.set()
|
||||
# wait for lock set to resolve
|
||||
yield from hass.async_block_till_done()
|
||||
# wait for sleep to resolve
|
||||
yield from hass.async_block_till_done()
|
||||
|
||||
assert connection_factory.call_count == 2, \
|
||||
'connecting not retried'
|
||||
|
|
Loading…
Add table
Reference in a new issue