Bump to Arcam 1.0.1 and make strictly typed (#82487)

* Make arcam_fmj strictly typed

* Add test for invalid UDN
This commit is contained in:
Joakim Plate 2022-11-23 16:23:25 +01:00 committed by GitHub
parent 32d68f375b
commit a55fb445b0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 63 additions and 33 deletions

View file

@ -2,6 +2,7 @@
import asyncio
from contextlib import suppress
import logging
from typing import Any
from arcam.fmj import ConnectionFailed
from arcam.fmj.client import Client
@ -31,7 +32,7 @@ CONFIG_SCHEMA = cv.removed(DOMAIN, raise_if_present=False)
PLATFORMS = [Platform.MEDIA_PLAYER]
async def _await_cancel(task):
async def _await_cancel(task: asyncio.Task) -> None:
task.cancel()
with suppress(asyncio.CancelledError):
await task
@ -42,7 +43,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
hass.data[DOMAIN_DATA_ENTRIES] = {}
hass.data[DOMAIN_DATA_TASKS] = {}
async def _stop(_):
async def _stop(_: Any) -> None:
asyncio.gather(
*(_await_cancel(task) for task in hass.data[DOMAIN_DATA_TASKS].values())
)
@ -80,8 +81,8 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return unload_ok
async def _run_client(hass, client, interval):
def _listen(_):
async def _run_client(hass: HomeAssistant, client: Client, interval: float) -> None:
def _listen(_: Any) -> None:
async_dispatcher_send(hass, SIGNAL_CLIENT_DATA, client.host)
while True:

View file

@ -1,4 +1,7 @@
"""Config flow to configure the Arcam FMJ component."""
from __future__ import annotations
from typing import Any
from urllib.parse import urlparse
from arcam.fmj.client import Client, ConnectionFailed
@ -8,15 +11,17 @@ import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import ssdp
from homeassistant.const import CONF_HOST, CONF_PORT
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DEFAULT_NAME, DEFAULT_PORT, DOMAIN, DOMAIN_DATA_ENTRIES
def get_entry_client(hass, entry):
def get_entry_client(hass: HomeAssistant, entry: config_entries.ConfigEntry) -> Client:
"""Retrieve client associated with a config entry."""
return hass.data[DOMAIN_DATA_ENTRIES][entry.entry_id]
client: Client = hass.data[DOMAIN_DATA_ENTRIES][entry.entry_id]
return client
class ArcamFmjFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
@ -24,11 +29,13 @@ class ArcamFmjFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1
async def _async_set_unique_id_and_update(self, host, port, uuid):
async def _async_set_unique_id_and_update(
self, host: str, port: int, uuid: str
) -> None:
await self.async_set_unique_id(uuid)
self._abort_if_unique_id_configured({CONF_HOST: host, CONF_PORT: port})
async def _async_check_and_create(self, host, port):
async def _async_check_and_create(self, host: str, port: int) -> FlowResult:
client = Client(host, port)
try:
await client.start()
@ -42,9 +49,11 @@ class ArcamFmjFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
data={CONF_HOST: host, CONF_PORT: port},
)
async def async_step_user(self, user_input=None):
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle a discovered device."""
errors = {}
errors: dict[str, str] = {}
if user_input is not None:
uuid = await get_uniqueid_from_host(
@ -68,7 +77,9 @@ class ArcamFmjFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
step_id="user", data_schema=vol.Schema(fields), errors=errors
)
async def async_step_confirm(self, user_input=None):
async def async_step_confirm(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle user-confirmation of discovered node."""
context = self.context
placeholders = {
@ -87,9 +98,11 @@ class ArcamFmjFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo) -> FlowResult:
"""Handle a discovered device."""
host = urlparse(discovery_info.ssdp_location).hostname
host = str(urlparse(discovery_info.ssdp_location).hostname)
port = DEFAULT_PORT
uuid = get_uniqueid_from_udn(discovery_info.upnp[ssdp.ATTR_UPNP_UDN])
if not uuid:
return self.async_abort(reason="cannot_connect")
await self._async_set_unique_id_and_update(host, port, uuid)

View file

