Media Source implementation for Chromecast (#39305)
* Implement local media finder and integrate into cast * update to media source as a platform * Tweak media source design * fix websocket and local source * fix websocket schema * fix playing media * Apply suggestions from code review Co-authored-by: Martin Hjelmare <marhje52@gmail.com> * Add resolve_media websocket * Register that shit * Square brackets * Sign path * add support for multiple media sources and address PR review * fix lint * fix tests from auto whitelisting config/media * allow specifying a name on the media source * add tests * fix for python 3.7 * Apply suggestions from code review Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io> * add http back to cast and remove guess_type from executor as there is no i/o Co-authored-by: Paulus Schoutsen <balloob@gmail.com> Co-authored-by: Martin Hjelmare <marhje52@gmail.com> Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>
This commit is contained in:
parent
f01a0f9151
commit
f2b3e63ff6
18 changed files with 738 additions and 6 deletions
|
@ -244,6 +244,7 @@ homeassistant/components/lutron_caseta/* @swails
|
|||
homeassistant/components/mastodon/* @fabaff
|
||||
homeassistant/components/matrix/* @tinloaf
|
||||
homeassistant/components/mcp23017/* @jardiamj
|
||||
homeassistant/components/media_source/* @hunterjm
|
||||
homeassistant/components/mediaroom/* @dgomes
|
||||
homeassistant/components/melcloud/* @vilppuvuorinen
|
||||
homeassistant/components/melissa/* @kennedyshead
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
"config_flow": true,
|
||||
"documentation": "https://www.home-assistant.io/integrations/cast",
|
||||
"requirements": ["pychromecast==7.2.1"],
|
||||
"after_dependencies": ["cloud","tts","zeroconf"],
|
||||
"after_dependencies": ["cloud", "http", "media_source", "tts", "zeroconf"],
|
||||
"zeroconf": ["_googlecast._tcp.local."],
|
||||
"codeowners": ["@emontnemery"]
|
||||
}
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
"""Provide functionality to interact with Cast devices on the network."""
|
||||
import asyncio
|
||||
from datetime import timedelta
|
||||
import functools as ft
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
@ -14,12 +16,15 @@ from pychromecast.socket_client import (
|
|||
)
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components import zeroconf
|
||||
from homeassistant.auth.models import RefreshToken
|
||||
from homeassistant.components import media_source, zeroconf
|
||||
from homeassistant.components.http.auth import async_sign_path
|
||||
from homeassistant.components.media_player import PLATFORM_SCHEMA, MediaPlayerEntity
|
||||
from homeassistant.components.media_player.const import (
|
||||
MEDIA_TYPE_MOVIE,
|
||||
MEDIA_TYPE_MUSIC,
|
||||
MEDIA_TYPE_TVSHOW,
|
||||
SUPPORT_BROWSE_MEDIA,
|
||||
SUPPORT_NEXT_TRACK,
|
||||
SUPPORT_PAUSE,
|
||||
SUPPORT_PLAY,
|
||||
|
@ -502,6 +507,44 @@ class CastDevice(MediaPlayerEntity):
|
|||
media_controller = self._media_controller()
|
||||
media_controller.seek(position)
|
||||
|
||||
async def async_browse_media(self, media_content_type=None, media_content_id=None):
|
||||
"""Implement the websocket media browsing helper."""
|
||||
result = await media_source.async_browse_media(self.hass, media_content_id)
|
||||
return result.to_media_player_item()
|
||||
|
||||
async def async_play_media(self, media_type, media_id, **kwargs):
|
||||
"""Play a piece of media."""
|
||||
# Handle media_source
|
||||
if media_source.is_media_source_id(media_id):
|
||||
sourced_media = await media_source.async_resolve_media(self.hass, media_id)
|
||||
media_type = sourced_media.mime_type
|
||||
media_id = sourced_media.url
|
||||
|
||||
# If media ID is a relative URL, we serve it from HA.
|
||||
# Create a signed path.
|
||||
if media_id[0] == "/":
|
||||
# Sign URL with Home Assistant Cast User
|
||||
config_entries = self.hass.config_entries.async_entries(CAST_DOMAIN)
|
||||
user_id = config_entries[0].data["user_id"]
|
||||
user = await self.hass.auth.async_get_user(user_id)
|
||||
if user.refresh_tokens:
|
||||
refresh_token: RefreshToken = list(user.refresh_tokens.values())[0]
|
||||
|
||||
media_id = async_sign_path(
|
||||
self.hass,
|
||||
refresh_token.id,
|
||||
media_id,
|
||||
timedelta(minutes=5),
|
||||
)
|
||||
|
||||
# prepend external URL
|
||||
hass_url = get_url(self.hass, prefer_external=True)
|
||||
media_id = f"{hass_url}{media_id}"
|
||||
|
||||
await self.hass.async_add_job(
|
||||
ft.partial(self.play_media, media_type, media_id, **kwargs)
|
||||
)
|
||||
|
||||
def play_media(self, media_type, media_id, **kwargs):
|
||||
"""Play media from a URL."""
|
||||
# We do not want this to be forwarded to a group
|
||||
|
@ -726,6 +769,9 @@ class CastDevice(MediaPlayerEntity):
|
|||
if media_status.supports_seek:
|
||||
support |= SUPPORT_SEEK
|
||||
|
||||
if "media_source" in self.hass.config.components:
|
||||
support |= SUPPORT_BROWSE_MEDIA
|
||||
|
||||
return support
|
||||
|
||||
@property
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
"history",
|
||||
"logbook",
|
||||
"map",
|
||||
"media_source",
|
||||
"mobile_app",
|
||||
"person",
|
||||
"scene",
|
||||
|
|
124
homeassistant/components/media_source/__init__.py
Normal file
124
homeassistant/components/media_source/__init__.py
Normal file
|
@ -0,0 +1,124 @@
|
|||
"""The media_source integration."""
|
||||
from datetime import timedelta
|
||||
from typing import Optional
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components import websocket_api
|
||||
from homeassistant.components.http.auth import async_sign_path
|
||||
from homeassistant.components.media_player.const import ATTR_MEDIA_CONTENT_ID
|
||||
from homeassistant.components.media_player.errors import BrowseError
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.helpers.integration_platform import (
|
||||
async_process_integration_platforms,
|
||||
)
|
||||
from homeassistant.loader import bind_hass
|
||||
|
||||
from . import local_source, models
|
||||
from .const import DOMAIN, URI_SCHEME, URI_SCHEME_REGEX
|
||||
from .error import Unresolvable
|
||||
|
||||
|
||||
def is_media_source_id(media_content_id: str):
|
||||
"""Test if identifier is a media source."""
|
||||
return URI_SCHEME_REGEX.match(media_content_id) is not None
|
||||
|
||||
|
||||
def generate_media_source_id(domain: str, identifier: str) -> str:
|
||||
"""Generate a media source ID."""
|
||||
uri = f"{URI_SCHEME}{domain or ''}"
|
||||
if identifier:
|
||||
uri += f"/{identifier}"
|
||||
return uri
|
||||
|
||||
|
||||
async def async_setup(hass: HomeAssistant, config: dict):
|
||||
"""Set up the media_source component."""
|
||||
hass.data[DOMAIN] = {}
|
||||
hass.components.websocket_api.async_register_command(websocket_browse_media)
|
||||
hass.components.websocket_api.async_register_command(websocket_resolve_media)
|
||||
local_source.async_setup(hass)
|
||||
await async_process_integration_platforms(
|
||||
hass, DOMAIN, _process_media_source_platform
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
async def _process_media_source_platform(hass, domain, platform):
|
||||
"""Process a media source platform."""
|
||||
hass.data[DOMAIN][domain] = await platform.async_get_media_source(hass)
|
||||
|
||||
|
||||
@callback
|
||||
def _get_media_item(
|
||||
hass: HomeAssistant, media_content_id: Optional[str]
|
||||
) -> models.MediaSourceItem:
|
||||
"""Return media item."""
|
||||
if media_content_id:
|
||||
return models.MediaSourceItem.from_uri(hass, media_content_id)
|
||||
|
||||
# We default to our own domain if its only one registered
|
||||
domain = None if len(hass.data[DOMAIN]) > 1 else DOMAIN
|
||||
return models.MediaSourceItem(hass, domain, "")
|
||||
|
||||
|
||||
@bind_hass
|
||||
async def async_browse_media(
|
||||
hass: HomeAssistant, media_content_id: str
|
||||
) -> models.BrowseMedia:
|
||||
"""Return media player browse media results."""
|
||||
return await _get_media_item(hass, media_content_id).async_browse()
|
||||
|
||||
|
||||
@bind_hass
|
||||
async def async_resolve_media(
|
||||
hass: HomeAssistant, media_content_id: str
|
||||
) -> models.PlayMedia:
|
||||
"""Get info to play media."""
|
||||
return await _get_media_item(hass, media_content_id).async_resolve()
|
||||
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "media_source/browse_media",
|
||||
vol.Optional(ATTR_MEDIA_CONTENT_ID, default=""): str,
|
||||
}
|
||||
)
|
||||
@websocket_api.async_response
|
||||
async def websocket_browse_media(hass, connection, msg):
|
||||
"""Browse available media."""
|
||||
try:
|
||||
media = await async_browse_media(hass, msg.get("media_content_id"))
|
||||
connection.send_result(
|
||||
msg["id"],
|
||||
media.to_media_player_item(),
|
||||
)
|
||||
except BrowseError as err:
|
||||
connection.send_error(msg["id"], "browse_media_failed", str(err))
|
||||
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "media_source/resolve_media",
|
||||
vol.Required(ATTR_MEDIA_CONTENT_ID): str,
|
||||
vol.Optional("expires", default=30): int,
|
||||
}
|
||||
)
|
||||
@websocket_api.async_response
|
||||
async def websocket_resolve_media(hass, connection, msg):
|
||||
"""Resolve media."""
|
||||
try:
|
||||
media = await async_resolve_media(hass, msg["media_content_id"])
|
||||
url = media.url
|
||||
except Unresolvable as err:
|
||||
connection.send_error(msg["id"], "resolve_media_failed", str(err))
|
||||
else:
|
||||
if url[0] == "/":
|
||||
url = async_sign_path(
|
||||
hass,
|
||||
connection.refresh_token_id,
|
||||
url,
|
||||
timedelta(seconds=msg["expires"]),
|
||||
)
|
||||
|
||||
connection.send_result(msg["id"], {"url": url, "mime_type": media.mime_type})
|
7
homeassistant/components/media_source/const.py
Normal file
7
homeassistant/components/media_source/const.py
Normal file
|
@ -0,0 +1,7 @@
|
|||
"""Constants for the media_source integration."""
|
||||
import re
|
||||
|
||||
DOMAIN = "media_source"
|
||||
MEDIA_MIME_TYPES = ("audio", "video", "image")
|
||||
URI_SCHEME = "media-source://"
|
||||
URI_SCHEME_REGEX = re.compile(r"^media-source://(?P<domain>[^/]+)?(?P<identifier>.+)?")
|
10
homeassistant/components/media_source/error.py
Normal file
10
homeassistant/components/media_source/error.py
Normal file
|
@ -0,0 +1,10 @@
|
|||
"""Errors for media source."""
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
|
||||
|
||||
class MediaSourceError(HomeAssistantError):
|
||||
"""Base class for media source errors."""
|
||||
|
||||
|
||||
class Unresolvable(MediaSourceError):
|
||||
"""When media ID is not resolvable."""
|
160
homeassistant/components/media_source/local_source.py
Normal file
160
homeassistant/components/media_source/local_source.py
Normal file
|
@ -0,0 +1,160 @@
|
|||
"""Local Media Source Implementation."""
|
||||
import mimetypes
|
||||
from pathlib import Path
|
||||
from typing import Tuple
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
from homeassistant.components.http import HomeAssistantView
|
||||
from homeassistant.components.media_player.errors import BrowseError
|
||||
from homeassistant.components.media_source.error import Unresolvable
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.util import sanitize_path
|
||||
|
||||
from .const import DOMAIN, MEDIA_MIME_TYPES
|
||||
from .models import BrowseMedia, MediaSource, MediaSourceItem, PlayMedia
|
||||
|
||||
|
||||
@callback
|
||||
def async_setup(hass: HomeAssistant):
|
||||
"""Set up local media source."""
|
||||
source = LocalSource(hass)
|
||||
hass.data[DOMAIN][DOMAIN] = source
|
||||
hass.http.register_view(LocalMediaView(hass))
|
||||
|
||||
|
||||
@callback
|
||||
def async_parse_identifier(item: MediaSourceItem) -> Tuple[str, str]:
|
||||
"""Parse identifier."""
|
||||
if not item.identifier:
|
||||
source_dir_id = "media"
|
||||
location = ""
|
||||
|
||||
else:
|
||||
source_dir_id, location = item.identifier.lstrip("/").split("/", 1)
|
||||
|
||||
if source_dir_id != "media":
|
||||
raise Unresolvable("Unknown source directory.")
|
||||
|
||||
if location != sanitize_path(location):
|
||||
raise Unresolvable("Invalid path.")
|
||||
|
||||
return source_dir_id, location
|
||||
|
||||
|
||||
class LocalSource(MediaSource):
|
||||
"""Provide local directories as media sources."""
|
||||
|
||||
name: str = "Local Media"
|
||||
|
||||
def __init__(self, hass: HomeAssistant):
|
||||
"""Initialize local source."""
|
||||
super().__init__(DOMAIN)
|
||||
self.hass = hass
|
||||
|
||||
@callback
|
||||
def async_full_path(self, source_dir_id, location) -> Path:
|
||||
"""Return full path."""
|
||||
return self.hass.config.path("media", location)
|
||||
|
||||
async def async_resolve_media(self, item: MediaSourceItem) -> str:
|
||||
"""Resolve media to a url."""
|
||||
source_dir_id, location = async_parse_identifier(item)
|
||||
mime_type, _ = mimetypes.guess_type(
|
||||
self.async_full_path(source_dir_id, location)
|
||||
)
|
||||
return PlayMedia(item.identifier, mime_type)
|
||||
|
||||
async def async_browse_media(
|
||||
self, item: MediaSourceItem, media_types: Tuple[str] = MEDIA_MIME_TYPES
|
||||
) -> BrowseMedia:
|
||||
"""Return media."""
|
||||
try:
|
||||
source_dir_id, location = async_parse_identifier(item)
|
||||
except Unresolvable as err:
|
||||
raise BrowseError(str(err)) from err
|
||||
|
||||
return await self.hass.async_add_executor_job(
|
||||
self._browse_media, source_dir_id, location
|
||||
)
|
||||
|
||||
def _browse_media(self, source_dir_id, location):
|
||||
"""Browse media."""
|
||||
full_path = Path(self.hass.config.path("media", location))
|
||||
|
||||
if not full_path.exists():
|
||||
raise BrowseError("Path does not exist.")
|
||||
|
||||
if not full_path.is_dir():
|
||||
raise BrowseError("Path is not a directory.")
|
||||
|
||||
return self._build_item_response(source_dir_id, full_path)
|
||||
|
||||
def _build_item_response(self, source_dir_id: str, path: Path, is_child=False):
|
||||
mime_type, _ = mimetypes.guess_type(str(path))
|
||||
media = BrowseMedia(
|
||||
DOMAIN,
|
||||
f"{source_dir_id}/{path.relative_to(self.hass.config.path('media'))}",
|
||||
path.name,
|
||||
path.is_file(),
|
||||
path.is_dir(),
|
||||
mime_type,
|
||||
)
|
||||
|
||||
# Make sure it's a file or directory
|
||||
if not media.can_play and not media.can_expand:
|
||||
return None
|
||||
|
||||
# Check that it's a media file
|
||||
if media.can_play and (
|
||||
not mime_type or mime_type.split("/")[0] not in MEDIA_MIME_TYPES
|
||||
):
|
||||
return None
|
||||
|
||||
if not media.can_expand:
|
||||
return media
|
||||
|
||||
media.name += "/"
|
||||
|
||||
# Append first level children
|
||||
if not is_child:
|
||||
media.children = []
|
||||
for child_path in path.iterdir():
|
||||
child = self._build_item_response(source_dir_id, child_path, True)
|
||||
if child:
|
||||
media.children.append(child)
|
||||
|
||||
return media
|
||||
|
||||
|
||||
class LocalMediaView(HomeAssistantView):
|
||||
"""
|
||||
Local Media Finder View.
|
||||
|
||||
Returns media files in config/media.
|
||||
"""
|
||||
|
||||
url = "/media/{location:.*}"
|
||||
name = "media"
|
||||
|
||||
def __init__(self, hass: HomeAssistant):
|
||||
"""Initialize the media view."""
|
||||
self.hass = hass
|
||||
|
||||
async def get(self, request: web.Request, location: str) -> web.FileResponse:
|
||||
"""Start a GET request."""
|
||||
if location != sanitize_path(location):
|
||||
return web.HTTPNotFound()
|
||||
|
||||
media_path = Path(self.hass.config.path("media", location))
|
||||
|
||||
# Check that the file exists
|
||||
if not media_path.is_file():
|
||||
raise web.HTTPNotFound()
|
||||
|
||||
# Check that it's a media file
|
||||
mime_type, _ = mimetypes.guess_type(str(media_path))
|
||||
if not mime_type or mime_type.split("/")[0] not in MEDIA_MIME_TYPES:
|
||||
raise web.HTTPNotFound()
|
||||
|
||||
return web.FileResponse(media_path)
|
7
homeassistant/components/media_source/manifest.json
Normal file
7
homeassistant/components/media_source/manifest.json
Normal file
|
@ -0,0 +1,7 @@
|
|||
{
|
||||
"domain": "media_source",
|
||||
"name": "Media Source",
|
||||
"documentation": "https://www.home-assistant.io/integrations/media_source",
|
||||
"dependencies": ["http"],
|
||||
"codeowners": ["@hunterjm"]
|
||||
}
|
124
homeassistant/components/media_source/models.py
Normal file
124
homeassistant/components/media_source/models.py
Normal file
|
@ -0,0 +1,124 @@
|
|||
"""Media Source models."""
|
||||
from abc import ABC
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
|
||||
from .const import DOMAIN, URI_SCHEME, URI_SCHEME_REGEX
|
||||
|
||||
|
||||
@dataclass
|
||||
class PlayMedia:
|
||||
"""Represents a playable media."""
|
||||
|
||||
url: str
|
||||
mime_type: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class BrowseMedia:
|
||||
"""Represent a browsable media file."""
|
||||
|
||||
domain: str
|
||||
identifier: str
|
||||
|
||||
name: str
|
||||
can_play: bool = False
|
||||
can_expand: bool = False
|
||||
media_content_type: str = None
|
||||
children: List = None
|
||||
|
||||
def to_uri(self):
|
||||
"""Return URI of media."""
|
||||
uri = f"{URI_SCHEME}{self.domain or ''}"
|
||||
if self.identifier:
|
||||
uri += f"/{self.identifier}"
|
||||
return uri
|
||||
|
||||
def to_media_player_item(self):
|
||||
"""Convert Media class to browse media dictionary."""
|
||||
content_type = self.media_content_type
|
||||
|
||||
if content_type is None:
|
||||
content_type = "folder" if self.can_expand else "file"
|
||||
|
||||
response = {
|
||||
"title": self.name,
|
||||
"media_content_type": content_type,
|
||||
"media_content_id": self.to_uri(),
|
||||
"can_play": self.can_play,
|
||||
"can_expand": self.can_expand,
|
||||
}
|
||||
|
||||
if self.children:
|
||||
response["children"] = [
|
||||
child.to_media_player_item() for child in self.children
|
||||
]
|
||||
|
||||
return response
|
||||
|
||||
|
||||
@dataclass
|
||||
class MediaSourceItem:
|
||||
"""A parsed media item."""
|
||||
|
||||
hass: HomeAssistant
|
||||
domain: Optional[str]
|
||||
identifier: str
|
||||
|
||||
async def async_browse(self) -> BrowseMedia:
|
||||
"""Browse this item."""
|
||||
if self.domain is None:
|
||||
base = BrowseMedia(None, None, "Media Sources", False, True)
|
||||
base.children = [
|
||||
BrowseMedia(source.domain, None, source.name, False, True)
|
||||
for source in self.hass.data[DOMAIN].values()
|
||||
]
|
||||
return base
|
||||
|
||||
return await self.async_media_source().async_browse_media(self)
|
||||
|
||||
async def async_resolve(self) -> PlayMedia:
|
||||
"""Resolve to playable item."""
|
||||
return await self.async_media_source().async_resolve_media(self)
|
||||
|
||||
@callback
|
||||
def async_media_source(self) -> "MediaSource":
|
||||
"""Return media source that owns this item."""
|
||||
return self.hass.data[DOMAIN][self.domain]
|
||||
|
||||
@classmethod
|
||||
def from_uri(cls, hass: HomeAssistant, uri: str) -> "MediaSourceItem":
|
||||
"""Create an item from a uri."""
|
||||
match = URI_SCHEME_REGEX.match(uri)
|
||||
|
||||
if not match:
|
||||
raise ValueError("Invalid media source URI")
|
||||
|
||||
domain = match.group("domain")
|
||||
identifier = match.group("identifier")
|
||||
|
||||
return cls(hass, domain, identifier)
|
||||
|
||||
|
||||
class MediaSource(ABC):
|
||||
"""Represents a source of media files."""
|
||||
|
||||
name: str = None
|
||||
|
||||
def __init__(self, domain: str):
|
||||
"""Initialize a media source."""
|
||||
self.domain = domain
|
||||
if not self.name:
|
||||
self.name = domain
|
||||
|
||||
async def async_resolve_media(self, item: MediaSourceItem) -> PlayMedia:
|
||||
"""Resolve a media item to a playable item."""
|
||||
raise NotImplementedError
|
||||
|
||||
async def async_browse_media(
|
||||
self, item: MediaSourceItem, media_types: Tuple[str]
|
||||
) -> BrowseMedia:
|
||||
"""Browse media."""
|
||||
raise NotImplementedError
|
|
@ -504,7 +504,7 @@ async def async_process_ha_core_config(hass: HomeAssistant, config: Dict) -> Non
|
|||
hac.set_time_zone(config[CONF_TIME_ZONE])
|
||||
|
||||
# Init whitelist external dir
|
||||
hac.allowlist_external_dirs = {hass.config.path("www")}
|
||||
hac.allowlist_external_dirs = {hass.config.path("www"), hass.config.path("media")}
|
||||
if CONF_ALLOWLIST_EXTERNAL_DIRS in config:
|
||||
hac.allowlist_external_dirs.update(set(config[CONF_ALLOWLIST_EXTERNAL_DIRS]))
|
||||
|
||||
|
|
1
tests/components/media_source/__init__.py
Normal file
1
tests/components/media_source/__init__.py
Normal file
|
@ -0,0 +1 @@
|
|||
"""The tests for Media Source integration."""
|
158
tests/components/media_source/test_init.py
Normal file
158
tests/components/media_source/test_init.py
Normal file
|
@ -0,0 +1,158 @@
|
|||
"""Test Media Source initialization."""
|
||||
import pytest
|
||||
|
||||
from homeassistant.components import media_source
|
||||
from homeassistant.components.media_player.errors import BrowseError
|
||||
from homeassistant.components.media_source import const
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from tests.async_mock import patch
|
||||
|
||||
|
||||
async def test_is_media_source_id():
|
||||
"""Test media source validation."""
|
||||
assert media_source.is_media_source_id(const.URI_SCHEME)
|
||||
assert media_source.is_media_source_id(f"{const.URI_SCHEME}domain")
|
||||
assert media_source.is_media_source_id(f"{const.URI_SCHEME}domain/identifier")
|
||||
assert not media_source.is_media_source_id("test")
|
||||
|
||||
|
||||
async def test_generate_media_source_id():
|
||||
"""Test identifier generation."""
|
||||
tests = [
|
||||
(None, None),
|
||||
(None, ""),
|
||||
("", ""),
|
||||
("domain", None),
|
||||
("domain", ""),
|
||||
("domain", "identifier"),
|
||||
]
|
||||
|
||||
for domain, identifier in tests:
|
||||
assert media_source.is_media_source_id(
|
||||
media_source.generate_media_source_id(domain, identifier)
|
||||
)
|
||||
|
||||
|
||||
async def test_async_browse_media(hass):
|
||||
"""Test browse media."""
|
||||
assert await async_setup_component(hass, const.DOMAIN, {})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Test non-media ignored (/media has test.mp3 and not_media.txt)
|
||||
media = await media_source.async_browse_media(hass, "")
|
||||
assert isinstance(media, media_source.models.BrowseMedia)
|
||||
assert media.name == "media/"
|
||||
assert len(media.children) == 1
|
||||
|
||||
# Test invalid media content
|
||||
with pytest.raises(ValueError):
|
||||
await media_source.async_browse_media(hass, "invalid")
|
||||
|
||||
# Test base URI returns all domains
|
||||
media = await media_source.async_browse_media(hass, const.URI_SCHEME)
|
||||
assert isinstance(media, media_source.models.BrowseMedia)
|
||||
assert len(media.children) == 1
|
||||
assert media.children[0].name == "Local Media"
|
||||
|
||||
|
||||
async def test_async_resolve_media(hass):
|
||||
"""Test browse media."""
|
||||
assert await async_setup_component(hass, const.DOMAIN, {})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Test no media content
|
||||
media = await media_source.async_resolve_media(hass, "")
|
||||
assert isinstance(media, media_source.models.PlayMedia)
|
||||
|
||||
|
||||
async def test_websocket_browse_media(hass, hass_ws_client):
|
||||
"""Test browse media websocket."""
|
||||
assert await async_setup_component(hass, const.DOMAIN, {})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
|
||||
media = media_source.models.BrowseMedia(const.DOMAIN, "/media", False, True)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.media_source.async_browse_media",
|
||||
return_value=media,
|
||||
):
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 1,
|
||||
"type": "media_source/browse_media",
|
||||
}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert msg["success"]
|
||||
assert msg["id"] == 1
|
||||
assert media.to_media_player_item() == msg["result"]
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.media_source.async_browse_media",
|
||||
side_effect=BrowseError("test"),
|
||||
):
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 2,
|
||||
"type": "media_source/browse_media",
|
||||
"media_content_id": "invalid",
|
||||
}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert not msg["success"]
|
||||
assert msg["error"]["code"] == "browse_media_failed"
|
||||
assert msg["error"]["message"] == "test"
|
||||
|
||||
|
||||
async def test_websocket_resolve_media(hass, hass_ws_client):
|
||||
"""Test browse media websocket."""
|
||||
assert await async_setup_component(hass, const.DOMAIN, {})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
|
||||
media = media_source.models.PlayMedia("/media/test.mp3", "audio/mpeg")
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.media_source.async_resolve_media",
|
||||
return_value=media,
|
||||
):
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 1,
|
||||
"type": "media_source/resolve_media",
|
||||
"media_content_id": f"{const.URI_SCHEME}{const.DOMAIN}/media/test.mp3",
|
||||
}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert msg["success"]
|
||||
assert msg["id"] == 1
|
||||
assert msg["result"]["url"].startswith(media.url)
|
||||
assert msg["result"]["mime_type"] == media.mime_type
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.media_source.async_resolve_media",
|
||||
side_effect=media_source.Unresolvable("test"),
|
||||
):
|
||||
await client.send_json(
|
||||
{
|
||||
"id": 2,
|
||||
"type": "media_source/resolve_media",
|
||||
"media_content_id": "invalid",
|
||||
}
|
||||
)
|
||||
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert not msg["success"]
|
||||
assert msg["error"]["code"] == "resolve_media_failed"
|
||||
assert msg["error"]["message"] == "test"
|
66
tests/components/media_source/test_local_source.py
Normal file
66
tests/components/media_source/test_local_source.py
Normal file
|
@ -0,0 +1,66 @@
|
|||
"""Test Local Media Source."""
|
||||
import pytest
|
||||
|
||||
from homeassistant.components import media_source
|
||||
from homeassistant.components.media_source import const
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
|
||||
async def test_async_browse_media(hass):
|
||||
"""Test browse media."""
|
||||
assert await async_setup_component(hass, const.DOMAIN, {})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Test path not exists
|
||||
with pytest.raises(media_source.BrowseError) as excinfo:
|
||||
await media_source.async_browse_media(
|
||||
hass, f"{const.URI_SCHEME}{const.DOMAIN}/media/test/not/exist"
|
||||
)
|
||||
assert str(excinfo.value) == "Path does not exist."
|
||||
|
||||
# Test browse file
|
||||
with pytest.raises(media_source.BrowseError) as excinfo:
|
||||
await media_source.async_browse_media(
|
||||
hass, f"{const.URI_SCHEME}{const.DOMAIN}/media/test.mp3"
|
||||
)
|
||||
assert str(excinfo.value) == "Path is not a directory."
|
||||
|
||||
# Test invalid base
|
||||
with pytest.raises(media_source.BrowseError) as excinfo:
|
||||
await media_source.async_browse_media(
|
||||
hass, f"{const.URI_SCHEME}{const.DOMAIN}/invalid/base"
|
||||
)
|
||||
assert str(excinfo.value) == "Unknown source directory."
|
||||
|
||||
# Test directory traversal
|
||||
with pytest.raises(media_source.BrowseError) as excinfo:
|
||||
await media_source.async_browse_media(
|
||||
hass, f"{const.URI_SCHEME}{const.DOMAIN}/media/../configuration.yaml"
|
||||
)
|
||||
assert str(excinfo.value) == "Invalid path."
|
||||
|
||||
# Test successful listing
|
||||
media = await media_source.async_browse_media(
|
||||
hass, f"{const.URI_SCHEME}{const.DOMAIN}/media/."
|
||||
)
|
||||
assert media
|
||||
|
||||
|
||||
async def test_media_view(hass, hass_client):
|
||||
"""Test media view."""
|
||||
assert await async_setup_component(hass, const.DOMAIN, {})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
client = await hass_client()
|
||||
|
||||
# Protects against non-existent files
|
||||
resp = await client.get("/media/invalid.txt")
|
||||
assert resp.status == 404
|
||||
|
||||
# Protects against non-media files
|
||||
resp = await client.get("/media/not_media.txt")
|
||||
assert resp.status == 404
|
||||
|
||||
# Fetch available media
|
||||
resp = await client.get("/media/test.mp3")
|
||||
assert resp.status == 200
|
27
tests/components/media_source/test_models.py
Normal file
27
tests/components/media_source/test_models.py
Normal file
|
@ -0,0 +1,27 @@
|
|||
"""Test Media Source model methods."""
|
||||
from homeassistant.components.media_source import const, models
|
||||
|
||||
|
||||
async def test_browse_media_to_media_player_item():
|
||||
"""Test BrowseMedia conversion to media player item dict."""
|
||||
base = models.BrowseMedia(const.DOMAIN, "media", "media/", False, True)
|
||||
base.children = [
|
||||
models.BrowseMedia(
|
||||
const.DOMAIN, "media/test.mp3", "test.mp3", True, False, "audio/mp3"
|
||||
)
|
||||
]
|
||||
|
||||
item = base.to_media_player_item()
|
||||
assert item["title"] == "media/"
|
||||
assert item["media_content_type"] == "folder"
|
||||
assert item["media_content_id"] == f"{const.URI_SCHEME}{const.DOMAIN}/media"
|
||||
assert not item["can_play"]
|
||||
assert item["can_expand"]
|
||||
assert len(item["children"]) == 1
|
||||
assert item["children"][0]["title"] == "test.mp3"
|
||||
|
||||
|
||||
async def test_media_source_default_name():
|
||||
"""Test MediaSource uses domain as default name."""
|
||||
source = models.MediaSource(const.DOMAIN)
|
||||
assert source.name == const.DOMAIN
|
|
@ -364,7 +364,7 @@ async def test_loading_configuration_from_storage(hass, hass_storage):
|
|||
assert hass.config.time_zone.zone == "Europe/Copenhagen"
|
||||
assert hass.config.external_url == "https://www.example.com"
|
||||
assert hass.config.internal_url == "http://example.local"
|
||||
assert len(hass.config.allowlist_external_dirs) == 2
|
||||
assert len(hass.config.allowlist_external_dirs) == 3
|
||||
assert "/etc" in hass.config.allowlist_external_dirs
|
||||
assert hass.config.config_source == SOURCE_STORAGE
|
||||
|
||||
|
@ -421,7 +421,7 @@ async def test_override_stored_configuration(hass, hass_storage):
|
|||
assert hass.config.location_name == "Home"
|
||||
assert hass.config.units.name == CONF_UNIT_SYSTEM_METRIC
|
||||
assert hass.config.time_zone.zone == "Europe/Copenhagen"
|
||||
assert len(hass.config.allowlist_external_dirs) == 2
|
||||
assert len(hass.config.allowlist_external_dirs) == 3
|
||||
assert "/etc" in hass.config.allowlist_external_dirs
|
||||
assert hass.config.config_source == config_util.SOURCE_YAML
|
||||
|
||||
|
@ -451,7 +451,7 @@ async def test_loading_configuration(hass):
|
|||
assert hass.config.time_zone.zone == "America/New_York"
|
||||
assert hass.config.external_url == "https://www.example.com"
|
||||
assert hass.config.internal_url == "http://example.local"
|
||||
assert len(hass.config.allowlist_external_dirs) == 2
|
||||
assert len(hass.config.allowlist_external_dirs) == 3
|
||||
assert "/etc" in hass.config.allowlist_external_dirs
|
||||
assert hass.config.config_source == config_util.SOURCE_YAML
|
||||
|
||||
|
|
0
tests/testing_config/media/not_media.txt
Normal file
0
tests/testing_config/media/not_media.txt
Normal file
0
tests/testing_config/media/test.mp3
Normal file
0
tests/testing_config/media/test.mp3
Normal file
Loading…
Add table
Add a link
Reference in a new issue