Add LG webOS Smart TV config flow support (#64117)

* Add webOS Smart TV config flow support (#53256)

* Add Webostv config flow

* Fix tests mocks and apply review comments

* Apply review comments

* Change config flow to use ssdp UDN as unique_id

* Fix device info

* More review comments

* Fix _async_check_configured_entry

* Remove turn on script

* Add webOS Smart TV device triggers (#53752)

* Add webOS Smart TV config flow support (#53256)

* Add Webostv config flow

* Fix tests mocks and apply review comments

* Apply review comments

* Change config flow to use ssdp UDN as unique_id

* Fix device info

* More review comments

* Fix _async_check_configured_entry

* Remove turn on script

* Add webOS Smart TV device triggers (#53752)

* Fix webOS Smart TV mypy and pylint errors (#62620)

* Change webOS Smart TV PyPi aiopylgtv package to bscpylgtv (#62633)

* Change webOS Smart TV PyPi aiopylgtv package to bscpylgtv

* Update bscpylgtv to 0.2.8 (revised websockets requirment)

* Change webOS Smart TV PyPi package to aiowebostv (#63759)

* Change webOS Smart TV PyPi package to aiowebostv

* Apply suggestions from code review

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* webOS TV check UUID for user added device (#63817)

* webOS TV check uuid when for user added device

* Apply suggestions from code review

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Add test for form abort and host update

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Rework webOS Smart TV device trigger to custom trigger platform (#63950)

* Rework webOS Smart TV device trigger to custom trigger platform

* Review comments and add tests

* Fix webOS TV import from YAML (#63996)

* Fix webOS TV import from YAML

* Fix requirements

* Migrate YAML entities unique id to UUID

* Add backoff to migration task delay

* Assert result data and unique_id

* Apply suggestions from code review

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* Add codeowner

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
Shay Levy 2022-01-14 23:48:45 +02:00 committed by GitHub
parent 043b9c66b0
commit dee843bf6e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 1668 additions and 324 deletions

View file

@ -1283,7 +1283,9 @@ omit =
homeassistant/components/waze_travel_time/__init__.py
homeassistant/components/waze_travel_time/helpers.py
homeassistant/components/waze_travel_time/sensor.py
homeassistant/components/webostv/*
homeassistant/components/webostv/__init__.py
homeassistant/components/webostv/media_player.py
homeassistant/components/webostv/notify.py
homeassistant/components/whois/__init__.py
homeassistant/components/whois/sensor.py
homeassistant/components/wiffi/*

View file

@ -1,42 +1,56 @@
"""Support for LG webOS Smart TV."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from contextlib import suppress
import json
import logging
import os
from pickle import loads
from typing import Any
from aiopylgtv import PyLGTVCmdException, PyLGTVPairException, WebOsClient
from sqlitedict import SqliteDict
from aiowebostv import WebOsClient, WebOsTvPairError
import sqlalchemy as db
import voluptuous as vol
from websockets.exceptions import ConnectionClosed
from homeassistant.components import notify as hass_notify
from homeassistant.components.automation import AutomationActionType
from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.const import (
ATTR_COMMAND,
ATTR_ENTITY_ID,
CONF_CLIENT_SECRET,
CONF_CUSTOMIZE,
CONF_HOST,
CONF_ICON,
CONF_NAME,
CONF_UNIQUE_ID,
EVENT_HOMEASSISTANT_STOP,
Platform,
)
from homeassistant.core import HomeAssistant, ServiceCall
import homeassistant.helpers.config_validation as cv
from homeassistant.core import Context, HassJob, HomeAssistant, ServiceCall, callback
from homeassistant.helpers import config_validation as cv, discovery, entity_registry
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.typing import ConfigType
from .const import (
ATTR_BUTTON,
ATTR_CONFIG_ENTRY_ID,
ATTR_PAYLOAD,
ATTR_SOUND_OUTPUT,
CONF_ON_ACTION,
CONF_SOURCES,
DATA_CONFIG_ENTRY,
DATA_HASS_CONFIG,
DEFAULT_NAME,
DOMAIN,
PLATFORMS,
SERVICE_BUTTON,
SERVICE_COMMAND,
SERVICE_SELECT_SOUND_OUTPUT,
WEBOSTV_CONFIG_FILE,
WEBOSTV_EXCEPTIONS,
)
CUSTOMIZE_SCHEMA = vol.Schema(
@ -44,6 +58,8 @@ CUSTOMIZE_SCHEMA = vol.Schema(
)
CONFIG_SCHEMA = vol.Schema(
vol.All(
cv.deprecated(DOMAIN),
{
DOMAIN: vol.All(
cv.ensure_list,
@ -60,6 +76,7 @@ CONFIG_SCHEMA = vol.Schema(
],
)
},
),
extra=vol.ALLOW_EXTRA,
)
@ -85,9 +102,122 @@ SERVICE_TO_METHOD = {
_LOGGER = logging.getLogger(__name__)
def read_client_keys(config_file):
"""Read legacy client keys from file."""
if not os.path.isfile(config_file):
return {}
# Try to parse the file as being JSON
with open(config_file, encoding="utf8") as json_file:
try:
client_keys = json.load(json_file)
return client_keys
except (json.JSONDecodeError, UnicodeDecodeError):
pass
# If the file is not JSON, read it as Sqlite DB
engine = db.create_engine(f"sqlite:///{config_file}")
table = db.Table("unnamed", db.MetaData(), autoload=True, autoload_with=engine)
results = engine.connect().execute(db.select([table])).fetchall()
client_keys = {k: loads(v) for k, v in results}
return client_keys
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the LG WebOS TV platform."""
hass.data[DOMAIN] = {}
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN].setdefault(DATA_CONFIG_ENTRY, {})
hass.data[DOMAIN][DATA_HASS_CONFIG] = config
if DOMAIN not in config:
return True
config_file = hass.config.path(WEBOSTV_CONFIG_FILE)
if not (
client_keys := await hass.async_add_executor_job(read_client_keys, config_file)
):
_LOGGER.debug("No pairing keys, Not importing webOS Smart TV YAML config")
return True
async def async_migrate_task(entity_id, conf, key):
_LOGGER.debug("Migrating webOS Smart TV entity %s unique_id", entity_id)
client = WebOsClient(conf[CONF_HOST], key)
tries = 0
while not client.is_connected():
try:
await client.connect()
except WEBOSTV_EXCEPTIONS:
wait_time = 2 ** min(tries, 4) * 5
tries += 1
await asyncio.sleep(wait_time)
except WebOsTvPairError:
return
else:
break
uuid = client.hello_info["deviceUUID"]
ent_reg.async_update_entity(entity_id, new_unique_id=uuid)
await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={
**conf,
CONF_CLIENT_SECRET: key,
CONF_UNIQUE_ID: uuid,
},
)
ent_reg = entity_registry.async_get(hass)
tasks = []
for conf in config[DOMAIN]:
host = conf[CONF_HOST]
if (key := client_keys.get(host)) is None:
_LOGGER.debug(
"Not importing webOS Smart TV host %s without pairing key", host
)
continue
if entity_id := ent_reg.async_get_entity_id(Platform.MEDIA_PLAYER, DOMAIN, key):
tasks.append(asyncio.create_task(async_migrate_task(entity_id, conf, key)))
async def async_tasks_cancel(_event):
"""Cancel config flow import tasks."""
for task in tasks:
if not task.done():
task.cancel()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, async_tasks_cancel)
return True
def _async_migrate_options_from_data(hass, config_entry):
"""Migrate options from data."""
if config_entry.options:
return
config = config_entry.data
options = {}
# Get Preferred Sources
if sources := config.get(CONF_CUSTOMIZE, {}).get(CONF_SOURCES):
options[CONF_SOURCES] = sources
if not isinstance(sources, list):
options[CONF_SOURCES] = sources.split(",")
hass.config_entries.async_update_entry(config_entry, options=options)
async def async_setup_entry(hass, config_entry):
"""Set the config entry up."""
_async_migrate_options_from_data(hass, config_entry)
host = config_entry.data[CONF_HOST]
key = config_entry.data[CONF_CLIENT_SECRET]
wrapper = WebOsClientWrapper(host, client_key=key)
await wrapper.connect()
async def async_service_handler(service: ServiceCall) -> None:
method = SERVICE_TO_METHOD[service.service]
@ -101,124 +231,125 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
DOMAIN, service, async_service_handler, schema=schema
)
tasks = [async_setup_tv(hass, config, conf) for conf in config[DOMAIN]]
if tasks:
await asyncio.gather(*tasks)
hass.data[DOMAIN][DATA_CONFIG_ENTRY][config_entry.entry_id] = wrapper
hass.config_entries.async_setup_platforms(config_entry, PLATFORMS)
# set up notify platform, no entry support for notify component yet,
# have to use discovery to load platform.
hass.async_create_task(
discovery.async_load_platform(
hass,
"notify",
DOMAIN,
{
CONF_NAME: config_entry.title,
ATTR_CONFIG_ENTRY_ID: config_entry.entry_id,
},
hass.data[DOMAIN][DATA_HASS_CONFIG],
)
)
if not config_entry.update_listeners:
config_entry.async_on_unload(
config_entry.add_update_listener(async_update_options)
)
async def async_on_stop(_event):
"""Unregister callbacks and disconnect."""
await wrapper.shutdown()
config_entry.async_on_unload(
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, async_on_stop)
)
return True
def convert_client_keys(config_file):
"""In case the config file contains JSON, convert it to a Sqlite config file."""
# Return early if config file is non-existing
if not os.path.isfile(config_file):
return
# Try to parse the file as being JSON
with open(config_file, encoding="utf8") as json_file:
try:
json_conf = json.load(json_file)
except (json.JSONDecodeError, UnicodeDecodeError):
json_conf = None
# If the file contains JSON, convert it to an Sqlite DB
if json_conf:
_LOGGER.warning("LG webOS TV client-key file is being migrated to Sqlite!")
# Clean the JSON file
os.remove(config_file)
# Write the data to the Sqlite DB
with SqliteDict(config_file) as conf:
for host, key in json_conf.items():
conf[host] = key
conf.commit()
async def async_update_options(hass, config_entry):
"""Update options."""
await hass.config_entries.async_reload(config_entry.entry_id)
async def async_setup_tv(hass, config, conf):
"""Set up a LG WebOS TV based on host parameter."""
host = conf[CONF_HOST]
config_file = hass.config.path(WEBOSTV_CONFIG_FILE)
await hass.async_add_executor_job(convert_client_keys, config_file)
client = await WebOsClient.create(host, config_file)
hass.data[DOMAIN][host] = {"client": client}
if client.is_registered():
await async_setup_tv_finalize(hass, config, conf, client)
else:
_LOGGER.warning("LG webOS TV %s needs to be paired", host)
await async_request_configuration(hass, config, conf, client)
async def async_connect(client):
"""Attempt a connection, but fail gracefully if tv is off for example."""
with suppress(
OSError,
ConnectionClosed,
ConnectionRefusedError,
asyncio.TimeoutError,
asyncio.CancelledError,
PyLGTVPairException,
PyLGTVCmdException,
):
await client.connect()
async def async_setup_tv_finalize(hass, config, conf, client):
"""Make initial connection attempt and call platform setup."""
async def async_on_stop(event):
"""Unregister callbacks and disconnect."""
client.clear_state_update_callbacks()
await client.disconnect()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, async_on_stop)
await async_connect(client)
hass.async_create_task(
hass.helpers.discovery.async_load_platform(
Platform.MEDIA_PLAYER, DOMAIN, conf, config
)
)
hass.async_create_task(
hass.helpers.discovery.async_load_platform(
Platform.NOTIFY, DOMAIN, conf, config
)
)
async def async_request_configuration(hass, config, conf, client):
"""Request configuration steps from the user."""
host = conf.get(CONF_HOST)
name = conf.get(CONF_NAME)
configurator = hass.components.configurator
async def lgtv_configuration_callback(data):
"""Handle actions when configuration callback is called."""
async def async_control_connect(host: str, key: str | None) -> WebOsClient:
"""LG Connection."""
client = WebOsClient(host, key)
try:
await client.connect()
except PyLGTVPairException:
except WebOsTvPairError:
_LOGGER.warning("Connected to LG webOS TV %s but not paired", host)
return
except (
OSError,
ConnectionClosed,
asyncio.TimeoutError,
asyncio.CancelledError,
PyLGTVCmdException,
):
_LOGGER.error("Unable to connect to host %s", host)
return
raise
await async_setup_tv_finalize(hass, config, conf, client)
configurator.async_request_done(request_id)
return client
request_id = configurator.async_request_config(
name,
lgtv_configuration_callback,
description="Click start and accept the pairing request on your TV.",
description_image="/static/images/config_webos.png",
submit_caption="Start pairing request",
)
async def async_unload_entry(hass, entry):
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
client = hass.data[DOMAIN][DATA_CONFIG_ENTRY].pop(entry.entry_id)
await hass_notify.async_reload(hass, DOMAIN)
await client.shutdown()
# unregister service calls, check if this is the last entry to unload
if unload_ok and not hass.data[DOMAIN][DATA_CONFIG_ENTRY]:
for service in SERVICE_TO_METHOD:
hass.services.async_remove(DOMAIN, service)
return unload_ok
class PluggableAction:
"""A pluggable action handler."""
def __init__(self) -> None:
"""Initialize."""
self._actions: dict[Callable[[], None], tuple[HassJob, dict[str, Any]]] = {}
def __bool__(self):
"""Return if we have something attached."""
return bool(self._actions)
@callback
def async_attach(
self, action: AutomationActionType, variables: dict[str, Any]
) -> Callable[[], None]:
"""Attach a device trigger for turn on."""
@callback
def _remove() -> None:
del self._actions[_remove]
job = HassJob(action)
self._actions[_remove] = (job, variables)
return _remove
@callback
def async_run(self, hass: HomeAssistant, context: Context | None = None) -> None:
"""Run all turn on triggers."""
for job, variables in self._actions.values():
hass.async_run_hass_job(job, variables, context)
class WebOsClientWrapper:
"""Wrapper for a WebOS TV client with Home Assistant specific functions."""
def __init__(self, host: str, client_key: str) -> None:
"""Set up the client."""
self.host = host
self.client_key = client_key
self.turn_on = PluggableAction()
self.client: WebOsClient | None = None
async def connect(self) -> None:
"""Attempt a connection, but fail gracefully if tv is off for example."""
self.client = WebOsClient(self.host, self.client_key)
with suppress(*WEBOSTV_EXCEPTIONS, WebOsTvPairError):
await self.client.connect()
async def shutdown(self) -> None:
"""Unregister callbacks and disconnect."""
assert self.client
self.client.clear_state_update_callbacks()
await self.client.disconnect()

View file

@ -0,0 +1,184 @@
"""Config flow to configure webostv component."""
from __future__ import annotations
import logging
from urllib.parse import urlparse
from aiowebostv import WebOsTvPairError
import voluptuous as vol
from homeassistant import config_entries, data_entry_flow
from homeassistant.components import ssdp
from homeassistant.const import CONF_CLIENT_SECRET, CONF_HOST, CONF_NAME, CONF_UNIQUE_ID
from homeassistant.core import callback
from homeassistant.helpers import config_validation as cv
from . import async_control_connect
from .const import CONF_SOURCES, DEFAULT_NAME, DOMAIN, WEBOSTV_EXCEPTIONS
DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
},
extra=vol.ALLOW_EXTRA,
)
_LOGGER = logging.getLogger(__name__)
class FlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""WebosTV configuration flow."""
VERSION = 1
def __init__(self):
"""Initialize workflow."""
self._host = None
self._name = None
self._uuid = None
@staticmethod
@callback
def async_get_options_flow(config_entry):
"""Get the options flow for this handler."""
return OptionsFlowHandler(config_entry)
async def async_step_import(self, import_info):
"""Set the config entry up from yaml."""
self._host = import_info[CONF_HOST]
self._name = import_info.get(CONF_NAME) or import_info[CONF_HOST]
await self.async_set_unique_id(
import_info[CONF_UNIQUE_ID], raise_on_progress=False
)
data = {
CONF_HOST: self._host,
CONF_CLIENT_SECRET: import_info[CONF_CLIENT_SECRET],
}
self._abort_if_unique_id_configured()
_LOGGER.debug("WebOS Smart TV host %s imported from YAML config", self._host)
return self.async_create_entry(title=self._name, data=data)
async def async_step_user(self, user_input=None):
"""Handle a flow initialized by the user."""
errors = {}
if user_input is not None:
self._host = user_input[CONF_HOST]
self._name = user_input[CONF_NAME]
return await self.async_step_pairing()
return self.async_show_form(
step_id="user", data_schema=DATA_SCHEMA, errors=errors
)
@callback
def _async_check_configured_entry(self):
"""Check if entry is configured, update unique_id if needed."""
for entry in self._async_current_entries(include_ignore=False):
if entry.data[CONF_HOST] != self._host:
continue
if self._uuid and not entry.unique_id:
_LOGGER.debug(
"Updating unique_id for host %s, unique_id: %s",
self._host,
self._uuid,
)
self.hass.config_entries.async_update_entry(entry, unique_id=self._uuid)
raise data_entry_flow.AbortFlow("already_configured")
async def async_step_pairing(self, user_input=None):
"""Display pairing form."""
self._async_check_configured_entry()
self.context[CONF_HOST] = self._host
self.context["title_placeholders"] = {"name": self._name}
errors = {}
if (
self.context["source"] == config_entries.SOURCE_IMPORT
or user_input is not None
):
try:
client = await async_control_connect(self._host, None)
except WebOsTvPairError:
return self.async_abort(reason="error_pairing")
except WEBOSTV_EXCEPTIONS:
errors["base"] = "cannot_connect"
else:
await self.async_set_unique_id(
client.hello_info["deviceUUID"], raise_on_progress=False
)
self._abort_if_unique_id_configured({CONF_HOST: self._host})
data = {CONF_HOST: self._host, CONF_CLIENT_SECRET: client.client_key}
return self.async_create_entry(title=self._name, data=data)
return self.async_show_form(step_id="pairing", errors=errors)
async def async_step_ssdp(self, discovery_info):
"""Handle a flow initialized by discovery."""
self._host = urlparse(discovery_info[ssdp.ATTR_SSDP_LOCATION]).hostname
self._name = discovery_info[ssdp.ATTR_UPNP_FRIENDLY_NAME]
uuid = discovery_info[ssdp.ATTR_UPNP_UDN]
if uuid.startswith("uuid:"):
uuid = uuid[5:]
await self.async_set_unique_id(uuid)
self._abort_if_unique_id_configured({CONF_HOST: self._host})
for progress in self._async_in_progress():
if progress.get("context", {}).get(CONF_HOST) == self._host:
return self.async_abort(reason="already_in_progress")
self._uuid = uuid
return await self.async_step_pairing()
class OptionsFlowHandler(config_entries.OptionsFlow):
"""Handle options."""
def __init__(self, config_entry):
"""Initialize options flow."""
self.config_entry = config_entry
self.options = config_entry.options
self.host = config_entry.data[CONF_HOST]
self.key = config_entry.data[CONF_CLIENT_SECRET]
async def async_step_init(self, user_input=None):
"""Manage the options."""
errors = {}
if user_input is not None:
options_input = {CONF_SOURCES: user_input[CONF_SOURCES]}
return self.async_create_entry(title="", data=options_input)
# Get sources
sources = self.options.get(CONF_SOURCES, "")
sources_list = await async_get_sources(self.host, self.key)
if sources_list is None:
errors["base"] = "cannot_retrieve"
options_schema = vol.Schema(
{
vol.Optional(
CONF_SOURCES,
description={"suggested_value": sources},
): cv.multi_select(sources_list),
}
)
return self.async_show_form(
step_id="init", data_schema=options_schema, errors=errors
)
async def async_get_sources(host, key):
"""Construct sources list."""
try:
client = await async_control_connect(host, key)
except WEBOSTV_EXCEPTIONS:
return None
return [
*(app["title"] for app in client.apps.values()),
*(app["label"] for app in client.inputs.values()),
]

View file

@ -1,9 +1,17 @@
"""Constants used for LG webOS Smart TV."""
DOMAIN = "webostv"
import asyncio
from aiowebostv import WebOsTvCommandError
from websockets.exceptions import ConnectionClosed, ConnectionClosedOK
DOMAIN = "webostv"
PLATFORMS = ["media_player"]
DATA_CONFIG_ENTRY = "config_entry"
DATA_HASS_CONFIG = "hass_config"
DEFAULT_NAME = "LG webOS Smart TV"
ATTR_BUTTON = "button"
ATTR_CONFIG_ENTRY_ID = "entry_id"
ATTR_PAYLOAD = "payload"
ATTR_SOUND_OUTPUT = "sound_output"
@ -16,4 +24,14 @@ SERVICE_SELECT_SOUND_OUTPUT = "select_sound_output"
LIVE_TV_APP_ID = "com.webos.app.livetv"
WEBOSTV_EXCEPTIONS = (
OSError,
ConnectionClosed,
ConnectionClosedOK,
ConnectionRefusedError,
WebOsTvCommandError,
asyncio.TimeoutError,
asyncio.CancelledError,
)
WEBOSTV_CONFIG_FILE = "webostv.conf"

View file

@ -0,0 +1,96 @@
"""Provides device automations for control of LG webOS Smart TV."""
from __future__ import annotations
import voluptuous as vol
from homeassistant.components.automation import (
AutomationActionType,
AutomationTriggerInfo,
)
from homeassistant.components.device_automation import DEVICE_TRIGGER_BASE_SCHEMA
from homeassistant.components.device_automation.exceptions import (
InvalidDeviceAutomationConfig,
)
from homeassistant.const import CONF_DEVICE_ID, CONF_DOMAIN, CONF_PLATFORM, CONF_TYPE
from homeassistant.core import CALLBACK_TYPE, HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.typing import ConfigType
from . import trigger
from .const import DOMAIN
from .helpers import (
async_get_client_wrapper_by_device_entry,
async_get_device_entry_by_device_id,
async_is_device_config_entry_not_loaded,
)
from .triggers.turn_on import PLATFORM_TYPE as TURN_ON_PLATFORM_TYPE
TRIGGER_TYPES = {TURN_ON_PLATFORM_TYPE}
TRIGGER_SCHEMA = DEVICE_TRIGGER_BASE_SCHEMA.extend(
{
vol.Required(CONF_TYPE): vol.In(TRIGGER_TYPES),
}
)
async def async_validate_trigger_config(
hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
config = TRIGGER_SCHEMA(config)
try:
if async_is_device_config_entry_not_loaded(hass, config[CONF_DEVICE_ID]):
return config
except ValueError as err:
raise InvalidDeviceAutomationConfig(err) from err
if config[CONF_TYPE] == TURN_ON_PLATFORM_TYPE:
device_id = config[CONF_DEVICE_ID]
try:
device = async_get_device_entry_by_device_id(hass, device_id)
async_get_client_wrapper_by_device_entry(hass, device)
except ValueError as err:
raise InvalidDeviceAutomationConfig(err) from err
return config
async def async_get_triggers(
_hass: HomeAssistant, device_id: str
) -> list[dict[str, str]]:
"""List device triggers for device."""
triggers = []
base_trigger = {
CONF_PLATFORM: "device",
CONF_DEVICE_ID: device_id,
CONF_DOMAIN: DOMAIN,
}
triggers.append({**base_trigger, CONF_TYPE: TURN_ON_PLATFORM_TYPE})
return triggers
async def async_attach_trigger(
hass: HomeAssistant,
config: ConfigType,
action: AutomationActionType,
automation_info: AutomationTriggerInfo,
) -> CALLBACK_TYPE | None:
"""Attach a trigger."""
trigger_type = config[CONF_TYPE]
if trigger_type == TURN_ON_PLATFORM_TYPE:
trigger_config = {
CONF_PLATFORM: trigger_type,
CONF_DEVICE_ID: config[CONF_DEVICE_ID],
}
trigger_config = await trigger.async_validate_trigger_config(
hass, trigger_config
)
return await trigger.async_attach_trigger(
hass, trigger_config, action, automation_info
)
raise HomeAssistantError(f"Unhandled trigger type {trigger_type}")

View file

@ -0,0 +1,82 @@
"""Helper functions for webOS Smart TV."""
from __future__ import annotations
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.device_registry import DeviceEntry
from . import WebOsClientWrapper
from .const import DATA_CONFIG_ENTRY, DOMAIN
@callback
def async_get_device_entry_by_device_id(
hass: HomeAssistant, device_id: str
) -> DeviceEntry:
"""
Get Device Entry from Device Registry by device ID.
Raises ValueError if device ID is invalid.
"""
device_reg = dr.async_get(hass)
device = device_reg.async_get(device_id)
if device is None:
raise ValueError(f"Device {device_id} is not a valid {DOMAIN} device.")
return device
@callback
def async_is_device_config_entry_not_loaded(
hass: HomeAssistant, device_id: str
) -> bool:
"""Return whether device's config entries are not loaded."""
device = async_get_device_entry_by_device_id(hass, device_id)
return any(
(entry := hass.config_entries.async_get_entry(entry_id))
and entry.state != ConfigEntryState.LOADED
for entry_id in device.config_entries
)
@callback
def async_get_device_id_from_entity_id(hass: HomeAssistant, entity_id: str) -> str:
"""
Get device ID from an entity ID.
Raises ValueError if entity or device ID is invalid.
"""
ent_reg = er.async_get(hass)
entity_entry = ent_reg.async_get(entity_id)
if (
entity_entry is None
or entity_entry.device_id is None
or entity_entry.platform != DOMAIN
):
raise ValueError(f"Entity {entity_id} is not a valid {DOMAIN} entity.")
return entity_entry.device_id
@callback
def async_get_client_wrapper_by_device_entry(
hass: HomeAssistant, device: DeviceEntry
) -> WebOsClientWrapper:
"""
Get WebOsClientWrapper from Device Registry by device entry.
Raises ValueError if client wrapper is not found.
"""
for config_entry_id in device.config_entries:
if wrapper := hass.data[DOMAIN][DATA_CONFIG_ENTRY].get(config_entry_id):
break
if not wrapper:
raise ValueError(
f"Device {device.id} is not from an existing {DOMAIN} config entry"
)
return wrapper

View file

@ -1,9 +1,10 @@
{
"domain": "webostv",
"name": "LG webOS Smart TV",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/webostv",
"requirements": ["aiopylgtv==0.4.0"],
"dependencies": ["configurator"],
"requirements": ["aiowebostv==0.1.1", "sqlalchemy==1.4.27"],
"codeowners": ["@bendavid", "@thecode"],
"ssdp": [{"st": "urn:lge-com:service:webos-second-screen:1"}],
"iot_class": "local_polling"
}

View file

@ -1,14 +1,12 @@
"""Support for interface with an LG webOS Smart TV."""
from __future__ import annotations
import asyncio
from contextlib import suppress
from datetime import timedelta
from functools import wraps
import logging
from aiopylgtv import PyLGTVCmdException, PyLGTVPairException, WebOsClient
from websockets.exceptions import ConnectionClosed
from aiowebostv import WebOsClient, WebOsTvPairError
from homeassistant import util
from homeassistant.components.media_player import (
@ -29,11 +27,9 @@ from homeassistant.components.media_player.const import (
SUPPORT_VOLUME_SET,
SUPPORT_VOLUME_STEP,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_CUSTOMIZE,
CONF_HOST,
CONF_NAME,
ENTITY_MATCH_ALL,
ENTITY_MATCH_NONE,
STATE_OFF,
@ -42,16 +38,16 @@ from homeassistant.const import (
from homeassistant.core import HomeAssistant
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.script import Script
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import WebOsClientWrapper
from .const import (
ATTR_PAYLOAD,
ATTR_SOUND_OUTPUT,
CONF_ON_ACTION,
CONF_SOURCES,
DATA_CONFIG_ENTRY,
DOMAIN,
LIVE_TV_APP_ID,
WEBOSTV_EXCEPTIONS,
)
_LOGGER = logging.getLogger(__name__)
@ -73,43 +69,29 @@ MIN_TIME_BETWEEN_FORCED_SCANS = timedelta(seconds=1)
SCAN_INTERVAL = timedelta(seconds=10)
async def async_setup_platform(
async def async_setup_entry(
hass: HomeAssistant,
config: ConfigType,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the LG webOS Smart TV platform."""
unique_id = config_entry.unique_id
name = config_entry.title
sources = config_entry.options.get(CONF_SOURCES)
wrapper = hass.data[DOMAIN][DATA_CONFIG_ENTRY][config_entry.entry_id]
if discovery_info is None:
return
host = discovery_info[CONF_HOST]
name = discovery_info[CONF_NAME]
customize = discovery_info[CONF_CUSTOMIZE]
turn_on_action = discovery_info.get(CONF_ON_ACTION)
client = hass.data[DOMAIN][host]["client"]
on_script = Script(hass, turn_on_action, name, DOMAIN) if turn_on_action else None
entity = LgWebOSMediaPlayerEntity(client, name, customize, on_script)
async_add_entities([entity], update_before_add=False)
async_add_entities([LgWebOSMediaPlayerEntity(wrapper, name, sources, unique_id)])
def cmd(func):
"""Catch command exceptions."""
@wraps(func)
async def wrapper(obj, *args, **kwargs):
async def cmd_wrapper(obj, *args, **kwargs):
"""Wrap all command methods."""
try:
await func(obj, *args, **kwargs)
except (
asyncio.TimeoutError,
asyncio.CancelledError,
PyLGTVCmdException,
) as exc:
except WEBOSTV_EXCEPTIONS as exc:
# If TV is off, we expect calls to fail.
if obj.state == STATE_OFF:
level = logging.INFO
@ -123,19 +105,21 @@ def cmd(func):
exc,
)
return wrapper
return cmd_wrapper
class LgWebOSMediaPlayerEntity(MediaPlayerEntity):
"""Representation of a LG webOS Smart TV."""
def __init__(self, client: WebOsClient, name: str, customize, on_script=None):
def __init__(
self, wrapper: WebOsClientWrapper, name: str, sources, unique_id
) -> None:
"""Initialize the webos device."""
self._client = client
self._wrapper = wrapper
self._client = wrapper.client
self._name = name
self._unique_id = client.client_key
self._customize = customize
self._on_script = on_script
self._unique_id = unique_id
self._sources = sources
# Assume that the TV is not paused
self._paused = False
@ -145,7 +129,9 @@ class LgWebOSMediaPlayerEntity(MediaPlayerEntity):
async def async_added_to_hass(self):
"""Connect and subscribe to dispatcher signals and state updates."""
self.async_on_remove(
async_dispatcher_connect(self.hass, DOMAIN, self.async_signal_handler)
)
await self._client.register_state_update_callback(
self.async_handle_state_update
@ -168,23 +154,22 @@ class LgWebOSMediaPlayerEntity(MediaPlayerEntity):
}
await getattr(self, data["method"])(**params)
async def async_handle_state_update(self):
async def async_handle_state_update(self, _client: WebOsClient):
"""Update state from WebOsClient."""
self.update_sources()
self.async_write_ha_state()
def update_sources(self):
"""Update list of sources from current source, apps, inputs and configured list."""
source_list = self._source_list
self._source_list = {}
conf_sources = self._customize[CONF_SOURCES]
conf_sources = self._sources
found_live_tv = False
for app in self._client.apps.values():
if app["id"] == LIVE_TV_APP_ID:
found_live_tv = True
if app["id"] == self._client.current_appId:
if app["id"] == self._client.current_app_id:
self._current_source = app["title"]
self._source_list[app["title"]] = app
elif (
@ -198,7 +183,7 @@ class LgWebOSMediaPlayerEntity(MediaPlayerEntity):
for source in self._client.inputs.values():
if source["appId"] == LIVE_TV_APP_ID:
found_live_tv = True
if source["appId"] == self._client.current_appId:
if source["appId"] == self._client.current_app_id:
self._current_source = source["label"]
self._source_list[source["label"]] = source
elif (
@ -208,10 +193,11 @@ class LgWebOSMediaPlayerEntity(MediaPlayerEntity):
):
self._source_list[source["label"]] = source
# special handling of live tv since this might not appear in the app or input lists in some cases
# special handling of live tv since this might
# not appear in the app or input lists in some cases
if not found_live_tv:
app = {"id": LIVE_TV_APP_ID, "title": "Live TV"}
if LIVE_TV_APP_ID == self._client.current_appId:
if LIVE_TV_APP_ID == self._client.current_app_id:
self._current_source = app["title"]
self._source_list["Live TV"] = app
elif (
@ -227,16 +213,10 @@ class LgWebOSMediaPlayerEntity(MediaPlayerEntity):
@util.Throttle(MIN_TIME_BETWEEN_SCANS, MIN_TIME_BETWEEN_FORCED_SCANS)
async def async_update(self):
"""Connect."""
if not self._client.is_connected():
with suppress(
OSError,
ConnectionClosed,
ConnectionRefusedError,
asyncio.TimeoutError,
asyncio.CancelledError,
PyLGTVPairException,
PyLGTVCmdException,
):
if self._client.is_connected():
return
with suppress(*WEBOSTV_EXCEPTIONS, WebOsTvPairError):
await self._client.connect()
@property
@ -288,7 +268,7 @@ class LgWebOSMediaPlayerEntity(MediaPlayerEntity):
@property
def media_content_type(self):
"""Content type of current playing media."""
if self._client.current_appId == LIVE_TV_APP_ID:
if self._client.current_app_id == LIVE_TV_APP_ID:
return MEDIA_TYPE_CHANNEL
return None
@ -296,7 +276,7 @@ class LgWebOSMediaPlayerEntity(MediaPlayerEntity):
@property
def media_title(self):
"""Title of current playing media."""
if (self._client.current_appId == LIVE_TV_APP_ID) and (
if (self._client.current_app_id == LIVE_TV_APP_ID) and (
self._client.current_channel is not None
):
return self._client.current_channel.get("channelName")
@ -305,10 +285,10 @@ class LgWebOSMediaPlayerEntity(MediaPlayerEntity):
@property
def media_image_url(self):
"""Image url of current playing media."""
if self._client.current_appId in self._client.apps:
icon = self._client.apps[self._client.current_appId]["largeIcon"]
if self._client.current_app_id in self._client.apps:
icon = self._client.apps[self._client.current_app_id]["largeIcon"]
if not icon.startswith("http"):
icon = self._client.apps[self._client.current_appId]["icon"]
icon = self._client.apps[self._client.current_app_id]["icon"]
return icon
return None
@ -322,11 +302,34 @@ class LgWebOSMediaPlayerEntity(MediaPlayerEntity):
elif self._client.sound_output != "lineout":
supported = supported | SUPPORT_WEBOSTV_VOLUME | SUPPORT_VOLUME_SET
if self._on_script:
supported = supported | SUPPORT_TURN_ON
if self._wrapper.turn_on:
supported |= SUPPORT_TURN_ON
return supported
@property
def device_info(self):
"""Return device information."""
device_info = {
"identifiers": {(DOMAIN, self._unique_id)},
"manufacturer": "LG",
"name": self._name,
}
if self._client.system_info is None and self.state == STATE_OFF:
return device_info
maj_v = self._client.software_info.get("major_ver")
min_v = self._client.software_info.get("minor_ver")
if maj_v and min_v:
device_info["sw_version"] = f"{maj_v}.{min_v}"
model = self._client.system_info.get("modelName")
if model:
device_info["model"] = model
return device_info
@property
def extra_state_attributes(self):
"""Return device specific state attributes."""
@ -340,9 +343,8 @@ class LgWebOSMediaPlayerEntity(MediaPlayerEntity):
await self._client.power_off()
async def async_turn_on(self):
"""Turn on the media player."""
if self._on_script:
await self._on_script.async_run(context=self._context)
"""Turn on media player."""
self._wrapper.turn_on.async_run(self.hass, self._context)
@cmd
async def async_volume_up(self):

View file

@ -1,41 +1,37 @@
"""Support for LG WebOS TV notification service."""
import asyncio
import logging
from aiopylgtv import PyLGTVCmdException, PyLGTVPairException
from websockets.exceptions import ConnectionClosed
from aiowebostv import WebOsTvPairError
from homeassistant.components.notify import ATTR_DATA, BaseNotificationService
from homeassistant.const import CONF_HOST, CONF_ICON
from homeassistant.const import CONF_ICON, CONF_NAME
from . import DOMAIN
from .const import ATTR_CONFIG_ENTRY_ID, DATA_CONFIG_ENTRY, DOMAIN, WEBOSTV_EXCEPTIONS
_LOGGER = logging.getLogger(__name__)
async def async_get_service(hass, config, discovery_info=None):
async def async_get_service(hass, _config, discovery_info=None):
"""Return the notify service."""
if discovery_info is None:
return None
host = discovery_info.get(CONF_HOST)
icon_path = discovery_info.get(CONF_ICON)
name = discovery_info.get(CONF_NAME)
client = hass.data[DOMAIN][DATA_CONFIG_ENTRY][
discovery_info[ATTR_CONFIG_ENTRY_ID]
].client
client = hass.data[DOMAIN][host]["client"]
svc = LgWebOSNotificationService(client, icon_path)
return svc
return LgWebOSNotificationService(client, name)
class LgWebOSNotificationService(BaseNotificationService):
"""Implement the notification service for LG WebOS TV."""
def __init__(self, client, icon_path):
def __init__(self, client, name):
"""Initialize the service."""
self._name = name
self._client = client
self._icon_path = icon_path
async def async_send_message(self, message="", **kwargs):
"""Send a message to the tv."""
@ -44,19 +40,11 @@ class LgWebOSNotificationService(BaseNotificationService):
await self._client.connect()
data = kwargs.get(ATTR_DATA)
icon_path = (
data.get(CONF_ICON, self._icon_path) if data else self._icon_path
)
icon_path = data.get(CONF_ICON, "") if data else None
await self._client.send_message(message, icon_path=icon_path)
except PyLGTVPairException:
except WebOsTvPairError:
_LOGGER.error("Pairing with TV failed")
except FileNotFoundError:
_LOGGER.error("Icon %s not found", icon_path)
except (
OSError,
ConnectionClosed,
asyncio.TimeoutError,
asyncio.CancelledError,
PyLGTVCmdException,
):
except WEBOSTV_EXCEPTIONS:
_LOGGER.error("TV unreachable")

View file

@ -0,0 +1,47 @@
{
"config": {
"flow_title": "LG webOS Smart TV",
"step": {
"user": {
"title": "Connect to webOS TV",
"description": "Turn on TV, fill the following fields click submit",
"data": {
"host": "[%key:common::config_flow::data::host%]",
"name": "[%key:common::config_flow::data::name%]"
}
},
"pairing": {
"title": "webOS TV Pairing",
"description": "Click submit and accept the pairing request on your TV.\n\n![Image](/static/images/config_webos.png)"
}
},
"error": {
"cannot_connect": "Failed to connect, please turn on your TV or check ip address"
},
"abort": {
"error_pairing": "Connected to LG webOS TV but not paired",
"already_in_progress": "[%key:common::config_flow::abort::already_in_progress%]",
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
}
},
"options": {
"step": {
"init": {
"title": "Options for webOS Smart TV",
"description": "Select enabled sources",
"data": {
"sources": "Sources list"
}
}
},
"error": {
"script_not_found": "Script not found",
"cannot_retrieve": "Unable to retrieve the list of sources. Make sure device is switched on"
}
},
"device_automation": {
"trigger_type": {
"webostv.turn_on": "Device is requested to turn on"
}
}
}

View file

@ -0,0 +1,47 @@
{
"config": {
"flow_title": "LG webOS Smart TV",
"step": {
"user": {
"title": "Connect to webOS TV",
"description": "Turn on TV, fill the following fields click submit",
"data": {
"host": "Hostname",
"name": "Name"
}
},
"pairing": {
"title": "webOS TV Pairing",
"description": "Click submit and accept the pairing request on your TV.\n\n![Image](/static/images/config_webos.png)"
}
},
"error": {
"cannot_connect": "Failed to connect, please turn on your TV or check ip address"
},
"abort": {
"error_pairing": "Connected to LG webOS TV but not paired",
"already_in_progress": "Configuration flow is already in progress",
"already_configured": "Device is already configured"
}
},
"options": {
"step": {
"init": {
"title": "Options for Webos Smart TV",
"description": "Select enabled sources",
"data": {
"sources": "Sources list"
}
}
},
"error": {
"script_not_found": "Script not found",
"cannot_retrieve": "Unable to retrieve the list of sources. Make sure device is switched on"
}
},
"device_automation": {
"trigger_type": {
"webostv.turn_on": "Device is requested to turn on"
}
}
}

View file

@ -0,0 +1,53 @@
"""webOS Smart TV trigger dispatcher."""
from __future__ import annotations
from typing import cast
from homeassistant.components.automation import (
AutomationActionType,
AutomationTriggerInfo,
)
from homeassistant.const import CONF_PLATFORM
from homeassistant.core import CALLBACK_TYPE, HomeAssistant
from homeassistant.helpers.typing import ConfigType
from .triggers import TriggersPlatformModule, turn_on
TRIGGERS = {
"turn_on": turn_on,
}
def _get_trigger_platform(config: ConfigType) -> TriggersPlatformModule:
"""Return trigger platform."""
platform_split = config[CONF_PLATFORM].split(".", maxsplit=1)
if len(platform_split) < 2 or platform_split[1] not in TRIGGERS:
raise ValueError(
f"Unknown webOS Smart TV trigger platform {config[CONF_PLATFORM]}"
)
return cast(TriggersPlatformModule, TRIGGERS[platform_split[1]])
async def async_validate_trigger_config(
hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
platform = _get_trigger_platform(config)
return platform.TRIGGER_SCHEMA(config)
async def async_attach_trigger(
hass: HomeAssistant,
config: ConfigType,
action: AutomationActionType,
automation_info: AutomationTriggerInfo,
) -> CALLBACK_TYPE:
"""Attach trigger of specified platform."""
platform = _get_trigger_platform(config)
assert hasattr(platform, "async_attach_trigger")
return cast(
CALLBACK_TYPE,
await getattr(platform, "async_attach_trigger")(
hass, config, action, automation_info
),
)

View file

@ -0,0 +1,12 @@
"""webOS Smart TV triggers."""
from __future__ import annotations
from typing import Protocol
import voluptuous as vol
class TriggersPlatformModule(Protocol):
"""Protocol type for the triggers platform."""
TRIGGER_SCHEMA: vol.Schema

View file

@ -0,0 +1,88 @@
"""webOS Smart TV device turn on trigger."""
from __future__ import annotations
import voluptuous as vol
from homeassistant.components.automation import (
AutomationActionType,
AutomationTriggerInfo,
)
from homeassistant.const import ATTR_DEVICE_ID, ATTR_ENTITY_ID, CONF_PLATFORM
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.typing import ConfigType
from ..const import DOMAIN
from ..helpers import (
async_get_client_wrapper_by_device_entry,
async_get_device_entry_by_device_id,
async_get_device_id_from_entity_id,
)
# Platform type should be <DOMAIN>.<SUBMODULE_NAME>
PLATFORM_TYPE = f"{DOMAIN}.{__name__.rsplit('.', maxsplit=1)[-1]}"
TRIGGER_TYPE_TURN_ON = "turn_on"
TRIGGER_SCHEMA = vol.All(
cv.TRIGGER_BASE_SCHEMA.extend(
{
vol.Required(CONF_PLATFORM): PLATFORM_TYPE,
vol.Optional(ATTR_DEVICE_ID): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(ATTR_ENTITY_ID): cv.entity_ids,
},
),
cv.has_at_least_one_key(ATTR_ENTITY_ID, ATTR_DEVICE_ID),
)
async def async_attach_trigger(
hass: HomeAssistant,
config: ConfigType,
action: AutomationActionType,
automation_info: AutomationTriggerInfo,
*,
platform_type: str = PLATFORM_TYPE,
) -> CALLBACK_TYPE | None:
"""Attach a trigger."""
device_ids = set()
if ATTR_DEVICE_ID in config:
device_ids.update(config.get(ATTR_DEVICE_ID, []))
if ATTR_ENTITY_ID in config:
device_ids.update(
{
async_get_device_id_from_entity_id(hass, entity_id)
for entity_id in config.get(ATTR_ENTITY_ID, [])
}
)
trigger_data = automation_info["trigger_data"]
unsubs = []
for device_id in device_ids:
device = async_get_device_entry_by_device_id(hass, device_id)
device_name = device.name_by_user or device.name
variables = {
**trigger_data,
CONF_PLATFORM: platform_type,
ATTR_DEVICE_ID: device_id,
"description": f"webostv turn on trigger for {device_name}",
}
client_wrapper = async_get_client_wrapper_by_device_entry(hass, device)
unsubs.append(
client_wrapper.turn_on.async_attach(action, {"trigger": variables})
)
@callback
def async_remove() -> None:
"""Remove state listeners async."""
for unsub in unsubs:
unsub()
unsubs.clear()
return async_remove

View file

@ -352,6 +352,7 @@ FLOWS = [
"wallbox",
"watttime",
"waze_travel_time",
"webostv",
"wemo",
"whirlpool",
"whois",

View file

@ -246,6 +246,11 @@ SSDP = {
"st": "urn:schemas-upnp-org:device:InternetGatewayDevice:2"
}
],
"webostv": [
{
"st": "urn:lge-com:service:webos-second-screen:1"
}
],
"wemo": [
{
"manufacturer": "Belkin International Inc."

View file

@ -244,9 +244,6 @@ aiopvapi==1.6.19
# homeassistant.components.pvpc_hourly_pricing
aiopvpc==3.0.0
# homeassistant.components.webostv
aiopylgtv==0.4.0
# homeassistant.components.recollect_waste
aiorecollect==1.0.8
@ -280,6 +277,9 @@ aiovlc==0.1.0
# homeassistant.components.watttime
aiowatttime==0.1.1
# homeassistant.components.webostv
aiowebostv==0.1.1
# homeassistant.components.yandex_transport
aioymaps==1.2.2
@ -2259,6 +2259,7 @@ spotipy==2.19.0
# homeassistant.components.recorder
# homeassistant.components.sql
# homeassistant.components.webostv
sqlalchemy==1.4.27
# homeassistant.components.srp_energy

View file

@ -176,9 +176,6 @@ aiopvapi==1.6.19
# homeassistant.components.pvpc_hourly_pricing
aiopvpc==3.0.0
# homeassistant.components.webostv
aiopylgtv==0.4.0
# homeassistant.components.recollect_waste
aiorecollect==1.0.8
@ -212,6 +209,9 @@ aiovlc==0.1.0
# homeassistant.components.watttime
aiowatttime==0.1.1
# homeassistant.components.webostv
aiowebostv==0.1.1
# homeassistant.components.yandex_transport
aioymaps==1.2.2
@ -1377,6 +1377,7 @@ spotipy==2.19.0
# homeassistant.components.recorder
# homeassistant.components.sql
# homeassistant.components.webostv
sqlalchemy==1.4.27
# homeassistant.components.srp_energy

View file

@ -1 +1,40 @@
"""Tests for the WebOS TV integration."""
from unittest.mock import patch
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
from homeassistant.components.webostv.const import DOMAIN
from homeassistant.const import CONF_CLIENT_SECRET, CONF_HOST
from homeassistant.setup import async_setup_component
from tests.common import MockConfigEntry
TV_NAME = "fake"
ENTITY_ID = f"{MP_DOMAIN}.{TV_NAME}"
MOCK_CLIENT_KEYS = {"1.2.3.4": "some-secret"}
async def setup_webostv(hass, unique_id=None):
"""Initialize webostv and media_player for tests."""
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: "1.2.3.4",
CONF_CLIENT_SECRET: "0123456789",
},
title=TV_NAME,
unique_id=unique_id,
)
entry.add_to_hass(hass)
with patch(
"homeassistant.components.webostv.read_client_keys",
return_value=MOCK_CLIENT_KEYS,
):
await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {CONF_HOST: "1.2.3.4"}},
)
await hass.async_block_till_done()
return entry

View file

@ -0,0 +1,29 @@
"""Common fixtures and objects for the LG webOS integration tests."""
from unittest.mock import patch
import pytest
from tests.common import async_mock_service
@pytest.fixture
def calls(hass):
"""Track calls to a mock service."""
return async_mock_service(hass, "test", "automation")
@pytest.fixture(name="client")
def client_fixture():
"""Patch of client library for tests."""
with patch(
"homeassistant.components.webostv.WebOsClient", autospec=True
) as mock_client_class:
client = mock_client_class.return_value
client.hello_info = {"deviceUUID": "some-fake-uuid"}
client.software_info = {"device_id": "00:01:02:03:04:05"}
client.system_info = {"modelName": "TVFAKE"}
client.client_key = "0123456789"
client.apps = {0: {"title": "Applicaiton01"}}
client.inputs = {0: {"label": "Input01"}, 1: {"label": "Input02"}}
yield client

View file

@ -0,0 +1,305 @@
"""Test the WebOS Tv config flow."""
from unittest.mock import Mock, patch
from aiowebostv import WebOsTvPairError
from homeassistant import config_entries
from homeassistant.components import ssdp
from homeassistant.components.webostv.const import CONF_SOURCES, DOMAIN
from homeassistant.config_entries import SOURCE_SSDP
from homeassistant.const import (
CONF_CLIENT_SECRET,
CONF_HOST,
CONF_ICON,
CONF_NAME,
CONF_SOURCE,
CONF_UNIQUE_ID,
)
from homeassistant.data_entry_flow import (
RESULT_TYPE_ABORT,
RESULT_TYPE_CREATE_ENTRY,
RESULT_TYPE_FORM,
)
from . import setup_webostv
MOCK_YAML_CONFIG = {
CONF_HOST: "1.2.3.4",
CONF_NAME: "fake",
CONF_ICON: "mdi:test",
CONF_CLIENT_SECRET: "some-secret",
CONF_UNIQUE_ID: "fake-uuid",
}
MOCK_DISCOVERY_INFO = {
ssdp.ATTR_SSDP_LOCATION: "http://1.2.3.4",
ssdp.ATTR_UPNP_FRIENDLY_NAME: "LG Webostv",
ssdp.ATTR_UPNP_UDN: "uuid:some-fake-uuid",
}
async def test_import(hass, client):
"""Test we can import yaml config."""
assert client
with patch("homeassistant.components.webostv.async_setup_entry", return_value=True):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_IMPORT},
data=MOCK_YAML_CONFIG,
)
assert result["type"] == RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "fake"
assert result["data"][CONF_HOST] == MOCK_YAML_CONFIG[CONF_HOST]
assert result["data"][CONF_CLIENT_SECRET] == MOCK_YAML_CONFIG[CONF_CLIENT_SECRET]
assert result["result"].unique_id == MOCK_YAML_CONFIG[CONF_UNIQUE_ID]
with patch("homeassistant.components.webostv.async_setup_entry", return_value=True):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_IMPORT},
data=MOCK_YAML_CONFIG,
)
assert result["type"] == RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
async def test_form(hass, client):
"""Test we get the form."""
assert client
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_USER},
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_USER},
data=MOCK_YAML_CONFIG,
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "pairing"
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_USER},
data=MOCK_YAML_CONFIG,
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "pairing"
with patch("homeassistant.components.webostv.async_setup_entry", return_value=True):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={}
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "fake"
async def test_options_flow(hass, client):
"""Test options config flow."""
entry = await setup_webostv(hass)
hass.states.async_set("script.test", "off", {"domain": "script"})
result = await hass.config_entries.options.async_init(entry.entry_id)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "init"
result2 = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={CONF_SOURCES: ["Input01", "Input02"]},
)
await hass.async_block_till_done()
assert result2["type"] == RESULT_TYPE_CREATE_ENTRY
assert result2["data"][CONF_SOURCES] == ["Input01", "Input02"]
client.connect = Mock(side_effect=ConnectionRefusedError())
result3 = await hass.config_entries.options.async_init(entry.entry_id)
await hass.async_block_till_done()
assert result3["type"] == RESULT_TYPE_FORM
assert result3["errors"] == {"base": "cannot_retrieve"}
async def test_form_cannot_connect(hass, client):
"""Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_USER},
data=MOCK_YAML_CONFIG,
)
client.connect = Mock(side_effect=ConnectionRefusedError())
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={}
)
await hass.async_block_till_done()
assert result2["type"] == RESULT_TYPE_FORM
assert result2["errors"] == {"base": "cannot_connect"}
async def test_form_pairexception(hass, client):
"""Test pairing exception."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_USER},
data=MOCK_YAML_CONFIG,
)
client.connect = Mock(side_effect=WebOsTvPairError("error"))
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={}
)
await hass.async_block_till_done()
assert result2["type"] == RESULT_TYPE_ABORT
assert result2["reason"] == "error_pairing"
async def test_entry_already_configured(hass, client):
"""Test entry already configured."""
await setup_webostv(hass)
assert client
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_USER},
data=MOCK_YAML_CONFIG,
)
assert result["type"] == RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
async def test_form_ssdp(hass, client):
"""Test that the ssdp confirmation form is served."""
assert client
with patch("homeassistant.components.webostv.async_setup_entry", return_value=True):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={CONF_SOURCE: SOURCE_SSDP}, data=MOCK_DISCOVERY_INFO
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "pairing"
async def test_ssdp_in_progress(hass, client):
"""Test abort if ssdp paring is already in progress."""
assert client
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_USER},
data=MOCK_YAML_CONFIG,
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "pairing"
result2 = await hass.config_entries.flow.async_init(
DOMAIN, context={CONF_SOURCE: SOURCE_SSDP}, data=MOCK_DISCOVERY_INFO
)
await hass.async_block_till_done()
assert result2["type"] == RESULT_TYPE_ABORT
assert result2["reason"] == "already_in_progress"
async def test_ssdp_update_uuid(hass, client):
"""Test that ssdp updates existing host entry uuid."""
entry = await setup_webostv(hass)
assert client
assert entry.unique_id is None
result = await hass.config_entries.flow.async_init(
DOMAIN, context={CONF_SOURCE: SOURCE_SSDP}, data=MOCK_DISCOVERY_INFO
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
assert entry.unique_id == MOCK_DISCOVERY_INFO[ssdp.ATTR_UPNP_UDN][5:]
async def test_ssdp_not_update_uuid(hass, client):
"""Test that ssdp not updates different host."""
entry = await setup_webostv(hass)
assert client
assert entry.unique_id is None
discovery_info = MOCK_DISCOVERY_INFO.copy()
discovery_info.update({ssdp.ATTR_SSDP_LOCATION: "http://1.2.3.5"})
result2 = await hass.config_entries.flow.async_init(
DOMAIN, context={CONF_SOURCE: SOURCE_SSDP}, data=discovery_info
)
await hass.async_block_till_done()
assert result2["type"] == RESULT_TYPE_FORM
assert result2["step_id"] == "pairing"
assert entry.unique_id is None
async def test_form_abort_uuid_configured(hass, client):
"""Test abort if uuid is already configured, verify host update."""
entry = await setup_webostv(hass, MOCK_DISCOVERY_INFO[ssdp.ATTR_UPNP_UDN][5:])
assert client
assert entry.unique_id == MOCK_DISCOVERY_INFO[ssdp.ATTR_UPNP_UDN][5:]
assert entry.data[CONF_HOST] == "1.2.3.4"
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_USER},
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "user"
user_config = {
CONF_HOST: "new_host",
CONF_NAME: "fake",
}
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_USER},
data=user_config,
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_FORM
assert result["step_id"] == "pairing"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={}
)
await hass.async_block_till_done()
assert result["type"] == RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
assert entry.data[CONF_HOST] == "new_host"

View file

@ -0,0 +1,174 @@
"""The tests for WebOS TV device triggers."""
import pytest
from homeassistant.components import automation
from homeassistant.components.device_automation.exceptions import (
InvalidDeviceAutomationConfig,
)
from homeassistant.components.webostv import DOMAIN, device_trigger
from homeassistant.config_entries import ConfigEntryState
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.device_registry import async_get as get_dev_reg
from homeassistant.setup import async_setup_component
from . import ENTITY_ID, setup_webostv
from tests.common import MockConfigEntry, async_get_device_automations
async def test_get_triggers(hass, client):
"""Test we get the expected triggers."""
await setup_webostv(hass, "fake-uuid")
device_reg = get_dev_reg(hass)
device = device_reg.async_get_device(identifiers={(DOMAIN, "fake-uuid")})
turn_on_trigger = {
"platform": "device",
"domain": DOMAIN,
"type": "webostv.turn_on",
"device_id": device.id,
}
triggers = await async_get_device_automations(hass, "trigger", device.id)
assert turn_on_trigger in triggers
async def test_if_fires_on_turn_on_request(hass, calls, client):
"""Test for turn_on and turn_off triggers firing."""
await setup_webostv(hass, "fake-uuid")
device_reg = get_dev_reg(hass)
device = device_reg.async_get_device(identifiers={(DOMAIN, "fake-uuid")})
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"trigger": {
"platform": "device",
"domain": DOMAIN,
"device_id": device.id,
"type": "webostv.turn_on",
},
"action": {
"service": "test.automation",
"data_template": {
"some": "{{ trigger.device_id }}",
"id": "{{ trigger.id }}",
},
},
},
{
"trigger": {
"platform": "webostv.turn_on",
"entity_id": ENTITY_ID,
},
"action": {
"service": "test.automation",
"data_template": {
"some": ENTITY_ID,
"id": "{{ trigger.id }}",
},
},
},
],
},
)
await hass.services.async_call(
"media_player",
"turn_on",
{"entity_id": ENTITY_ID},
blocking=True,
)
await hass.async_block_till_done()
assert len(calls) == 2
assert calls[0].data["some"] == device.id
assert calls[0].data["id"] == 0
assert calls[1].data["some"] == ENTITY_ID
assert calls[1].data["id"] == 0
async def test_get_triggers_for_invalid_device_id(hass, caplog):
"""Test error raised for invalid shelly device_id."""
await async_setup_component(hass, "persistent_notification", {})
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"trigger": {
"platform": "device",
"domain": DOMAIN,
"device_id": "invalid_device_id",
"type": "webostv.turn_on",
},
"action": {
"service": "test.automation",
"data_template": {
"some": "{{ trigger.invalid_device }}",
"id": "{{ trigger.id }}",
},
},
}
]
},
)
await hass.async_block_till_done()
assert (
"Invalid config for [automation]: Device invalid_device_id is not a valid webostv device"
in caplog.text
)
async def test_failure_scenarios(hass, client):
"""Test failure scenarios."""
await setup_webostv(hass, "fake-uuid")
# Test wrong trigger platform type
with pytest.raises(HomeAssistantError):
await device_trigger.async_attach_trigger(
hass, {"type": "wrong.type", "device_id": "invalid_device_id"}, None, {}
)
# Test invalid device id
with pytest.raises(InvalidDeviceAutomationConfig):
await device_trigger.async_validate_trigger_config(
hass,
{
"platform": "device",
"domain": DOMAIN,
"type": "webostv.turn_on",
"device_id": "invalid_device_id",
},
)
entry = MockConfigEntry(domain="fake", state=ConfigEntryState.LOADED, data={})
entry.add_to_hass(hass)
device_reg = get_dev_reg(hass)
device = device_reg.async_get_or_create(
config_entry_id=entry.entry_id, identifiers={("fake", "fake")}
)
config = {
"platform": "device",
"domain": DOMAIN,
"device_id": device.id,
"type": "webostv.turn_on",
}
# Test that device id from non webostv domain raises exception
with pytest.raises(InvalidDeviceAutomationConfig):
await device_trigger.async_validate_trigger_config(hass, config)
# Test no exception if device is not loaded
await hass.config_entries.async_unload(entry.entry_id)
assert await device_trigger.async_validate_trigger_config(hass, config) == config

View file

@ -1,12 +1,5 @@
"""The tests for the LG webOS media player platform."""
import json
import os
from unittest.mock import patch
import pytest
from sqlitedict import SqliteDict
from homeassistant.components import media_player
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
from homeassistant.components.media_player.const import (
ATTR_INPUT_SOURCE,
ATTR_MEDIA_VOLUME_MUTED,
@ -18,61 +11,22 @@ from homeassistant.components.webostv.const import (
DOMAIN,
SERVICE_BUTTON,
SERVICE_COMMAND,
WEBOSTV_CONFIG_FILE,
)
from homeassistant.const import (
ATTR_COMMAND,
ATTR_ENTITY_ID,
CONF_HOST,
CONF_NAME,
SERVICE_VOLUME_MUTE,
)
from homeassistant.setup import async_setup_component
from homeassistant.const import ATTR_COMMAND, ATTR_ENTITY_ID, SERVICE_VOLUME_MUTE
NAME = "fake"
ENTITY_ID = f"{media_player.DOMAIN}.{NAME}"
@pytest.fixture(name="client")
def client_fixture():
"""Patch of client library for tests."""
with patch(
"homeassistant.components.webostv.WebOsClient", autospec=True
) as mock_client_class:
mock_client_class.create.return_value = mock_client_class.return_value
client = mock_client_class.return_value
client.software_info = {"device_id": "a1:b1:c1:d1:e1:f1"}
client.client_key = "0123456789"
yield client
async def setup_webostv(hass):
"""Initialize webostv and media_player for tests."""
assert await async_setup_component(
hass,
DOMAIN,
{DOMAIN: {CONF_HOST: "fake", CONF_NAME: NAME}},
)
await hass.async_block_till_done()
@pytest.fixture
def cleanup_config(hass):
"""Test cleanup, remove the config file."""
yield
os.remove(hass.config.path(WEBOSTV_CONFIG_FILE))
from . import ENTITY_ID, setup_webostv
async def test_mute(hass, client):
"""Test simple service call."""
await setup_webostv(hass)
data = {
ATTR_ENTITY_ID: ENTITY_ID,
ATTR_MEDIA_VOLUME_MUTED: True,
}
await hass.services.async_call(media_player.DOMAIN, SERVICE_VOLUME_MUTE, data)
assert await hass.services.async_call(MP_DOMAIN, SERVICE_VOLUME_MUTE, data, True)
await hass.async_block_till_done()
client.set_mute.assert_called_once()
@ -80,14 +34,13 @@ async def test_mute(hass, client):
async def test_select_source_with_empty_source_list(hass, client):
"""Ensure we don't call client methods when we don't have sources."""
await setup_webostv(hass)
data = {
ATTR_ENTITY_ID: ENTITY_ID,
ATTR_INPUT_SOURCE: "nonexistent",
}
await hass.services.async_call(media_player.DOMAIN, SERVICE_SELECT_SOURCE, data)
await hass.services.async_call(MP_DOMAIN, SERVICE_SELECT_SOURCE, data)
await hass.async_block_till_done()
client.launch_app.assert_not_called()
@ -96,7 +49,6 @@ async def test_select_source_with_empty_source_list(hass, client):
async def test_button(hass, client):
"""Test generic button functionality."""
await setup_webostv(hass)
data = {
@ -139,37 +91,3 @@ async def test_command_with_optional_arg(hass, client):
client.request.assert_called_with(
"test", payload={"target": "https://www.google.com"}
)
async def test_migrate_keyfile_to_sqlite(hass, client, cleanup_config):
"""Test migration from JSON key-file to Sqlite based one."""
key = "3d5b1aeeb98e"
# Create config file with JSON content
config_file = hass.config.path(WEBOSTV_CONFIG_FILE)
with open(config_file, "w+") as file:
json.dump({"host": key}, file)
# Run the component setup
await setup_webostv(hass)
# Assert that the config file is a Sqlite database which contains the key
with SqliteDict(config_file) as conf:
assert conf.get("host") == key
async def test_dont_migrate_sqlite_keyfile(hass, client, cleanup_config):
"""Test that migration is not performed and setup still succeeds when config file is already an Sqlite DB."""
key = "3d5b1aeeb98e"
# Create config file with Sqlite DB
config_file = hass.config.path(WEBOSTV_CONFIG_FILE)
with SqliteDict(config_file) as conf:
conf["host"] = key
conf.commit()
# Run the component setup
await setup_webostv(hass)
# Assert that the config file is still an Sqlite database and setup didn't fail
with SqliteDict(config_file) as conf:
assert conf.get("host") == key

View file

@ -0,0 +1,120 @@
"""The tests for WebOS TV automation triggers."""
from homeassistant.components import automation
from homeassistant.components.webostv import DOMAIN
from homeassistant.helpers.device_registry import async_get as get_dev_reg
from homeassistant.setup import async_setup_component
from . import ENTITY_ID, setup_webostv
async def test_webostv_turn_on_trigger_device_id(hass, calls, client):
"""Test for turn_on triggers by device_id firing."""
await setup_webostv(hass, "fake-uuid")
device_reg = get_dev_reg(hass)
device = device_reg.async_get_device(identifiers={(DOMAIN, "fake-uuid")})
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"trigger": {
"platform": "webostv.turn_on",
"device_id": device.id,
},
"action": {
"service": "test.automation",
"data_template": {
"some": device.id,
"id": "{{ trigger.id }}",
},
},
},
],
},
)
await hass.services.async_call(
"media_player",
"turn_on",
{"entity_id": ENTITY_ID},
blocking=True,
)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].data["some"] == device.id
assert calls[0].data["id"] == 0
async def test_webostv_turn_on_trigger_entity_id(hass, calls, client):
"""Test for turn_on triggers by entity_id firing."""
await setup_webostv(hass, "fake-uuid")
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"trigger": {
"platform": "webostv.turn_on",
"entity_id": ENTITY_ID,
},
"action": {
"service": "test.automation",
"data_template": {
"some": ENTITY_ID,
"id": "{{ trigger.id }}",
},
},
},
],
},
)
await hass.services.async_call(
"media_player",
"turn_on",
{"entity_id": ENTITY_ID},
blocking=True,
)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].data["some"] == ENTITY_ID
assert calls[0].data["id"] == 0
async def test_wrong_trigger_platform_type(hass, caplog, client):
"""Test wrong trigger platform type."""
await setup_webostv(hass, "fake-uuid")
await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"trigger": {
"platform": "webostv.wrong_type",
"entity_id": ENTITY_ID,
},
"action": {
"service": "test.automation",
"data_template": {
"some": ENTITY_ID,
"id": "{{ trigger.id }}",
},
},
},
],
},
)
assert (
"ValueError: Unknown webOS Smart TV trigger platform webostv.wrong_type"
in caplog.text
)