Add bang_olufsen integration (#93462)

* Add bangolufsen integration

* add untested files to .coveragerc

* Simplify integration to media_player platform

* Remove missing files from .coveragerc

* Add beolink_set_relative_volume custom service
Tweaks

* Remove custom services
Remove grouping as it was dependent on custom services

* Update API to 3.2.1.150.0
Reduce and optimize code with feedback from joostlek
Tweaks

* Updated testing

* Remove unused options schema

* Fix bugfix setting wrong state

* Fix wrong initial state

* Bump API

* Fix Beosound Level not reconnecting properly

* Remove unused constant

* Fix wrong variable checked to determine source

* Update integration with feedback from emontnemery

* Update integration with feedback from emontnemery

* Remove unused code

* Move API client into dataclass
Fix not all config_flow exceptions caught
Tweaks

* Add Bang & Olufsen brand

* Revert "Add Bang & Olufsen brand"

This reverts commit 57b2722078.

* Remove volume options from setup
Simplify device checks
rename integration to bang_olufsen
update tests to pass
Update API

* Remove _device from base
Add _device to websocket

* Move SW version device update to websocket
Sort websocket variables

* Add WebSocket connection test

* Remove unused constants

* Remove confirmation form
Make discovered devices get added to Home Assistant immediately
Fix device not being available on mdns discovery
Change config flow aborts to forms with error

* Update tests for new config_flow
Add missing api_exception test

* Restrict manual and discovered IP addresses to IPv4

* Re-add confirmation step for zeroconf discovery
Improve error messages
Move exception mapping dict to module level

* Enable remote control WebSocket listener

* Update tests
This commit is contained in:
Markus Jacobsen 2024-01-24 12:00:51 +01:00 committed by GitHub
parent 393dee1524
commit 1d7e0e7fe4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 1790 additions and 0 deletions

View file

@ -112,6 +112,12 @@ omit =
homeassistant/components/baf/sensor.py
homeassistant/components/baf/switch.py
homeassistant/components/baidu/tts.py
homeassistant/components/bang_olufsen/__init__.py
homeassistant/components/bang_olufsen/const.py
homeassistant/components/bang_olufsen/entity.py
homeassistant/components/bang_olufsen/media_player.py
homeassistant/components/bang_olufsen/util.py
homeassistant/components/bang_olufsen/websocket.py
homeassistant/components/bbox/device_tracker.py
homeassistant/components/bbox/sensor.py
homeassistant/components/beewi_smartclim/sensor.py

View file

@ -100,6 +100,7 @@ homeassistant.components.awair.*
homeassistant.components.axis.*
homeassistant.components.backup.*
homeassistant.components.baf.*
homeassistant.components.bang_olufsen.*
homeassistant.components.bayesian.*
homeassistant.components.binary_sensor.*
homeassistant.components.bitcoin.*

View file

@ -149,6 +149,8 @@ build.json @home-assistant/supervisor
/tests/components/baf/ @bdraco @jfroy
/homeassistant/components/balboa/ @garbled1 @natekspencer
/tests/components/balboa/ @garbled1 @natekspencer
/homeassistant/components/bang_olufsen/ @mj23000
/tests/components/bang_olufsen/ @mj23000
/homeassistant/components/bayesian/ @HarvsG
/tests/components/bayesian/ @HarvsG
/homeassistant/components/beewi_smartclim/ @alemuro

View file

@ -0,0 +1,85 @@
"""The Bang & Olufsen integration."""
from __future__ import annotations
from dataclasses import dataclass
from aiohttp.client_exceptions import ClientConnectorError
from mozart_api.exceptions import ApiException
from mozart_api.mozart_client import MozartClient
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_HOST, CONF_MODEL, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
import homeassistant.helpers.device_registry as dr
from .const import DOMAIN
from .websocket import BangOlufsenWebsocket
@dataclass
class BangOlufsenData:
"""Dataclass for API client and WebSocket client."""
websocket: BangOlufsenWebsocket
client: MozartClient
PLATFORMS = [Platform.MEDIA_PLAYER]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up from a config entry."""
# Remove casts to str
assert entry.unique_id
# Create device now as BangOlufsenWebsocket needs a device for debug logging, firing events etc.
device_registry = dr.async_get(hass)
device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
identifiers={(DOMAIN, entry.unique_id)},
name=entry.title,
model=entry.data[CONF_MODEL],
)
client = MozartClient(host=entry.data[CONF_HOST], websocket_reconnect=True)
# Check connection and try to initialize it.
try:
await client.get_battery_state(_request_timeout=3)
except (ApiException, ClientConnectorError, TimeoutError) as error:
await client.close_api_client()
raise ConfigEntryNotReady(f"Unable to connect to {entry.title}") from error
websocket = BangOlufsenWebsocket(hass, entry, client)
# Add the websocket and API client
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = BangOlufsenData(
websocket,
client,
)
# Check and start WebSocket connection
if not await client.connect_notifications(remote_control=True):
raise ConfigEntryNotReady(
f"Unable to connect to {entry.title} WebSocket notification channel"
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
# Close the API client and WebSocket notification listener
hass.data[DOMAIN][entry.entry_id].client.disconnect_notifications()
await hass.data[DOMAIN][entry.entry_id].client.close_api_client()
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok

View file

@ -0,0 +1,184 @@
"""Config flow for the Bang & Olufsen integration."""
from __future__ import annotations
from ipaddress import AddressValueError, IPv4Address
from typing import Any, TypedDict
from aiohttp.client_exceptions import ClientConnectorError
from mozart_api.exceptions import ApiException
from mozart_api.mozart_client import MozartClient
import voluptuous as vol
from homeassistant.components.zeroconf import ZeroconfServiceInfo
from homeassistant.config_entries import ConfigFlow
from homeassistant.const import CONF_HOST, CONF_MODEL
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig
from .const import (
ATTR_FRIENDLY_NAME,
ATTR_ITEM_NUMBER,
ATTR_SERIAL_NUMBER,
ATTR_TYPE_NUMBER,
COMPATIBLE_MODELS,
CONF_SERIAL_NUMBER,
DEFAULT_MODEL,
DOMAIN,
)
class EntryData(TypedDict, total=False):
"""TypedDict for config_entry data."""
host: str
jid: str
model: str
name: str
# Map exception types to strings
_exception_map = {
ApiException: "api_exception",
ClientConnectorError: "client_connector_error",
TimeoutError: "timeout_error",
AddressValueError: "invalid_ip",
}
class BangOlufsenConfigFlowHandler(ConfigFlow, domain=DOMAIN):
"""Handle a config flow."""
_beolink_jid = ""
_client: MozartClient
_host = ""
_model = ""
_name = ""
_serial_number = ""
def __init__(self) -> None:
"""Init the config flow."""
VERSION = 1
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the initial step."""
data_schema = vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Required(CONF_MODEL, default=DEFAULT_MODEL): SelectSelector(
SelectSelectorConfig(options=COMPATIBLE_MODELS)
),
}
)
if user_input is not None:
self._host = user_input[CONF_HOST]
self._model = user_input[CONF_MODEL]
# Check if the IP address is a valid IPv4 address.
try:
IPv4Address(self._host)
except AddressValueError as error:
return self.async_show_form(
step_id="user",
data_schema=data_schema,
errors={"base": _exception_map[type(error)]},
)
self._client = MozartClient(self._host)
# Try to get information from Beolink self method.
async with self._client:
try:
beolink_self = await self._client.get_beolink_self(
_request_timeout=3
)
except (
ApiException,
ClientConnectorError,
TimeoutError,
) as error:
return self.async_show_form(
step_id="user",
data_schema=data_schema,
errors={"base": _exception_map[type(error)]},
)
self._beolink_jid = beolink_self.jid
self._serial_number = beolink_self.jid.split(".")[2].split("@")[0]
await self.async_set_unique_id(self._serial_number)
self._abort_if_unique_id_configured()
return await self._create_entry()
return self.async_show_form(
step_id="user",
data_schema=data_schema,
)
async def async_step_zeroconf(
self, discovery_info: ZeroconfServiceInfo
) -> FlowResult:
"""Handle discovery using Zeroconf."""
# Check if the discovered device is a Mozart device
if ATTR_FRIENDLY_NAME not in discovery_info.properties:
return self.async_abort(reason="not_mozart_device")
# Ensure that an IPv4 address is received
self._host = discovery_info.host
try:
IPv4Address(self._host)
except AddressValueError:
return self.async_abort(reason="ipv6_address")
self._model = discovery_info.hostname[:-16].replace("-", " ")
self._serial_number = discovery_info.properties[ATTR_SERIAL_NUMBER]
self._beolink_jid = f"{discovery_info.properties[ATTR_TYPE_NUMBER]}.{discovery_info.properties[ATTR_ITEM_NUMBER]}.{self._serial_number}@products.bang-olufsen.com"
await self.async_set_unique_id(self._serial_number)
self._abort_if_unique_id_configured(updates={CONF_HOST: self._host})
# Set the discovered device title
self.context["title_placeholders"] = {
"name": discovery_info.properties[ATTR_FRIENDLY_NAME]
}
return await self.async_step_zeroconf_confirm()
async def _create_entry(self) -> FlowResult:
"""Create the config entry for a discovered or manually configured Bang & Olufsen device."""
# Ensure that created entities have a unique and easily identifiable id and not a "friendly name"
self._name = f"{self._model}-{self._serial_number}"
return self.async_create_entry(
title=self._name,
data=EntryData(
host=self._host,
jid=self._beolink_jid,
model=self._model,
name=self._name,
),
)
async def async_step_zeroconf_confirm(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Confirm the configuration of the device."""
if user_input is not None:
return await self._create_entry()
self._set_confirm_only()
return self.async_show_form(
step_id="zeroconf_confirm",
description_placeholders={
CONF_HOST: self._host,
CONF_MODEL: self._model,
CONF_SERIAL_NUMBER: self._serial_number,
},
last_step=True,
)

View file

@ -0,0 +1,207 @@
"""Constants for the Bang & Olufsen integration."""
from __future__ import annotations
from enum import StrEnum
from typing import Final
from mozart_api.models import Source, SourceArray, SourceTypeEnum
from homeassistant.components.media_player import MediaPlayerState, MediaType
class SOURCE_ENUM(StrEnum):
"""Enum used for associating device source ids with friendly names. May not include all sources."""
uriStreamer = "Audio Streamer" # noqa: N815
bluetooth = "Bluetooth"
airPlay = "AirPlay" # noqa: N815
chromeCast = "Chromecast built-in" # noqa: N815
spotify = "Spotify Connect"
generator = "Tone Generator"
lineIn = "Line-In" # noqa: N815
spdif = "Optical"
netRadio = "B&O Radio" # noqa: N815
local = "Local"
dlna = "DLNA"
qplay = "QPlay"
wpl = "Wireless Powerlink"
pl = "Powerlink"
tv = "TV"
deezer = "Deezer"
beolink = "Networklink"
tidalConnect = "Tidal Connect" # noqa: N815
BANG_OLUFSEN_STATES: dict[str, MediaPlayerState] = {
# Dict used for translating device states to Home Assistant states.
"started": MediaPlayerState.PLAYING,
"buffering": MediaPlayerState.PLAYING,
"idle": MediaPlayerState.IDLE,
"paused": MediaPlayerState.PAUSED,
"stopped": MediaPlayerState.PAUSED,
"ended": MediaPlayerState.PAUSED,
"error": MediaPlayerState.IDLE,
# A device's initial state is "unknown" and should be treated as "idle"
"unknown": MediaPlayerState.IDLE,
}
# Media types for play_media
class BANG_OLUFSEN_MEDIA_TYPE(StrEnum):
"""Bang & Olufsen specific media types."""
FAVOURITE = "favourite"
DEEZER = "deezer"
RADIO = "radio"
TTS = "provider"
class MODEL_ENUM(StrEnum):
"""Enum for compatible model names."""
BEOLAB_8 = "BeoLab 8"
BEOLAB_28 = "BeoLab 28"
BEOSOUND_2 = "Beosound 2 3rd Gen"
BEOSOUND_A5 = "Beosound A5"
BEOSOUND_A9 = "Beosound A9 5th Gen"
BEOSOUND_BALANCE = "Beosound Balance"
BEOSOUND_EMERGE = "Beosound Emerge"
BEOSOUND_LEVEL = "Beosound Level"
BEOSOUND_THEATRE = "Beosound Theatre"
# Dispatcher events
class WEBSOCKET_NOTIFICATION(StrEnum):
"""Enum for WebSocket notification types."""
PLAYBACK_ERROR: Final[str] = "playback_error"
PLAYBACK_METADATA: Final[str] = "playback_metadata"
PLAYBACK_PROGRESS: Final[str] = "playback_progress"
PLAYBACK_SOURCE: Final[str] = "playback_source"
PLAYBACK_STATE: Final[str] = "playback_state"
SOFTWARE_UPDATE_STATE: Final[str] = "software_update_state"
SOURCE_CHANGE: Final[str] = "source_change"
VOLUME: Final[str] = "volume"
# Sub-notifications
NOTIFICATION: Final[str] = "notification"
REMOTE_MENU_CHANGED: Final[str] = "remoteMenuChanged"
ALL: Final[str] = "all"
DOMAIN: Final[str] = "bang_olufsen"
# Default values for configuration.
DEFAULT_MODEL: Final[str] = MODEL_ENUM.BEOSOUND_BALANCE
# Configuration.
CONF_SERIAL_NUMBER: Final = "serial_number"
CONF_BEOLINK_JID: Final = "jid"
# Models to choose from in manual configuration.
COMPATIBLE_MODELS: list[str] = [x.value for x in MODEL_ENUM]
# Attribute names for zeroconf discovery.
ATTR_TYPE_NUMBER: Final[str] = "tn"
ATTR_SERIAL_NUMBER: Final[str] = "sn"
ATTR_ITEM_NUMBER: Final[str] = "in"
ATTR_FRIENDLY_NAME: Final[str] = "fn"
# Power states.
BANG_OLUFSEN_ON: Final[str] = "on"
VALID_MEDIA_TYPES: Final[tuple] = (
BANG_OLUFSEN_MEDIA_TYPE.FAVOURITE,
BANG_OLUFSEN_MEDIA_TYPE.DEEZER,
BANG_OLUFSEN_MEDIA_TYPE.RADIO,
BANG_OLUFSEN_MEDIA_TYPE.TTS,
MediaType.MUSIC,
MediaType.URL,
MediaType.CHANNEL,
)
# Sources on the device that should not be selectable by the user
HIDDEN_SOURCE_IDS: Final[tuple] = (
"airPlay",
"bluetooth",
"chromeCast",
"generator",
"local",
"dlna",
"qplay",
"wpl",
"pl",
"beolink",
"usbIn",
)
# Fallback sources to use in case of API failure.
FALLBACK_SOURCES: Final[SourceArray] = SourceArray(
items=[
Source(
id="uriStreamer",
is_enabled=True,
is_playable=False,
name="Audio Streamer",
type=SourceTypeEnum(value="uriStreamer"),
),
Source(
id="bluetooth",
is_enabled=True,
is_playable=False,
name="Bluetooth",
type=SourceTypeEnum(value="bluetooth"),
),
Source(
id="spotify",
is_enabled=True,
is_playable=False,
name="Spotify Connect",
type=SourceTypeEnum(value="spotify"),
),
Source(
id="lineIn",
is_enabled=True,
is_playable=True,
name="Line-In",
type=SourceTypeEnum(value="lineIn"),
),
Source(
id="spdif",
is_enabled=True,
is_playable=True,
name="Optical",
type=SourceTypeEnum(value="spdif"),
),
Source(
id="netRadio",
is_enabled=True,
is_playable=True,
name="B&O Radio",
type=SourceTypeEnum(value="netRadio"),
),
Source(
id="deezer",
is_enabled=True,
is_playable=True,
name="Deezer",
type=SourceTypeEnum(value="deezer"),
),
Source(
id="tidalConnect",
is_enabled=True,
is_playable=True,
name="Tidal Connect",
type=SourceTypeEnum(value="tidalConnect"),
),
]
)
# Device events
BANG_OLUFSEN_WEBSOCKET_EVENT: Final[str] = f"{DOMAIN}_websocket_event"
CONNECTION_STATUS: Final[str] = "CONNECTION_STATUS"

View file

@ -0,0 +1,71 @@
"""Entity representing a Bang & Olufsen device."""
from __future__ import annotations
from typing import cast
from mozart_api.models import (
PlaybackContentMetadata,
PlaybackProgress,
RenderingState,
Source,
VolumeLevel,
VolumeMute,
VolumeState,
)
from mozart_api.mozart_client import MozartClient
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_HOST
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity import Entity
from .const import DOMAIN
class BangOlufsenBase:
"""Base class for BangOlufsen Home Assistant objects."""
def __init__(self, entry: ConfigEntry, client: MozartClient) -> None:
"""Initialize the object."""
# Set the MozartClient
self._client = client
# get the input from the config entry.
self.entry: ConfigEntry = entry
# Set the configuration variables.
self._host: str = self.entry.data[CONF_HOST]
self._name: str = self.entry.title
self._unique_id: str = cast(str, self.entry.unique_id)
# Objects that get directly updated by notifications.
self._playback_metadata: PlaybackContentMetadata = PlaybackContentMetadata()
self._playback_progress: PlaybackProgress = PlaybackProgress(total_duration=0)
self._playback_source: Source = Source()
self._playback_state: RenderingState = RenderingState()
self._source_change: Source = Source()
self._volume: VolumeState = VolumeState(
level=VolumeLevel(level=0), muted=VolumeMute(muted=False)
)
class BangOlufsenEntity(Entity, BangOlufsenBase):
"""Base Entity for BangOlufsen entities."""
_attr_has_entity_name = True
def __init__(self, entry: ConfigEntry, client: MozartClient) -> None:
"""Initialize the object."""
super().__init__(entry, client)
self._attr_device_info = DeviceInfo(identifiers={(DOMAIN, self._unique_id)})
self._attr_device_class = None
self._attr_entity_category = None
self._attr_should_poll = False
async def _update_connection_state(self, connection_state: bool) -> None:
"""Update entity connection state."""
self._attr_available = connection_state
self.async_write_ha_state()

View file

@ -0,0 +1,11 @@
{
"domain": "bang_olufsen",
"name": "Bang & Olufsen",
"codeowners": ["@mj23000"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/bang_olufsen",
"integration_type": "device",
"iot_class": "local_push",
"requirements": ["mozart-api==3.2.1.150.6"],
"zeroconf": ["_bangolufsen._tcp.local."]
}

View file

@ -0,0 +1,647 @@
"""Media player entity for the Bang & Olufsen integration."""
from __future__ import annotations
import json
import logging
from typing import Any, cast
from mozart_api import __version__ as MOZART_API_VERSION
from mozart_api.exceptions import ApiException
from mozart_api.models import (
Action,
Art,
OverlayPlayRequest,
PlaybackContentMetadata,
PlaybackError,
PlaybackProgress,
PlayQueueItem,
PlayQueueItemType,
RenderingState,
SceneProperties,
SoftwareUpdateState,
SoftwareUpdateStatus,
Source,
Uri,
UserFlow,
VolumeLevel,
VolumeMute,
VolumeState,
)
from mozart_api.mozart_client import MozartClient, get_highest_resolution_artwork
from homeassistant.components import media_source
from homeassistant.components.media_player import (
ATTR_MEDIA_EXTRA,
BrowseMedia,
MediaPlayerDeviceClass,
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
MediaType,
async_process_play_media_url,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_MODEL
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.util.dt import utcnow
from . import BangOlufsenData
from .const import (
BANG_OLUFSEN_MEDIA_TYPE,
BANG_OLUFSEN_STATES,
CONF_BEOLINK_JID,
CONNECTION_STATUS,
DOMAIN,
FALLBACK_SOURCES,
HIDDEN_SOURCE_IDS,
SOURCE_ENUM,
VALID_MEDIA_TYPES,
WEBSOCKET_NOTIFICATION,
)
from .entity import BangOlufsenEntity
_LOGGER = logging.getLogger(__name__)
BANG_OLUFSEN_FEATURES = (
MediaPlayerEntityFeature.PAUSE
| MediaPlayerEntityFeature.SEEK
| MediaPlayerEntityFeature.VOLUME_SET
| MediaPlayerEntityFeature.VOLUME_MUTE
| MediaPlayerEntityFeature.PREVIOUS_TRACK
| MediaPlayerEntityFeature.NEXT_TRACK
| MediaPlayerEntityFeature.PLAY_MEDIA
| MediaPlayerEntityFeature.SELECT_SOURCE
| MediaPlayerEntityFeature.STOP
| MediaPlayerEntityFeature.CLEAR_PLAYLIST
| MediaPlayerEntityFeature.PLAY
| MediaPlayerEntityFeature.BROWSE_MEDIA
| MediaPlayerEntityFeature.TURN_OFF
)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up a Media Player entity from config entry."""
data: BangOlufsenData = hass.data[DOMAIN][config_entry.entry_id]
# Add MediaPlayer entity
async_add_entities(new_entities=[BangOlufsenMediaPlayer(config_entry, data.client)])
class BangOlufsenMediaPlayer(MediaPlayerEntity, BangOlufsenEntity):
"""Representation of a media player."""
_attr_has_entity_name = False
_attr_icon = "mdi:speaker-wireless"
_attr_supported_features = BANG_OLUFSEN_FEATURES
def __init__(self, entry: ConfigEntry, client: MozartClient) -> None:
"""Initialize the media player."""
super().__init__(entry, client)
self._beolink_jid: str = self.entry.data[CONF_BEOLINK_JID]
self._model: str = self.entry.data[CONF_MODEL]
self._attr_device_info = DeviceInfo(
configuration_url=f"http://{self._host}/#/",
identifiers={(DOMAIN, self._unique_id)},
manufacturer="Bang & Olufsen",
model=self._model,
name=cast(str, self.name),
serial_number=self._unique_id,
)
self._attr_name = self._name
self._attr_unique_id = self._unique_id
self._attr_device_class = MediaPlayerDeviceClass.SPEAKER
# Misc. variables.
self._audio_sources: dict[str, str] = {}
self._media_image: Art = Art()
self._software_status: SoftwareUpdateStatus = SoftwareUpdateStatus(
software_version="",
state=SoftwareUpdateState(seconds_remaining=0, value="idle"),
)
self._sources: dict[str, str] = {}
self._state: str = MediaPlayerState.IDLE
self._video_sources: dict[str, str] = {}
async def async_added_to_hass(self) -> None:
"""Turn on the dispatchers."""
await self._initialize()
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{self._unique_id}_{CONNECTION_STATUS}",
self._update_connection_state,
)
)
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{self._unique_id}_{WEBSOCKET_NOTIFICATION.PLAYBACK_ERROR}",
self._update_playback_error,
)
)
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{self._unique_id}_{WEBSOCKET_NOTIFICATION.PLAYBACK_METADATA}",
self._update_playback_metadata,
)
)
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{self._unique_id}_{WEBSOCKET_NOTIFICATION.PLAYBACK_PROGRESS}",
self._update_playback_progress,
)
)
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{self._unique_id}_{WEBSOCKET_NOTIFICATION.PLAYBACK_STATE}",
self._update_playback_state,
)
)
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{self._unique_id}_{WEBSOCKET_NOTIFICATION.REMOTE_MENU_CHANGED}",
self._update_sources,
)
)
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{self._unique_id}_{WEBSOCKET_NOTIFICATION.SOURCE_CHANGE}",
self._update_source_change,
)
)
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{self._unique_id}_{WEBSOCKET_NOTIFICATION.VOLUME}",
self._update_volume,
)
)
async def _initialize(self) -> None:
"""Initialize connection dependent variables."""
# Get software version.
self._software_status = await self._client.get_softwareupdate_status()
_LOGGER.debug(
"Connected to: %s %s running SW %s",
self._model,
self._unique_id,
self._software_status.software_version,
)
# Get overall device state once. This is handled by WebSocket events the rest of the time.
product_state = await self._client.get_product_state()
# Get volume information.
if product_state.volume:
self._volume = product_state.volume
# Get all playback information.
# Ensure that the metadata is not None upon startup
if product_state.playback:
if product_state.playback.metadata:
self._playback_metadata = product_state.playback.metadata
if product_state.playback.progress:
self._playback_progress = product_state.playback.progress
if product_state.playback.source:
self._source_change = product_state.playback.source
if product_state.playback.state:
self._playback_state = product_state.playback.state
# Set initial state
if self._playback_state.value:
self._state = self._playback_state.value
self._attr_media_position_updated_at = utcnow()
# Get the highest resolution available of the given images.
self._media_image = get_highest_resolution_artwork(self._playback_metadata)
# If the device has been updated with new sources, then the API will fail here.
await self._update_sources()
# Set the static entity attributes that needed more information.
self._attr_source_list = list(self._sources.values())
async def _update_sources(self) -> None:
"""Get sources for the specific product."""
# Audio sources
try:
# Get all available sources.
sources = await self._client.get_available_sources(target_remote=False)
# Use a fallback list of sources
except ValueError:
# Try to get software version from device
if self.device_info:
sw_version = self.device_info.get("sw_version")
if not sw_version:
sw_version = self._software_status.software_version
_LOGGER.warning(
"The API is outdated compared to the device software version %s and %s. Using fallback sources",
MOZART_API_VERSION,
sw_version,
)
sources = FALLBACK_SOURCES
# Save all of the relevant enabled sources, both the ID and the friendly name for displaying in a dict.
self._audio_sources = {
source.id: source.name
for source in cast(list[Source], sources.items)
if source.is_enabled
and source.id
and source.name
and source.id not in HIDDEN_SOURCE_IDS
}
# Video sources from remote menu
menu_items = await self._client.get_remote_menu()
for key in menu_items:
menu_item = menu_items[key]
if not menu_item.available:
continue
# TV SOURCES
if (
menu_item.content is not None
and menu_item.content.categories
and len(menu_item.content.categories) > 0
and "music" not in menu_item.content.categories
and menu_item.label
and menu_item.label != "TV"
):
self._video_sources[key] = menu_item.label
# Combine the source dicts
self._sources = self._audio_sources | self._video_sources
# HASS won't necessarily be running the first time this method is run
if self.hass.is_running:
self.async_write_ha_state()
async def _update_playback_metadata(self, data: PlaybackContentMetadata) -> None:
"""Update _playback_metadata and related."""
self._playback_metadata = data
# Update current artwork.
self._media_image = get_highest_resolution_artwork(self._playback_metadata)
self.async_write_ha_state()
async def _update_playback_error(self, data: PlaybackError) -> None:
"""Show playback error."""
_LOGGER.error(data.error)
async def _update_playback_progress(self, data: PlaybackProgress) -> None:
"""Update _playback_progress and last update."""
self._playback_progress = data
self._attr_media_position_updated_at = utcnow()
self.async_write_ha_state()
async def _update_playback_state(self, data: RenderingState) -> None:
"""Update _playback_state and related."""
self._playback_state = data
# Update entity state based on the playback state.
if self._playback_state.value:
self._state = self._playback_state.value
self.async_write_ha_state()
async def _update_source_change(self, data: Source) -> None:
"""Update _source_change and related."""
self._source_change = data
# Check if source is line-in or optical and progress should be updated
if self._source_change.id in (SOURCE_ENUM.lineIn, SOURCE_ENUM.spdif):
self._playback_progress = PlaybackProgress(progress=0)
async def _update_volume(self, data: VolumeState) -> None:
"""Update _volume."""
self._volume = data
self.async_write_ha_state()
@property
def state(self) -> MediaPlayerState:
"""Return the current state of the media player."""
return BANG_OLUFSEN_STATES[self._state]
@property
def volume_level(self) -> float | None:
"""Volume level of the media player (0..1)."""
if self._volume.level and self._volume.level.level:
return float(self._volume.level.level / 100)
return None
@property
def is_volume_muted(self) -> bool | None:
"""Boolean if volume is currently muted."""
if self._volume.muted and self._volume.muted.muted:
return self._volume.muted.muted
return None
@property
def media_content_type(self) -> str:
"""Return the current media type."""
# Hard to determine content type
if self.source == SOURCE_ENUM.uriStreamer:
return MediaType.URL
return MediaType.MUSIC
@property
def media_duration(self) -> int | None:
"""Return the total duration of the current track in seconds."""
return self._playback_metadata.total_duration_seconds
@property
def media_position(self) -> int | None:
"""Return the current playback progress."""
return self._playback_progress.progress
@property
def media_image_url(self) -> str | None:
"""Return URL of the currently playing music."""
if self._media_image:
return self._media_image.url
return None
@property
def media_image_remotely_accessible(self) -> bool:
"""Return whether or not the image of the current media is available outside the local network."""
return not self._media_image.has_local_image
@property
def media_title(self) -> str | None:
"""Return the currently playing title."""
return self._playback_metadata.title
@property
def media_album_name(self) -> str | None:
"""Return the currently playing album name."""
return self._playback_metadata.album_name
@property
def media_album_artist(self) -> str | None:
"""Return the currently playing artist name."""
return self._playback_metadata.artist_name
@property
def media_track(self) -> int | None:
"""Return the currently playing track."""
return self._playback_metadata.track
@property
def media_channel(self) -> str | None:
"""Return the currently playing channel."""
return self._playback_metadata.organization
@property
def source(self) -> str | None:
"""Return the current audio source."""
# Try to fix some of the source_change chromecast weirdness.
if hasattr(self._playback_metadata, "title"):
# source_change is chromecast but line in is selected.
if self._playback_metadata.title == SOURCE_ENUM.lineIn:
return SOURCE_ENUM.lineIn
# source_change is chromecast but bluetooth is selected.
if self._playback_metadata.title == SOURCE_ENUM.bluetooth:
return SOURCE_ENUM.bluetooth
# source_change is line in, bluetooth or optical but stale metadata is sent through the WebSocket,
# And the source has not changed.
if self._source_change.id in (
SOURCE_ENUM.bluetooth,
SOURCE_ENUM.lineIn,
SOURCE_ENUM.spdif,
):
return SOURCE_ENUM.chromeCast
# source_change is chromecast and there is metadata but no artwork. Bluetooth does support metadata but not artwork
# So i assume that it is bluetooth and not chromecast
if (
hasattr(self._playback_metadata, "art")
and self._playback_metadata.art is not None
):
if (
len(self._playback_metadata.art) == 0
and self._source_change.name == SOURCE_ENUM.bluetooth
):
return SOURCE_ENUM.bluetooth
return self._source_change.name
async def async_turn_off(self) -> None:
"""Set the device to "networkStandby"."""
await self._client.post_standby()
async def async_set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1."""
await self._client.set_current_volume_level(
volume_level=VolumeLevel(level=int(volume * 100))
)
async def async_mute_volume(self, mute: bool) -> None:
"""Mute or unmute media player."""
await self._client.set_volume_mute(volume_mute=VolumeMute(muted=mute))
async def async_media_play_pause(self) -> None:
"""Toggle play/pause media player."""
if self.state == MediaPlayerState.PLAYING:
await self.async_media_pause()
elif self.state in (MediaPlayerState.PAUSED, MediaPlayerState.IDLE):
await self.async_media_play()
async def async_media_pause(self) -> None:
"""Pause media player."""
await self._client.post_playback_command(command="pause")
async def async_media_play(self) -> None:
"""Play media player."""
await self._client.post_playback_command(command="play")
async def async_media_stop(self) -> None:
"""Pause media player."""
await self._client.post_playback_command(command="stop")
async def async_media_next_track(self) -> None:
"""Send the next track command."""
await self._client.post_playback_command(command="skip")
async def async_media_seek(self, position: float) -> None:
"""Seek to position in ms."""
if self.source == SOURCE_ENUM.deezer:
await self._client.seek_to_position(position_ms=int(position * 1000))
# Try to prevent the playback progress from bouncing in the UI.
self._attr_media_position_updated_at = utcnow()
self._playback_progress = PlaybackProgress(progress=int(position))
self.async_write_ha_state()
else:
_LOGGER.error("Seeking is currently only supported when using Deezer")
async def async_media_previous_track(self) -> None:
"""Send the previous track command."""
await self._client.post_playback_command(command="prev")
async def async_clear_playlist(self) -> None:
"""Clear the current playback queue."""
await self._client.post_clear_queue()
async def async_select_source(self, source: str) -> None:
"""Select an input source."""
if source not in self._sources.values():
_LOGGER.error(
"Invalid source: %s. Valid sources are: %s",
source,
list(self._sources.values()),
)
return
# pylint: disable=consider-using-dict-items
key = [x for x in self._sources if self._sources[x] == source][0]
# Check for source type
if source in self._audio_sources.values():
# Audio
await self._client.set_active_source(source_id=key)
else:
# Video
await self._client.post_remote_trigger(id=key)
async def async_play_media(
self,
media_type: MediaType | str,
media_id: str,
**kwargs: Any,
) -> None:
"""Play from: netradio station id, URI, favourite or Deezer."""
# Convert audio/mpeg, audio/aac etc. to MediaType.MUSIC
if media_type.startswith("audio/"):
media_type = MediaType.MUSIC
if media_type not in VALID_MEDIA_TYPES:
_LOGGER.error(
"%s is an invalid type. Valid values are: %s",
media_type,
VALID_MEDIA_TYPES,
)
return
if media_source.is_media_source_id(media_id):
sourced_media = await media_source.async_resolve_media(
self.hass, media_id, self.entity_id
)
media_id = async_process_play_media_url(self.hass, sourced_media.url)
# Remove playlist extension as it is unsupported.
if media_id.endswith(".m3u"):
media_id = media_id.replace(".m3u", "")
if media_type in (MediaType.URL, MediaType.MUSIC):
await self._client.post_uri_source(uri=Uri(location=media_id))
# The "provider" media_type may not be suitable for overlay all the time.
# Use it for now.
elif media_type == BANG_OLUFSEN_MEDIA_TYPE.TTS:
await self._client.post_overlay_play(
overlay_play_request=OverlayPlayRequest(
uri=Uri(location=media_id),
)
)
elif media_type == BANG_OLUFSEN_MEDIA_TYPE.RADIO:
await self._client.run_provided_scene(
scene_properties=SceneProperties(
action_list=[
Action(
type="radio",
radio_station_id=media_id,
)
]
)
)
elif media_type == BANG_OLUFSEN_MEDIA_TYPE.FAVOURITE:
await self._client.activate_preset(id=int(media_id))
elif media_type == BANG_OLUFSEN_MEDIA_TYPE.DEEZER:
try:
if media_id == "flow":
deezer_id = None
if "id" in kwargs[ATTR_MEDIA_EXTRA]:
deezer_id = kwargs[ATTR_MEDIA_EXTRA]["id"]
# Play Deezer flow.
await self._client.start_deezer_flow(
user_flow=UserFlow(user_id=deezer_id)
)
# Play a Deezer playlist or album.
elif any(match in media_id for match in ("playlist", "album")):
start_from = 0
if "start_from" in kwargs[ATTR_MEDIA_EXTRA]:
start_from = kwargs[ATTR_MEDIA_EXTRA]["start_from"]
await self._client.add_to_queue(
play_queue_item=PlayQueueItem(
provider=PlayQueueItemType(value="deezer"),
start_now_from_position=start_from,
type="playlist",
uri=media_id,
)
)
# Play a Deezer track.
else:
await self._client.add_to_queue(
play_queue_item=PlayQueueItem(
provider=PlayQueueItemType(value="deezer"),
start_now_from_position=0,
type="track",
uri=media_id,
)
)
except ApiException as error:
_LOGGER.error(json.loads(error.body)["message"])
async def async_browse_media(
self,
media_content_type: MediaType | str | None = None,
media_content_id: str | None = None,
) -> BrowseMedia:
"""Implement the WebSocket media browsing helper."""
return await media_source.async_browse_media(
self.hass,
media_content_id,
content_filter=lambda item: item.media_content_type.startswith("audio/"),
)

View file

@ -0,0 +1,28 @@
{
"config": {
"error": {
"api_exception": "[%key:common::config_flow::error::cannot_connect%]",
"client_connector_error": "[%key:common::config_flow::error::cannot_connect%]",
"timeout_error": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_ip": "Invalid IPv4 address"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::single_instance_allowed%]",
"already_in_progress": "[%key:common::config_flow::abort::already_in_progress%]"
},
"flow_title": "{name}",
"step": {
"user": {
"data": {
"host": "[%key:common::config_flow::data::ip%]",
"model": "[%key:common::generic::model%]"
},
"description": "Manually configure your Bang & Olufsen device."
},
"zeroconf_confirm": {
"title": "Setup Bang & Olufsen device",
"description": "Confirm the configuration of the {model}-{serial_number} @ {host}."
}
}
}
}

View file

@ -0,0 +1,21 @@
"""Various utilities for the Bang & Olufsen integration."""
from __future__ import annotations
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.device_registry import DeviceEntry
from .const import DOMAIN
def get_device(hass: HomeAssistant | None, unique_id: str) -> DeviceEntry | None:
"""Get the device."""
if not isinstance(hass, HomeAssistant):
return None
device_registry = dr.async_get(hass)
device = device_registry.async_get_device({(DOMAIN, unique_id)})
assert device
return device

View file

@ -0,0 +1,182 @@
"""Update coordinator and WebSocket listener(s) for the Bang & Olufsen integration."""
from __future__ import annotations
import logging
from mozart_api.models import (
PlaybackContentMetadata,
PlaybackError,
PlaybackProgress,
RenderingState,
SoftwareUpdateState,
Source,
VolumeState,
WebsocketNotificationTag,
)
from mozart_api.mozart_client import MozartClient
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.dispatcher import async_dispatcher_send
from .const import (
BANG_OLUFSEN_WEBSOCKET_EVENT,
CONNECTION_STATUS,
WEBSOCKET_NOTIFICATION,
)
from .entity import BangOlufsenBase
from .util import get_device
_LOGGER = logging.getLogger(__name__)
class BangOlufsenWebsocket(BangOlufsenBase):
"""The WebSocket listeners."""
def __init__(
self, hass: HomeAssistant, entry: ConfigEntry, client: MozartClient
) -> None:
"""Initialize the WebSocket listeners."""
BangOlufsenBase.__init__(self, entry, client)
self.hass = hass
self._device = get_device(hass, self._unique_id)
# WebSocket callbacks
self._client.get_notification_notifications(self.on_notification_notification)
self._client.get_on_connection_lost(self.on_connection_lost)
self._client.get_on_connection(self.on_connection)
self._client.get_playback_error_notifications(
self.on_playback_error_notification
)
self._client.get_playback_metadata_notifications(
self.on_playback_metadata_notification
)
self._client.get_playback_progress_notifications(
self.on_playback_progress_notification
)
self._client.get_playback_state_notifications(
self.on_playback_state_notification
)
self._client.get_software_update_state_notifications(
self.on_software_update_state
)
self._client.get_source_change_notifications(self.on_source_change_notification)
self._client.get_volume_notifications(self.on_volume_notification)
# Used for firing events and debugging
self._client.get_all_notifications_raw(self.on_all_notifications_raw)
def _update_connection_status(self) -> None:
"""Update all entities of the connection status."""
async_dispatcher_send(
self.hass,
f"{self._unique_id}_{CONNECTION_STATUS}",
self._client.websocket_connected,
)
def on_connection(self) -> None:
"""Handle WebSocket connection made."""
_LOGGER.debug("Connected to the %s notification channel", self._name)
self._update_connection_status()
def on_connection_lost(self) -> None:
"""Handle WebSocket connection lost."""
_LOGGER.error("Lost connection to the %s", self._name)
self._update_connection_status()
def on_notification_notification(
self, notification: WebsocketNotificationTag
) -> None:
"""Send notification dispatch."""
if notification.value:
if WEBSOCKET_NOTIFICATION.REMOTE_MENU_CHANGED in notification.value:
async_dispatcher_send(
self.hass,
f"{self._unique_id}_{WEBSOCKET_NOTIFICATION.REMOTE_MENU_CHANGED}",
)
def on_playback_error_notification(self, notification: PlaybackError) -> None:
"""Send playback_error dispatch."""
async_dispatcher_send(
self.hass,
f"{self._unique_id}_{WEBSOCKET_NOTIFICATION.PLAYBACK_ERROR}",
notification,
)
def on_playback_metadata_notification(
self, notification: PlaybackContentMetadata
) -> None:
"""Send playback_metadata dispatch."""
async_dispatcher_send(
self.hass,
f"{self._unique_id}_{WEBSOCKET_NOTIFICATION.PLAYBACK_METADATA}",
notification,
)
def on_playback_progress_notification(self, notification: PlaybackProgress) -> None:
"""Send playback_progress dispatch."""
async_dispatcher_send(
self.hass,
f"{self._unique_id}_{WEBSOCKET_NOTIFICATION.PLAYBACK_PROGRESS}",
notification,
)
def on_playback_state_notification(self, notification: RenderingState) -> None:
"""Send playback_state dispatch."""
async_dispatcher_send(
self.hass,
f"{self._unique_id}_{WEBSOCKET_NOTIFICATION.PLAYBACK_STATE}",
notification,
)
def on_source_change_notification(self, notification: Source) -> None:
"""Send source_change dispatch."""
async_dispatcher_send(
self.hass,
f"{self._unique_id}_{WEBSOCKET_NOTIFICATION.SOURCE_CHANGE}",
notification,
)
def on_volume_notification(self, notification: VolumeState) -> None:
"""Send volume dispatch."""
async_dispatcher_send(
self.hass,
f"{self._unique_id}_{WEBSOCKET_NOTIFICATION.VOLUME}",
notification,
)
async def on_software_update_state(self, notification: SoftwareUpdateState) -> None:
"""Check device sw version."""
software_status = await self._client.get_softwareupdate_status()
# Update the HA device if the sw version does not match
if not self._device:
self._device = get_device(self.hass, self._unique_id)
assert self._device
if software_status.software_version != self._device.sw_version:
device_registry = dr.async_get(self.hass)
device_registry.async_update_device(
device_id=self._device.id,
sw_version=software_status.software_version,
)
def on_all_notifications_raw(self, notification: dict) -> None:
"""Receive all notifications."""
if not self._device:
self._device = get_device(self.hass, self._unique_id)
assert self._device
# Add the device_id and serial_number to the notification
notification["device_id"] = self._device.id
notification["serial_number"] = int(self._unique_id)
_LOGGER.debug("%s", notification)
self.hass.bus.async_fire(BANG_OLUFSEN_WEBSOCKET_EVENT, notification)

View file

@ -67,6 +67,7 @@ FLOWS = {
"azure_event_hub",
"baf",
"balboa",
"bang_olufsen",
"blebox",
"blink",
"blue_current",

View file

@ -581,6 +581,12 @@
"config_flow": true,
"iot_class": "local_push"
},
"bang_olufsen": {
"name": "Bang & Olufsen",
"integration_type": "device",
"config_flow": true,
"iot_class": "local_push"
},
"bayesian": {
"name": "Bayesian",
"integration_type": "hub",

View file

@ -344,6 +344,11 @@ ZEROCONF = {
},
},
],
"_bangolufsen._tcp.local.": [
{
"domain": "bang_olufsen",
},
],
"_bbxsrv._tcp.local.": [
{
"domain": "blebox",

View file

@ -760,6 +760,16 @@ disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.bang_olufsen.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.bayesian.*]
check_untyped_defs = true
disallow_incomplete_defs = true

View file

@ -1306,6 +1306,9 @@ motionblinds==0.6.19
# homeassistant.components.motioneye
motioneye-client==0.3.14
# homeassistant.components.bang_olufsen
mozart-api==3.2.1.150.6
# homeassistant.components.mullvad
mullvad-api==1.0.0

View file

@ -1036,6 +1036,9 @@ motionblinds==0.6.19
# homeassistant.components.motioneye
motioneye-client==0.3.14
# homeassistant.components.bang_olufsen
mozart-api==3.2.1.150.6
# homeassistant.components.mullvad
mullvad-api==1.0.0

View file

@ -0,0 +1 @@
"""Tests for the bang_olufsen integration."""

View file

@ -0,0 +1,70 @@
"""Test fixtures for bang_olufsen."""
from unittest.mock import AsyncMock, patch
from mozart_api.models import BeolinkPeer
import pytest
from homeassistant.components.bang_olufsen.const import DOMAIN
from .const import (
TEST_DATA_CREATE_ENTRY,
TEST_FRIENDLY_NAME,
TEST_JID_1,
TEST_NAME,
TEST_SERIAL_NUMBER,
)
from tests.common import MockConfigEntry
class MockMozartClient:
"""Class for mocking MozartClient objects and methods."""
async def __aenter__(self):
"""Mock async context entry."""
async def __aexit__(self, exc_type, exc, tb):
"""Mock async context exit."""
# API call results
get_beolink_self_result = BeolinkPeer(
friendly_name=TEST_FRIENDLY_NAME, jid=TEST_JID_1
)
# API endpoints
get_beolink_self = AsyncMock()
get_beolink_self.return_value = get_beolink_self_result
@pytest.fixture
def mock_config_entry():
"""Mock config entry."""
return MockConfigEntry(
domain=DOMAIN,
unique_id=TEST_SERIAL_NUMBER,
data=TEST_DATA_CREATE_ENTRY,
title=TEST_NAME,
)
@pytest.fixture
def mock_client():
"""Mock MozartClient."""
client = MockMozartClient()
with patch("mozart_api.mozart_client.MozartClient", return_value=client):
yield client
# Reset mocked API call counts and side effects
client.get_beolink_self.reset_mock(side_effect=True)
@pytest.fixture
def mock_setup_entry():
"""Mock successful setup entry."""
with patch(
"homeassistant.components.bang_olufsen.async_setup_entry", return_value=True
) as mock_setup_entry:
yield mock_setup_entry

View file

@ -0,0 +1,83 @@
"""Constants used for testing the bang_olufsen integration."""
from ipaddress import IPv4Address, IPv6Address
from homeassistant.components.bang_olufsen.const import (
ATTR_FRIENDLY_NAME,
ATTR_ITEM_NUMBER,
ATTR_SERIAL_NUMBER,
ATTR_TYPE_NUMBER,
CONF_BEOLINK_JID,
)
from homeassistant.components.zeroconf import ZeroconfServiceInfo
from homeassistant.const import CONF_HOST, CONF_MODEL, CONF_NAME
TEST_HOST = "192.168.0.1"
TEST_HOST_INVALID = "192.168.0"
TEST_HOST_IPV6 = "1111:2222:3333:4444:5555:6666:7777:8888"
TEST_MODEL_BALANCE = "Beosound Balance"
TEST_MODEL_THEATRE = "Beosound Theatre"
TEST_MODEL_LEVEL = "Beosound Level"
TEST_SERIAL_NUMBER = "11111111"
TEST_NAME = f"{TEST_MODEL_BALANCE}-{TEST_SERIAL_NUMBER}"
TEST_FRIENDLY_NAME = "Living room Balance"
TEST_TYPE_NUMBER = "1111"
TEST_ITEM_NUMBER = "1111111"
TEST_JID_1 = f"{TEST_TYPE_NUMBER}.{TEST_ITEM_NUMBER}.{TEST_SERIAL_NUMBER}@products.bang-olufsen.com"
TEST_HOSTNAME_ZEROCONF = TEST_NAME.replace(" ", "-") + ".local."
TEST_TYPE_ZEROCONF = "_bangolufsen._tcp.local."
TEST_NAME_ZEROCONF = TEST_NAME.replace(" ", "-") + "." + TEST_TYPE_ZEROCONF
TEST_DATA_USER = {CONF_HOST: TEST_HOST, CONF_MODEL: TEST_MODEL_BALANCE}
TEST_DATA_USER_INVALID = {CONF_HOST: TEST_HOST_INVALID, CONF_MODEL: TEST_MODEL_BALANCE}
TEST_DATA_CREATE_ENTRY = {
CONF_HOST: TEST_HOST,
CONF_MODEL: TEST_MODEL_BALANCE,
CONF_BEOLINK_JID: TEST_JID_1,
CONF_NAME: TEST_NAME,
}
TEST_DATA_ZEROCONF = ZeroconfServiceInfo(
ip_address=IPv4Address(TEST_HOST),
ip_addresses=[IPv4Address(TEST_HOST)],
port=80,
hostname=TEST_HOSTNAME_ZEROCONF,
type=TEST_TYPE_ZEROCONF,
name=TEST_NAME_ZEROCONF,
properties={
ATTR_FRIENDLY_NAME: TEST_FRIENDLY_NAME,
ATTR_SERIAL_NUMBER: TEST_SERIAL_NUMBER,
ATTR_TYPE_NUMBER: TEST_TYPE_NUMBER,
ATTR_ITEM_NUMBER: TEST_ITEM_NUMBER,
},
)
TEST_DATA_ZEROCONF_NOT_MOZART = ZeroconfServiceInfo(
ip_address=IPv4Address(TEST_HOST),
ip_addresses=[IPv4Address(TEST_HOST)],
port=80,
hostname=TEST_HOSTNAME_ZEROCONF,
type=TEST_TYPE_ZEROCONF,
name=TEST_NAME_ZEROCONF,
properties={ATTR_SERIAL_NUMBER: TEST_SERIAL_NUMBER},
)
TEST_DATA_ZEROCONF_IPV6 = ZeroconfServiceInfo(
ip_address=IPv6Address(TEST_HOST_IPV6),
ip_addresses=[IPv6Address(TEST_HOST_IPV6)],
port=80,
hostname=TEST_HOSTNAME_ZEROCONF,
type=TEST_TYPE_ZEROCONF,
name=TEST_NAME_ZEROCONF,
properties={
ATTR_FRIENDLY_NAME: TEST_FRIENDLY_NAME,
ATTR_SERIAL_NUMBER: TEST_SERIAL_NUMBER,
ATTR_TYPE_NUMBER: TEST_TYPE_NUMBER,
ATTR_ITEM_NUMBER: TEST_ITEM_NUMBER,
},
)

View file

@ -0,0 +1,163 @@
"""Test the bang_olufsen config_flow."""
from unittest.mock import Mock
from aiohttp.client_exceptions import ClientConnectorError
from mozart_api.exceptions import ApiException
import pytest
from homeassistant.components.bang_olufsen.const import DOMAIN
from homeassistant.config_entries import SOURCE_USER, SOURCE_ZEROCONF
from homeassistant.const import CONF_SOURCE
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from .conftest import MockMozartClient
from .const import (
TEST_DATA_CREATE_ENTRY,
TEST_DATA_USER,
TEST_DATA_USER_INVALID,
TEST_DATA_ZEROCONF,
TEST_DATA_ZEROCONF_IPV6,
TEST_DATA_ZEROCONF_NOT_MOZART,
)
pytestmark = pytest.mark.usefixtures("mock_setup_entry")
async def test_config_flow_timeout_error(
hass: HomeAssistant, mock_client: MockMozartClient
) -> None:
"""Test we handle timeout_error."""
mock_client.get_beolink_self.side_effect = TimeoutError()
result_user = await hass.config_entries.flow.async_init(
handler=DOMAIN,
context={CONF_SOURCE: SOURCE_USER},
data=TEST_DATA_USER,
)
assert result_user["type"] == FlowResultType.FORM
assert result_user["errors"] == {"base": "timeout_error"}
assert mock_client.get_beolink_self.call_count == 1
async def test_config_flow_client_connector_error(
hass: HomeAssistant, mock_client: MockMozartClient
) -> None:
"""Test we handle client_connector_error."""
mock_client.get_beolink_self.side_effect = ClientConnectorError(Mock(), Mock())
result_user = await hass.config_entries.flow.async_init(
handler=DOMAIN,
context={CONF_SOURCE: SOURCE_USER},
data=TEST_DATA_USER,
)
assert result_user["type"] == FlowResultType.FORM
assert result_user["errors"] == {"base": "client_connector_error"}
assert mock_client.get_beolink_self.call_count == 1
async def test_config_flow_invalid_ip(hass: HomeAssistant) -> None:
"""Test we handle invalid_ip."""
result_user = await hass.config_entries.flow.async_init(
handler=DOMAIN,
context={CONF_SOURCE: SOURCE_USER},
data=TEST_DATA_USER_INVALID,
)
assert result_user["type"] == FlowResultType.FORM
assert result_user["errors"] == {"base": "invalid_ip"}
async def test_config_flow_api_exception(
hass: HomeAssistant, mock_client: MockMozartClient
) -> None:
"""Test we handle api_exception."""
mock_client.get_beolink_self.side_effect = ApiException()
result_user = await hass.config_entries.flow.async_init(
handler=DOMAIN,
context={CONF_SOURCE: SOURCE_USER},
data=TEST_DATA_USER,
)
assert result_user["type"] == FlowResultType.FORM
assert result_user["errors"] == {"base": "api_exception"}
assert mock_client.get_beolink_self.call_count == 1
async def test_config_flow(hass: HomeAssistant, mock_client: MockMozartClient) -> None:
"""Test config flow."""
result_init = await hass.config_entries.flow.async_init(
handler=DOMAIN,
context={CONF_SOURCE: SOURCE_USER},
data=None,
)
assert result_init["type"] == FlowResultType.FORM
assert result_init["step_id"] == "user"
result_user = await hass.config_entries.flow.async_configure(
flow_id=result_init["flow_id"],
user_input=TEST_DATA_USER,
)
assert result_user["type"] == FlowResultType.CREATE_ENTRY
assert result_user["data"] == TEST_DATA_CREATE_ENTRY
assert mock_client.get_beolink_self.call_count == 1
async def test_config_flow_zeroconf(
hass: HomeAssistant, mock_client: MockMozartClient
) -> None:
"""Test zeroconf discovery."""
result_zeroconf = await hass.config_entries.flow.async_init(
handler=DOMAIN,
context={CONF_SOURCE: SOURCE_ZEROCONF},
data=TEST_DATA_ZEROCONF,
)
assert result_zeroconf["type"] == FlowResultType.FORM
assert result_zeroconf["step_id"] == "zeroconf_confirm"
result_confirm = await hass.config_entries.flow.async_configure(
flow_id=result_zeroconf["flow_id"],
user_input=TEST_DATA_USER,
)
assert result_confirm["type"] == FlowResultType.CREATE_ENTRY
assert result_confirm["data"] == TEST_DATA_CREATE_ENTRY
assert mock_client.get_beolink_self.call_count == 0
async def test_config_flow_zeroconf_not_mozart_device(hass: HomeAssistant) -> None:
"""Test zeroconf discovery of invalid device."""
result_user = await hass.config_entries.flow.async_init(
handler=DOMAIN,
context={CONF_SOURCE: SOURCE_ZEROCONF},
data=TEST_DATA_ZEROCONF_NOT_MOZART,
)
assert result_user["type"] == FlowResultType.ABORT
assert result_user["reason"] == "not_mozart_device"
async def test_config_flow_zeroconf_ipv6(hass: HomeAssistant) -> None:
"""Test zeroconf discovery with IPv6 IP address."""
result_user = await hass.config_entries.flow.async_init(
handler=DOMAIN,
context={CONF_SOURCE: SOURCE_ZEROCONF},
data=TEST_DATA_ZEROCONF_IPV6,
)
assert result_user["type"] == FlowResultType.ABORT
assert result_user["reason"] == "ipv6_address"