@ -65,7 +65,7 @@ async def async_attach_trigger(
entity_id = config[CONF_ENTITY_ID]
@callback
def _handle_event(event: Event):
def _handle_event(event: Event) -> None:
if event.data[ATTR_ENTITY_ID] == entity_id:
hass.async_run_hass_job(
job,

View file

@ -3,7 +3,7 @@
"name": "Arcam FMJ Receivers",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/arcam_fmj",
"requirements": ["arcam-fmj==0.12.0"],
"requirements": ["arcam-fmj==1.0.1"],
"ssdp": [
{
"deviceType": "urn:schemas-upnp-org:device:MediaRenderer:1",

View file

@ -64,10 +64,10 @@ class ArcamFmj(MediaPlayerEntity):
def __init__(
self,
device_name,
device_name: str,
state: State,
uuid: str,
):
) -> None:
"""Initialize device."""
self._state = state
self._device_name = device_name
@ -96,12 +96,12 @@ class ArcamFmj(MediaPlayerEntity):
return MediaPlayerState.OFF
@property
def device_info(self):
def device_info(self) -> DeviceInfo:
"""Return a device description for device registry."""
return DeviceInfo(
identifiers={
(DOMAIN, self._uuid),
(DOMAIN, self._state.client.host, self._state.client.port),
(DOMAIN, self._state.client.host, self._state.client.port), # type: ignore[arg-type]
},
manufacturer="Arcam",
model="Arcam FMJ AVR",
@ -114,17 +114,17 @@ class ArcamFmj(MediaPlayerEntity):
await self._state.update()
@callback
def _data(host):
def _data(host: str) -> None:
if host == self._state.client.host:
self.async_write_ha_state()
@callback
def _started(host):
def _started(host: str) -> None:
if host == self._state.client.host:
self.async_schedule_update_ha_state(force_refresh=True)
@callback
def _stopped(host):
def _stopped(host: str) -> None:
if host == self._state.client.host:
self.async_schedule_update_ha_state(force_refresh=True)
@ -249,40 +249,40 @@ class ArcamFmj(MediaPlayerEntity):
return
@property
def source(self):
def source(self) -> str | None:
"""Return the current input source."""
if (value := self._state.get_source()) is None:
return None
return value.name
@property
def source_list(self):
def source_list(self) -> list[str]:
"""List of available input sources."""
return [x.name for x in self._state.get_source_list()]
@property
def sound_mode(self):
def sound_mode(self) -> str | None:
"""Name of the current sound mode."""
if (value := self._state.get_decode_mode()) is None:
return None
return value.name
@property
def sound_mode_list(self):
def sound_mode_list(self) -> list[str] | None:
"""List of available sound modes."""
if (values := self._state.get_decode_modes()) is None:
return None
return [x.name for x in values]
@property
def is_volume_muted(self):
def is_volume_muted(self) -> bool | None:
"""Boolean if volume is currently muted."""
if (value := self._state.get_mute()) is None:
return None
return value
@property
def volume_level(self):
def volume_level(self) -> float | None:
"""Volume level of device."""
if (value := self._state.get_volume()) is None:
return None
@ -301,7 +301,7 @@ class ArcamFmj(MediaPlayerEntity):
return value
@property
def media_content_id(self):
def media_content_id(self) -> str | None:
"""Content type of current playing media."""
source = self._state.get_source()
if source in (SourceCodes.DAB, SourceCodes.FM):
@ -315,7 +315,7 @@ class ArcamFmj(MediaPlayerEntity):
return value
@property
def media_channel(self):
def media_channel(self) -> str | None:
"""Channel currently playing."""
source = self._state.get_source()
if source == SourceCodes.DAB:
@ -327,7 +327,7 @@ class ArcamFmj(MediaPlayerEntity):
return value
@property
def media_artist(self):
def media_artist(self) -> str | None:
"""Artist of current playing media, music track only."""
if self._state.get_source() == SourceCodes.DAB:
value = self._state.get_dls_pdt()
@ -336,7 +336,7 @@ class ArcamFmj(MediaPlayerEntity):
return value
@property
def media_title(self):
def media_title(self) -> str | None:
"""Title of current playing media."""
if (source := self._state.get_source()) is None:
return None

View file

@ -345,7 +345,7 @@ aqualogic==2.6
aranet4==2.1.3
# homeassistant.components.arcam_fmj
arcam-fmj==0.12.0
arcam-fmj==1.0.1
# homeassistant.components.arris_tg2492lg
arris-tg2492lg==1.2.1

View file

@ -308,7 +308,7 @@ aprslib==0.7.0
aranet4==2.1.3
# homeassistant.components.arcam_fmj
arcam-fmj==0.12.0
arcam-fmj==1.0.1
# homeassistant.components.dlna_dmr
# homeassistant.components.dlna_dms

View file

@ -1,5 +1,6 @@
"""Tests for the Arcam FMJ config flow module."""
from dataclasses import replace
from unittest.mock import AsyncMock, patch
from arcam.fmj.client import ConnectionFailed
@ -107,6 +108,21 @@ async def test_ssdp_unable_to_connect(hass, dummy_client):
assert result["reason"] == "cannot_connect"
async def test_ssdp_invalid_id(hass, dummy_client):
"""Test a ssdp with invalid UDN."""
discover = replace(
MOCK_DISCOVER, upnp=MOCK_DISCOVER.upnp | {ssdp.ATTR_UPNP_UDN: "invalid"}
)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={CONF_SOURCE: SOURCE_SSDP},
data=discover,
)
assert result["type"] == data_entry_flow.FlowResultType.ABORT
assert result["reason"] == "cannot_connect"
async def test_ssdp_update(hass):
"""Test a ssdp import flow."""
entry = MockConfigEntry(