Improve typing of Tasmota (1/3) (#52746)

This commit is contained in:
Erik Montnemery 2021-07-12 20:14:03 +02:00 committed by GitHub
parent 5caf170c78
commit 5e472f2c06
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 81 additions and 32 deletions

View file

@ -275,8 +275,16 @@ async def async_start( # noqa: C901
if key not in hass.data[INTEGRATION_UNSUBSCRIBE]:
return
data = {
"topic": msg.topic,
"payload": msg.payload,
"qos": msg.qos,
"retain": msg.retain,
"subscribed_topic": msg.subscribed_topic,
"timestamp": msg.timestamp,
}
result = await hass.config_entries.flow.async_init(
integration, context={"source": DOMAIN}, data=msg
integration, context={"source": DOMAIN}, data=data
)
if (
result

View file

@ -1,4 +1,6 @@
"""The Tasmota integration."""
from __future__ import annotations
import asyncio
import logging
@ -10,6 +12,7 @@ from hatasmota.const import (
CONF_SW_VERSION,
)
from hatasmota.discovery import clear_discovery_topic
from hatasmota.models import TasmotaDeviceConfig
from hatasmota.mqtt import TasmotaMQTTClient
import voluptuous as vol
@ -18,10 +21,13 @@ from homeassistant.components.mqtt.subscription import (
async_subscribe_topics,
async_unsubscribe_topics,
)
from homeassistant.core import callback
from homeassistant.components.websocket_api.connection import ActiveConnection
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.helpers.device_registry import (
CONNECTION_NETWORK_MAC,
EVENT_DEVICE_REGISTRY_UPDATED,
DeviceRegistry,
async_entries_for_config_entry,
)
@ -37,33 +43,38 @@ from .const import (
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass, entry):
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Tasmota from a config entry."""
websocket_api.async_register_command(hass, websocket_remove_device)
hass.data[DATA_UNSUB] = []
def _publish(*args, **kwds):
mqtt.async_publish(hass, *args, **kwds)
def _publish(
topic: str,
payload: mqtt.PublishPayloadType,
qos: int | None = None,
retain: bool | None = None,
) -> None:
mqtt.async_publish(hass, topic, payload, qos, retain)
async def _subscribe_topics(sub_state, topics):
async def _subscribe_topics(sub_state: dict | None, topics: dict) -> dict:
# Optionally mark message handlers as callback
for topic in topics.values():
if "msg_callback" in topic and "event_loop_safe" in topic:
topic["msg_callback"] = callback(topic["msg_callback"])
return await async_subscribe_topics(hass, sub_state, topics)
async def _unsubscribe_topics(sub_state):
async def _unsubscribe_topics(sub_state: dict | None) -> dict:
return await async_unsubscribe_topics(hass, sub_state)
tasmota_mqtt = TasmotaMQTTClient(_publish, _subscribe_topics, _unsubscribe_topics)
device_registry = await hass.helpers.device_registry.async_get_registry()
def async_discover_device(config, mac):
def async_discover_device(config: TasmotaDeviceConfig, mac: str) -> None:
"""Discover and add a Tasmota device."""
async_setup_device(hass, mac, config, entry, tasmota_mqtt, device_registry)
async def async_device_removed(event):
async def async_device_removed(event: Event) -> None:
"""Handle the removal of a device."""
device_registry = await hass.helpers.device_registry.async_get_registry()
if event.data["action"] != "remove":
@ -82,7 +93,7 @@ async def async_setup_entry(hass, entry):
hass.bus.async_listen(EVENT_DEVICE_REGISTRY_UPDATED, async_device_removed)
)
async def start_platforms():
async def start_platforms() -> None:
await device_automation.async_setup_entry(hass, entry)
await asyncio.gather(
*[
@ -100,7 +111,7 @@ async def async_setup_entry(hass, entry):
return True
async def async_unload_entry(hass, entry):
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
# cleanup platforms
@ -127,7 +138,13 @@ async def async_unload_entry(hass, entry):
return True
def _remove_device(hass, config_entry, mac, tasmota_mqtt, device_registry):
def _remove_device(
hass: HomeAssistant,
config_entry: ConfigEntry,
mac: str,
tasmota_mqtt: TasmotaMQTTClient,
device_registry: DeviceRegistry,
) -> None:
"""Remove device from device registry."""
device = device_registry.async_get_device(set(), {(CONNECTION_NETWORK_MAC, mac)})
@ -139,22 +156,32 @@ def _remove_device(hass, config_entry, mac, tasmota_mqtt, device_registry):
clear_discovery_topic(mac, config_entry.data[CONF_DISCOVERY_PREFIX], tasmota_mqtt)
def _update_device(hass, config_entry, config, device_registry):
def _update_device(
hass: HomeAssistant,
config_entry: ConfigEntry,
config: TasmotaDeviceConfig,
device_registry: DeviceRegistry,
) -> None:
"""Add or update device registry."""
config_entry_id = config_entry.entry_id
device_info = {
"connections": {(CONNECTION_NETWORK_MAC, config[CONF_MAC])},
"manufacturer": config[CONF_MANUFACTURER],
"model": config[CONF_MODEL],
"name": config[CONF_NAME],
"sw_version": config[CONF_SW_VERSION],
"config_entry_id": config_entry_id,
}
_LOGGER.debug("Adding or updating tasmota device %s", config[CONF_MAC])
device_registry.async_get_or_create(**device_info)
device_registry.async_get_or_create(
connections={(CONNECTION_NETWORK_MAC, config[CONF_MAC])},
manufacturer=config[CONF_MANUFACTURER],
model=config[CONF_MODEL],
name=config[CONF_NAME],
sw_version=config[CONF_SW_VERSION],
config_entry_id=config_entry.entry_id,
)
def async_setup_device(hass, mac, config, config_entry, tasmota_mqtt, device_registry):
def async_setup_device(
hass: HomeAssistant,
mac: str,
config: TasmotaDeviceConfig,
config_entry: ConfigEntry,
tasmota_mqtt: TasmotaMQTTClient,
device_registry: DeviceRegistry,
) -> None:
"""Set up the Tasmota device."""
if not config:
_remove_device(hass, config_entry, mac, tasmota_mqtt, device_registry)
@ -166,7 +193,9 @@ def async_setup_device(hass, mac, config, config_entry, tasmota_mqtt, device_reg
{vol.Required("type"): "tasmota/device/remove", vol.Required("device_id"): str}
)
@websocket_api.async_response
async def websocket_remove_device(hass, connection, msg):
async def websocket_remove_device(
hass: HomeAssistant, connection: ActiveConnection, msg: dict
) -> None:
"""Delete device."""
device_id = msg["device_id"]
dev_registry = await hass.helpers.device_registry.async_get_registry()

View file

@ -1,8 +1,14 @@
"""Config flow for Tasmota."""
from __future__ import annotations
from typing import Any, cast
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.mqtt import valid_subscribe_topic
from homeassistant.components.mqtt import ReceiveMessage, valid_subscribe_topic
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.typing import DiscoveryInfoType
from .const import CONF_DISCOVERY_PREFIX, DEFAULT_PREFIX, DOMAIN
@ -12,11 +18,11 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1
def __init__(self):
def __init__(self) -> None:
"""Initialize flow."""
self._prefix = DEFAULT_PREFIX
async def async_step_mqtt(self, discovery_info=None):
async def async_step_mqtt(self, discovery_info: DiscoveryInfoType) -> FlowResult:
"""Handle a flow initialized by MQTT discovery."""
if self._async_in_progress() or self._async_current_entries():
return self.async_abort(reason="single_instance_allowed")
@ -24,7 +30,7 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
await self.async_set_unique_id(DOMAIN)
# Validate the topic, will throw if it fails
prefix = discovery_info.subscribed_topic
prefix = cast(ReceiveMessage, discovery_info).subscribed_topic
if prefix.endswith("/#"):
prefix = prefix[:-2]
try:
@ -36,7 +42,9 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
return await self.async_step_confirm()
async def async_step_user(self, user_input=None):
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle a flow initialized by the user."""
if self._async_current_entries():
return self.async_abort(reason="single_instance_allowed")
@ -45,7 +53,9 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
return await self.async_step_config()
return await self.async_step_confirm()
async def async_step_config(self, user_input=None):
async def async_step_config(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Confirm the setup."""
errors = {}
data = {CONF_DISCOVERY_PREFIX: self._prefix}
@ -72,7 +82,9 @@ class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
step_id="config", data_schema=vol.Schema(fields), errors=errors
)
async def async_step_confirm(self, user_input=None):
async def async_step_confirm(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Confirm the setup."""
data = {CONF_DISCOVERY_PREFIX: self._prefix}