Add coordinator to Twitch (#127724)

This commit is contained in:
Joost Lekkerkerker 2024-10-19 10:59:37 +02:00 committed by GitHub
parent 391f278ee5
commit 061ece55f3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 178 additions and 105 deletions

View file

@ -17,7 +17,8 @@ from homeassistant.helpers.config_entry_oauth2_flow import (
async_get_config_entry_implementation,
)
from .const import CLIENT, DOMAIN, OAUTH_SCOPES, PLATFORMS, SESSION
from .const import DOMAIN, OAUTH_SCOPES, PLATFORMS
from .coordinator import TwitchCoordinator
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
@ -46,10 +47,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
client.auto_refresh_auth = False
await client.set_user_authentication(access_token, scope=OAUTH_SCOPES)
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = {
CLIENT: client,
SESSION: session,
}
coordinator = TwitchCoordinator(hass, client, session)
await coordinator.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)

View file

@ -17,7 +17,5 @@ CONF_REFRESH_TOKEN = "refresh_token"
DOMAIN = "twitch"
CONF_CHANNELS = "channels"
CLIENT = "client"
SESSION = "session"
OAUTH_SCOPES = [AuthScope.USER_READ_SUBSCRIPTIONS, AuthScope.USER_READ_FOLLOWS]

View file

@ -0,0 +1,116 @@
"""Define a class to manage fetching Twitch data."""
from dataclasses import dataclass
from datetime import datetime, timedelta
from twitchAPI.helper import first
from twitchAPI.object.api import FollowedChannelsResult, TwitchUser, UserSubscription
from twitchAPI.twitch import Twitch
from twitchAPI.type import TwitchAPIException, TwitchResourceNotFound
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.config_entry_oauth2_flow import OAuth2Session
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import CONF_CHANNELS, DOMAIN, LOGGER, OAUTH_SCOPES
def chunk_list(lst: list, chunk_size: int) -> list[list]:
"""Split a list into chunks of chunk_size."""
return [lst[i : i + chunk_size] for i in range(0, len(lst), chunk_size)]
@dataclass
class TwitchUpdate:
"""Class for holding Twitch data."""
name: str
followers: int
views: int
is_streaming: bool
game: str | None
title: str | None
started_at: datetime | None
stream_picture: str | None
picture: str
subscribed: bool | None
subscription_gifted: bool | None
follows: bool
following_since: datetime | None
class TwitchCoordinator(DataUpdateCoordinator[dict[str, TwitchUpdate]]):
"""Class to manage fetching Twitch data."""
config_entry: ConfigEntry
users: list[TwitchUser]
current_user: TwitchUser
def __init__(
self, hass: HomeAssistant, twitch: Twitch, session: OAuth2Session
) -> None:
"""Initialize the coordinator."""
self.twitch = twitch
super().__init__(
hass,
LOGGER,
name=DOMAIN,
update_interval=timedelta(minutes=5),
)
self.session = session
async def _async_setup(self) -> None:
channels = self.config_entry.options[CONF_CHANNELS]
self.users = []
# Split channels into chunks of 100 to avoid hitting the rate limit
for chunk in chunk_list(channels, 100):
self.users.extend(
[channel async for channel in self.twitch.get_users(logins=chunk)]
)
if not (user := await first(self.twitch.get_users())):
raise UpdateFailed("Logged in user not found")
self.current_user = user
async def _async_update_data(self) -> dict[str, TwitchUpdate]:
await self.session.async_ensure_token_valid()
await self.twitch.set_user_authentication(
self.session.token["access_token"],
OAUTH_SCOPES,
self.session.token["refresh_token"],
False,
)
data = {}
for channel in self.users:
followers = await self.twitch.get_channel_followers(channel.id)
stream = await first(self.twitch.get_streams(user_id=[channel.id], first=1))
sub: UserSubscription | None = None
follows: FollowedChannelsResult | None = None
try:
sub = await self.twitch.check_user_subscription(
user_id=self.current_user.id, broadcaster_id=channel.id
)
except TwitchResourceNotFound:
LOGGER.debug("User is not subscribed to %s", channel.display_name)
except TwitchAPIException as exc:
LOGGER.error("Error response on check_user_subscription: %s", exc)
else:
follows = await self.twitch.get_followed_channels(
self.current_user.id, broadcaster_id=channel.id
)
data[channel.id] = TwitchUpdate(
channel.display_name,
followers.total,
channel.view_count,
bool(stream),
stream.game_name if stream else None,
stream.title if stream else None,
stream.started_at if stream else None,
stream.thumbnail_url if stream else None,
channel.profile_image_url,
sub is not None if sub else None,
sub.is_gift if sub else None,
follows is not None and follows.total > 0,
follows.data[0].followed_at if follows and follows.total else None,
)
return data

View file

