Strictly type modbus.py. (#56375)

This commit is contained in:
jan iversen 2021-09-20 18:47:05 +02:00 committed by GitHub
parent 4c4bd740f3
commit f3ad4ca0cc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -4,12 +4,19 @@ from __future__ import annotations
import asyncio
from collections import namedtuple
import logging
from typing import Any, Callable
from pymodbus.client.sync import ModbusSerialClient, ModbusTcpClient, ModbusUdpClient
from pymodbus.client.sync import (
BaseModbusClient,
ModbusSerialClient,
ModbusTcpClient,
ModbusUdpClient,
)
from pymodbus.constants import Defaults
from pymodbus.exceptions import ModbusException
from pymodbus.pdu import ModbusResponse
from pymodbus.transaction import ModbusRtuFramer
import voluptuous as vol
from homeassistant.const import (
CONF_DELAY,
@ -21,10 +28,11 @@ from homeassistant.const import (
CONF_TYPE,
EVENT_HOMEASSISTANT_STOP,
)
from homeassistant.core import callback
from homeassistant.core import HomeAssistant, ServiceCall, callback
from homeassistant.helpers.discovery import async_load_platform
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_call_later
from homeassistant.helpers.event import Event, async_call_later
from homeassistant.helpers.typing import ConfigType
from .const import (
ATTR_ADDRESS,
@ -113,12 +121,12 @@ PYMODBUS_CALL = [
async def async_modbus_setup(
hass,
config,
service_write_register_schema,
service_write_coil_schema,
service_stop_start_schema,
):
hass: HomeAssistant,
config: ConfigType,
service_write_register_schema: vol.Schema,
service_write_coil_schema: vol.Schema,
service_stop_start_schema: vol.Schema,
) -> bool:
"""Set up Modbus component."""
hass.data[DOMAIN] = hub_collect = {}
@ -138,7 +146,7 @@ async def async_modbus_setup(
async_load_platform(hass, component, DOMAIN, conf_hub, config)
)
async def async_stop_modbus(event):
async def async_stop_modbus(event: Event) -> None:
"""Stop Modbus service."""
async_dispatcher_send(hass, SIGNAL_STOP_ENTITY)
@ -147,7 +155,7 @@ async def async_modbus_setup(
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, async_stop_modbus)
async def async_write_register(service):
async def async_write_register(service: ServiceCall) -> None:
"""Write Modbus registers."""
unit = int(float(service.data[ATTR_UNIT]))
address = int(float(service.data[ATTR_ADDRESS]))
@ -171,7 +179,7 @@ async def async_modbus_setup(
schema=service_write_register_schema,
)
async def async_write_coil(service):
async def async_write_coil(service: ServiceCall) -> None:
"""Write Modbus coil."""
unit = service.data[ATTR_UNIT]
address = service.data[ATTR_ADDRESS]
@ -188,7 +196,7 @@ async def async_modbus_setup(
DOMAIN, SERVICE_WRITE_COIL, async_write_coil, schema=service_write_coil_schema
)
async def async_stop_hub(service):
async def async_stop_hub(service: ServiceCall) -> None:
"""Stop Modbus hub."""
async_dispatcher_send(hass, SIGNAL_STOP_ENTITY)
hub = hub_collect[service.data[ATTR_HUB]]
@ -198,7 +206,7 @@ async def async_modbus_setup(
DOMAIN, SERVICE_STOP, async_stop_hub, schema=service_stop_start_schema
)
async def async_restart_hub(service):
async def async_restart_hub(service: ServiceCall) -> None:
"""Restart Modbus hub."""
async_dispatcher_send(hass, SIGNAL_START_ENTITY)
hub = hub_collect[service.data[ATTR_HUB]]
@ -213,19 +221,19 @@ async def async_modbus_setup(
class ModbusHub:
"""Thread safe wrapper class for pymodbus."""
def __init__(self, hass, client_config):
def __init__(self, hass: HomeAssistant, client_config: dict[str, Any]) -> None:
"""Initialize the Modbus hub."""
# generic configuration
self._client = None
self._async_cancel_listener = None
self._client: BaseModbusClient | None = None
self._async_cancel_listener: Callable[[], None] | None = None
self._in_error = False
self._lock = asyncio.Lock()
self.hass = hass
self.name = client_config[CONF_NAME]
self._config_type = client_config[CONF_TYPE]
self._config_delay = client_config[CONF_DELAY]
self._pb_call = {}
self._pb_call: dict[str, RunEntry] = {}
self._pb_class = {
SERIAL: ModbusSerialClient,
TCP: ModbusTcpClient,
@ -264,7 +272,7 @@ class ModbusHub:
else:
self._msg_wait = 0
def _log_error(self, text: str, error_state=True):
def _log_error(self, text: str, error_state: bool = True) -> None:
log_text = f"Pymodbus: {self.name}: {text}"
if self._in_error:
_LOGGER.debug(log_text)
@ -272,7 +280,7 @@ class ModbusHub:
_LOGGER.error(log_text)
self._in_error = error_state
async def async_setup(self):
async def async_setup(self) -> bool:
"""Set up pymodbus client."""
try:
self._client = self._pb_class[self._config_type](**self._pb_params)
@ -287,7 +295,7 @@ class ModbusHub:
await self.async_connect_task()
return True
async def async_connect_task(self):
async def async_connect_task(self) -> None:
"""Try to connect, and retry if needed."""
async with self._lock:
if not await self.hass.async_add_executor_job(self._pymodbus_connect):
@ -302,19 +310,19 @@ class ModbusHub:
)
@callback
def async_end_delay(self, args):
def async_end_delay(self, args: Any) -> None:
"""End startup delay."""
self._async_cancel_listener = None
self._config_delay = 0
async def async_restart(self):
async def async_restart(self) -> None:
"""Reconnect client."""
if self._client:
await self.async_close()
await self.async_setup()
async def async_close(self):
async def async_close(self) -> None:
"""Disconnect client."""
if self._async_cancel_listener:
self._async_cancel_listener()
@ -330,8 +338,10 @@ class ModbusHub:
message = f"modbus {self.name} communication closed"
_LOGGER.warning(message)
def _pymodbus_connect(self):
def _pymodbus_connect(self) -> bool:
"""Connect client."""
if not self._client:
return False
try:
self._client.connect()
except ModbusException as exception_error:
@ -342,7 +352,9 @@ class ModbusHub:
_LOGGER.warning(message)
return True
def _pymodbus_call(self, unit, address, value, use_call):
def _pymodbus_call(
self, unit: int, address: int, value: int | list[int], use_call: str
) -> ModbusResponse:
"""Call sync. pymodbus."""
kwargs = {"unit": unit} if unit else {}
entry = self._pb_call[use_call]
@ -359,10 +371,10 @@ class ModbusHub:
async def async_pymodbus_call(
self,
unit: str | int | None,
unit: int | None,
address: int,
value: str | int,
use_call: str | None,
value: int | list[int],
use_call: str,
) -> ModbusResponse | None:
"""Convert async to sync pymodbus call."""
if self._config_delay: