Add new Plex movie lookup method for media_player.play_media (#39584)

This commit is contained in:
jjlawren 2020-09-04 04:32:36 -05:00 committed by GitHub
parent fdb737d1d9
commit dd7f282723
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 202 additions and 125 deletions

View file

@ -16,3 +16,7 @@ class ServerNotSpecified(PlexException):
class ShouldUpdateConfigEntry(PlexException):
"""Config entry data is out of date and should be updated."""
class MediaNotFound(PlexException):
"""Media lookup failed for a given search query."""

View file

@ -0,0 +1,116 @@
"""Helper methods to search for Plex media."""
import logging
from plexapi.exceptions import BadRequest, NotFound
from .errors import MediaNotFound
_LOGGER = logging.getLogger(__name__)
def lookup_movie(library_section, **kwargs):
"""Find a specific movie and return a Plex media object."""
try:
title = kwargs["title"]
except KeyError:
_LOGGER.error("Must specify 'title' for this search")
return None
try:
movies = library_section.search(**kwargs, libtype="movie", maxresults=3)
except BadRequest as err:
_LOGGER.error("Invalid search payload provided: %s", err)
return None
if not movies:
raise MediaNotFound(f"Movie {title}") from None
if len(movies) > 1:
exact_matches = [x for x in movies if x.title.lower() == title.lower()]
if len(exact_matches) == 1:
return exact_matches[0]
match_list = [f"{x.title} ({x.year})" for x in movies]
_LOGGER.warning("Multiple matches found during search: %s", match_list)
return None
return movies[0]
def lookup_tv(library_section, **kwargs):
"""Find TV media and return a Plex media object."""
season_number = kwargs.get("season_number")
episode_number = kwargs.get("episode_number")
try:
show_name = kwargs["show_name"]
show = library_section.get(show_name)
except KeyError:
_LOGGER.error("Must specify 'show_name' for this search")
return None
except NotFound as err:
raise MediaNotFound(f"Show {show_name}") from err
if not season_number:
return show
try:
season = show.season(int(season_number))
except NotFound as err:
raise MediaNotFound(f"Season {season_number} of {show_name}") from err
if not episode_number:
return season
try:
return season.episode(episode=int(episode_number))
except NotFound as err:
episode = f"S{str(season_number).zfill(2)}E{str(episode_number).zfill(2)}"
raise MediaNotFound(f"Episode {episode} of {show_name}") from err
def lookup_music(library_section, **kwargs):
"""Search for music and return a Plex media object."""
album_name = kwargs.get("album_name")
track_name = kwargs.get("track_name")
track_number = kwargs.get("track_number")
try:
artist_name = kwargs["artist_name"]
artist = library_section.get(artist_name)
except KeyError:
_LOGGER.error("Must specify 'artist_name' for this search")
return None
except NotFound as err:
raise MediaNotFound(f"Artist {artist_name}") from err
if album_name:
try:
album = artist.album(album_name)
except NotFound as err:
raise MediaNotFound(f"Album {album_name} by {artist_name}") from err
if track_name:
try:
return album.track(track_name)
except NotFound as err:
raise MediaNotFound(
f"Track {track_name} on {album_name} by {artist_name}"
) from err
if track_number:
for track in album.tracks():
if int(track.index) == int(track_number):
return track
raise MediaNotFound(
f"Track {track_number} on {album_name} by {artist_name}"
) from None
return album
if track_name:
try:
return artist.get(track_name)
except NotFound as err:
raise MediaNotFound(f"Track {track_name} by {artist_name}") from err
return artist

View file

@ -14,6 +14,7 @@ import requests.exceptions
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
from homeassistant.components.media_player.const import (
MEDIA_TYPE_EPISODE,
MEDIA_TYPE_MOVIE,
MEDIA_TYPE_MUSIC,
MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_VIDEO,
@ -41,7 +42,13 @@ from .const import (
X_PLEX_PRODUCT,
X_PLEX_VERSION,
)
from .errors import NoServersFound, ServerNotSpecified, ShouldUpdateConfigEntry
from .errors import (
MediaNotFound,
NoServersFound,
ServerNotSpecified,
ShouldUpdateConfigEntry,
)
from .media_search import lookup_movie, lookup_music, lookup_tv
_LOGGER = logging.getLogger(__name__)
@ -487,7 +494,7 @@ class PlexServer:
return None
try:
library_name = kwargs["library_name"]
library_name = kwargs.pop("library_name")
library_section = self.library.section(library_name)
except KeyError:
_LOGGER.error("Must specify 'library_name' for this search")
@ -496,125 +503,23 @@ class PlexServer:
_LOGGER.error("Library '%s' not found", library_name)
return None
def lookup_music():
"""Search for music and return a Plex media object."""
album_name = kwargs.get("album_name")
track_name = kwargs.get("track_name")
track_number = kwargs.get("track_number")
try:
artist_name = kwargs["artist_name"]
artist = library_section.get(artist_name)
except KeyError:
_LOGGER.error("Must specify 'artist_name' for this search")
return None
except NotFound:
_LOGGER.error(
"Artist '%s' not found in '%s'", artist_name, library_name
)
return None
if album_name:
try:
if media_type == MEDIA_TYPE_EPISODE:
return lookup_tv(library_section, **kwargs)
if media_type == MEDIA_TYPE_MOVIE:
return lookup_movie(library_section, **kwargs)
if media_type == MEDIA_TYPE_MUSIC:
return lookup_music(library_section, **kwargs)
if media_type == MEDIA_TYPE_VIDEO:
# Legacy method for compatibility
try:
album = artist.album(album_name)
except NotFound:
_LOGGER.error(
"Album '%s' by '%s' not found", album_name, artist_name
)
video_name = kwargs["video_name"]
return library_section.get(video_name)
except KeyError:
_LOGGER.error("Must specify 'video_name' for this search")
return None
if track_name:
try:
return album.track(track_name)
except NotFound:
_LOGGER.error(
"Track '%s' on '%s' by '%s' not found",
track_name,
album_name,
artist_name,
)
return None
if track_number:
for track in album.tracks():
if int(track.index) == int(track_number):
return track
_LOGGER.error(
"Track %d on '%s' by '%s' not found",
track_number,
album_name,
artist_name,
)
return None
return album
if track_name:
try:
return artist.get(track_name)
except NotFound:
_LOGGER.error(
"Track '%s' by '%s' not found", track_name, artist_name
)
return None
return artist
def lookup_tv():
"""Find TV media and return a Plex media object."""
season_number = kwargs.get("season_number")
episode_number = kwargs.get("episode_number")
try:
show_name = kwargs["show_name"]
show = library_section.get(show_name)
except KeyError:
_LOGGER.error("Must specify 'show_name' for this search")
return None
except NotFound:
_LOGGER.error("Show '%s' not found in '%s'", show_name, library_name)
return None
if not season_number:
return show
try:
season = show.season(int(season_number))
except NotFound:
_LOGGER.error(
"Season %d of '%s' not found",
season_number,
show_name,
)
return None
if not episode_number:
return season
try:
return season.episode(episode=int(episode_number))
except NotFound:
_LOGGER.error(
"Episode not found: %s - S%sE%s",
show_name,
str(season_number).zfill(2),
str(episode_number).zfill(2),
)
return None
if media_type == MEDIA_TYPE_MUSIC:
return lookup_music()
if media_type == MEDIA_TYPE_EPISODE:
return lookup_tv()
if media_type == MEDIA_TYPE_VIDEO:
try:
video_name = kwargs["video_name"]
return library_section.get(video_name)
except KeyError:
_LOGGER.error("Must specify 'video_name' for this search")
except NotFound:
_LOGGER.error(
"Movie '%s' not found in '%s'",
video_name,
library_name,
)
except NotFound as err:
raise MediaNotFound(f"Video {video_name}") from err
except MediaNotFound as failed_item:
_LOGGER.error("%s not found in %s", failed_item, library_name)
return None

View file

@ -414,6 +414,11 @@ class MockPlexLibrarySection:
"""Mock the key identifier property."""
return str(id(self.title))
def search(self, **kwargs):
"""Mock the LibrarySection search method."""
if kwargs.get("libtype") == "movie":
return self.all()
def update(self):
"""Mock the update call."""
pass
@ -422,11 +427,12 @@ class MockPlexLibrarySection:
class MockPlexMediaItem:
"""Mock a Plex Media instance."""
def __init__(self, title, mediatype="video"):
def __init__(self, title, mediatype="video", year=2020):
"""Initialize the object."""
self.title = str(title)
self.type = mediatype
self.thumbUrl = "http://1.2.3.4/thumb.png"
self.year = year
self._children = []
def __iter__(self):

View file

@ -1,7 +1,7 @@
"""Tests for Plex server."""
import copy
from plexapi.exceptions import NotFound
from plexapi.exceptions import BadRequest, NotFound
from requests.exceptions import RequestException
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
@ -9,6 +9,7 @@ from homeassistant.components.media_player.const import (
ATTR_MEDIA_CONTENT_ID,
ATTR_MEDIA_CONTENT_TYPE,
MEDIA_TYPE_EPISODE,
MEDIA_TYPE_MOVIE,
MEDIA_TYPE_MUSIC,
MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_VIDEO,
@ -32,6 +33,7 @@ from .mock_classes import (
MockPlexArtist,
MockPlexLibrary,
MockPlexLibrarySection,
MockPlexMediaItem,
MockPlexSeason,
MockPlexServer,
MockPlexShow,
@ -454,7 +456,7 @@ async def test_media_lookups(hass):
is None
)
# Movie searches
# Legacy Movie searches
assert loaded_server.lookup_media(MEDIA_TYPE_VIDEO, video_name="Movie") is None
assert loaded_server.lookup_media(MEDIA_TYPE_VIDEO, library_name="Movies") is None
assert loaded_server.lookup_media(
@ -467,3 +469,47 @@ async def test_media_lookups(hass):
)
is None
)
# Movie searches
assert loaded_server.lookup_media(MEDIA_TYPE_MOVIE, title="Movie") is None
assert loaded_server.lookup_media(MEDIA_TYPE_MOVIE, library_name="Movies") is None
assert loaded_server.lookup_media(
MEDIA_TYPE_MOVIE, library_name="Movies", title="Movie"
)
with patch.object(MockPlexLibrarySection, "search", side_effect=BadRequest):
assert (
loaded_server.lookup_media(
MEDIA_TYPE_MOVIE, library_name="Movies", title="Not a Movie"
)
is None
)
with patch.object(MockPlexLibrarySection, "search", return_value=[]):
assert (
loaded_server.lookup_media(
MEDIA_TYPE_MOVIE, library_name="Movies", title="Not a Movie"
)
is None
)
similar_movies = []
for title in "Duplicate Movie", "Duplicate Movie 2":
similar_movies.append(MockPlexMediaItem(title))
with patch.object(
loaded_server.library.section("Movies"), "search", return_value=similar_movies
):
found_media = loaded_server.lookup_media(
MEDIA_TYPE_MOVIE, library_name="Movies", title="Duplicate Movie"
)
assert found_media.title == "Duplicate Movie"
duplicate_movies = []
for title in "Duplicate Movie - Original", "Duplicate Movie - Remake":
duplicate_movies.append(MockPlexMediaItem(title))
with patch.object(
loaded_server.library.section("Movies"), "search", return_value=duplicate_movies
):
assert (
loaded_server.lookup_media(
MEDIA_TYPE_MOVIE, library_name="Movies", title="Duplicate Movie"
)
) is None