"""Shared class to maintain Plex server instances.""" from functools import partial, wraps import logging import ssl from urllib.parse import urlparse import plexapi.myplex import plexapi.playqueue import plexapi.server from requests import Session import requests.exceptions from homeassistant.components.media_player import DOMAIN as MP_DOMAIN from homeassistant.const import CONF_TOKEN, CONF_URL, CONF_VERIFY_SSL from homeassistant.helpers.dispatcher import dispatcher_send from homeassistant.helpers.event import async_call_later from .const import ( CONF_CLIENT_IDENTIFIER, CONF_IGNORE_NEW_SHARED_USERS, CONF_MONITORED_USERS, CONF_SERVER, CONF_USE_EPISODE_ART, DEBOUNCE_TIMEOUT, DEFAULT_VERIFY_SSL, PLEX_NEW_MP_SIGNAL, PLEX_UPDATE_MEDIA_PLAYER_SIGNAL, PLEX_UPDATE_SENSOR_SIGNAL, X_PLEX_DEVICE_NAME, X_PLEX_PLATFORM, X_PLEX_PRODUCT, X_PLEX_VERSION, ) from .errors import NoServersFound, ServerNotSpecified, ShouldUpdateConfigEntry _LOGGER = logging.getLogger(__name__) # Set default headers sent by plexapi plexapi.X_PLEX_DEVICE_NAME = X_PLEX_DEVICE_NAME plexapi.X_PLEX_PLATFORM = X_PLEX_PLATFORM plexapi.X_PLEX_PRODUCT = X_PLEX_PRODUCT plexapi.X_PLEX_VERSION = X_PLEX_VERSION def debounce(func): """Decorate function to debounce callbacks from Plex websocket.""" unsub = None async def call_later_listener(self, _): """Handle call_later callback.""" nonlocal unsub unsub = None await self.hass.async_add_executor_job(func, self) @wraps(func) def wrapper(self): """Schedule async callback.""" nonlocal unsub if unsub: _LOGGER.debug("Throttling update of %s", self.friendly_name) unsub() # pylint: disable=not-callable unsub = async_call_later( self.hass, DEBOUNCE_TIMEOUT, partial(call_later_listener, self), ) return wrapper class PlexServer: """Manages a single Plex server connection.""" def __init__(self, hass, server_config, known_server_id=None, options=None): """Initialize a Plex server instance.""" self.hass = hass self._plex_server = None self._known_clients = set() self._known_idle = set() self._url = server_config.get(CONF_URL) self._token = server_config.get(CONF_TOKEN) self._server_name = server_config.get(CONF_SERVER) self._verify_ssl = server_config.get(CONF_VERIFY_SSL, DEFAULT_VERIFY_SSL) self._server_id = known_server_id self.options = options self.server_choice = None self._accounts = [] self._owner_username = None self._version = None # Header conditionally added as it is not available in config entry v1 if CONF_CLIENT_IDENTIFIER in server_config: plexapi.X_PLEX_IDENTIFIER = server_config[CONF_CLIENT_IDENTIFIER] plexapi.myplex.BASE_HEADERS = plexapi.reset_base_headers() plexapi.server.BASE_HEADERS = plexapi.reset_base_headers() def connect(self): """Connect to a Plex server directly, obtaining direct URL if necessary.""" config_entry_update_needed = False def _connect_with_token(): account = plexapi.myplex.MyPlexAccount(token=self._token) available_servers = [ (x.name, x.clientIdentifier) for x in account.resources() if "server" in x.provides ] if not available_servers: raise NoServersFound if not self._server_name and len(available_servers) > 1: raise ServerNotSpecified(available_servers) self.server_choice = ( self._server_name if self._server_name else available_servers[0][0] ) self._plex_server = account.resource(self.server_choice).connect(timeout=10) def _connect_with_url(): session = None if self._url.startswith("https") and not self._verify_ssl: session = Session() session.verify = False self._plex_server = plexapi.server.PlexServer( self._url, self._token, session ) def _update_plexdirect_hostname(): account = plexapi.myplex.MyPlexAccount(token=self._token) matching_server = [ x.name for x in account.resources() if x.clientIdentifier == self._server_id ][0] self._plex_server = account.resource(matching_server).connect(timeout=10) if self._url: try: _connect_with_url() except requests.exceptions.SSLError as error: while error and not isinstance(error, ssl.SSLCertVerificationError): error = error.__context__ # pylint: disable=no-member if isinstance(error, ssl.SSLCertVerificationError): domain = urlparse(self._url).netloc.split(":")[0] if domain.endswith("plex.direct") and error.args[0].startswith( f"hostname '{domain}' doesn't match" ): _LOGGER.warning( "Plex SSL certificate's hostname changed, updating." ) _update_plexdirect_hostname() config_entry_update_needed = True else: raise else: _connect_with_token() self._accounts = [ account.name for account in self._plex_server.systemAccounts() if account.name ] owner_account = [ account.name for account in self._plex_server.systemAccounts() if account.accountID == 1 ] if owner_account: self._owner_username = owner_account[0] self._version = self._plex_server.version if config_entry_update_needed: raise ShouldUpdateConfigEntry def refresh_entity(self, machine_identifier, device, session): """Forward refresh dispatch to media_player.""" unique_id = f"{self.machine_identifier}:{machine_identifier}" _LOGGER.debug("Refreshing %s", unique_id) dispatcher_send( self.hass, PLEX_UPDATE_MEDIA_PLAYER_SIGNAL.format(unique_id), device, session, ) @debounce def update_platforms(self): """Update the platform entities.""" _LOGGER.debug("Updating devices") available_clients = {} ignored_clients = set() new_clients = set() monitored_users = self.accounts known_accounts = set(self.option_monitored_users) if known_accounts: monitored_users = { user for user in self.option_monitored_users if self.option_monitored_users[user]["enabled"] } if not self.option_ignore_new_shared_users: for new_user in self.accounts - known_accounts: monitored_users.add(new_user) try: devices = self._plex_server.clients() sessions = self._plex_server.sessions() except plexapi.exceptions.BadRequest: _LOGGER.exception("Error requesting Plex client data from server") return except requests.exceptions.RequestException as ex: _LOGGER.warning( "Could not connect to Plex server: %s (%s)", self.friendly_name, ex ) return for device in devices: self._known_idle.discard(device.machineIdentifier) available_clients[device.machineIdentifier] = {"device": device} if device.machineIdentifier not in self._known_clients: new_clients.add(device.machineIdentifier) _LOGGER.debug("New device: %s", device.machineIdentifier) for session in sessions: if session.TYPE == "photo": _LOGGER.debug("Photo session detected, skipping: %s", session) continue session_username = session.usernames[0] for player in session.players: if session_username and session_username not in monitored_users: ignored_clients.add(player.machineIdentifier) _LOGGER.debug("Ignoring Plex client owned by %s", session_username) continue self._known_idle.discard(player.machineIdentifier) available_clients.setdefault( player.machineIdentifier, {"device": player} ) available_clients[player.machineIdentifier]["session"] = session if player.machineIdentifier not in self._known_clients: new_clients.add(player.machineIdentifier) _LOGGER.debug("New session: %s", player.machineIdentifier) new_entity_configs = [] for client_id, client_data in available_clients.items(): if client_id in ignored_clients: continue if client_id in new_clients: new_entity_configs.append(client_data) else: self.refresh_entity( client_id, client_data["device"], client_data.get("session") ) self._known_clients.update(new_clients | ignored_clients) idle_clients = ( self._known_clients - self._known_idle - ignored_clients ).difference(available_clients) for client_id in idle_clients: self.refresh_entity(client_id, None, None) self._known_idle.add(client_id) if new_entity_configs: dispatcher_send( self.hass, PLEX_NEW_MP_SIGNAL.format(self.machine_identifier), new_entity_configs, ) dispatcher_send( self.hass, PLEX_UPDATE_SENSOR_SIGNAL.format(self.machine_identifier), sessions, ) @property def plex_server(self): """Return the plexapi PlexServer instance.""" return self._plex_server @property def accounts(self): """Return accounts associated with the Plex server.""" return set(self._accounts) @property def owner(self): """Return the Plex server owner username.""" return self._owner_username @property def version(self): """Return the version of the Plex server.""" return self._version @property def friendly_name(self): """Return name of connected Plex server.""" return self._plex_server.friendlyName @property def machine_identifier(self): """Return unique identifier of connected Plex server.""" return self._plex_server.machineIdentifier @property def url_in_use(self): """Return URL used for connected Plex server.""" return self._plex_server._baseurl # pylint: disable=protected-access @property def option_ignore_new_shared_users(self): """Return ignore_new_shared_users option.""" return self.options[MP_DOMAIN].get(CONF_IGNORE_NEW_SHARED_USERS, False) @property def option_use_episode_art(self): """Return use_episode_art option.""" return self.options[MP_DOMAIN][CONF_USE_EPISODE_ART] @property def option_monitored_users(self): """Return dict of monitored users option.""" return self.options[MP_DOMAIN].get(CONF_MONITORED_USERS, {}) @property def library(self): """Return library attribute from server object.""" return self._plex_server.library def playlist(self, title): """Return playlist from server object.""" return self._plex_server.playlist(title) def create_playqueue(self, media, **kwargs): """Create playqueue on Plex server.""" return plexapi.playqueue.PlayQueue.create(self._plex_server, media, **kwargs)