* Listen for config updates from DlnaDmrEntity.async_added_to_hass
Use `Entity.async_on_remove` for dealing with callback cancellation,
instead of re-inventing the wheel with `_remove_ssdp_callbacks`.
* Use async_write_ha_state within async methods
* Import YAML config from async_setup_platform
* Import flow prompts user when device is uncontactable during migration
When config flow is able to contact a device, or when it has information
from SSDP, it will create config entries without error. If the device is
uncontactable at this point then it will appear as unavailable in HA
until it is turned on again.
When import flow cannot migrate an entry because it needs to contact the
device and can't, it will notify the user with a config flow form.
* Don't del unused parameters, HA pylint doesn't care
* Remove unused imports from tests
* Abort config flow at earliest opportunity
* Return async_abort instead of raising AbortFlow
* Consolidate config entry test cleanup into a single function
* fixup! Consolidate config entry test cleanup into a single function
Revert "Consolidate config entry test cleanup into a single function"
This reverts commit 8220da7263
.
* Check resource acquisition/release in specific tests
* fixup! Check resource acquisition/release in specific tests
* Remove unused network dependency from manifest
* _on_event runs in async context
* Call async_write_ha_state directly (not via shedule_update)
Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
377 lines
14 KiB
Python
377 lines
14 KiB
Python
"""Config flow for DLNA DMR."""
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable
|
|
import logging
|
|
from pprint import pformat
|
|
from typing import Any, Mapping, Optional
|
|
from urllib.parse import urlparse
|
|
|
|
from async_upnp_client.client import UpnpError
|
|
from async_upnp_client.profiles.dlna import DmrDevice
|
|
from async_upnp_client.profiles.profile import find_device_of_type
|
|
import voluptuous as vol
|
|
|
|
from homeassistant import config_entries
|
|
from homeassistant.components import ssdp
|
|
from homeassistant.const import CONF_DEVICE_ID, CONF_NAME, CONF_TYPE, CONF_URL
|
|
from homeassistant.core import callback
|
|
from homeassistant.data_entry_flow import FlowResult
|
|
from homeassistant.exceptions import IntegrationError
|
|
import homeassistant.helpers.config_validation as cv
|
|
from homeassistant.helpers.typing import DiscoveryInfoType
|
|
|
|
from .const import (
|
|
CONF_CALLBACK_URL_OVERRIDE,
|
|
CONF_LISTEN_PORT,
|
|
CONF_POLL_AVAILABILITY,
|
|
DEFAULT_NAME,
|
|
DOMAIN,
|
|
)
|
|
from .data import get_domain_data
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
FlowInput = Optional[Mapping[str, Any]]
|
|
|
|
|
|
class ConnectError(IntegrationError):
|
|
"""Error occurred when trying to connect to a device."""
|
|
|
|
|
|
class DlnaDmrFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
|
"""Handle a DLNA DMR config flow.
|
|
|
|
The Unique Device Name (UDN) of the DMR device is used as the unique_id for
|
|
config entries and for entities. This UDN may differ from the root UDN if
|
|
the DMR is an embedded device.
|
|
"""
|
|
|
|
VERSION = 1
|
|
|
|
def __init__(self) -> None:
|
|
"""Initialize flow."""
|
|
self._discoveries: list[Mapping[str, str]] = []
|
|
self._location: str | None = None
|
|
self._udn: str | None = None
|
|
self._device_type: str | None = None
|
|
self._name: str | None = None
|
|
self._options: dict[str, Any] = {}
|
|
|
|
@staticmethod
|
|
@callback
|
|
def async_get_options_flow(
|
|
config_entry: config_entries.ConfigEntry,
|
|
) -> config_entries.OptionsFlow:
|
|
"""Define the config flow to handle options."""
|
|
return DlnaDmrOptionsFlowHandler(config_entry)
|
|
|
|
async def async_step_user(self, user_input: FlowInput = None) -> FlowResult:
|
|
"""Handle a flow initialized by the user: manual URL entry.
|
|
|
|
Discovered devices will already be displayed, no need to prompt user
|
|
with them here.
|
|
"""
|
|
LOGGER.debug("async_step_user: user_input: %s", user_input)
|
|
|
|
# Device setup manually, assume we don't get SSDP broadcast notifications
|
|
self._options[CONF_POLL_AVAILABILITY] = True
|
|
|
|
errors = {}
|
|
if user_input is not None:
|
|
self._location = user_input[CONF_URL]
|
|
try:
|
|
await self._async_connect()
|
|
except ConnectError as err:
|
|
errors["base"] = err.args[0]
|
|
else:
|
|
return self._create_entry()
|
|
|
|
data_schema = vol.Schema({CONF_URL: str})
|
|
return self.async_show_form(
|
|
step_id="user", data_schema=data_schema, errors=errors
|
|
)
|
|
|
|
async def async_step_import(self, import_data: FlowInput = None) -> FlowResult:
|
|
"""Import a new DLNA DMR device from a config entry.
|
|
|
|
This flow is triggered by `async_setup_platform`. If the device has not
|
|
been migrated, and can be connected to, automatically import it. If it
|
|
cannot be connected to, prompt the user to turn it on. If it has already
|
|
been migrated, do nothing.
|
|
"""
|
|
LOGGER.debug("async_step_import: import_data: %s", import_data)
|
|
|
|
if not import_data or CONF_URL not in import_data:
|
|
LOGGER.debug("Entry not imported: incomplete_config")
|
|
return self.async_abort(reason="incomplete_config")
|
|
|
|
self._location = import_data[CONF_URL]
|
|
self._async_abort_entries_match({CONF_URL: self._location})
|
|
|
|
# Use the location as this config flow's unique ID until UDN is known
|
|
await self.async_set_unique_id(self._location)
|
|
|
|
# Set options from the import_data, except listen_ip which is no longer used
|
|
self._options[CONF_LISTEN_PORT] = import_data.get(CONF_LISTEN_PORT)
|
|
self._options[CONF_CALLBACK_URL_OVERRIDE] = import_data.get(
|
|
CONF_CALLBACK_URL_OVERRIDE
|
|
)
|
|
|
|
# Override device name if it's set in the YAML
|
|
self._name = import_data.get(CONF_NAME)
|
|
|
|
discoveries = await self._async_get_discoveries()
|
|
|
|
# Find the device in the list of unconfigured devices
|
|
for discovery in discoveries:
|
|
if discovery[ssdp.ATTR_SSDP_LOCATION] == self._location:
|
|
# Device found via SSDP, it shouldn't need polling
|
|
self._options[CONF_POLL_AVAILABILITY] = False
|
|
# Discovery info has everything required to create config entry
|
|
await self._async_set_info_from_discovery(discovery)
|
|
LOGGER.debug(
|
|
"Entry %s found via SSDP, with UDN %s",
|
|
self._location,
|
|
self._udn,
|
|
)
|
|
return self._create_entry()
|
|
|
|
# This device will need to be polled
|
|
self._options[CONF_POLL_AVAILABILITY] = True
|
|
|
|
# Device was not found via SSDP, connect directly for configuration
|
|
try:
|
|
await self._async_connect()
|
|
except ConnectError as err:
|
|
# This will require user action
|
|
LOGGER.debug("Entry %s not imported yet: %s", self._location, err.args[0])
|
|
return await self.async_step_import_turn_on()
|
|
|
|
LOGGER.debug("Entry %s ready for import", self._location)
|
|
return self._create_entry()
|
|
|
|
async def async_step_import_turn_on(
|
|
self, user_input: FlowInput = None
|
|
) -> FlowResult:
|
|
"""Request the user to turn on the device so that import can finish."""
|
|
LOGGER.debug("async_step_import_turn_on: %s", user_input)
|
|
|
|
self.context["title_placeholders"] = {"name": self._name or self._location}
|
|
|
|
errors = {}
|
|
if user_input is not None:
|
|
try:
|
|
await self._async_connect()
|
|
except ConnectError as err:
|
|
errors["base"] = err.args[0]
|
|
else:
|
|
return self._create_entry()
|
|
|
|
self._set_confirm_only()
|
|
return self.async_show_form(step_id="import_turn_on", errors=errors)
|
|
|
|
async def async_step_ssdp(self, discovery_info: DiscoveryInfoType) -> FlowResult:
|
|
"""Handle a flow initialized by SSDP discovery."""
|
|
LOGGER.debug("async_step_ssdp: discovery_info %s", pformat(discovery_info))
|
|
|
|
await self._async_set_info_from_discovery(discovery_info)
|
|
|
|
# Abort if a migration flow for the device's location is in progress
|
|
for progress in self._async_in_progress(include_uninitialized=True):
|
|
if progress["context"].get("unique_id") == self._location:
|
|
LOGGER.debug(
|
|
"Aborting SSDP setup because migration for %s is in progress",
|
|
self._location,
|
|
)
|
|
return self.async_abort(reason="already_in_progress")
|
|
|
|
self.context["title_placeholders"] = {"name": self._name}
|
|
|
|
return await self.async_step_confirm()
|
|
|
|
async def async_step_confirm(self, user_input: FlowInput = None) -> FlowResult:
|
|
"""Allow the user to confirm adding the device."""
|
|
LOGGER.debug("async_step_confirm: %s", user_input)
|
|
|
|
if user_input is not None:
|
|
return self._create_entry()
|
|
|
|
self._set_confirm_only()
|
|
return self.async_show_form(step_id="confirm")
|
|
|
|
async def _async_connect(self) -> None:
|
|
"""Connect to a device to confirm it works and gather extra information.
|
|
|
|
Updates this flow's unique ID to the device UDN if not already done.
|
|
Raises ConnectError if something goes wrong.
|
|
"""
|
|
LOGGER.debug("_async_connect: location: %s", self._location)
|
|
assert self._location, "self._location has not been set before connect"
|
|
|
|
domain_data = get_domain_data(self.hass)
|
|
try:
|
|
device = await domain_data.upnp_factory.async_create_device(self._location)
|
|
except UpnpError as err:
|
|
raise ConnectError("could_not_connect") from err
|
|
|
|
try:
|
|
device = find_device_of_type(device, DmrDevice.DEVICE_TYPES)
|
|
except UpnpError as err:
|
|
raise ConnectError("not_dmr") from err
|
|
|
|
if not self._udn:
|
|
self._udn = device.udn
|
|
await self.async_set_unique_id(self._udn)
|
|
|
|
# Abort if already configured, but update the last-known location
|
|
self._abort_if_unique_id_configured(
|
|
updates={CONF_URL: self._location}, reload_on_update=False
|
|
)
|
|
|
|
if not self._device_type:
|
|
self._device_type = device.device_type
|
|
|
|
if not self._name:
|
|
self._name = device.name
|
|
|
|
def _create_entry(self) -> FlowResult:
|
|
"""Create a config entry, assuming all required information is now known."""
|
|
LOGGER.debug(
|
|
"_async_create_entry: location: %s, UDN: %s", self._location, self._udn
|
|
)
|
|
assert self._location
|
|
assert self._udn
|
|
assert self._device_type
|
|
|
|
title = self._name or urlparse(self._location).hostname or DEFAULT_NAME
|
|
data = {
|
|
CONF_URL: self._location,
|
|
CONF_DEVICE_ID: self._udn,
|
|
CONF_TYPE: self._device_type,
|
|
}
|
|
return self.async_create_entry(title=title, data=data, options=self._options)
|
|
|
|
async def _async_set_info_from_discovery(
|
|
self, discovery_info: Mapping[str, Any], abort_if_configured: bool = True
|
|
) -> None:
|
|
"""Set information required for a config entry from the SSDP discovery."""
|
|
LOGGER.debug(
|
|
"_async_set_info_from_discovery: location: %s, UDN: %s",
|
|
discovery_info[ssdp.ATTR_SSDP_LOCATION],
|
|
discovery_info[ssdp.ATTR_SSDP_UDN],
|
|
)
|
|
|
|
if not self._location:
|
|
self._location = discovery_info[ssdp.ATTR_SSDP_LOCATION]
|
|
assert isinstance(self._location, str)
|
|
|
|
self._udn = discovery_info[ssdp.ATTR_SSDP_UDN]
|
|
await self.async_set_unique_id(self._udn)
|
|
|
|
if abort_if_configured:
|
|
# Abort if already configured, but update the last-known location
|
|
self._abort_if_unique_id_configured(
|
|
updates={CONF_URL: self._location}, reload_on_update=False
|
|
)
|
|
|
|
self._device_type = (
|
|
discovery_info.get(ssdp.ATTR_SSDP_NT) or discovery_info[ssdp.ATTR_SSDP_ST]
|
|
)
|
|
self._name = (
|
|
discovery_info.get(ssdp.ATTR_UPNP_FRIENDLY_NAME)
|
|
or urlparse(self._location).hostname
|
|
or DEFAULT_NAME
|
|
)
|
|
|
|
async def _async_get_discoveries(self) -> list[Mapping[str, str]]:
|
|
"""Get list of unconfigured DLNA devices discovered by SSDP."""
|
|
LOGGER.debug("_get_discoveries")
|
|
|
|
# Get all compatible devices from ssdp's cache
|
|
discoveries: list[Mapping[str, str]] = []
|
|
for udn_st in DmrDevice.DEVICE_TYPES:
|
|
st_discoveries = await ssdp.async_get_discovery_info_by_st(
|
|
self.hass, udn_st
|
|
)
|
|
discoveries.extend(st_discoveries)
|
|
|
|
# Filter out devices already configured
|
|
current_unique_ids = {
|
|
entry.unique_id for entry in self._async_current_entries()
|
|
}
|
|
discoveries = [
|
|
disc
|
|
for disc in discoveries
|
|
if disc[ssdp.ATTR_SSDP_UDN] not in current_unique_ids
|
|
]
|
|
|
|
return discoveries
|
|
|
|
|
|
class DlnaDmrOptionsFlowHandler(config_entries.OptionsFlow):
|
|
"""Handle a DLNA DMR options flow.
|
|
|
|
Configures the single instance and updates the existing config entry.
|
|
"""
|
|
|
|
def __init__(self, config_entry: config_entries.ConfigEntry) -> None:
|
|
"""Initialize."""
|
|
self.config_entry = config_entry
|
|
|
|
async def async_step_init(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> FlowResult:
|
|
"""Manage the options."""
|
|
errors: dict[str, str] = {}
|
|
# Don't modify existing (read-only) options -- copy and update instead
|
|
options = dict(self.config_entry.options)
|
|
|
|
if user_input is not None:
|
|
LOGGER.debug("user_input: %s", user_input)
|
|
listen_port = user_input.get(CONF_LISTEN_PORT) or None
|
|
callback_url_override = user_input.get(CONF_CALLBACK_URL_OVERRIDE) or None
|
|
|
|
try:
|
|
# Cannot use cv.url validation in the schema itself so apply
|
|
# extra validation here
|
|
if callback_url_override:
|
|
cv.url(callback_url_override)
|
|
except vol.Invalid:
|
|
errors["base"] = "invalid_url"
|
|
|
|
options[CONF_LISTEN_PORT] = listen_port
|
|
options[CONF_CALLBACK_URL_OVERRIDE] = callback_url_override
|
|
options[CONF_POLL_AVAILABILITY] = user_input[CONF_POLL_AVAILABILITY]
|
|
|
|
# Save if there's no errors, else fall through and show the form again
|
|
if not errors:
|
|
return self.async_create_entry(title="", data=options)
|
|
|
|
fields = {}
|
|
|
|
def _add_with_suggestion(key: str, validator: Callable) -> None:
|
|
"""Add a field to with a suggested, not default, value."""
|
|
suggested_value = options.get(key)
|
|
if suggested_value is None:
|
|
fields[vol.Optional(key)] = validator
|
|
else:
|
|
fields[
|
|
vol.Optional(key, description={"suggested_value": suggested_value})
|
|
] = validator
|
|
|
|
# listen_port can be blank or 0 for "bind any free port"
|
|
_add_with_suggestion(CONF_LISTEN_PORT, cv.port)
|
|
_add_with_suggestion(CONF_CALLBACK_URL_OVERRIDE, str)
|
|
fields[
|
|
vol.Required(
|
|
CONF_POLL_AVAILABILITY,
|
|
default=options.get(CONF_POLL_AVAILABILITY, False),
|
|
)
|
|
] = bool
|
|
|
|
return self.async_show_form(
|
|
step_id="init",
|
|
data_schema=vol.Schema(fields),
|
|
errors=errors,
|
|
)
|