diff --git a/homeassistant/components/twitch/__init__.py b/homeassistant/components/twitch/__init__.py index 40a744684b9..6979a016447 100644 --- a/homeassistant/components/twitch/__init__.py +++ b/homeassistant/components/twitch/__init__.py @@ -17,7 +17,8 @@ from homeassistant.helpers.config_entry_oauth2_flow import ( async_get_config_entry_implementation, ) -from .const import CLIENT, DOMAIN, OAUTH_SCOPES, PLATFORMS, SESSION +from .const import DOMAIN, OAUTH_SCOPES, PLATFORMS +from .coordinator import TwitchCoordinator async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: @@ -46,10 +47,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: client.auto_refresh_auth = False await client.set_user_authentication(access_token, scope=OAUTH_SCOPES) - hass.data.setdefault(DOMAIN, {})[entry.entry_id] = { - CLIENT: client, - SESSION: session, - } + coordinator = TwitchCoordinator(hass, client, session) + + await coordinator.async_config_entry_first_refresh() + + hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) diff --git a/homeassistant/components/twitch/const.py b/homeassistant/components/twitch/const.py index b46bf8113b4..fc7c2f73487 100644 --- a/homeassistant/components/twitch/const.py +++ b/homeassistant/components/twitch/const.py @@ -17,7 +17,5 @@ CONF_REFRESH_TOKEN = "refresh_token" DOMAIN = "twitch" CONF_CHANNELS = "channels" -CLIENT = "client" -SESSION = "session" OAUTH_SCOPES = [AuthScope.USER_READ_SUBSCRIPTIONS, AuthScope.USER_READ_FOLLOWS] diff --git a/homeassistant/components/twitch/coordinator.py b/homeassistant/components/twitch/coordinator.py new file mode 100644 index 00000000000..5788df7df13 --- /dev/null +++ b/homeassistant/components/twitch/coordinator.py @@ -0,0 +1,116 @@ +"""Define a class to manage fetching Twitch data.""" + +from dataclasses import dataclass +from datetime import datetime, timedelta + +from twitchAPI.helper import first +from twitchAPI.object.api import FollowedChannelsResult, TwitchUser, UserSubscription +from twitchAPI.twitch import Twitch +from twitchAPI.type import TwitchAPIException, TwitchResourceNotFound + +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant +from homeassistant.helpers.config_entry_oauth2_flow import OAuth2Session +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed + +from .const import CONF_CHANNELS, DOMAIN, LOGGER, OAUTH_SCOPES + + +def chunk_list(lst: list, chunk_size: int) -> list[list]: + """Split a list into chunks of chunk_size.""" + return [lst[i : i + chunk_size] for i in range(0, len(lst), chunk_size)] + + +@dataclass +class TwitchUpdate: + """Class for holding Twitch data.""" + + name: str + followers: int + views: int + is_streaming: bool + game: str | None + title: str | None + started_at: datetime | None + stream_picture: str | None + picture: str + subscribed: bool | None + subscription_gifted: bool | None + follows: bool + following_since: datetime | None + + +class TwitchCoordinator(DataUpdateCoordinator[dict[str, TwitchUpdate]]): + """Class to manage fetching Twitch data.""" + + config_entry: ConfigEntry + users: list[TwitchUser] + current_user: TwitchUser + + def __init__( + self, hass: HomeAssistant, twitch: Twitch, session: OAuth2Session + ) -> None: + """Initialize the coordinator.""" + self.twitch = twitch + super().__init__( + hass, + LOGGER, + name=DOMAIN, + update_interval=timedelta(minutes=5), + ) + self.session = session + + async def _async_setup(self) -> None: + channels = self.config_entry.options[CONF_CHANNELS] + self.users = [] + # Split channels into chunks of 100 to avoid hitting the rate limit + for chunk in chunk_list(channels, 100): + self.users.extend( + [channel async for channel in self.twitch.get_users(logins=chunk)] + ) + if not (user := await first(self.twitch.get_users())): + raise UpdateFailed("Logged in user not found") + self.current_user = user + + async def _async_update_data(self) -> dict[str, TwitchUpdate]: + await self.session.async_ensure_token_valid() + await self.twitch.set_user_authentication( + self.session.token["access_token"], + OAUTH_SCOPES, + self.session.token["refresh_token"], + False, + ) + data = {} + for channel in self.users: + followers = await self.twitch.get_channel_followers(channel.id) + stream = await first(self.twitch.get_streams(user_id=[channel.id], first=1)) + sub: UserSubscription | None = None + follows: FollowedChannelsResult | None = None + try: + sub = await self.twitch.check_user_subscription( + user_id=self.current_user.id, broadcaster_id=channel.id + ) + except TwitchResourceNotFound: + LOGGER.debug("User is not subscribed to %s", channel.display_name) + except TwitchAPIException as exc: + LOGGER.error("Error response on check_user_subscription: %s", exc) + else: + follows = await self.twitch.get_followed_channels( + self.current_user.id, broadcaster_id=channel.id + ) + data[channel.id] = TwitchUpdate( + channel.display_name, + followers.total, + channel.view_count, + bool(stream), + stream.game_name if stream else None, + stream.title if stream else None, + stream.started_at if stream else None, + stream.thumbnail_url if stream else None, + channel.profile_image_url, + sub is not None if sub else None, + sub.is_gift if sub else None, + follows is not None and follows.total > 0, + follows.data[0].followed_at if follows and follows.total else None, + ) + return data diff --git a/homeassistant/components/twitch/sensor.py b/homeassistant/components/twitch/sensor.py index a6e2f4e04af..636f94114a4 100644 --- a/homeassistant/components/twitch/sensor.py +++ b/homeassistant/components/twitch/sensor.py @@ -2,22 +2,18 @@ from __future__ import annotations -from twitchAPI.helper import first -from twitchAPI.twitch import ( - AuthType, - Twitch, - TwitchAPIException, - TwitchResourceNotFound, - TwitchUser, -) +from typing import Any from homeassistant.components.sensor import SensorEntity from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant -from homeassistant.helpers.config_entry_oauth2_flow import OAuth2Session from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.typing import StateType +from homeassistant.helpers.update_coordinator import CoordinatorEntity -from .const import CLIENT, CONF_CHANNELS, DOMAIN, LOGGER, OAUTH_SCOPES, SESSION +from . import TwitchCoordinator +from .const import DOMAIN +from .coordinator import TwitchUpdate ATTR_GAME = "game" ATTR_TITLE = "title" @@ -36,109 +32,70 @@ STATE_STREAMING = "streaming" PARALLEL_UPDATES = 1 -def chunk_list(lst: list, chunk_size: int) -> list[list]: - """Split a list into chunks of chunk_size.""" - return [lst[i : i + chunk_size] for i in range(0, len(lst), chunk_size)] - - async def async_setup_entry( hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback, ) -> None: """Initialize entries.""" - client = hass.data[DOMAIN][entry.entry_id][CLIENT] - session = hass.data[DOMAIN][entry.entry_id][SESSION] + coordinator = hass.data[DOMAIN][entry.entry_id] - channels = entry.options[CONF_CHANNELS] - - entities: list[TwitchSensor] = [] - - # Split channels into chunks of 100 to avoid hitting the rate limit - for chunk in chunk_list(channels, 100): - entities.extend( - [ - TwitchSensor(channel, session, client) - async for channel in client.get_users(logins=chunk) - ] - ) - - async_add_entities(entities, True) + async_add_entities( + TwitchSensor(coordinator, channel_id) for channel_id in coordinator.data + ) -class TwitchSensor(SensorEntity): +class TwitchSensor(CoordinatorEntity[TwitchCoordinator], SensorEntity): """Representation of a Twitch channel.""" _attr_translation_key = "channel" - def __init__( - self, channel: TwitchUser, session: OAuth2Session, client: Twitch - ) -> None: + def __init__(self, coordinator: TwitchCoordinator, channel_id: str) -> None: """Initialize the sensor.""" - self._session = session - self._client = client - self._channel = channel - self._enable_user_auth = client.has_required_auth(AuthType.USER, OAUTH_SCOPES) - self._attr_name = channel.display_name - self._attr_unique_id = channel.id + super().__init__(coordinator) + self.channel_id = channel_id + self._attr_unique_id = channel_id + self._attr_name = self.channel.name - async def async_update(self) -> None: - """Update device state.""" - await self._session.async_ensure_token_valid() - await self._client.set_user_authentication( - self._session.token["access_token"], - OAUTH_SCOPES, - self._session.token["refresh_token"], - False, - ) - followers = await self._client.get_channel_followers(self._channel.id) + @property + def available(self) -> bool: + """Return if entity is available.""" + return super().available and self.channel_id in self.coordinator.data - self._attr_extra_state_attributes = { - ATTR_FOLLOWING: followers.total, - ATTR_VIEWS: self._channel.view_count, + @property + def channel(self) -> TwitchUpdate: + """Return the channel data.""" + return self.coordinator.data[self.channel_id] + + @property + def native_value(self) -> StateType: + """Return the state of the sensor.""" + return STATE_STREAMING if self.channel.is_streaming else STATE_OFFLINE + + @property + def extra_state_attributes(self) -> dict[str, Any]: + """Return the state attributes.""" + channel = self.channel + resp = { + ATTR_FOLLOWING: channel.followers, + ATTR_VIEWS: channel.views, + ATTR_GAME: channel.game, + ATTR_TITLE: channel.title, + ATTR_STARTED_AT: channel.started_at, } - if self._enable_user_auth: - await self._async_add_user_attributes() - if stream := ( - await first(self._client.get_streams(user_id=[self._channel.id], first=1)) - ): - self._attr_native_value = STATE_STREAMING - self._attr_extra_state_attributes[ATTR_GAME] = stream.game_name - self._attr_extra_state_attributes[ATTR_TITLE] = stream.title - self._attr_extra_state_attributes[ATTR_STARTED_AT] = stream.started_at - self._attr_entity_picture = stream.thumbnail_url - if self._attr_entity_picture is not None: - self._attr_entity_picture = self._attr_entity_picture.format( - height=24, - width=24, - ) - else: - self._attr_native_value = STATE_OFFLINE - self._attr_extra_state_attributes[ATTR_GAME] = None - self._attr_extra_state_attributes[ATTR_TITLE] = None - self._attr_extra_state_attributes[ATTR_STARTED_AT] = None - self._attr_entity_picture = self._channel.profile_image_url + resp[ATTR_SUBSCRIPTION] = False + if channel.subscribed is not None: + resp[ATTR_SUBSCRIPTION] = channel.subscribed + resp[ATTR_SUBSCRIPTION_GIFTED] = channel.subscription_gifted + resp[ATTR_FOLLOW] = channel.follows + if channel.follows: + resp[ATTR_FOLLOW_SINCE] = channel.following_since + return resp - async def _async_add_user_attributes(self) -> None: - if not (user := await first(self._client.get_users())): - return - self._attr_extra_state_attributes[ATTR_SUBSCRIPTION] = False - try: - sub = await self._client.check_user_subscription( - user_id=user.id, broadcaster_id=self._channel.id - ) - self._attr_extra_state_attributes[ATTR_SUBSCRIPTION] = True - self._attr_extra_state_attributes[ATTR_SUBSCRIPTION_GIFTED] = sub.is_gift - except TwitchResourceNotFound: - LOGGER.debug("User is not subscribed to %s", self._channel.display_name) - except TwitchAPIException as exc: - LOGGER.error("Error response on check_user_subscription: %s", exc) - - follows = await self._client.get_followed_channels( - user.id, broadcaster_id=self._channel.id - ) - self._attr_extra_state_attributes[ATTR_FOLLOW] = follows.total > 0 - if follows.total: - self._attr_extra_state_attributes[ATTR_FOLLOW_SINCE] = follows.data[ - 0 - ].followed_at + @property + def entity_picture(self) -> str | None: + """Return the picture of the sensor.""" + if self.channel.is_streaming: + assert self.channel.stream_picture is not None + return self.channel.stream_picture + return self.channel.picture