Wait for Sonos regrouping in service calls (#22006)
This commit is contained in:
parent
fe5e4b5b9b
commit
de2c7a9567
1 changed files with 66 additions and 25 deletions
|
@ -6,6 +6,7 @@ import socket
|
|||
import asyncio
|
||||
import urllib
|
||||
|
||||
import async_timeout
|
||||
import requests
|
||||
import voluptuous as vol
|
||||
|
||||
|
@ -115,7 +116,7 @@ class SonosData:
|
|||
"""Initialize the data."""
|
||||
self.uids = set()
|
||||
self.entities = []
|
||||
self.topology_lock = asyncio.Lock(loop=hass.loop)
|
||||
self.topology_condition = asyncio.Condition(loop=hass.loop)
|
||||
|
||||
|
||||
def setup_platform(hass, config, add_entities, discovery_info=None):
|
||||
|
@ -364,7 +365,6 @@ class SonosEntity(MediaPlayerDevice):
|
|||
self._favorites = None
|
||||
self._soco_snapshot = None
|
||||
self._snapshot_group = None
|
||||
self._restore_pending = False
|
||||
|
||||
self._set_basic_information()
|
||||
|
||||
|
@ -752,15 +752,14 @@ class SonosEntity(MediaPlayerDevice):
|
|||
|
||||
async def _async_handle_group_event(event):
|
||||
"""Get async lock and handle event."""
|
||||
async with self.hass.data[DATA_SONOS].topology_lock:
|
||||
async with self.hass.data[DATA_SONOS].topology_condition:
|
||||
group = await _async_extract_group(event)
|
||||
|
||||
if self.unique_id == group[0]:
|
||||
if self._restore_pending:
|
||||
await self.hass.async_add_executor_job(self.restore)
|
||||
|
||||
_async_regroup(group)
|
||||
|
||||
self.hass.data[DATA_SONOS].topology_condition.notify_all()
|
||||
|
||||
if event:
|
||||
self._receives_events = True
|
||||
|
||||
|
@ -988,18 +987,26 @@ class SonosEntity(MediaPlayerDevice):
|
|||
"""Form a group with other players."""
|
||||
if self._coordinator:
|
||||
self.unjoin()
|
||||
group = [self]
|
||||
else:
|
||||
group = self._sonos_group.copy()
|
||||
|
||||
for slave in slaves:
|
||||
if slave.unique_id != self.unique_id:
|
||||
slave.soco.join(self.soco)
|
||||
# pylint: disable=protected-access
|
||||
slave._coordinator = self
|
||||
if slave not in group:
|
||||
group.append(slave)
|
||||
|
||||
return group
|
||||
|
||||
@staticmethod
|
||||
async def join_multi(hass, master, entities):
|
||||
"""Form a group with other players."""
|
||||
async with hass.data[DATA_SONOS].topology_lock:
|
||||
await hass.async_add_executor_job(master.join, entities)
|
||||
async with hass.data[DATA_SONOS].topology_condition:
|
||||
group = await hass.async_add_executor_job(master.join, entities)
|
||||
await SonosEntity.wait_for_groups(hass, [group])
|
||||
|
||||
@soco_error()
|
||||
def unjoin(self):
|
||||
|
@ -1019,8 +1026,9 @@ class SonosEntity(MediaPlayerDevice):
|
|||
for entity in slaves + coordinators:
|
||||
entity.unjoin()
|
||||
|
||||
async with hass.data[DATA_SONOS].topology_lock:
|
||||
async with hass.data[DATA_SONOS].topology_condition:
|
||||
await hass.async_add_executor_job(_unjoin_all, entities)
|
||||
await SonosEntity.wait_for_groups(hass, [[e] for e in entities])
|
||||
|
||||
@soco_error()
|
||||
def snapshot(self, with_group):
|
||||
|
@ -1050,7 +1058,7 @@ class SonosEntity(MediaPlayerDevice):
|
|||
for entity in list(entities):
|
||||
entities.update(entity._sonos_group)
|
||||
|
||||
async with hass.data[DATA_SONOS].topology_lock:
|
||||
async with hass.data[DATA_SONOS].topology_condition:
|
||||
await hass.async_add_executor_job(_snapshot_all, entities)
|
||||
|
||||
@soco_error()
|
||||
|
@ -1060,7 +1068,6 @@ class SonosEntity(MediaPlayerDevice):
|
|||
|
||||
try:
|
||||
# pylint: disable=protected-access
|
||||
self.soco._zgs_cache.clear()
|
||||
self._soco_snapshot.restore()
|
||||
except (TypeError, AttributeError, SoCoException) as ex:
|
||||
# Can happen if restoring a coordinator onto a current slave
|
||||
|
@ -1068,20 +1075,20 @@ class SonosEntity(MediaPlayerDevice):
|
|||
|
||||
self._soco_snapshot = None
|
||||
self._snapshot_group = None
|
||||
self._restore_pending = False
|
||||
|
||||
@staticmethod
|
||||
async def restore_multi(hass, entities, with_group):
|
||||
"""Restore snapshots for all the entities."""
|
||||
# pylint: disable=protected-access
|
||||
|
||||
def _restore_all(entities):
|
||||
"""Sync helper."""
|
||||
# Pause all current coordinators
|
||||
def _restore_groups(entities, with_group):
|
||||
"""Pause all current coordinators and restore groups."""
|
||||
for entity in (e for e in entities if e.is_coordinator):
|
||||
if entity.state == STATE_PLAYING:
|
||||
entity.media_pause()
|
||||
|
||||
groups = []
|
||||
|
||||
if with_group:
|
||||
# Unjoin slaves first to prevent inheritance of queues
|
||||
for entity in [e for e in entities if not e.is_coordinator]:
|
||||
|
@ -1092,19 +1099,17 @@ class SonosEntity(MediaPlayerDevice):
|
|||
for entity in (e for e in entities if e._snapshot_group):
|
||||
if entity._snapshot_group[0] == entity:
|
||||
entity.join(entity._snapshot_group)
|
||||
groups.append(entity._snapshot_group.copy())
|
||||
|
||||
# Restore slaves
|
||||
return groups
|
||||
|
||||
def _restore_players(entities):
|
||||
"""Restore state of all players."""
|
||||
for entity in (e for e in entities if not e.is_coordinator):
|
||||
entity.restore()
|
||||
|
||||
# Restore coordinators (or delay if moving from slave)
|
||||
for entity in (e for e in entities if e.is_coordinator):
|
||||
if entity._sonos_group[0] == entity:
|
||||
# Was already coordinator
|
||||
entity.restore()
|
||||
else:
|
||||
# Await coordinator role
|
||||
entity._restore_pending = True
|
||||
entity.restore()
|
||||
|
||||
# Find all affected players
|
||||
entities = set(e for e in entities if e._soco_snapshot)
|
||||
|
@ -1112,8 +1117,44 @@ class SonosEntity(MediaPlayerDevice):
|
|||
for entity in [e for e in entities if e._snapshot_group]:
|
||||
entities.update(entity._snapshot_group)
|
||||
|
||||
async with hass.data[DATA_SONOS].topology_lock:
|
||||
await hass.async_add_executor_job(_restore_all, entities)
|
||||
async with hass.data[DATA_SONOS].topology_condition:
|
||||
groups = await hass.async_add_executor_job(
|
||||
_restore_groups, entities, with_group)
|
||||
|
||||
await SonosEntity.wait_for_groups(hass, groups)
|
||||
|
||||
await hass.async_add_executor_job(_restore_players, entities)
|
||||
|
||||
@staticmethod
|
||||
async def wait_for_groups(hass, groups):
|
||||
"""Wait until all groups are present, or timeout."""
|
||||
# pylint: disable=protected-access
|
||||
|
||||
def _test_groups(groups):
|
||||
"""Return whether all groups exist now."""
|
||||
for group in groups:
|
||||
coordinator = group[0]
|
||||
|
||||
# Test that coordinator is coordinating
|
||||
current_group = coordinator._sonos_group
|
||||
if coordinator != current_group[0]:
|
||||
return False
|
||||
|
||||
# Test that slaves match
|
||||
if set(group[1:]) != set(current_group[1:]):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
try:
|
||||
with async_timeout.timeout(5):
|
||||
while not _test_groups(groups):
|
||||
await hass.data[DATA_SONOS].topology_condition.wait()
|
||||
except asyncio.TimeoutError:
|
||||
_LOGGER.warning("Timeout waiting for target groups %s", groups)
|
||||
|
||||
for entity in hass.data[DATA_SONOS].entities:
|
||||
entity.soco._zgs_cache.clear()
|
||||
|
||||
@soco_error()
|
||||
@soco_coordinator
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue