Support dynamic Google Cast groups (#44484)
* Re-add support for dynamic groups * Add tests * Add support for manufacturer * Refactor support for dynamic groups * Bump pychromecast to 7.7.0 * Bump pychromecast to 7.7.1 * Tweak tests * Apply review suggestion
This commit is contained in:
parent
f18880686c
commit
02bfc68842
7 changed files with 489 additions and 59 deletions
|
@ -1,7 +1,8 @@
|
|||
"""Helpers to deal with Cast devices."""
|
||||
from typing import Optional, Tuple
|
||||
from typing import Optional
|
||||
|
||||
import attr
|
||||
from pychromecast import dial
|
||||
from pychromecast.const import CAST_MANUFACTURERS
|
||||
|
||||
from .const import DEFAULT_PORT
|
||||
|
@ -20,8 +21,10 @@ class ChromecastInfo:
|
|||
uuid: Optional[str] = attr.ib(
|
||||
converter=attr.converters.optional(str), default=None
|
||||
) # always convert UUID to string if not None
|
||||
_manufacturer = attr.ib(type=Optional[str], default=None)
|
||||
model_name: str = attr.ib(default="")
|
||||
friendly_name: Optional[str] = attr.ib(default=None)
|
||||
is_dynamic_group = attr.ib(type=Optional[bool], default=None)
|
||||
|
||||
@property
|
||||
def is_audio_group(self) -> bool:
|
||||
|
@ -29,17 +32,84 @@ class ChromecastInfo:
|
|||
return self.port != DEFAULT_PORT
|
||||
|
||||
@property
|
||||
def host_port(self) -> Tuple[str, int]:
|
||||
"""Return the host+port tuple."""
|
||||
return self.host, self.port
|
||||
def is_information_complete(self) -> bool:
|
||||
"""Return if all information is filled out."""
|
||||
want_dynamic_group = self.is_audio_group
|
||||
have_dynamic_group = self.is_dynamic_group is not None
|
||||
have_all_except_dynamic_group = all(
|
||||
attr.astuple(
|
||||
self,
|
||||
filter=attr.filters.exclude(
|
||||
attr.fields(ChromecastInfo).is_dynamic_group
|
||||
),
|
||||
)
|
||||
)
|
||||
return have_all_except_dynamic_group and (
|
||||
not want_dynamic_group or have_dynamic_group
|
||||
)
|
||||
|
||||
@property
|
||||
def manufacturer(self) -> str:
|
||||
"""Return the manufacturer."""
|
||||
if self._manufacturer:
|
||||
return self._manufacturer
|
||||
if not self.model_name:
|
||||
return None
|
||||
return CAST_MANUFACTURERS.get(self.model_name.lower(), "Google Inc.")
|
||||
|
||||
def fill_out_missing_chromecast_info(self) -> "ChromecastInfo":
|
||||
"""Return a new ChromecastInfo object with missing attributes filled in.
|
||||
|
||||
Uses blocking HTTP / HTTPS.
|
||||
"""
|
||||
if self.is_information_complete:
|
||||
# We have all information, no need to check HTTP API.
|
||||
return self
|
||||
|
||||
# Fill out missing group information via HTTP API.
|
||||
if self.is_audio_group:
|
||||
is_dynamic_group = False
|
||||
http_group_status = None
|
||||
if self.uuid:
|
||||
http_group_status = dial.get_multizone_status(
|
||||
self.host,
|
||||
services=self.services,
|
||||
zconf=ChromeCastZeroconf.get_zeroconf(),
|
||||
)
|
||||
if http_group_status is not None:
|
||||
is_dynamic_group = any(
|
||||
str(g.uuid) == self.uuid
|
||||
for g in http_group_status.dynamic_groups
|
||||
)
|
||||
|
||||
return ChromecastInfo(
|
||||
services=self.services,
|
||||
host=self.host,
|
||||
port=self.port,
|
||||
uuid=self.uuid,
|
||||
friendly_name=self.friendly_name,
|
||||
model_name=self.model_name,
|
||||
is_dynamic_group=is_dynamic_group,
|
||||
)
|
||||
|
||||
# Fill out some missing information (friendly_name, uuid) via HTTP dial.
|
||||
http_device_status = dial.get_device_status(
|
||||
self.host, services=self.services, zconf=ChromeCastZeroconf.get_zeroconf()
|
||||
)
|
||||
if http_device_status is None:
|
||||
# HTTP dial didn't give us any new information.
|
||||
return self
|
||||
|
||||
return ChromecastInfo(
|
||||
services=self.services,
|
||||
host=self.host,
|
||||
port=self.port,
|
||||
uuid=(self.uuid or http_device_status.uuid),
|
||||
friendly_name=(self.friendly_name or http_device_status.friendly_name),
|
||||
manufacturer=(self.manufacturer or http_device_status.manufacturer),
|
||||
model_name=(self.model_name or http_device_status.model_name),
|
||||
)
|
||||
|
||||
|
||||
class ChromeCastZeroconf:
|
||||
"""Class to hold a zeroconf instance."""
|
||||
|
@ -65,19 +135,22 @@ class CastStatusListener:
|
|||
potentially arrive. This class allows invalidating past chromecast objects.
|
||||
"""
|
||||
|
||||
def __init__(self, cast_device, chromecast, mz_mgr):
|
||||
def __init__(self, cast_device, chromecast, mz_mgr, mz_only=False):
|
||||
"""Initialize the status listener."""
|
||||
self._cast_device = cast_device
|
||||
self._uuid = chromecast.uuid
|
||||
self._valid = True
|
||||
self._mz_mgr = mz_mgr
|
||||
|
||||
if cast_device._cast_info.is_audio_group:
|
||||
self._mz_mgr.add_multizone(chromecast)
|
||||
if mz_only:
|
||||
return
|
||||
|
||||
chromecast.register_status_listener(self)
|
||||
chromecast.socket_client.media_controller.register_status_listener(self)
|
||||
chromecast.register_connection_listener(self)
|
||||
if cast_device._cast_info.is_audio_group:
|
||||
self._mz_mgr.add_multizone(chromecast)
|
||||
else:
|
||||
if not cast_device._cast_info.is_audio_group:
|
||||
self._mz_mgr.register_listener(chromecast.uuid, self)
|
||||
|
||||
def new_cast_status(self, cast_status):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue