hass-core/homeassistant/components/shelly/utils.py
RadekHvizdos 8549af3855
Add missing Shelly Cover sensors bugfix (#71831)
Switching Shelly Plus 2PM from switch to cover mode results in missing sensors for Power, Voltage, Energy and Temperature.

These parameters are still available in the API, but need to be accessed via "cover" key instead of "switch" key.

This change adds the missing sensors.
2022-05-15 12:29:35 +03:00

384 lines
12 KiB
Python

"""Shelly helpers functions."""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any, cast
from aioshelly.block_device import BLOCK_VALUE_UNIT, COAP, Block, BlockDevice
from aioshelly.const import MODEL_NAMES
from aioshelly.rpc_device import RpcDevice
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EVENT_HOMEASSISTANT_STOP, TEMP_CELSIUS, TEMP_FAHRENHEIT
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry, singleton
from homeassistant.helpers.typing import EventType
from homeassistant.util.dt import utcnow
from .const import (
BASIC_INPUTS_EVENTS_TYPES,
CONF_COAP_PORT,
DEFAULT_COAP_PORT,
DOMAIN,
LOGGER,
MAX_RPC_KEY_INSTANCES,
RPC_INPUTS_EVENTS_TYPES,
SHBTN_INPUTS_EVENTS_TYPES,
SHBTN_MODELS,
SHIX3_1_INPUTS_EVENTS_TYPES,
UPTIME_DEVIATION,
)
async def async_remove_shelly_entity(
hass: HomeAssistant, domain: str, unique_id: str
) -> None:
"""Remove a Shelly entity."""
entity_reg = await hass.helpers.entity_registry.async_get_registry()
entity_id = entity_reg.async_get_entity_id(domain, DOMAIN, unique_id)
if entity_id:
LOGGER.debug("Removing entity: %s", entity_id)
entity_reg.async_remove(entity_id)
def temperature_unit(block_info: dict[str, Any]) -> str:
"""Detect temperature unit."""
if block_info[BLOCK_VALUE_UNIT] == "F":
return TEMP_FAHRENHEIT
return TEMP_CELSIUS
def get_block_device_name(device: BlockDevice) -> str:
"""Naming for device."""
return cast(str, device.settings["name"] or device.settings["device"]["hostname"])
def get_rpc_device_name(device: RpcDevice) -> str:
"""Naming for device."""
return cast(str, device.config["sys"]["device"].get("name") or device.hostname)
def get_number_of_channels(device: BlockDevice, block: Block) -> int:
"""Get number of channels for block type."""
assert isinstance(device.shelly, dict)
channels = None
if block.type == "input":
# Shelly Dimmer/1L has two input channels and missing "num_inputs"
if device.settings["device"]["type"] in ["SHDM-1", "SHDM-2", "SHSW-L"]:
channels = 2
else:
channels = device.shelly.get("num_inputs")
elif block.type == "emeter":
channels = device.shelly.get("num_emeters")
elif block.type in ["relay", "light"]:
channels = device.shelly.get("num_outputs")
elif block.type in ["roller", "device"]:
channels = 1
return channels or 1
def get_block_entity_name(
device: BlockDevice,
block: Block | None,
description: str | None = None,
) -> str:
"""Naming for block based switch and sensors."""
channel_name = get_block_channel_name(device, block)
if description:
return f"{channel_name} {description}"
return channel_name
def get_block_channel_name(device: BlockDevice, block: Block | None) -> str:
"""Get name based on device and channel name."""
entity_name = get_block_device_name(device)
if (
not block
or block.type == "device"
or get_number_of_channels(device, block) == 1
):
return entity_name
assert block.channel
channel_name: str | None = None
mode = cast(str, block.type) + "s"
if mode in device.settings:
channel_name = device.settings[mode][int(block.channel)].get("name")
if channel_name:
return channel_name
if device.settings["device"]["type"] == "SHEM-3":
base = ord("A")
else:
base = ord("1")
return f"{entity_name} channel {chr(int(block.channel)+base)}"
def is_block_momentary_input(
settings: dict[str, Any], block: Block, include_detached: bool = False
) -> bool:
"""Return true if block input button settings is set to a momentary type."""
momentary_types = ["momentary", "momentary_on_release"]
if include_detached:
momentary_types.append("detached")
# Shelly Button type is fixed to momentary and no btn_type
if settings["device"]["type"] in SHBTN_MODELS:
return True
if settings.get("mode") == "roller":
button_type = settings["rollers"][0]["button_type"]
return button_type in momentary_types
button = settings.get("relays") or settings.get("lights") or settings.get("inputs")
if button is None:
return False
# Shelly 1L has two button settings in the first channel
if settings["device"]["type"] == "SHSW-L":
channel = int(block.channel or 0) + 1
button_type = button[0].get("btn" + str(channel) + "_type")
else:
# Some devices has only one channel in settings
channel = min(int(block.channel or 0), len(button) - 1)
button_type = button[channel].get("btn_type")
return button_type in momentary_types
def get_device_uptime(uptime: float, last_uptime: datetime | None) -> datetime:
"""Return device uptime string, tolerate up to 5 seconds deviation."""
delta_uptime = utcnow() - timedelta(seconds=uptime)
if (
not last_uptime
or abs((delta_uptime - last_uptime).total_seconds()) > UPTIME_DEVIATION
):
return delta_uptime
return last_uptime
def get_block_input_triggers(
device: BlockDevice, block: Block
) -> list[tuple[str, str]]:
"""Return list of input triggers for block."""
if "inputEvent" not in block.sensor_ids or "inputEventCnt" not in block.sensor_ids:
return []
if not is_block_momentary_input(device.settings, block, True):
return []
triggers = []
if block.type == "device" or get_number_of_channels(device, block) == 1:
subtype = "button"
else:
assert block.channel
subtype = f"button{int(block.channel)+1}"
if device.settings["device"]["type"] in SHBTN_MODELS:
trigger_types = SHBTN_INPUTS_EVENTS_TYPES
elif device.settings["device"]["type"] == "SHIX3-1":
trigger_types = SHIX3_1_INPUTS_EVENTS_TYPES
else:
trigger_types = BASIC_INPUTS_EVENTS_TYPES
for trigger_type in trigger_types:
triggers.append((trigger_type, subtype))
return triggers
def get_shbtn_input_triggers() -> list[tuple[str, str]]:
"""Return list of input triggers for SHBTN models."""
triggers = []
for trigger_type in SHBTN_INPUTS_EVENTS_TYPES:
triggers.append((trigger_type, "button"))
return triggers
@singleton.singleton("shelly_coap")
async def get_coap_context(hass: HomeAssistant) -> COAP:
"""Get CoAP context to be used in all Shelly devices."""
context = COAP()
if DOMAIN in hass.data:
port = hass.data[DOMAIN].get(CONF_COAP_PORT, DEFAULT_COAP_PORT)
else:
port = DEFAULT_COAP_PORT
LOGGER.info("Starting CoAP context with UDP port %s", port)
await context.initialize(port)
@callback
def shutdown_listener(ev: EventType) -> None:
context.close()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown_listener)
return context
def get_block_device_sleep_period(settings: dict[str, Any]) -> int:
"""Return the device sleep period in seconds or 0 for non sleeping devices."""
sleep_period = 0
if settings.get("sleep_mode", False):
sleep_period = settings["sleep_mode"]["period"]
if settings["sleep_mode"]["unit"] == "h":
sleep_period *= 60 # hours to minutes
return sleep_period * 60 # minutes to seconds
def get_info_auth(info: dict[str, Any]) -> bool:
"""Return true if device has authorization enabled."""
return cast(bool, info.get("auth") or info.get("auth_en"))
def get_info_gen(info: dict[str, Any]) -> int:
"""Return the device generation from shelly info."""
return int(info.get("gen", 1))
def get_model_name(info: dict[str, Any]) -> str:
"""Return the device model name."""
if get_info_gen(info) == 2:
return cast(str, MODEL_NAMES.get(info["model"], info["model"]))
return cast(str, MODEL_NAMES.get(info["type"], info["type"]))
def get_rpc_channel_name(device: RpcDevice, key: str) -> str:
"""Get name based on device and channel name."""
if device.config.get("switch:0"):
key = key.replace("input", "switch")
device_name = get_rpc_device_name(device)
entity_name: str | None = None
if key in device.config:
entity_name = device.config[key].get("name", device_name)
if entity_name is None:
return f"{device_name} {key.replace(':', '_')}"
return entity_name
def get_rpc_entity_name(
device: RpcDevice, key: str, description: str | None = None
) -> str:
"""Naming for RPC based switch and sensors."""
channel_name = get_rpc_channel_name(device, key)
if description:
return f"{channel_name} {description}"
return channel_name
def get_device_entry_gen(entry: ConfigEntry) -> int:
"""Return the device generation from config entry."""
return entry.data.get("gen", 1)
def get_rpc_key_instances(keys_dict: dict[str, Any], key: str) -> list[str]:
"""Return list of key instances for RPC device from a dict."""
if key in keys_dict:
return [key]
if key == "switch" and "cover:0" in keys_dict:
key = "cover"
keys_list: list[str] = []
for i in range(MAX_RPC_KEY_INSTANCES):
key_inst = f"{key}:{i}"
if key_inst not in keys_dict:
return keys_list
keys_list.append(key_inst)
return keys_list
def get_rpc_key_ids(keys_dict: dict[str, Any], key: str) -> list[int]:
"""Return list of key ids for RPC device from a dict."""
key_ids: list[int] = []
for i in range(MAX_RPC_KEY_INSTANCES):
key_inst = f"{key}:{i}"
if key_inst not in keys_dict:
return key_ids
key_ids.append(i)
return key_ids
def is_rpc_momentary_input(config: dict[str, Any], key: str) -> bool:
"""Return true if rpc input button settings is set to a momentary type."""
return cast(bool, config[key]["type"] == "button")
def is_block_channel_type_light(settings: dict[str, Any], channel: int) -> bool:
"""Return true if block channel appliance type is set to light."""
app_type = settings["relays"][channel].get("appliance_type")
return app_type is not None and app_type.lower().startswith("light")
def is_rpc_channel_type_light(config: dict[str, Any], channel: int) -> bool:
"""Return true if rpc channel consumption type is set to light."""
con_types = config["sys"]["ui_data"].get("consumption_types")
return con_types is not None and con_types[channel].lower().startswith("light")
def get_rpc_input_triggers(device: RpcDevice) -> list[tuple[str, str]]:
"""Return list of input triggers for RPC device."""
triggers = []
key_ids = get_rpc_key_ids(device.config, "input")
for id_ in key_ids:
key = f"input:{id_}"
if not is_rpc_momentary_input(device.config, key):
continue
for trigger_type in RPC_INPUTS_EVENTS_TYPES:
subtype = f"button{id_+1}"
triggers.append((trigger_type, subtype))
return triggers
@callback
def device_update_info(
hass: HomeAssistant, shellydevice: BlockDevice | RpcDevice, entry: ConfigEntry
) -> None:
"""Update device registry info."""
LOGGER.debug("Updating device registry info for %s", entry.title)
assert entry.unique_id
dev_registry = device_registry.async_get(hass)
if device := dev_registry.async_get_device(
identifiers={(DOMAIN, entry.entry_id)},
connections={
(
device_registry.CONNECTION_NETWORK_MAC,
device_registry.format_mac(entry.unique_id),
)
},
):
dev_registry.async_update_device(
device.id, sw_version=shellydevice.firmware_version
)