@ -2,22 +2,18 @@
from __future__ import annotations
from twitchAPI.helper import first
from twitchAPI.twitch import (
AuthType,
Twitch,
TwitchAPIException,
TwitchResourceNotFound,
TwitchUser,
)
from typing import Any
from homeassistant.components.sensor import SensorEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.config_entry_oauth2_flow import OAuth2Session
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import StateType
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import CLIENT, CONF_CHANNELS, DOMAIN, LOGGER, OAUTH_SCOPES, SESSION
from . import TwitchCoordinator
from .const import DOMAIN
from .coordinator import TwitchUpdate
ATTR_GAME = "game"
ATTR_TITLE = "title"
@ -36,109 +32,70 @@ STATE_STREAMING = "streaming"
PARALLEL_UPDATES = 1
def chunk_list(lst: list, chunk_size: int) -> list[list]:
"""Split a list into chunks of chunk_size."""
return [lst[i : i + chunk_size] for i in range(0, len(lst), chunk_size)]
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Initialize entries."""
client = hass.data[DOMAIN][entry.entry_id][CLIENT]
session = hass.data[DOMAIN][entry.entry_id][SESSION]
coordinator = hass.data[DOMAIN][entry.entry_id]
channels = entry.options[CONF_CHANNELS]
entities: list[TwitchSensor] = []
# Split channels into chunks of 100 to avoid hitting the rate limit
for chunk in chunk_list(channels, 100):
entities.extend(
[
TwitchSensor(channel, session, client)
async for channel in client.get_users(logins=chunk)
]
)
async_add_entities(entities, True)
async_add_entities(
TwitchSensor(coordinator, channel_id) for channel_id in coordinator.data
)
class TwitchSensor(SensorEntity):
class TwitchSensor(CoordinatorEntity[TwitchCoordinator], SensorEntity):
"""Representation of a Twitch channel."""
_attr_translation_key = "channel"
def __init__(
self, channel: TwitchUser, session: OAuth2Session, client: Twitch
) -> None:
def __init__(self, coordinator: TwitchCoordinator, channel_id: str) -> None:
"""Initialize the sensor."""
self._session = session
self._client = client
self._channel = channel
self._enable_user_auth = client.has_required_auth(AuthType.USER, OAUTH_SCOPES)
self._attr_name = channel.display_name
self._attr_unique_id = channel.id
super().__init__(coordinator)
self.channel_id = channel_id
self._attr_unique_id = channel_id
self._attr_name = self.channel.name
async def async_update(self) -> None:
"""Update device state."""
await self._session.async_ensure_token_valid()
await self._client.set_user_authentication(
self._session.token["access_token"],
OAUTH_SCOPES,
self._session.token["refresh_token"],
False,
)
followers = await self._client.get_channel_followers(self._channel.id)
@property
def available(self) -> bool:
"""Return if entity is available."""
return super().available and self.channel_id in self.coordinator.data
self._attr_extra_state_attributes = {
ATTR_FOLLOWING: followers.total,
ATTR_VIEWS: self._channel.view_count,
@property
def channel(self) -> TwitchUpdate:
"""Return the channel data."""
return self.coordinator.data[self.channel_id]
@property
def native_value(self) -> StateType:
"""Return the state of the sensor."""
return STATE_STREAMING if self.channel.is_streaming else STATE_OFFLINE
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes."""
channel = self.channel
resp = {
ATTR_FOLLOWING: channel.followers,
ATTR_VIEWS: channel.views,
ATTR_GAME: channel.game,
ATTR_TITLE: channel.title,
ATTR_STARTED_AT: channel.started_at,
}
if self._enable_user_auth:
await self._async_add_user_attributes()
if stream := (
await first(self._client.get_streams(user_id=[self._channel.id], first=1))
):
self._attr_native_value = STATE_STREAMING
self._attr_extra_state_attributes[ATTR_GAME] = stream.game_name
self._attr_extra_state_attributes[ATTR_TITLE] = stream.title
self._attr_extra_state_attributes[ATTR_STARTED_AT] = stream.started_at
self._attr_entity_picture = stream.thumbnail_url
if self._attr_entity_picture is not None:
self._attr_entity_picture = self._attr_entity_picture.format(
height=24,
width=24,
)
else:
self._attr_native_value = STATE_OFFLINE
self._attr_extra_state_attributes[ATTR_GAME] = None
self._attr_extra_state_attributes[ATTR_TITLE] = None
self._attr_extra_state_attributes[ATTR_STARTED_AT] = None
self._attr_entity_picture = self._channel.profile_image_url
resp[ATTR_SUBSCRIPTION] = False
if channel.subscribed is not None:
resp[ATTR_SUBSCRIPTION] = channel.subscribed
resp[ATTR_SUBSCRIPTION_GIFTED] = channel.subscription_gifted
resp[ATTR_FOLLOW] = channel.follows
if channel.follows:
resp[ATTR_FOLLOW_SINCE] = channel.following_since
return resp
async def _async_add_user_attributes(self) -> None:
if not (user := await first(self._client.get_users())):
return
self._attr_extra_state_attributes[ATTR_SUBSCRIPTION] = False
try:
sub = await self._client.check_user_subscription(
user_id=user.id, broadcaster_id=self._channel.id
)
self._attr_extra_state_attributes[ATTR_SUBSCRIPTION] = True
self._attr_extra_state_attributes[ATTR_SUBSCRIPTION_GIFTED] = sub.is_gift
except TwitchResourceNotFound:
LOGGER.debug("User is not subscribed to %s", self._channel.display_name)
except TwitchAPIException as exc:
LOGGER.error("Error response on check_user_subscription: %s", exc)
follows = await self._client.get_followed_channels(
user.id, broadcaster_id=self._channel.id
)
self._attr_extra_state_attributes[ATTR_FOLLOW] = follows.total > 0
if follows.total:
self._attr_extra_state_attributes[ATTR_FOLLOW_SINCE] = follows.data[
0
].followed_at
@property
def entity_picture(self) -> str | None:
"""Return the picture of the sensor."""
if self.channel.is_streaming:
assert self.channel.stream_picture is not None
return self.channel.stream_picture
return self.channel.picture