Add Plex config flow support (#26548)

* Add config flow support

* Log error on failed connection

* Review comments

* Unused errors

* Move form to step

* Use instance var instead of passing argument

* Only share servers created by component

* Return errors early to avoid try:else

* Separate debug for validation vs setup

* Unnecessary

* Unnecessary checks

* Combine import flows, move logic to component

* Use config entry discovery handler

* Temporary lint fix

* Filter out servers already configured

* Remove manual config flow

* Skip discovery if a config exists

* Swap conditional to reduce indenting

* Only discover when no configs created or creating

* Un-nest function

* Proper async use

* Move legacy file import to discovery

* Fix, bad else

* Separate validate step

* Unused without manual setup step

* Async oops

* First attempt at tests

* Test cleanup

* Full test coverage for config_flow, enable tests

* Lint

* Fix lint vs black

* Add test init

* Add test package requirement

* Actually run script

* Use 'not None' convention

* Group exceptions by result

* Improve logic, add new error and test

* Test cleanup

* Add more asserts
This commit is contained in:
jjlawren 2019-09-19 16:29:26 -05:00 committed by Martin Hjelmare
parent 246a611a7c
commit 2d12bac0e2
17 changed files with 836 additions and 172 deletions

View file

@ -479,7 +479,10 @@ omit =
homeassistant/components/pioneer/media_player.py
homeassistant/components/pjlink/media_player.py
homeassistant/components/plaato/*
homeassistant/components/plex/*
homeassistant/components/plex/__init__.py
homeassistant/components/plex/media_player.py
homeassistant/components/plex/sensor.py
homeassistant/components/plex/server.py
homeassistant/components/plugwise/*
homeassistant/components/plum_lightpad/*
homeassistant/components/pocketcasts/sensor.py

View file

@ -50,6 +50,7 @@ CONFIG_ENTRY_HANDLERS = {
SERVICE_DAIKIN: "daikin",
SERVICE_TELLDUSLIVE: "tellduslive",
SERVICE_IGD: "upnp",
SERVICE_PLEX: "plex",
}
SERVICE_HANDLERS = {
@ -69,7 +70,6 @@ SERVICE_HANDLERS = {
SERVICE_FREEBOX: ("freebox", None),
SERVICE_YEELIGHT: ("yeelight", None),
"panasonic_viera": ("media_player", "panasonic_viera"),
SERVICE_PLEX: ("plex", None),
"yamaha": ("media_player", "yamaha"),
"logitech_mediaserver": ("media_player", "squeezebox"),
"directv": ("media_player", "directv"),

View file

@ -5,7 +5,7 @@ import plexapi.exceptions
import requests.exceptions
import voluptuous as vol
from homeassistant.components.discovery import SERVICE_PLEX
from homeassistant import config_entries
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
from homeassistant.const import (
CONF_HOST,
@ -16,20 +16,18 @@ from homeassistant.const import (
CONF_VERIFY_SSL,
)
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers import discovery
from homeassistant.util.json import load_json, save_json
from .const import (
CONF_SERVER,
CONF_USE_EPISODE_ART,
CONF_SHOW_ALL_CONTROLS,
CONF_SERVER,
DEFAULT_PORT,
DEFAULT_SSL,
DEFAULT_VERIFY_SSL,
DOMAIN as PLEX_DOMAIN,
PLATFORMS,
PLEX_CONFIG_FILE,
PLEX_MEDIA_PLAYER_OPTIONS,
PLEX_SERVER_CONFIG,
SERVERS,
)
from .server import PlexServer
@ -58,60 +56,50 @@ SERVER_CONFIG_SCHEMA = vol.Schema(
CONFIG_SCHEMA = vol.Schema({PLEX_DOMAIN: SERVER_CONFIG_SCHEMA}, extra=vol.ALLOW_EXTRA)
CONFIGURING = "configuring"
_LOGGER = logging.getLogger(__package__)
def setup(hass, config):
"""Set up the Plex component."""
hass.data.setdefault(PLEX_DOMAIN, {SERVERS: {}})
def server_discovered(service, info):
"""Pass back discovered Plex server details."""
if hass.data[PLEX_DOMAIN][SERVERS]:
_LOGGER.debug("Plex server already configured, ignoring discovery.")
return
_LOGGER.debug("Discovered Plex server: %s:%s", info["host"], info["port"])
setup_plex(discovery_info=info)
plex_config = config.get(PLEX_DOMAIN, {})
if plex_config:
_setup_plex(hass, plex_config)
def setup_plex(config=None, discovery_info=None, configurator_info=None):
"""Return assembled server_config dict."""
json_file = hass.config.path(PLEX_CONFIG_FILE)
file_config = load_json(json_file)
host_and_port = None
if config:
server_config = config
if CONF_HOST in server_config:
host_and_port = (
f"{server_config.pop(CONF_HOST)}:{server_config.pop(CONF_PORT)}"
)
if MP_DOMAIN in server_config:
hass.data[PLEX_MEDIA_PLAYER_OPTIONS] = server_config.pop(MP_DOMAIN)
elif file_config:
_LOGGER.debug("Loading config from %s", json_file)
host_and_port, server_config = file_config.popitem()
server_config[CONF_VERIFY_SSL] = server_config.pop("verify")
elif discovery_info:
server_config = {}
host_and_port = f"{discovery_info[CONF_HOST]}:{discovery_info[CONF_PORT]}"
elif configurator_info:
server_config = configurator_info
host_and_port = server_config["host_and_port"]
else:
discovery.listen(hass, SERVICE_PLEX, server_discovered)
return True
if host_and_port:
use_ssl = server_config.get(CONF_SSL, DEFAULT_SSL)
http_prefix = "https" if use_ssl else "http"
server_config[CONF_URL] = f"{http_prefix}://{host_and_port}"
def _setup_plex(hass, config):
"""Pass configuration to a config flow."""
server_config = dict(config)
if MP_DOMAIN in server_config:
hass.data[PLEX_MEDIA_PLAYER_OPTIONS] = server_config.pop(MP_DOMAIN)
if CONF_HOST in server_config:
prefix = "https" if server_config.pop(CONF_SSL) else "http"
server_config[
CONF_URL
] = f"{prefix}://{server_config.pop(CONF_HOST)}:{server_config.pop(CONF_PORT)}"
hass.async_create_task(
hass.config_entries.flow.async_init(
PLEX_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data=server_config,
)
)
async def async_setup_entry(hass, entry):
"""Set up Plex from a config entry."""
server_config = entry.data[PLEX_SERVER_CONFIG]
plex_server = PlexServer(server_config)
try:
plex_server.connect()
await hass.async_add_executor_job(plex_server.connect)
except requests.exceptions.ConnectionError as error:
_LOGGER.error(
"Plex server could not be reached, please verify host and port: [%s]",
"Plex server (%s) could not be reached: [%s]",
server_config[CONF_URL],
error,
)
return False
@ -121,88 +109,23 @@ def setup(hass, config):
plexapi.exceptions.NotFound,
) as error:
_LOGGER.error(
"Connection to Plex server failed, please verify token and SSL settings: [%s]",
"Login to %s failed, verify token and SSL settings: [%s]",
server_config[CONF_SERVER],
error,
)
request_configuration(host_and_port)
return False
else:
hass.data[PLEX_DOMAIN][SERVERS][
plex_server.machine_identifier
] = plex_server
if host_and_port in hass.data[PLEX_DOMAIN][CONFIGURING]:
request_id = hass.data[PLEX_DOMAIN][CONFIGURING].pop(host_and_port)
configurator = hass.components.configurator
configurator.request_done(request_id)
_LOGGER.debug("Discovery configuration done")
if configurator_info:
# Write plex.conf if created via discovery/configurator
save_json(
hass.config.path(PLEX_CONFIG_FILE),
{
host_and_port: {
CONF_TOKEN: server_config[CONF_TOKEN],
CONF_SSL: use_ssl,
"verify": server_config[CONF_VERIFY_SSL],
}
},
_LOGGER.debug(
"Connected to: %s (%s)", plex_server.friendly_name, plex_server.url_in_use
)
hass.data[PLEX_DOMAIN][SERVERS][plex_server.machine_identifier] = plex_server
if not hass.data.get(PLEX_MEDIA_PLAYER_OPTIONS):
hass.data[PLEX_MEDIA_PLAYER_OPTIONS] = MEDIA_PLAYER_SCHEMA({})
for platform in PLATFORMS:
hass.helpers.discovery.load_platform(
platform, PLEX_DOMAIN, {}, original_config
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(entry, platform)
)
return True
def request_configuration(host_and_port):
"""Request configuration steps from the user."""
configurator = hass.components.configurator
if host_and_port in hass.data[PLEX_DOMAIN][CONFIGURING]:
configurator.notify_errors(
hass.data[PLEX_DOMAIN][CONFIGURING][host_and_port],
"Failed to register, please try again.",
)
return
def plex_configuration_callback(data):
"""Handle configuration changes."""
config = {
"host_and_port": host_and_port,
CONF_TOKEN: data.get("token"),
CONF_SSL: cv.boolean(data.get("ssl")),
CONF_VERIFY_SSL: cv.boolean(data.get("verify_ssl")),
}
setup_plex(configurator_info=config)
hass.data[PLEX_DOMAIN][CONFIGURING][
host_and_port
] = configurator.request_config(
"Plex Media Server",
plex_configuration_callback,
description="Enter the X-Plex-Token",
entity_picture="/static/images/logo_plex_mediaserver.png",
submit_caption="Confirm",
fields=[
{"id": "token", "name": "X-Plex-Token", "type": ""},
{"id": "ssl", "name": "Use SSL", "type": ""},
{"id": "verify_ssl", "name": "Verify SSL", "type": ""},
],
)
# End of inner functions.
original_config = config
hass.data.setdefault(PLEX_DOMAIN, {SERVERS: {}, CONFIGURING: {}})
if hass.data[PLEX_DOMAIN][SERVERS]:
_LOGGER.debug("Plex server already configured")
return False
plex_config = config.get(PLEX_DOMAIN, {})
return setup_plex(config=plex_config)

View file

@ -0,0 +1,171 @@
"""Config flow for Plex."""
import logging
import plexapi.exceptions
import requests.exceptions
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_URL, CONF_TOKEN, CONF_SSL, CONF_VERIFY_SSL
from homeassistant.core import callback
from homeassistant.util.json import load_json
from .const import ( # pylint: disable=unused-import
CONF_SERVER,
CONF_SERVER_IDENTIFIER,
DEFAULT_VERIFY_SSL,
DOMAIN,
PLEX_CONFIG_FILE,
PLEX_SERVER_CONFIG,
)
from .errors import NoServersFound, ServerNotSpecified
from .server import PlexServer
USER_SCHEMA = vol.Schema({vol.Required(CONF_TOKEN): str})
_LOGGER = logging.getLogger(__package__)
@callback
def configured_servers(hass):
"""Return a set of the configured Plex servers."""
return set(
entry.data[CONF_SERVER_IDENTIFIER]
for entry in hass.config_entries.async_entries(DOMAIN)
)
class PlexFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a Plex config flow."""
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_POLL
def __init__(self):
"""Initialize the Plex flow."""
self.current_login = {}
self.available_servers = None
async def async_step_user(self, user_input=None):
"""Handle a flow initialized by the user."""
if user_input is not None:
return await self.async_step_server_validate(user_input)
return self.async_show_form(step_id="user", data_schema=USER_SCHEMA, errors={})
async def async_step_server_validate(self, server_config):
"""Validate a provided configuration."""
errors = {}
self.current_login = server_config
plex_server = PlexServer(server_config)
try:
await self.hass.async_add_executor_job(plex_server.connect)
except NoServersFound:
errors["base"] = "no_servers"
except (plexapi.exceptions.BadRequest, plexapi.exceptions.Unauthorized):
_LOGGER.error("Invalid credentials provided, config not created")
errors["base"] = "faulty_credentials"
except (plexapi.exceptions.NotFound, requests.exceptions.ConnectionError):
_LOGGER.error(
"Plex server could not be reached: %s", server_config[CONF_URL]
)
errors["base"] = "not_found"
except ServerNotSpecified as available_servers:
self.available_servers = available_servers.args[0]
return await self.async_step_select_server()
except Exception as error: # pylint: disable=broad-except
_LOGGER.error("Unknown error connecting to Plex server: %s", error)
return self.async_abort(reason="unknown")
if errors:
return self.async_show_form(
step_id="user", data_schema=USER_SCHEMA, errors=errors
)
server_id = plex_server.machine_identifier
for entry in self._async_current_entries():
if entry.data[CONF_SERVER_IDENTIFIER] == server_id:
return self.async_abort(reason="already_configured")
url = plex_server.url_in_use
token = server_config.get(CONF_TOKEN)
entry_config = {CONF_URL: url}
if token:
entry_config[CONF_TOKEN] = token
if url.startswith("https"):
entry_config[CONF_VERIFY_SSL] = server_config.get(
CONF_VERIFY_SSL, DEFAULT_VERIFY_SSL
)
_LOGGER.debug("Valid config created for %s", plex_server.friendly_name)
return self.async_create_entry(
title=plex_server.friendly_name,
data={
CONF_SERVER: plex_server.friendly_name,
CONF_SERVER_IDENTIFIER: server_id,
PLEX_SERVER_CONFIG: entry_config,
},
)
async def async_step_select_server(self, user_input=None):
"""Use selected Plex server."""
config = dict(self.current_login)
if user_input is not None:
config[CONF_SERVER] = user_input[CONF_SERVER]
return await self.async_step_server_validate(config)
configured = configured_servers(self.hass)
available_servers = [
name
for (name, server_id) in self.available_servers
if server_id not in configured
]
if not available_servers:
return self.async_abort(reason="all_configured")
if len(available_servers) == 1:
config[CONF_SERVER] = available_servers[0]
return await self.async_step_server_validate(config)
return self.async_show_form(
step_id="select_server",
data_schema=vol.Schema(
{vol.Required(CONF_SERVER): vol.In(available_servers)}
),
errors={},
)
async def async_step_discovery(self, discovery_info):
"""Set default host and port from discovery."""
if self._async_current_entries() or self._async_in_progress():
# Skip discovery if a config already exists or is in progress.
return self.async_abort(reason="already_configured")
json_file = self.hass.config.path(PLEX_CONFIG_FILE)
file_config = await self.hass.async_add_executor_job(load_json, json_file)
if file_config:
host_and_port, host_config = file_config.popitem()
prefix = "https" if host_config[CONF_SSL] else "http"
server_config = {
CONF_URL: f"{prefix}://{host_and_port}",
CONF_TOKEN: host_config[CONF_TOKEN],
CONF_VERIFY_SSL: host_config["verify"],
}
_LOGGER.info("Imported legacy config, file can be removed: %s", json_file)
return await self.async_step_server_validate(server_config)
return await self.async_step_user()
async def async_step_import(self, import_config):
"""Import from Plex configuration."""
_LOGGER.debug("Imported Plex configuration")
return await self.async_step_server_validate(import_config)

View file

@ -14,5 +14,6 @@ PLEX_MEDIA_PLAYER_OPTIONS = "plex_mp_options"
PLEX_SERVER_CONFIG = "server_config"
CONF_SERVER = "server"
CONF_SERVER_IDENTIFIER = "server_id"
CONF_USE_EPISODE_ART = "use_episode_art"
CONF_SHOW_ALL_CONTROLS = "show_all_controls"

View file

@ -0,0 +1,14 @@
"""Errors for the Plex component."""
from homeassistant.exceptions import HomeAssistantError
class PlexException(HomeAssistantError):
"""Base class for Plex exceptions."""
class NoServersFound(PlexException):
"""No servers found on Plex account."""
class ServerNotSpecified(PlexException):
"""Multiple servers linked to account without choice provided."""

View file

@ -1,11 +1,12 @@
{
"domain": "plex",
"name": "Plex",
"config_flow": true,
"documentation": "https://www.home-assistant.io/components/plex",
"requirements": [
"plexapi==3.0.6"
],
"dependencies": ["configurator"],
"dependencies": [],
"codeowners": [
"@jjlawren"
]

View file

@ -35,26 +35,40 @@ from homeassistant.util import dt as dt_util
from .const import (
CONF_USE_EPISODE_ART,
CONF_SHOW_ALL_CONTROLS,
CONF_SERVER_IDENTIFIER,
DOMAIN as PLEX_DOMAIN,
NAME_FORMAT,
PLEX_MEDIA_PLAYER_OPTIONS,
SERVERS,
)
SERVER_SETUP = "server_setup"
_CONFIGURING = {}
_LOGGER = logging.getLogger(__name__)
def setup_platform(hass, config, add_entities_callback, discovery_info=None):
"""Set up the Plex platform."""
if discovery_info is None:
return
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the Plex media_player platform.
plexserver = list(hass.data[PLEX_DOMAIN][SERVERS].values())[0]
Deprecated.
"""
pass
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up Plex media_player from a config entry."""
def add_entities(entities, update_before_add=False):
"""Sync version of async add entities."""
hass.add_job(async_add_entities, entities, update_before_add)
hass.async_add_executor_job(_setup_platform, hass, config_entry, add_entities)
def _setup_platform(hass, config_entry, add_entities_callback):
"""Set up the Plex media_player platform."""
server_id = config_entry.data[CONF_SERVER_IDENTIFIER]
config = hass.data[PLEX_MEDIA_PLAYER_OPTIONS]
plexserver = hass.data[PLEX_DOMAIN][SERVERS][server_id]
plex_clients = {}
plex_sessions = {}
track_time_interval(hass, lambda now: update_devices(), timedelta(seconds=10))

View file

@ -8,21 +8,26 @@ import requests.exceptions
from homeassistant.helpers.entity import Entity
from homeassistant.util import Throttle
from .const import DOMAIN as PLEX_DOMAIN, SERVERS
from .const import CONF_SERVER_IDENTIFIER, DOMAIN as PLEX_DOMAIN, SERVERS
DEFAULT_NAME = "Plex"
_LOGGER = logging.getLogger(__name__)
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=1)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Plex sensor."""
if discovery_info is None:
return
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the Plex sensor platform.
plexserver = list(hass.data[PLEX_DOMAIN][SERVERS].values())[0]
add_entities([PlexSensor(plexserver)], True)
Deprecated.
"""
pass
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up Plex sensor from a config entry."""
server_id = config_entry.data[CONF_SERVER_IDENTIFIER]
sensor = PlexSensor(hass.data[PLEX_DOMAIN][SERVERS][server_id])
async_add_entities([sensor], True)
class PlexSensor(Entity):
@ -30,10 +35,10 @@ class PlexSensor(Entity):
def __init__(self, plex_server):
"""Initialize the sensor."""
self._name = DEFAULT_NAME
self._state = None
self._now_playing = []
self._server = plex_server
self._name = f"Plex ({plex_server.friendly_name})"
self._unique_id = f"sensor-{plex_server.machine_identifier}"
@property

View file

@ -1,6 +1,4 @@
"""Shared class to maintain Plex server instances."""
import logging
import plexapi.myplex
import plexapi.server
from requests import Session
@ -8,8 +6,7 @@ from requests import Session
from homeassistant.const import CONF_TOKEN, CONF_URL, CONF_VERIFY_SSL
from .const import CONF_SERVER, DEFAULT_VERIFY_SSL
_LOGGER = logging.getLogger(__package__)
from .errors import NoServersFound, ServerNotSpecified
class PlexServer:
@ -29,8 +26,16 @@ class PlexServer:
def _set_missing_url():
account = plexapi.myplex.MyPlexAccount(token=self._token)
available_servers = [
x.name for x in account.resources() if "server" in x.provides
(x.name, x.clientIdentifier)
for x in account.resources()
if "server" in x.provides
]
if not available_servers:
raise NoServersFound
if not self._server_name and len(available_servers) > 1:
raise ServerNotSpecified(available_servers)
server_choice = (
self._server_name if self._server_name else available_servers[0]
)
@ -47,7 +52,6 @@ class PlexServer:
self._plex_server = plexapi.server.PlexServer(
self._url, self._token, session
)
_LOGGER.debug("Connected to: %s (%s)", self.friendly_name, self.url_in_use)
if self._token and not self._url:
_set_missing_url()

View file

@ -0,0 +1,33 @@
{
"config": {
"title": "Plex",
"step": {
"select_server": {
"title": "Select Plex server",
"description": "Multiple servers available, select one:",
"data": {
"server": "Server"
}
},
"user": {
"title": "Connect Plex server",
"description": "Enter a Plex token for automatic setup.",
"data": {
"token": "Plex token"
}
}
},
"error": {
"faulty_credentials": "Authorization failed",
"no_servers": "No servers linked to account",
"not_found": "Plex server not found"
},
"abort": {
"all_configured": "All linked servers already configured",
"already_configured": "This Plex server is already configured",
"already_in_progress": "Plex is being configured",
"invalid_import": "Imported configuration is invalid",
"unknown": "Failed for unknown reason"
}
}
}

View file

@ -45,6 +45,7 @@ FLOWS = [
"openuv",
"owntracks",
"plaato",
"plex",
"point",
"ps4",
"rainmachine",

View file

@ -249,6 +249,9 @@ pexpect==4.6.0
# homeassistant.components.pilight
pilight==0.1.1
# homeassistant.components.plex
plexapi==3.0.6
# homeassistant.components.mhz19
# homeassistant.components.serial_pm
pmsensor==0.4

View file

@ -110,6 +110,7 @@ TEST_REQUIREMENTS = (
"paho-mqtt",
"pexpect",
"pilight",
"plexapi",
"pmsensor",
"prometheus_client",
"ptvsd",

View file

@ -0,0 +1 @@
"""Tests for the Plex component."""

View file

@ -0,0 +1,35 @@
"""Mock classes used in tests."""
MOCK_HOST_1 = "1.2.3.4"
MOCK_PORT_1 = "32400"
MOCK_HOST_2 = "4.3.2.1"
MOCK_PORT_2 = "32400"
class MockAvailableServer: # pylint: disable=too-few-public-methods
"""Mock avilable server objects."""
def __init__(self, name, client_id):
"""Initialize the object."""
self.name = name
self.clientIdentifier = client_id # pylint: disable=invalid-name
self.provides = ["server"]
class MockConnection: # pylint: disable=too-few-public-methods
"""Mock a single account resource connection object."""
def __init__(self, ssl):
"""Initialize the object."""
prefix = "https" if ssl else "http"
self.httpuri = f"{prefix}://{MOCK_HOST_1}:{MOCK_PORT_1}"
self.uri = "{prefix}://{MOCK_HOST_2}:{MOCK_PORT_2}"
self.local = True
class MockConnections: # pylint: disable=too-few-public-methods
"""Mock a list of resource connections."""
def __init__(self, ssl=False):
"""Initialize the object."""
self.connections = [MockConnection(ssl)]

View file

@ -0,0 +1,454 @@
"""Tests for Plex config flow."""
from unittest.mock import MagicMock, Mock, patch, PropertyMock
import plexapi.exceptions
import requests.exceptions
from homeassistant.components.plex import config_flow
from homeassistant.const import CONF_HOST, CONF_PORT, CONF_TOKEN, CONF_URL
from tests.common import MockConfigEntry
from .mock_classes import MOCK_HOST_1, MOCK_PORT_1, MockAvailableServer, MockConnections
MOCK_NAME_1 = "Plex Server 1"
MOCK_ID_1 = "unique_id_123"
MOCK_NAME_2 = "Plex Server 2"
MOCK_ID_2 = "unique_id_456"
MOCK_TOKEN = "secret_token"
MOCK_FILE_CONTENTS = {
f"{MOCK_HOST_1}:{MOCK_PORT_1}": {"ssl": False, "token": MOCK_TOKEN, "verify": True}
}
MOCK_SERVER_1 = MockAvailableServer(MOCK_NAME_1, MOCK_ID_1)
MOCK_SERVER_2 = MockAvailableServer(MOCK_NAME_2, MOCK_ID_2)
def init_config_flow(hass):
"""Init a configuration flow."""
flow = config_flow.PlexFlowHandler()
flow.hass = hass
return flow
async def test_bad_credentials(hass):
"""Test when provided credentials are rejected."""
result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN, context={"source": "user"}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
with patch(
"plexapi.myplex.MyPlexAccount", side_effect=plexapi.exceptions.Unauthorized
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_TOKEN: MOCK_TOKEN}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert result["errors"]["base"] == "faulty_credentials"
async def test_import_file_from_discovery(hass):
"""Test importing a legacy file during discovery."""
file_host_and_port, file_config = list(MOCK_FILE_CONTENTS.items())[0]
used_url = f"http://{file_host_and_port}"
with patch("plexapi.server.PlexServer") as mock_plex_server, patch(
"homeassistant.components.plex.config_flow.load_json",
return_value=MOCK_FILE_CONTENTS,
):
type(mock_plex_server.return_value).machineIdentifier = PropertyMock(
return_value=MOCK_ID_1
)
type(mock_plex_server.return_value).friendlyName = PropertyMock(
return_value=MOCK_NAME_1
)
type( # pylint: disable=protected-access
mock_plex_server.return_value
)._baseurl = PropertyMock(return_value=used_url)
result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN,
context={"source": "discovery"},
data={CONF_HOST: MOCK_HOST_1, CONF_PORT: MOCK_PORT_1},
)
assert result["type"] == "create_entry"
assert result["title"] == MOCK_NAME_1
assert result["data"][config_flow.CONF_SERVER] == MOCK_NAME_1
assert result["data"][config_flow.CONF_SERVER_IDENTIFIER] == MOCK_ID_1
assert result["data"][config_flow.PLEX_SERVER_CONFIG][CONF_URL] == used_url
assert (
result["data"][config_flow.PLEX_SERVER_CONFIG][CONF_TOKEN]
== file_config[CONF_TOKEN]
)
async def test_discovery(hass):
"""Test starting a flow from discovery."""
result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN,
context={"source": "discovery"},
data={CONF_HOST: MOCK_HOST_1, CONF_PORT: MOCK_PORT_1},
)
assert result["type"] == "form"
assert result["step_id"] == "user"
async def test_discovery_while_in_progress(hass):
"""Test starting a flow from discovery."""
await hass.config_entries.flow.async_init(
config_flow.DOMAIN, context={"source": "user"}
)
result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN,
context={"source": "discovery"},
data={CONF_HOST: MOCK_HOST_1, CONF_PORT: MOCK_PORT_1},
)
assert result["type"] == "abort"
assert result["reason"] == "already_configured"
async def test_import_success(hass):
"""Test a successful configuration import."""
mock_connections = MockConnections(ssl=True)
mm_plex_account = MagicMock()
mm_plex_account.resources = Mock(return_value=[MOCK_SERVER_1])
mm_plex_account.resource = Mock(return_value=mock_connections)
with patch("plexapi.server.PlexServer") as mock_plex_server:
type(mock_plex_server.return_value).machineIdentifier = PropertyMock(
return_value=MOCK_SERVER_1.clientIdentifier
)
type(mock_plex_server.return_value).friendlyName = PropertyMock(
return_value=MOCK_SERVER_1.name
)
type( # pylint: disable=protected-access
mock_plex_server.return_value
)._baseurl = PropertyMock(return_value=mock_connections.connections[0].httpuri)
result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN,
context={"source": "import"},
data={
CONF_TOKEN: MOCK_TOKEN,
CONF_URL: f"https://{MOCK_HOST_1}:{MOCK_PORT_1}",
},
)
assert result["type"] == "create_entry"
assert result["title"] == MOCK_SERVER_1.name
assert result["data"][config_flow.CONF_SERVER] == MOCK_SERVER_1.name
assert (
result["data"][config_flow.CONF_SERVER_IDENTIFIER]
== MOCK_SERVER_1.clientIdentifier
)
assert (
result["data"][config_flow.PLEX_SERVER_CONFIG][CONF_URL]
== mock_connections.connections[0].httpuri
)
assert result["data"][config_flow.PLEX_SERVER_CONFIG][CONF_TOKEN] == MOCK_TOKEN
async def test_import_bad_hostname(hass):
"""Test when an invalid address is provided."""
with patch(
"plexapi.server.PlexServer", side_effect=requests.exceptions.ConnectionError
):
result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN,
context={"source": "import"},
data={
CONF_TOKEN: MOCK_TOKEN,
CONF_URL: f"http://{MOCK_HOST_1}:{MOCK_PORT_1}",
},
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert result["errors"]["base"] == "not_found"
async def test_unknown_exception(hass):
"""Test when an unknown exception is encountered."""
result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN, context={"source": "user"}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
with patch("plexapi.myplex.MyPlexAccount", side_effect=Exception):
result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN,
context={"source": "user"},
data={CONF_TOKEN: MOCK_TOKEN},
)
assert result["type"] == "abort"
assert result["reason"] == "unknown"
async def test_no_servers_found(hass):
"""Test when no servers are on an account."""
result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN, context={"source": "user"}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
mm_plex_account = MagicMock()
mm_plex_account.resources = Mock(return_value=[])
with patch("plexapi.myplex.MyPlexAccount", return_value=mm_plex_account):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_TOKEN: MOCK_TOKEN}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert result["errors"]["base"] == "no_servers"
async def test_single_available_server(hass):
"""Test creating an entry with one server available."""
result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN, context={"source": "user"}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
mock_connections = MockConnections()
mm_plex_account = MagicMock()
mm_plex_account.resources = Mock(return_value=[MOCK_SERVER_1])
mm_plex_account.resource = Mock(return_value=mock_connections)
with patch("plexapi.myplex.MyPlexAccount", return_value=mm_plex_account), patch(
"plexapi.server.PlexServer"
) as mock_plex_server:
type(mock_plex_server.return_value).machineIdentifier = PropertyMock(
return_value=MOCK_SERVER_1.clientIdentifier
)
type(mock_plex_server.return_value).friendlyName = PropertyMock(
return_value=MOCK_SERVER_1.name
)
type( # pylint: disable=protected-access
mock_plex_server.return_value
)._baseurl = PropertyMock(return_value=mock_connections.connections[0].httpuri)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_TOKEN: MOCK_TOKEN}
)
assert result["type"] == "create_entry"
assert result["title"] == MOCK_SERVER_1.name
assert result["data"][config_flow.CONF_SERVER] == MOCK_SERVER_1.name
assert (
result["data"][config_flow.CONF_SERVER_IDENTIFIER]
== MOCK_SERVER_1.clientIdentifier
)
assert (
result["data"][config_flow.PLEX_SERVER_CONFIG][CONF_URL]
== mock_connections.connections[0].httpuri
)
assert result["data"][config_flow.PLEX_SERVER_CONFIG][CONF_TOKEN] == MOCK_TOKEN
async def test_multiple_servers_with_selection(hass):
"""Test creating an entry with multiple servers available."""
result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN, context={"source": "user"}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
mock_connections = MockConnections()
mm_plex_account = MagicMock()
mm_plex_account.resources = Mock(return_value=[MOCK_SERVER_1, MOCK_SERVER_2])
mm_plex_account.resource = Mock(return_value=mock_connections)
with patch("plexapi.myplex.MyPlexAccount", return_value=mm_plex_account), patch(
"plexapi.server.PlexServer"
) as mock_plex_server:
type(mock_plex_server.return_value).machineIdentifier = PropertyMock(
return_value=MOCK_SERVER_1.clientIdentifier
)
type(mock_plex_server.return_value).friendlyName = PropertyMock(
return_value=MOCK_SERVER_1.name
)
type( # pylint: disable=protected-access
mock_plex_server.return_value
)._baseurl = PropertyMock(return_value=mock_connections.connections[0].httpuri)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_TOKEN: MOCK_TOKEN}
)
assert result["type"] == "form"
assert result["step_id"] == "select_server"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={config_flow.CONF_SERVER: MOCK_SERVER_1.name}
)
assert result["type"] == "create_entry"
assert result["title"] == MOCK_SERVER_1.name
assert result["data"][config_flow.CONF_SERVER] == MOCK_SERVER_1.name
assert (
result["data"][config_flow.CONF_SERVER_IDENTIFIER]
== MOCK_SERVER_1.clientIdentifier
)
assert (
result["data"][config_flow.PLEX_SERVER_CONFIG][CONF_URL]
== mock_connections.connections[0].httpuri
)
assert result["data"][config_flow.PLEX_SERVER_CONFIG][CONF_TOKEN] == MOCK_TOKEN
async def test_adding_last_unconfigured_server(hass):
"""Test automatically adding last unconfigured server when multiple servers on account."""
MockConfigEntry(
domain=config_flow.DOMAIN,
data={
config_flow.CONF_SERVER_IDENTIFIER: MOCK_ID_2,
config_flow.CONF_SERVER: MOCK_NAME_2,
},
).add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN, context={"source": "user"}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
mock_connections = MockConnections()
mm_plex_account = MagicMock()
mm_plex_account.resources = Mock(return_value=[MOCK_SERVER_1, MOCK_SERVER_2])
mm_plex_account.resource = Mock(return_value=mock_connections)
with patch("plexapi.myplex.MyPlexAccount", return_value=mm_plex_account), patch(
"plexapi.server.PlexServer"
) as mock_plex_server:
type(mock_plex_server.return_value).machineIdentifier = PropertyMock(
return_value=MOCK_SERVER_1.clientIdentifier
)
type(mock_plex_server.return_value).friendlyName = PropertyMock(
return_value=MOCK_SERVER_1.name
)
type( # pylint: disable=protected-access
mock_plex_server.return_value
)._baseurl = PropertyMock(return_value=mock_connections.connections[0].httpuri)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_TOKEN: MOCK_TOKEN}
)
assert result["type"] == "create_entry"
assert result["title"] == MOCK_SERVER_1.name
assert result["data"][config_flow.CONF_SERVER] == MOCK_SERVER_1.name
assert (
result["data"][config_flow.CONF_SERVER_IDENTIFIER]
== MOCK_SERVER_1.clientIdentifier
)
assert (
result["data"][config_flow.PLEX_SERVER_CONFIG][CONF_URL]
== mock_connections.connections[0].httpuri
)
assert result["data"][config_flow.PLEX_SERVER_CONFIG][CONF_TOKEN] == MOCK_TOKEN
async def test_already_configured(hass):
"""Test a duplicated successful flow."""
flow = init_config_flow(hass)
MockConfigEntry(
domain=config_flow.DOMAIN, data={config_flow.CONF_SERVER_IDENTIFIER: MOCK_ID_1}
).add_to_hass(hass)
mock_connections = MockConnections()
mm_plex_account = MagicMock()
mm_plex_account.resources = Mock(return_value=[MOCK_SERVER_1])
mm_plex_account.resource = Mock(return_value=mock_connections)
with patch("plexapi.server.PlexServer") as mock_plex_server:
type(mock_plex_server.return_value).machineIdentifier = PropertyMock(
return_value=MOCK_SERVER_1.clientIdentifier
)
type(mock_plex_server.return_value).friendlyName = PropertyMock(
return_value=MOCK_SERVER_1.name
)
type( # pylint: disable=protected-access
mock_plex_server.return_value
)._baseurl = PropertyMock(return_value=mock_connections.connections[0].httpuri)
result = await flow.async_step_import(
{CONF_TOKEN: MOCK_TOKEN, CONF_URL: f"http://{MOCK_HOST_1}:{MOCK_PORT_1}"}
)
assert result["type"] == "abort"
assert result["reason"] == "already_configured"
async def test_all_available_servers_configured(hass):
"""Test when all available servers are already configured."""
MockConfigEntry(
domain=config_flow.DOMAIN,
data={
config_flow.CONF_SERVER_IDENTIFIER: MOCK_ID_1,
config_flow.CONF_SERVER: MOCK_NAME_1,
},
).add_to_hass(hass)
MockConfigEntry(
domain=config_flow.DOMAIN,
data={
config_flow.CONF_SERVER_IDENTIFIER: MOCK_ID_2,
config_flow.CONF_SERVER: MOCK_NAME_2,
},
).add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN, context={"source": "user"}
)
assert result["type"] == "form"
assert result["step_id"] == "user"
mock_connections = MockConnections()
mm_plex_account = MagicMock()
mm_plex_account.resources = Mock(return_value=[MOCK_SERVER_1, MOCK_SERVER_2])
mm_plex_account.resource = Mock(return_value=mock_connections)
with patch("plexapi.myplex.MyPlexAccount", return_value=mm_plex_account):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_TOKEN: MOCK_TOKEN}
)
assert result["type"] == "abort"
assert result["reason"] == "all_configured"