diff --git a/homeassistant/components/rfxtrx/__init__.py b/homeassistant/components/rfxtrx/__init__.py index 7fc743d021f..e380e1734e6 100644 --- a/homeassistant/components/rfxtrx/__init__.py +++ b/homeassistant/components/rfxtrx/__init__.py @@ -1,9 +1,12 @@ """Support for RFXtrx devices.""" +from __future__ import annotations + import asyncio import binascii import copy import functools import logging +from typing import NamedTuple import RFXtrx as rfxtrxmod import async_timeout @@ -49,6 +52,14 @@ SIGNAL_EVENT = f"{DOMAIN}_event" _LOGGER = logging.getLogger(__name__) +class DeviceTuple(NamedTuple): + """Representation of a device in rfxtrx.""" + + packettype: str + subtype: str + id_string: str + + def _bytearray_string(data): val = cv.string(data) try: @@ -225,7 +236,7 @@ async def async_setup_internal(hass, entry: config_entries.ConfigEntry): hass.services.async_register(DOMAIN, SERVICE_SEND, send, schema=SERVICE_SEND_SCHEMA) -def get_rfx_object(packetid): +def get_rfx_object(packetid: str) -> rfxtrxmod.RFXtrxEvent | None: """Return the RFXObject with the packetid.""" try: binarypacket = bytearray.fromhex(packetid) @@ -246,10 +257,10 @@ def get_rfx_object(packetid): return obj -def get_pt2262_deviceid(device_id, nb_data_bits): +def get_pt2262_deviceid(device_id: str, nb_data_bits: int | None) -> bytes | None: """Extract and return the address bits from a Lighting4/PT2262 packet.""" if nb_data_bits is None: - return + return None try: data = bytearray.fromhex(device_id) @@ -262,7 +273,7 @@ def get_pt2262_deviceid(device_id, nb_data_bits): return binascii.hexlify(data) -def get_pt2262_cmd(device_id, data_bits): +def get_pt2262_cmd(device_id: str, data_bits: int) -> str | None: """Extract and return the data bits from a Lighting4/PT2262 packet.""" try: data = bytearray.fromhex(device_id) @@ -274,7 +285,9 @@ def get_pt2262_cmd(device_id, data_bits): return hex(data[-1] & mask) -def get_device_data_bits(device, devices): +def get_device_data_bits( + device: rfxtrxmod.RFXtrxDevice, devices: dict[DeviceTuple, dict] +) -> int | None: """Deduce data bits for device based on a cache of device bits.""" data_bits = None if device.packettype == DEVICE_PACKET_TYPE_LIGHTING4: @@ -286,7 +299,7 @@ def get_device_data_bits(device, devices): return data_bits -def find_possible_pt2262_device(device_ids, device_id): +def find_possible_pt2262_device(device_ids: list[str], device_id: str) -> str | None: """Look for the device which id matches the given device_id parameter.""" for dev_id in device_ids: if len(dev_id) == len(device_id): @@ -313,9 +326,11 @@ def find_possible_pt2262_device(device_ids, device_id): return None -def get_device_id(device, data_bits=None): +def get_device_id( + device: rfxtrxmod.RFXtrxDevice, data_bits: int | None = None +) -> DeviceTuple: """Calculate a device id for device.""" - id_string = device.id_string + id_string: str = device.id_string if ( data_bits and device.packettype == DEVICE_PACKET_TYPE_LIGHTING4 @@ -323,7 +338,7 @@ def get_device_id(device, data_bits=None): ): id_string = masked_id.decode("ASCII") - return (f"{device.packettype:x}", f"{device.subtype:x}", id_string) + return DeviceTuple(f"{device.packettype:x}", f"{device.subtype:x}", id_string) def connect_auto_add(hass, entry_data, callback_fun): @@ -340,7 +355,15 @@ class RfxtrxEntity(RestoreEntity): Contains the common logic for Rfxtrx lights and switches. """ - def __init__(self, device, device_id, event=None): + _device: rfxtrxmod.RFXtrxDevice + _event: rfxtrxmod.RFXtrxEvent | None + + def __init__( + self, + device: rfxtrxmod.RFXtrxDevice, + device_id: DeviceTuple, + event: rfxtrxmod.RFXtrxEvent | None = None, + ) -> None: """Initialize the device.""" self._name = f"{device.type_string} {device.id_string}" self._device = device @@ -405,21 +428,28 @@ class RfxtrxEntity(RestoreEntity): name=f"{self._device.type_string} {self._device.id_string}", ) - def _event_applies(self, event, device_id): + def _event_applies(self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple): """Check if event applies to me.""" - if "Command" in event.values and event.values["Command"] in COMMAND_GROUP_LIST: - (group_id, _, _) = event.device.id_string.partition(":") - return group_id == self._group_id + if isinstance(event, rfxtrxmod.ControlEvent): + if ( + "Command" in event.values + and event.values["Command"] in COMMAND_GROUP_LIST + ): + device: rfxtrxmod.RFXtrxDevice = event.device + (group_id, _, _) = device.id_string.partition(":") + return group_id == self._group_id # Otherwise, the event only applies to the matching device. return device_id == self._device_id - def _apply_event(self, event): + def _apply_event(self, event: rfxtrxmod.RFXtrxEvent) -> None: """Apply a received event.""" self._event = event @callback - def _handle_event(self, event, device_id): + def _handle_event( + self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple + ) -> None: """Handle a reception of data, overridden by other classes.""" @@ -429,11 +459,17 @@ class RfxtrxCommandEntity(RfxtrxEntity): Contains the common logic for Rfxtrx lights and switches. """ - def __init__(self, device, device_id, signal_repetitions=1, event=None): + def __init__( + self, + device: rfxtrxmod.RFXtrxDevice, + device_id: DeviceTuple, + signal_repetitions: int = 1, + event: rfxtrxmod.RFXtrxEvent | None = None, + ) -> None: """Initialzie a switch or light device.""" super().__init__(device, device_id, event=event) self.signal_repetitions = signal_repetitions - self._state = None + self._state: bool | None = None async def _async_send(self, fun, *args): rfx_object = self.hass.data[DOMAIN][DATA_RFXOBJECT] diff --git a/homeassistant/components/rfxtrx/binary_sensor.py b/homeassistant/components/rfxtrx/binary_sensor.py index cc11e94c526..d7c0ea306d8 100644 --- a/homeassistant/components/rfxtrx/binary_sensor.py +++ b/homeassistant/components/rfxtrx/binary_sensor.py @@ -17,10 +17,11 @@ from homeassistant.const import ( CONF_DEVICES, STATE_ON, ) -from homeassistant.core import callback +from homeassistant.core import CALLBACK_TYPE, callback from homeassistant.helpers import event as evt from . import ( + DeviceTuple, RfxtrxEntity, connect_auto_add, find_possible_pt2262_device, @@ -83,7 +84,7 @@ SENSOR_TYPES = ( SENSOR_TYPES_DICT = {desc.key: desc for desc in SENSOR_TYPES} -def supported(event): +def supported(event: rfxtrxmod.RFXtrxEvent): """Return whether an event supports binary_sensor.""" if isinstance(event, rfxtrxmod.ControlEvent): return True @@ -103,8 +104,8 @@ async def async_setup_entry( """Set up platform.""" sensors = [] - device_ids = set() - pt2262_devices = [] + device_ids: set[DeviceTuple] = set() + pt2262_devices: list[str] = [] discovery_info = config_entry.data @@ -127,25 +128,29 @@ async def async_setup_entry( continue device_ids.add(device_id) - if event.device.packettype == DEVICE_PACKET_TYPE_LIGHTING4: - find_possible_pt2262_device(pt2262_devices, event.device.id_string) - pt2262_devices.append(event.device.id_string) + device: rfxtrxmod.RFXtrxDevice = event.device - device = RfxtrxBinarySensor( - event.device, + if device.packettype == DEVICE_PACKET_TYPE_LIGHTING4: + find_possible_pt2262_device(pt2262_devices, device.id_string) + pt2262_devices.append(device.id_string) + + entity = RfxtrxBinarySensor( + device, device_id, - get_sensor_description(event.device.type_string), + get_sensor_description(device.type_string), entity_info.get(CONF_OFF_DELAY), entity_info.get(CONF_DATA_BITS), entity_info.get(CONF_COMMAND_ON), entity_info.get(CONF_COMMAND_OFF), ) - sensors.append(device) + sensors.append(entity) async_add_entities(sensors) @callback - def binary_sensor_update(event, device_id): + def binary_sensor_update( + event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple + ) -> None: """Call for control updates from the RFXtrx gateway.""" if not supported(event): return @@ -179,22 +184,22 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity): def __init__( self, - device, - device_id, - entity_description, - off_delay=None, - data_bits=None, - cmd_on=None, - cmd_off=None, - event=None, - ): + device: rfxtrxmod.RFXtrxDevice, + device_id: DeviceTuple, + entity_description: BinarySensorEntityDescription, + off_delay: float | None = None, + data_bits: int | None = None, + cmd_on: int | None = None, + cmd_off: int | None = None, + event: rfxtrxmod.RFXtrxEvent | None = None, + ) -> None: """Initialize the RFXtrx sensor.""" super().__init__(device, device_id, event=event) self.entity_description = entity_description self._data_bits = data_bits self._off_delay = off_delay - self._state = None - self._delay_listener = None + self._state: bool | None = None + self._delay_listener: CALLBACK_TYPE | None = None self._cmd_on = cmd_on self._cmd_off = cmd_off @@ -220,11 +225,12 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity): """Return true if the sensor state is True.""" return self._state - def _apply_event_lighting4(self, event): + def _apply_event_lighting4(self, event: rfxtrxmod.RFXtrxEvent): """Apply event for a lighting 4 device.""" if self._data_bits is not None: - cmd = get_pt2262_cmd(event.device.id_string, self._data_bits) - cmd = int(cmd, 16) + cmdstr = get_pt2262_cmd(event.device.id_string, self._data_bits) + assert cmdstr + cmd = int(cmdstr, 16) if cmd == self._cmd_on: self._state = True elif cmd == self._cmd_off: @@ -232,7 +238,8 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity): else: self._state = True - def _apply_event_standard(self, event): + def _apply_event_standard(self, event: rfxtrxmod.RFXtrxEvent): + assert isinstance(event, (rfxtrxmod.SensorEvent, rfxtrxmod.ControlEvent)) if event.values.get("Command") in COMMAND_ON_LIST: self._state = True elif event.values.get("Command") in COMMAND_OFF_LIST: @@ -242,7 +249,7 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity): elif event.values.get("Sensor Status") in SENSOR_STATUS_OFF: self._state = False - def _apply_event(self, event): + def _apply_event(self, event: rfxtrxmod.RFXtrxEvent): """Apply command from rfxtrx.""" super()._apply_event(event) if event.device.packettype == DEVICE_PACKET_TYPE_LIGHTING4: @@ -251,7 +258,7 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity): self._apply_event_standard(event) @callback - def _handle_event(self, event, device_id): + def _handle_event(self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple): """Check if event applies to me and update.""" if not self._event_applies(event, device_id): return diff --git a/homeassistant/components/rfxtrx/config_flow.py b/homeassistant/components/rfxtrx/config_flow.py index cf1069a7907..1ec74c2415a 100644 --- a/homeassistant/components/rfxtrx/config_flow.py +++ b/homeassistant/components/rfxtrx/config_flow.py @@ -1,6 +1,9 @@ """Config flow for RFXCOM RFXtrx integration.""" +from __future__ import annotations + import copy import os +from typing import TypedDict, cast import RFXtrx as rfxtrxmod import serial @@ -22,6 +25,8 @@ from homeassistant.const import ( from homeassistant.core import callback from homeassistant.helpers import config_validation as cv from homeassistant.helpers.device_registry import ( + DeviceEntry, + DeviceRegistry, async_entries_for_config_entry, async_get_registry as async_get_device_registry, ) @@ -30,7 +35,7 @@ from homeassistant.helpers.entity_registry import ( async_get_registry as async_get_entity_registry, ) -from . import DOMAIN, get_device_id, get_rfx_object +from . import DOMAIN, DeviceTuple, get_device_id, get_rfx_object from .binary_sensor import supported as binary_supported from .const import ( CONF_AUTOMATIC_ADD, @@ -53,6 +58,13 @@ CONF_EVENT_CODE = "event_code" CONF_MANUAL_PATH = "Enter Manually" +class DeviceData(TypedDict): + """Dict data representing a device entry.""" + + event_code: str + device_id: DeviceTuple + + def none_or_int(value, base): """Check if strin is one otherwise convert to int.""" if value is None: @@ -63,16 +75,17 @@ def none_or_int(value, base): class OptionsFlow(config_entries.OptionsFlow): """Handle Rfxtrx options.""" + _device_registry: DeviceRegistry + _device_entries: list[DeviceEntry] + def __init__(self, config_entry: ConfigEntry) -> None: """Initialize rfxtrx options flow.""" self._config_entry = config_entry self._global_options = None self._selected_device = None - self._selected_device_entry_id = None - self._selected_device_event_code = None - self._selected_device_object = None - self._device_entries = None - self._device_registry = None + self._selected_device_entry_id: str | None = None + self._selected_device_event_code: str | None = None + self._selected_device_object: rfxtrxmod.RFXtrxEvent | None = None async def async_step_init(self, user_input=None): """Manage the options.""" @@ -173,6 +186,8 @@ class OptionsFlow(config_entries.OptionsFlow): errors = {} if user_input is not None: + assert self._selected_device_object + assert self._selected_device_event_code device_id = get_device_id( self._selected_device_object.device, data_bits=user_input.get(CONF_DATA_BITS), @@ -399,20 +414,18 @@ class OptionsFlow(config_entries.OptionsFlow): return data[CONF_EVENT_CODE] - def _get_device_data(self, entry_id): + def _get_device_data(self, entry_id) -> DeviceData: """Get event code based on device identifier.""" - event_code = None - device_id = None + event_code: str entry = self._device_registry.async_get(entry_id) - device_id = next(iter(entry.identifiers))[1:] + assert entry + device_id = cast(DeviceTuple, next(iter(entry.identifiers))[1:]) for packet_id, entity_info in self._config_entry.data[CONF_DEVICES].items(): if tuple(entity_info.get(CONF_DEVICE_ID)) == device_id: - event_code = packet_id + event_code = cast(str, packet_id) break - - data = {CONF_EVENT_CODE: event_code, CONF_DEVICE_ID: device_id} - - return data + assert event_code + return DeviceData(event_code=event_code, device_id=device_id) @callback def update_config_data(self, global_options=None, devices=None): diff --git a/homeassistant/components/rfxtrx/cover.py b/homeassistant/components/rfxtrx/cover.py index 4dc89577542..c1c009c930c 100644 --- a/homeassistant/components/rfxtrx/cover.py +++ b/homeassistant/components/rfxtrx/cover.py @@ -1,6 +1,10 @@ """Support for RFXtrx covers.""" +from __future__ import annotations + import logging +import RFXtrx as rfxtrxmod + from homeassistant.components.cover import ( SUPPORT_CLOSE, SUPPORT_CLOSE_TILT, @@ -15,6 +19,7 @@ from homeassistant.core import callback from . import ( DEFAULT_SIGNAL_REPETITIONS, + DeviceTuple, RfxtrxCommandEntity, connect_auto_add, get_device_id, @@ -33,7 +38,7 @@ from .const import ( _LOGGER = logging.getLogger(__name__) -def supported(event): +def supported(event: rfxtrxmod.RFXtrxEvent): """Return whether an event supports cover.""" return event.device.known_to_be_rollershutter @@ -45,7 +50,7 @@ async def async_setup_entry( ): """Set up config entry.""" discovery_info = config_entry.data - device_ids = set() + device_ids: set[DeviceTuple] = set() entities = [] for packet_id, entity_info in discovery_info[CONF_DEVICES].items(): @@ -73,7 +78,7 @@ async def async_setup_entry( async_add_entities(entities) @callback - def cover_update(event, device_id): + def cover_update(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple) -> None: """Handle cover updates from the RFXtrx gateway.""" if not supported(event): return @@ -81,12 +86,13 @@ async def async_setup_entry( if device_id in device_ids: return device_ids.add(device_id) + device: rfxtrxmod.RFXtrxDevice = event.device _LOGGER.info( "Added cover (Device ID: %s Class: %s Sub: %s, Event: %s)", - event.device.id_string.lower(), - event.device.__class__.__name__, - event.device.subtype, + device.id_string.lower(), + device.__class__.__name__, + device.subtype, "".join(f"{x:02x}" for x in event.data), ) @@ -102,14 +108,16 @@ async def async_setup_entry( class RfxtrxCover(RfxtrxCommandEntity, CoverEntity): """Representation of a RFXtrx cover.""" + _device: rfxtrxmod.RollerTrolDevice | rfxtrxmod.RfyDevice | rfxtrxmod.LightingDevice + def __init__( self, - device, - device_id, - signal_repetitions, - event=None, - venetian_blind_mode=None, - ): + device: rfxtrxmod.RFXtrxDevice, + device_id: DeviceTuple, + signal_repetitions: int, + event: rfxtrxmod.RFXtrxEvent = None, + venetian_blind_mode: bool | None = None, + ) -> None: """Initialize the RFXtrx cover device.""" super().__init__(device, device_id, signal_repetitions, event) self._venetian_blind_mode = venetian_blind_mode @@ -191,8 +199,9 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity): self._state = True self.async_write_ha_state() - def _apply_event(self, event): + def _apply_event(self, event: rfxtrxmod.RFXtrxEvent): """Apply command from rfxtrx.""" + assert isinstance(event, rfxtrxmod.ControlEvent) super()._apply_event(event) if event.values["Command"] in COMMAND_ON_LIST: self._state = True @@ -200,7 +209,7 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity): self._state = False @callback - def _handle_event(self, event, device_id): + def _handle_event(self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple): """Check if event applies to me and update.""" if device_id != self._device_id: return diff --git a/homeassistant/components/rfxtrx/light.py b/homeassistant/components/rfxtrx/light.py index c67213ed6f8..bf88ff86368 100644 --- a/homeassistant/components/rfxtrx/light.py +++ b/homeassistant/components/rfxtrx/light.py @@ -1,4 +1,6 @@ """Support for RFXtrx lights.""" +from __future__ import annotations + import logging import RFXtrx as rfxtrxmod @@ -13,6 +15,7 @@ from homeassistant.core import callback from . import ( DEFAULT_SIGNAL_REPETITIONS, + DeviceTuple, RfxtrxCommandEntity, connect_auto_add, get_device_id, @@ -30,7 +33,7 @@ _LOGGER = logging.getLogger(__name__) SUPPORT_RFXTRX = SUPPORT_BRIGHTNESS -def supported(event): +def supported(event: rfxtrxmod.RFXtrxEvent): """Return whether an event supports light.""" return ( isinstance(event.device, rfxtrxmod.LightingDevice) @@ -45,7 +48,7 @@ async def async_setup_entry( ): """Set up config entry.""" discovery_info = config_entry.data - device_ids = set() + device_ids: set[DeviceTuple] = set() # Add switch from config file entities = [] @@ -72,7 +75,7 @@ async def async_setup_entry( async_add_entities(entities) @callback - def light_update(event, device_id): + def light_update(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple): """Handle light updates from the RFXtrx gateway.""" if not supported(event): return @@ -103,6 +106,7 @@ class RfxtrxLight(RfxtrxCommandEntity, LightEntity): """Representation of a RFXtrx light.""" _brightness = 0 + _device: rfxtrxmod.LightingDevice async def async_added_to_hass(self): """Restore RFXtrx device state (ON/OFF).""" @@ -149,8 +153,9 @@ class RfxtrxLight(RfxtrxCommandEntity, LightEntity): self._brightness = 0 self.async_write_ha_state() - def _apply_event(self, event): + def _apply_event(self, event: rfxtrxmod.RFXtrxEvent): """Apply command from rfxtrx.""" + assert isinstance(event, rfxtrxmod.ControlEvent) super()._apply_event(event) if event.values["Command"] in COMMAND_ON_LIST: self._state = True diff --git a/homeassistant/components/rfxtrx/sensor.py b/homeassistant/components/rfxtrx/sensor.py index c13e499bbf0..bf292adeee8 100644 --- a/homeassistant/components/rfxtrx/sensor.py +++ b/homeassistant/components/rfxtrx/sensor.py @@ -238,7 +238,7 @@ async def async_setup_entry( event.device, data_bits=entity_info.get(CONF_DATA_BITS) ) for data_type in set(event.values) & set(SENSOR_TYPES_DICT): - data_id = (*device_id, data_type) + data_id = (*device_id, str(data_type)) if data_id in data_ids: continue data_ids.add(data_id) diff --git a/homeassistant/components/rfxtrx/switch.py b/homeassistant/components/rfxtrx/switch.py index 21e9e06b802..d0ec83f3c66 100644 --- a/homeassistant/components/rfxtrx/switch.py +++ b/homeassistant/components/rfxtrx/switch.py @@ -1,4 +1,6 @@ """Support for RFXtrx switches.""" +from __future__ import annotations + import logging import RFXtrx as rfxtrxmod @@ -10,6 +12,7 @@ from homeassistant.core import callback from . import ( DEFAULT_SIGNAL_REPETITIONS, DOMAIN, + DeviceTuple, RfxtrxCommandEntity, connect_auto_add, get_device_id, @@ -44,7 +47,7 @@ async def async_setup_entry( ): """Set up config entry.""" discovery_info = config_entry.data - device_ids = set() + device_ids: set[DeviceTuple] = set() # Add switch from config file entities = [] @@ -79,16 +82,18 @@ async def async_setup_entry( return device_ids.add(device_id) + device: rfxtrxmod.RFXtrxDevice = event.device + _LOGGER.info( "Added switch (Device ID: %s Class: %s Sub: %s, Event: %s)", - event.device.id_string.lower(), - event.device.__class__.__name__, - event.device.subtype, + device.id_string.lower(), + device.__class__.__name__, + device.subtype, "".join(f"{x:02x}" for x in event.data), ) entity = RfxtrxSwitch( - event.device, device_id, DEFAULT_SIGNAL_REPETITIONS, event=event + device, device_id, DEFAULT_SIGNAL_REPETITIONS, event=event ) async_add_entities([entity]) @@ -108,8 +113,9 @@ class RfxtrxSwitch(RfxtrxCommandEntity, SwitchEntity): if old_state is not None: self._state = old_state.state == STATE_ON - def _apply_event(self, event): + def _apply_event(self, event: rfxtrxmod.RFXtrxEvent) -> None: """Apply command from rfxtrx.""" + assert isinstance(event, rfxtrxmod.ControlEvent) super()._apply_event(event) if event.values["Command"] in COMMAND_ON_LIST: self._state = True @@ -117,7 +123,9 @@ class RfxtrxSwitch(RfxtrxCommandEntity, SwitchEntity): self._state = False @callback - def _handle_event(self, event, device_id): + def _handle_event( + self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple + ) -> None: """Check if event applies to me and update.""" if self._event_applies(event, device_id): self._apply_event(event)