Move core config class to core_config.py (#129163)
This commit is contained in:
parent
59227116f3
commit
4b56701152
8 changed files with 789 additions and 742 deletions
|
@ -1226,9 +1226,9 @@ class UnitOfConductivity(
|
||||||
StrEnum,
|
StrEnum,
|
||||||
metaclass=EnumWithDeprecatedMembers,
|
metaclass=EnumWithDeprecatedMembers,
|
||||||
deprecated={
|
deprecated={
|
||||||
"SIEMENS": ("SIEMENS_PER_CM", "2025.11.0"),
|
"SIEMENS": ("UnitOfConductivity.SIEMENS_PER_CM", "2025.11.0"),
|
||||||
"MICROSIEMENS": ("MICROSIEMENS_PER_CM", "2025.11.0"),
|
"MICROSIEMENS": ("UnitOfConductivity.MICROSIEMENS_PER_CM", "2025.11.0"),
|
||||||
"MILLISIEMENS": ("MILLISIEMENS_PER_CM", "2025.11.0"),
|
"MILLISIEMENS": ("UnitOfConductivity.MILLISIEMENS_PER_CM", "2025.11.0"),
|
||||||
},
|
},
|
||||||
):
|
):
|
||||||
"""Conductivity units."""
|
"""Conductivity units."""
|
||||||
|
|
|
@ -18,15 +18,12 @@ from collections.abc import (
|
||||||
ValuesView,
|
ValuesView,
|
||||||
)
|
)
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
from contextlib import suppress
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
import datetime
|
import datetime
|
||||||
import enum
|
import enum
|
||||||
import functools
|
import functools
|
||||||
import inspect
|
import inspect
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
import pathlib
|
|
||||||
import re
|
import re
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
@ -42,13 +39,10 @@ from typing import (
|
||||||
cast,
|
cast,
|
||||||
overload,
|
overload,
|
||||||
)
|
)
|
||||||
from urllib.parse import urlparse
|
|
||||||
|
|
||||||
from propcache import cached_property, under_cached_property
|
from propcache import cached_property, under_cached_property
|
||||||
from typing_extensions import TypeVar
|
from typing_extensions import TypeVar
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
from webrtc_models import RTCConfiguration
|
|
||||||
import yarl
|
|
||||||
|
|
||||||
from . import util
|
from . import util
|
||||||
from .const import (
|
from .const import (
|
||||||
|
@ -56,7 +50,6 @@ from .const import (
|
||||||
ATTR_FRIENDLY_NAME,
|
ATTR_FRIENDLY_NAME,
|
||||||
ATTR_SERVICE,
|
ATTR_SERVICE,
|
||||||
ATTR_SERVICE_DATA,
|
ATTR_SERVICE_DATA,
|
||||||
BASE_PLATFORMS,
|
|
||||||
COMPRESSED_STATE_ATTRIBUTES,
|
COMPRESSED_STATE_ATTRIBUTES,
|
||||||
COMPRESSED_STATE_CONTEXT,
|
COMPRESSED_STATE_CONTEXT,
|
||||||
COMPRESSED_STATE_LAST_CHANGED,
|
COMPRESSED_STATE_LAST_CHANGED,
|
||||||
|
@ -78,7 +71,6 @@ from .const import (
|
||||||
MAX_EXPECTED_ENTITY_IDS,
|
MAX_EXPECTED_ENTITY_IDS,
|
||||||
MAX_LENGTH_EVENT_EVENT_TYPE,
|
MAX_LENGTH_EVENT_EVENT_TYPE,
|
||||||
MAX_LENGTH_STATE_STATE,
|
MAX_LENGTH_STATE_STATE,
|
||||||
UnitOfLength,
|
|
||||||
__version__,
|
__version__,
|
||||||
)
|
)
|
||||||
from .exceptions import (
|
from .exceptions import (
|
||||||
|
@ -92,13 +84,14 @@ from .exceptions import (
|
||||||
)
|
)
|
||||||
from .helpers.deprecation import (
|
from .helpers.deprecation import (
|
||||||
DeprecatedConstantEnum,
|
DeprecatedConstantEnum,
|
||||||
|
EnumWithDeprecatedMembers,
|
||||||
all_with_deprecated_constants,
|
all_with_deprecated_constants,
|
||||||
check_if_deprecated_constant,
|
check_if_deprecated_constant,
|
||||||
dir_with_deprecated_constants,
|
dir_with_deprecated_constants,
|
||||||
)
|
)
|
||||||
from .helpers.json import json_bytes, json_fragment
|
from .helpers.json import json_bytes, json_fragment
|
||||||
from .helpers.typing import UNDEFINED, UndefinedType, VolSchemaType
|
from .helpers.typing import VolSchemaType
|
||||||
from .util import dt as dt_util, location
|
from .util import dt as dt_util
|
||||||
from .util.async_ import (
|
from .util.async_ import (
|
||||||
cancelling,
|
cancelling,
|
||||||
create_eager_task,
|
create_eager_task,
|
||||||
|
@ -113,18 +106,11 @@ from .util.json import JsonObjectType
|
||||||
from .util.read_only_dict import ReadOnlyDict
|
from .util.read_only_dict import ReadOnlyDict
|
||||||
from .util.timeout import TimeoutManager
|
from .util.timeout import TimeoutManager
|
||||||
from .util.ulid import ulid_at_time, ulid_now
|
from .util.ulid import ulid_at_time, ulid_now
|
||||||
from .util.unit_system import (
|
|
||||||
_CONF_UNIT_SYSTEM_IMPERIAL,
|
|
||||||
_CONF_UNIT_SYSTEM_US_CUSTOMARY,
|
|
||||||
METRIC_SYSTEM,
|
|
||||||
UnitSystem,
|
|
||||||
get_unit_system,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Typing imports that create a circular dependency
|
# Typing imports that create a circular dependency
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .auth import AuthManager
|
from .auth import AuthManager
|
||||||
from .components.http import ApiConfig, HomeAssistantHTTP
|
from .components.http import HomeAssistantHTTP
|
||||||
from .config_entries import ConfigEntries
|
from .config_entries import ConfigEntries
|
||||||
from .helpers.entity import StateInfo
|
from .helpers.entity import StateInfo
|
||||||
|
|
||||||
|
@ -138,10 +124,6 @@ _SENTINEL = object()
|
||||||
_DataT = TypeVar("_DataT", bound=Mapping[str, Any], default=Mapping[str, Any])
|
_DataT = TypeVar("_DataT", bound=Mapping[str, Any], default=Mapping[str, Any])
|
||||||
type CALLBACK_TYPE = Callable[[], None]
|
type CALLBACK_TYPE = Callable[[], None]
|
||||||
|
|
||||||
CORE_STORAGE_KEY = "core.config"
|
|
||||||
CORE_STORAGE_VERSION = 1
|
|
||||||
CORE_STORAGE_MINOR_VERSION = 4
|
|
||||||
|
|
||||||
DOMAIN = "homeassistant"
|
DOMAIN = "homeassistant"
|
||||||
|
|
||||||
# How long to wait to log tasks that are blocking
|
# How long to wait to log tasks that are blocking
|
||||||
|
@ -151,7 +133,16 @@ type ServiceResponse = JsonObjectType | None
|
||||||
type EntityServiceResponse = dict[str, ServiceResponse]
|
type EntityServiceResponse = dict[str, ServiceResponse]
|
||||||
|
|
||||||
|
|
||||||
class ConfigSource(enum.StrEnum):
|
class ConfigSource(
|
||||||
|
enum.StrEnum,
|
||||||
|
metaclass=EnumWithDeprecatedMembers,
|
||||||
|
deprecated={
|
||||||
|
"DEFAULT": ("core_config.ConfigSource.DEFAULT", "2025.11.0"),
|
||||||
|
"DISCOVERED": ("core_config.ConfigSource.DISCOVERED", "2025.11.0"),
|
||||||
|
"STORAGE": ("core_config.ConfigSource.STORAGE", "2025.11.0"),
|
||||||
|
"YAML": ("core_config.ConfigSource.YAML", "2025.11.0"),
|
||||||
|
},
|
||||||
|
):
|
||||||
"""Source of core configuration."""
|
"""Source of core configuration."""
|
||||||
|
|
||||||
DEFAULT = "default"
|
DEFAULT = "default"
|
||||||
|
@ -432,6 +423,9 @@ class HomeAssistant:
|
||||||
# pylint: disable-next=import-outside-toplevel
|
# pylint: disable-next=import-outside-toplevel
|
||||||
from . import loader
|
from . import loader
|
||||||
|
|
||||||
|
# pylint: disable-next=import-outside-toplevel
|
||||||
|
from .core_config import Config
|
||||||
|
|
||||||
# This is a dictionary that any component can store any data on.
|
# This is a dictionary that any component can store any data on.
|
||||||
self.data = HassDict()
|
self.data = HassDict()
|
||||||
self.loop = asyncio.get_running_loop()
|
self.loop = asyncio.get_running_loop()
|
||||||
|
@ -2844,454 +2838,6 @@ class ServiceRegistry:
|
||||||
return await self._hass.async_add_executor_job(target, service_call)
|
return await self._hass.async_add_executor_job(target, service_call)
|
||||||
|
|
||||||
|
|
||||||
class _ComponentSet(set[str]):
|
|
||||||
"""Set of loaded components.
|
|
||||||
|
|
||||||
This set contains both top level components and platforms.
|
|
||||||
|
|
||||||
Examples:
|
|
||||||
`light`, `switch`, `hue`, `mjpeg.camera`, `universal.media_player`,
|
|
||||||
`homeassistant.scene`
|
|
||||||
|
|
||||||
The top level components set only contains the top level components.
|
|
||||||
|
|
||||||
The all components set contains all components, including platform
|
|
||||||
based components.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self, top_level_components: set[str], all_components: set[str]
|
|
||||||
) -> None:
|
|
||||||
"""Initialize the component set."""
|
|
||||||
self._top_level_components = top_level_components
|
|
||||||
self._all_components = all_components
|
|
||||||
|
|
||||||
def add(self, component: str) -> None:
|
|
||||||
"""Add a component to the store."""
|
|
||||||
if "." not in component:
|
|
||||||
self._top_level_components.add(component)
|
|
||||||
self._all_components.add(component)
|
|
||||||
else:
|
|
||||||
platform, _, domain = component.partition(".")
|
|
||||||
if domain in BASE_PLATFORMS:
|
|
||||||
self._all_components.add(platform)
|
|
||||||
return super().add(component)
|
|
||||||
|
|
||||||
def remove(self, component: str) -> None:
|
|
||||||
"""Remove a component from the store."""
|
|
||||||
if "." in component:
|
|
||||||
raise ValueError("_ComponentSet does not support removing sub-components")
|
|
||||||
self._top_level_components.remove(component)
|
|
||||||
return super().remove(component)
|
|
||||||
|
|
||||||
def discard(self, component: str) -> None:
|
|
||||||
"""Remove a component from the store."""
|
|
||||||
raise NotImplementedError("_ComponentSet does not support discard, use remove")
|
|
||||||
|
|
||||||
|
|
||||||
class Config:
|
|
||||||
"""Configuration settings for Home Assistant."""
|
|
||||||
|
|
||||||
_store: Config._ConfigStore
|
|
||||||
|
|
||||||
def __init__(self, hass: HomeAssistant, config_dir: str) -> None:
|
|
||||||
"""Initialize a new config object."""
|
|
||||||
# pylint: disable-next=import-outside-toplevel
|
|
||||||
from .components.zone import DEFAULT_RADIUS
|
|
||||||
|
|
||||||
self.hass = hass
|
|
||||||
|
|
||||||
self.latitude: float = 0
|
|
||||||
self.longitude: float = 0
|
|
||||||
|
|
||||||
self.elevation: int = 0
|
|
||||||
"""Elevation (always in meters regardless of the unit system)."""
|
|
||||||
|
|
||||||
self.radius: int = DEFAULT_RADIUS
|
|
||||||
"""Radius of the Home Zone (always in meters regardless of the unit system)."""
|
|
||||||
|
|
||||||
self.debug: bool = False
|
|
||||||
self.location_name: str = "Home"
|
|
||||||
self.time_zone: str = "UTC"
|
|
||||||
self.units: UnitSystem = METRIC_SYSTEM
|
|
||||||
self.internal_url: str | None = None
|
|
||||||
self.external_url: str | None = None
|
|
||||||
self.currency: str = "EUR"
|
|
||||||
self.country: str | None = None
|
|
||||||
self.language: str = "en"
|
|
||||||
|
|
||||||
self.config_source: ConfigSource = ConfigSource.DEFAULT
|
|
||||||
|
|
||||||
# If True, pip install is skipped for requirements on startup
|
|
||||||
self.skip_pip: bool = False
|
|
||||||
|
|
||||||
# List of packages to skip when installing requirements on startup
|
|
||||||
self.skip_pip_packages: list[str] = []
|
|
||||||
|
|
||||||
# Set of loaded top level components
|
|
||||||
# This set is updated by _ComponentSet
|
|
||||||
# and should not be modified directly
|
|
||||||
self.top_level_components: set[str] = set()
|
|
||||||
|
|
||||||
# Set of all loaded components including platform
|
|
||||||
# based components
|
|
||||||
self.all_components: set[str] = set()
|
|
||||||
|
|
||||||
# Set of loaded components
|
|
||||||
self.components: _ComponentSet = _ComponentSet(
|
|
||||||
self.top_level_components, self.all_components
|
|
||||||
)
|
|
||||||
|
|
||||||
# API (HTTP) server configuration
|
|
||||||
self.api: ApiConfig | None = None
|
|
||||||
|
|
||||||
# Directory that holds the configuration
|
|
||||||
self.config_dir: str = config_dir
|
|
||||||
|
|
||||||
# List of allowed external dirs to access
|
|
||||||
self.allowlist_external_dirs: set[str] = set()
|
|
||||||
|
|
||||||
# List of allowed external URLs that integrations may use
|
|
||||||
self.allowlist_external_urls: set[str] = set()
|
|
||||||
|
|
||||||
# Dictionary of Media folders that integrations may use
|
|
||||||
self.media_dirs: dict[str, str] = {}
|
|
||||||
|
|
||||||
# If Home Assistant is running in recovery mode
|
|
||||||
self.recovery_mode: bool = False
|
|
||||||
|
|
||||||
# Use legacy template behavior
|
|
||||||
self.legacy_templates: bool = False
|
|
||||||
|
|
||||||
# If Home Assistant is running in safe mode
|
|
||||||
self.safe_mode: bool = False
|
|
||||||
|
|
||||||
self.webrtc = RTCConfiguration()
|
|
||||||
|
|
||||||
def async_initialize(self) -> None:
|
|
||||||
"""Finish initializing a config object.
|
|
||||||
|
|
||||||
This must be called before the config object is used.
|
|
||||||
"""
|
|
||||||
self._store = self._ConfigStore(self.hass)
|
|
||||||
|
|
||||||
def distance(self, lat: float, lon: float) -> float | None:
|
|
||||||
"""Calculate distance from Home Assistant.
|
|
||||||
|
|
||||||
Async friendly.
|
|
||||||
"""
|
|
||||||
return self.units.length(
|
|
||||||
location.distance(self.latitude, self.longitude, lat, lon),
|
|
||||||
UnitOfLength.METERS,
|
|
||||||
)
|
|
||||||
|
|
||||||
def path(self, *path: str) -> str:
|
|
||||||
"""Generate path to the file within the configuration directory.
|
|
||||||
|
|
||||||
Async friendly.
|
|
||||||
"""
|
|
||||||
return os.path.join(self.config_dir, *path)
|
|
||||||
|
|
||||||
def is_allowed_external_url(self, url: str) -> bool:
|
|
||||||
"""Check if an external URL is allowed."""
|
|
||||||
parsed_url = f"{yarl.URL(url)!s}/"
|
|
||||||
|
|
||||||
return any(
|
|
||||||
allowed
|
|
||||||
for allowed in self.allowlist_external_urls
|
|
||||||
if parsed_url.startswith(allowed)
|
|
||||||
)
|
|
||||||
|
|
||||||
def is_allowed_path(self, path: str) -> bool:
|
|
||||||
"""Check if the path is valid for access from outside.
|
|
||||||
|
|
||||||
This function does blocking I/O and should not be called from the event loop.
|
|
||||||
Use hass.async_add_executor_job to schedule it on the executor.
|
|
||||||
"""
|
|
||||||
assert path is not None
|
|
||||||
|
|
||||||
thepath = pathlib.Path(path)
|
|
||||||
try:
|
|
||||||
# The file path does not have to exist (it's parent should)
|
|
||||||
if thepath.exists():
|
|
||||||
thepath = thepath.resolve()
|
|
||||||
else:
|
|
||||||
thepath = thepath.parent.resolve()
|
|
||||||
except (FileNotFoundError, RuntimeError, PermissionError):
|
|
||||||
return False
|
|
||||||
|
|
||||||
for allowed_path in self.allowlist_external_dirs:
|
|
||||||
try:
|
|
||||||
thepath.relative_to(allowed_path)
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
def as_dict(self) -> dict[str, Any]:
|
|
||||||
"""Create a dictionary representation of the configuration.
|
|
||||||
|
|
||||||
Async friendly.
|
|
||||||
"""
|
|
||||||
allowlist_external_dirs = list(self.allowlist_external_dirs)
|
|
||||||
return {
|
|
||||||
"latitude": self.latitude,
|
|
||||||
"longitude": self.longitude,
|
|
||||||
"elevation": self.elevation,
|
|
||||||
"unit_system": self.units.as_dict(),
|
|
||||||
"location_name": self.location_name,
|
|
||||||
"time_zone": self.time_zone,
|
|
||||||
"components": list(self.components),
|
|
||||||
"config_dir": self.config_dir,
|
|
||||||
# legacy, backwards compat
|
|
||||||
"whitelist_external_dirs": allowlist_external_dirs,
|
|
||||||
"allowlist_external_dirs": allowlist_external_dirs,
|
|
||||||
"allowlist_external_urls": list(self.allowlist_external_urls),
|
|
||||||
"version": __version__,
|
|
||||||
"config_source": self.config_source,
|
|
||||||
"recovery_mode": self.recovery_mode,
|
|
||||||
"state": self.hass.state.value,
|
|
||||||
"external_url": self.external_url,
|
|
||||||
"internal_url": self.internal_url,
|
|
||||||
"currency": self.currency,
|
|
||||||
"country": self.country,
|
|
||||||
"language": self.language,
|
|
||||||
"safe_mode": self.safe_mode,
|
|
||||||
"debug": self.debug,
|
|
||||||
"radius": self.radius,
|
|
||||||
}
|
|
||||||
|
|
||||||
async def async_set_time_zone(self, time_zone_str: str) -> None:
|
|
||||||
"""Help to set the time zone."""
|
|
||||||
if time_zone := await dt_util.async_get_time_zone(time_zone_str):
|
|
||||||
self.time_zone = time_zone_str
|
|
||||||
dt_util.set_default_time_zone(time_zone)
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Received invalid time zone {time_zone_str}")
|
|
||||||
|
|
||||||
def set_time_zone(self, time_zone_str: str) -> None:
|
|
||||||
"""Set the time zone.
|
|
||||||
|
|
||||||
This is a legacy method that should not be used in new code.
|
|
||||||
Use async_set_time_zone instead.
|
|
||||||
|
|
||||||
It will be removed in Home Assistant 2025.6.
|
|
||||||
"""
|
|
||||||
# report is imported here to avoid a circular import
|
|
||||||
from .helpers.frame import report # pylint: disable=import-outside-toplevel
|
|
||||||
|
|
||||||
report(
|
|
||||||
"set the time zone using set_time_zone instead of async_set_time_zone"
|
|
||||||
" which will stop working in Home Assistant 2025.6",
|
|
||||||
error_if_core=True,
|
|
||||||
error_if_integration=True,
|
|
||||||
)
|
|
||||||
if time_zone := dt_util.get_time_zone(time_zone_str):
|
|
||||||
self.time_zone = time_zone_str
|
|
||||||
dt_util.set_default_time_zone(time_zone)
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Received invalid time zone {time_zone_str}")
|
|
||||||
|
|
||||||
async def _async_update(
|
|
||||||
self,
|
|
||||||
*,
|
|
||||||
source: ConfigSource,
|
|
||||||
latitude: float | None = None,
|
|
||||||
longitude: float | None = None,
|
|
||||||
elevation: int | None = None,
|
|
||||||
unit_system: str | None = None,
|
|
||||||
location_name: str | None = None,
|
|
||||||
time_zone: str | None = None,
|
|
||||||
external_url: str | UndefinedType | None = UNDEFINED,
|
|
||||||
internal_url: str | UndefinedType | None = UNDEFINED,
|
|
||||||
currency: str | None = None,
|
|
||||||
country: str | UndefinedType | None = UNDEFINED,
|
|
||||||
language: str | None = None,
|
|
||||||
radius: int | None = None,
|
|
||||||
) -> None:
|
|
||||||
"""Update the configuration from a dictionary."""
|
|
||||||
self.config_source = source
|
|
||||||
if latitude is not None:
|
|
||||||
self.latitude = latitude
|
|
||||||
if longitude is not None:
|
|
||||||
self.longitude = longitude
|
|
||||||
if elevation is not None:
|
|
||||||
self.elevation = elevation
|
|
||||||
if unit_system is not None:
|
|
||||||
try:
|
|
||||||
self.units = get_unit_system(unit_system)
|
|
||||||
except ValueError:
|
|
||||||
self.units = METRIC_SYSTEM
|
|
||||||
if location_name is not None:
|
|
||||||
self.location_name = location_name
|
|
||||||
if time_zone is not None:
|
|
||||||
await self.async_set_time_zone(time_zone)
|
|
||||||
if external_url is not UNDEFINED:
|
|
||||||
self.external_url = external_url
|
|
||||||
if internal_url is not UNDEFINED:
|
|
||||||
self.internal_url = internal_url
|
|
||||||
if currency is not None:
|
|
||||||
self.currency = currency
|
|
||||||
if country is not UNDEFINED:
|
|
||||||
self.country = country
|
|
||||||
if language is not None:
|
|
||||||
self.language = language
|
|
||||||
if radius is not None:
|
|
||||||
self.radius = radius
|
|
||||||
|
|
||||||
async def async_update(self, **kwargs: Any) -> None:
|
|
||||||
"""Update the configuration from a dictionary."""
|
|
||||||
# pylint: disable-next=import-outside-toplevel
|
|
||||||
from .core_config import (
|
|
||||||
_raise_issue_if_historic_currency,
|
|
||||||
_raise_issue_if_no_country,
|
|
||||||
)
|
|
||||||
|
|
||||||
await self._async_update(source=ConfigSource.STORAGE, **kwargs)
|
|
||||||
await self._async_store()
|
|
||||||
self.hass.bus.async_fire_internal(EVENT_CORE_CONFIG_UPDATE, kwargs)
|
|
||||||
|
|
||||||
_raise_issue_if_historic_currency(self.hass, self.currency)
|
|
||||||
_raise_issue_if_no_country(self.hass, self.country)
|
|
||||||
|
|
||||||
async def async_load(self) -> None:
|
|
||||||
"""Load [homeassistant] core config."""
|
|
||||||
if not (data := await self._store.async_load()):
|
|
||||||
return
|
|
||||||
|
|
||||||
# In 2021.9 we fixed validation to disallow a path (because that's never
|
|
||||||
# correct) but this data still lives in storage, so we print a warning.
|
|
||||||
if data.get("external_url") and urlparse(data["external_url"]).path not in (
|
|
||||||
"",
|
|
||||||
"/",
|
|
||||||
):
|
|
||||||
_LOGGER.warning("Invalid external_url set. It's not allowed to have a path")
|
|
||||||
|
|
||||||
if data.get("internal_url") and urlparse(data["internal_url"]).path not in (
|
|
||||||
"",
|
|
||||||
"/",
|
|
||||||
):
|
|
||||||
_LOGGER.warning("Invalid internal_url set. It's not allowed to have a path")
|
|
||||||
|
|
||||||
await self._async_update(
|
|
||||||
source=ConfigSource.STORAGE,
|
|
||||||
latitude=data.get("latitude"),
|
|
||||||
longitude=data.get("longitude"),
|
|
||||||
elevation=data.get("elevation"),
|
|
||||||
unit_system=data.get("unit_system_v2"),
|
|
||||||
location_name=data.get("location_name"),
|
|
||||||
time_zone=data.get("time_zone"),
|
|
||||||
external_url=data.get("external_url", UNDEFINED),
|
|
||||||
internal_url=data.get("internal_url", UNDEFINED),
|
|
||||||
currency=data.get("currency"),
|
|
||||||
country=data.get("country"),
|
|
||||||
language=data.get("language"),
|
|
||||||
radius=data["radius"],
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _async_store(self) -> None:
|
|
||||||
"""Store [homeassistant] core config."""
|
|
||||||
data = {
|
|
||||||
"latitude": self.latitude,
|
|
||||||
"longitude": self.longitude,
|
|
||||||
"elevation": self.elevation,
|
|
||||||
# We don't want any integrations to use the name of the unit system
|
|
||||||
# so we are using the private attribute here
|
|
||||||
"unit_system_v2": self.units._name, # noqa: SLF001
|
|
||||||
"location_name": self.location_name,
|
|
||||||
"time_zone": self.time_zone,
|
|
||||||
"external_url": self.external_url,
|
|
||||||
"internal_url": self.internal_url,
|
|
||||||
"currency": self.currency,
|
|
||||||
"country": self.country,
|
|
||||||
"language": self.language,
|
|
||||||
"radius": self.radius,
|
|
||||||
}
|
|
||||||
await self._store.async_save(data)
|
|
||||||
|
|
||||||
# Circular dependency prevents us from generating the class at top level
|
|
||||||
# pylint: disable-next=import-outside-toplevel
|
|
||||||
from .helpers.storage import Store
|
|
||||||
|
|
||||||
class _ConfigStore(Store[dict[str, Any]]):
|
|
||||||
"""Class to help storing Config data."""
|
|
||||||
|
|
||||||
def __init__(self, hass: HomeAssistant) -> None:
|
|
||||||
"""Initialize storage class."""
|
|
||||||
super().__init__(
|
|
||||||
hass,
|
|
||||||
CORE_STORAGE_VERSION,
|
|
||||||
CORE_STORAGE_KEY,
|
|
||||||
private=True,
|
|
||||||
atomic_writes=True,
|
|
||||||
minor_version=CORE_STORAGE_MINOR_VERSION,
|
|
||||||
)
|
|
||||||
self._original_unit_system: str | None = None # from old store 1.1
|
|
||||||
|
|
||||||
async def _async_migrate_func(
|
|
||||||
self,
|
|
||||||
old_major_version: int,
|
|
||||||
old_minor_version: int,
|
|
||||||
old_data: dict[str, Any],
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Migrate to the new version."""
|
|
||||||
|
|
||||||
# pylint: disable-next=import-outside-toplevel
|
|
||||||
from .components.zone import DEFAULT_RADIUS
|
|
||||||
|
|
||||||
data = old_data
|
|
||||||
if old_major_version == 1 and old_minor_version < 2:
|
|
||||||
# In 1.2, we remove support for "imperial", replaced by "us_customary"
|
|
||||||
# Using a new key to allow rollback
|
|
||||||
self._original_unit_system = data.get("unit_system")
|
|
||||||
data["unit_system_v2"] = self._original_unit_system
|
|
||||||
if data["unit_system_v2"] == _CONF_UNIT_SYSTEM_IMPERIAL:
|
|
||||||
data["unit_system_v2"] = _CONF_UNIT_SYSTEM_US_CUSTOMARY
|
|
||||||
if old_major_version == 1 and old_minor_version < 3:
|
|
||||||
# In 1.3, we add the key "language", initialize it from the
|
|
||||||
# owner account.
|
|
||||||
data["language"] = "en"
|
|
||||||
try:
|
|
||||||
owner = await self.hass.auth.async_get_owner()
|
|
||||||
if owner is not None:
|
|
||||||
# pylint: disable-next=import-outside-toplevel
|
|
||||||
from .components.frontend import storage as frontend_store
|
|
||||||
|
|
||||||
# pylint: disable-next=import-outside-toplevel
|
|
||||||
from .helpers import config_validation as cv
|
|
||||||
|
|
||||||
_, owner_data = await frontend_store.async_user_store(
|
|
||||||
self.hass, owner.id
|
|
||||||
)
|
|
||||||
|
|
||||||
if (
|
|
||||||
"language" in owner_data
|
|
||||||
and "language" in owner_data["language"]
|
|
||||||
):
|
|
||||||
with suppress(vol.InInvalid):
|
|
||||||
data["language"] = cv.language(
|
|
||||||
owner_data["language"]["language"]
|
|
||||||
)
|
|
||||||
# pylint: disable-next=broad-except
|
|
||||||
except Exception:
|
|
||||||
_LOGGER.exception("Unexpected error during core config migration")
|
|
||||||
if old_major_version == 1 and old_minor_version < 4:
|
|
||||||
# In 1.4, we add the key "radius", initialize it with the default.
|
|
||||||
data.setdefault("radius", DEFAULT_RADIUS)
|
|
||||||
|
|
||||||
if old_major_version > 1:
|
|
||||||
raise NotImplementedError
|
|
||||||
return data
|
|
||||||
|
|
||||||
async def async_save(self, data: dict[str, Any]) -> None:
|
|
||||||
if self._original_unit_system:
|
|
||||||
data["unit_system"] = self._original_unit_system
|
|
||||||
return await super().async_save(data)
|
|
||||||
|
|
||||||
|
|
||||||
# These can be removed if no deprecated constant are in this module anymore
|
# These can be removed if no deprecated constant are in this module anymore
|
||||||
__getattr__ = functools.partial(check_if_deprecated_constant, module_globals=globals())
|
__getattr__ = functools.partial(check_if_deprecated_constant, module_globals=globals())
|
||||||
__dir__ = functools.partial(
|
__dir__ = functools.partial(
|
||||||
|
|
|
@ -5,12 +5,16 @@ from __future__ import annotations
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from collections.abc import Sequence
|
from collections.abc import Sequence
|
||||||
from contextlib import suppress
|
from contextlib import suppress
|
||||||
|
import enum
|
||||||
import logging
|
import logging
|
||||||
from typing import Any, Final
|
import os
|
||||||
|
import pathlib
|
||||||
|
from typing import TYPE_CHECKING, Any, Final
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
from webrtc_models import RTCIceServer
|
from webrtc_models import RTCConfiguration, RTCIceServer
|
||||||
|
import yarl
|
||||||
|
|
||||||
from . import auth
|
from . import auth
|
||||||
from .auth import mfa_modules as auth_mfa_modules, providers as auth_providers
|
from .auth import mfa_modules as auth_mfa_modules, providers as auth_providers
|
||||||
|
@ -18,6 +22,7 @@ from .const import (
|
||||||
ATTR_ASSUMED_STATE,
|
ATTR_ASSUMED_STATE,
|
||||||
ATTR_FRIENDLY_NAME,
|
ATTR_FRIENDLY_NAME,
|
||||||
ATTR_HIDDEN,
|
ATTR_HIDDEN,
|
||||||
|
BASE_PLATFORMS,
|
||||||
CONF_ALLOWLIST_EXTERNAL_DIRS,
|
CONF_ALLOWLIST_EXTERNAL_DIRS,
|
||||||
CONF_ALLOWLIST_EXTERNAL_URLS,
|
CONF_ALLOWLIST_EXTERNAL_URLS,
|
||||||
CONF_AUTH_MFA_MODULES,
|
CONF_AUTH_MFA_MODULES,
|
||||||
|
@ -46,15 +51,33 @@ from .const import (
|
||||||
CONF_UNIT_SYSTEM,
|
CONF_UNIT_SYSTEM,
|
||||||
CONF_URL,
|
CONF_URL,
|
||||||
CONF_USERNAME,
|
CONF_USERNAME,
|
||||||
|
EVENT_CORE_CONFIG_UPDATE,
|
||||||
LEGACY_CONF_WHITELIST_EXTERNAL_DIRS,
|
LEGACY_CONF_WHITELIST_EXTERNAL_DIRS,
|
||||||
|
UnitOfLength,
|
||||||
|
__version__,
|
||||||
)
|
)
|
||||||
from .core import DOMAIN as HOMEASSISTANT_DOMAIN, ConfigSource, HomeAssistant
|
from .core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
|
||||||
from .generated.currencies import HISTORIC_CURRENCIES
|
from .generated.currencies import HISTORIC_CURRENCIES
|
||||||
from .helpers import config_validation as cv, issue_registry as ir
|
from .helpers import config_validation as cv, issue_registry as ir
|
||||||
from .helpers.entity_values import EntityValues
|
from .helpers.entity_values import EntityValues
|
||||||
|
from .helpers.frame import report
|
||||||
|
from .helpers.storage import Store
|
||||||
|
from .helpers.typing import UNDEFINED, UndefinedType
|
||||||
|
from .util import dt as dt_util, location
|
||||||
from .util.hass_dict import HassKey
|
from .util.hass_dict import HassKey
|
||||||
from .util.package import is_docker_env
|
from .util.package import is_docker_env
|
||||||
from .util.unit_system import get_unit_system, validate_unit_system
|
from .util.unit_system import (
|
||||||
|
_CONF_UNIT_SYSTEM_IMPERIAL,
|
||||||
|
_CONF_UNIT_SYSTEM_US_CUSTOMARY,
|
||||||
|
METRIC_SYSTEM,
|
||||||
|
UnitSystem,
|
||||||
|
get_unit_system,
|
||||||
|
validate_unit_system,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Typing imports that create a circular dependency
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from .components.http import ApiConfig
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -64,6 +87,19 @@ CONF_CREDENTIAL: Final = "credential"
|
||||||
CONF_ICE_SERVERS: Final = "ice_servers"
|
CONF_ICE_SERVERS: Final = "ice_servers"
|
||||||
CONF_WEBRTC: Final = "webrtc"
|
CONF_WEBRTC: Final = "webrtc"
|
||||||
|
|
||||||
|
CORE_STORAGE_KEY = "core.config"
|
||||||
|
CORE_STORAGE_VERSION = 1
|
||||||
|
CORE_STORAGE_MINOR_VERSION = 4
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigSource(enum.StrEnum):
|
||||||
|
"""Source of core configuration."""
|
||||||
|
|
||||||
|
DEFAULT = "default"
|
||||||
|
DISCOVERED = "discovered"
|
||||||
|
STORAGE = "storage"
|
||||||
|
YAML = "yaml"
|
||||||
|
|
||||||
|
|
||||||
def _no_duplicate_auth_provider(
|
def _no_duplicate_auth_provider(
|
||||||
configs: Sequence[dict[str, Any]],
|
configs: Sequence[dict[str, Any]],
|
||||||
|
@ -421,3 +457,435 @@ async def async_process_ha_core_config(hass: HomeAssistant, config: dict) -> Non
|
||||||
|
|
||||||
if CONF_UNIT_SYSTEM in config:
|
if CONF_UNIT_SYSTEM in config:
|
||||||
hac.units = get_unit_system(config[CONF_UNIT_SYSTEM])
|
hac.units = get_unit_system(config[CONF_UNIT_SYSTEM])
|
||||||
|
|
||||||
|
|
||||||
|
class _ComponentSet(set[str]):
|
||||||
|
"""Set of loaded components.
|
||||||
|
|
||||||
|
This set contains both top level components and platforms.
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
`light`, `switch`, `hue`, `mjpeg.camera`, `universal.media_player`,
|
||||||
|
`homeassistant.scene`
|
||||||
|
|
||||||
|
The top level components set only contains the top level components.
|
||||||
|
|
||||||
|
The all components set contains all components, including platform
|
||||||
|
based components.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self, top_level_components: set[str], all_components: set[str]
|
||||||
|
) -> None:
|
||||||
|
"""Initialize the component set."""
|
||||||
|
self._top_level_components = top_level_components
|
||||||
|
self._all_components = all_components
|
||||||
|
|
||||||
|
def add(self, component: str) -> None:
|
||||||
|
"""Add a component to the store."""
|
||||||
|
if "." not in component:
|
||||||
|
self._top_level_components.add(component)
|
||||||
|
self._all_components.add(component)
|
||||||
|
else:
|
||||||
|
platform, _, domain = component.partition(".")
|
||||||
|
if domain in BASE_PLATFORMS:
|
||||||
|
self._all_components.add(platform)
|
||||||
|
return super().add(component)
|
||||||
|
|
||||||
|
def remove(self, component: str) -> None:
|
||||||
|
"""Remove a component from the store."""
|
||||||
|
if "." in component:
|
||||||
|
raise ValueError("_ComponentSet does not support removing sub-components")
|
||||||
|
self._top_level_components.remove(component)
|
||||||
|
return super().remove(component)
|
||||||
|
|
||||||
|
def discard(self, component: str) -> None:
|
||||||
|
"""Remove a component from the store."""
|
||||||
|
raise NotImplementedError("_ComponentSet does not support discard, use remove")
|
||||||
|
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
"""Configuration settings for Home Assistant."""
|
||||||
|
|
||||||
|
_store: Config._ConfigStore
|
||||||
|
|
||||||
|
def __init__(self, hass: HomeAssistant, config_dir: str) -> None:
|
||||||
|
"""Initialize a new config object."""
|
||||||
|
# pylint: disable-next=import-outside-toplevel
|
||||||
|
from .components.zone import DEFAULT_RADIUS
|
||||||
|
|
||||||
|
self.hass = hass
|
||||||
|
|
||||||
|
self.latitude: float = 0
|
||||||
|
self.longitude: float = 0
|
||||||
|
|
||||||
|
self.elevation: int = 0
|
||||||
|
"""Elevation (always in meters regardless of the unit system)."""
|
||||||
|
|
||||||
|
self.radius: int = DEFAULT_RADIUS
|
||||||
|
"""Radius of the Home Zone (always in meters regardless of the unit system)."""
|
||||||
|
|
||||||
|
self.debug: bool = False
|
||||||
|
self.location_name: str = "Home"
|
||||||
|
self.time_zone: str = "UTC"
|
||||||
|
self.units: UnitSystem = METRIC_SYSTEM
|
||||||
|
self.internal_url: str | None = None
|
||||||
|
self.external_url: str | None = None
|
||||||
|
self.currency: str = "EUR"
|
||||||
|
self.country: str | None = None
|
||||||
|
self.language: str = "en"
|
||||||
|
|
||||||
|
self.config_source: ConfigSource = ConfigSource.DEFAULT
|
||||||
|
|
||||||
|
# If True, pip install is skipped for requirements on startup
|
||||||
|
self.skip_pip: bool = False
|
||||||
|
|
||||||
|
# List of packages to skip when installing requirements on startup
|
||||||
|
self.skip_pip_packages: list[str] = []
|
||||||
|
|
||||||
|
# Set of loaded top level components
|
||||||
|
# This set is updated by _ComponentSet
|
||||||
|
# and should not be modified directly
|
||||||
|
self.top_level_components: set[str] = set()
|
||||||
|
|
||||||
|
# Set of all loaded components including platform
|
||||||
|
# based components
|
||||||
|
self.all_components: set[str] = set()
|
||||||
|
|
||||||
|
# Set of loaded components
|
||||||
|
self.components: _ComponentSet = _ComponentSet(
|
||||||
|
self.top_level_components, self.all_components
|
||||||
|
)
|
||||||
|
|
||||||
|
# API (HTTP) server configuration
|
||||||
|
self.api: ApiConfig | None = None
|
||||||
|
|
||||||
|
# Directory that holds the configuration
|
||||||
|
self.config_dir: str = config_dir
|
||||||
|
|
||||||
|
# List of allowed external dirs to access
|
||||||
|
self.allowlist_external_dirs: set[str] = set()
|
||||||
|
|
||||||
|
# List of allowed external URLs that integrations may use
|
||||||
|
self.allowlist_external_urls: set[str] = set()
|
||||||
|
|
||||||
|
# Dictionary of Media folders that integrations may use
|
||||||
|
self.media_dirs: dict[str, str] = {}
|
||||||
|
|
||||||
|
# If Home Assistant is running in recovery mode
|
||||||
|
self.recovery_mode: bool = False
|
||||||
|
|
||||||
|
# Use legacy template behavior
|
||||||
|
self.legacy_templates: bool = False
|
||||||
|
|
||||||
|
# If Home Assistant is running in safe mode
|
||||||
|
self.safe_mode: bool = False
|
||||||
|
|
||||||
|
self.webrtc = RTCConfiguration()
|
||||||
|
|
||||||
|
def async_initialize(self) -> None:
|
||||||
|
"""Finish initializing a config object.
|
||||||
|
|
||||||
|
This must be called before the config object is used.
|
||||||
|
"""
|
||||||
|
self._store = self._ConfigStore(self.hass)
|
||||||
|
|
||||||
|
def distance(self, lat: float, lon: float) -> float | None:
|
||||||
|
"""Calculate distance from Home Assistant.
|
||||||
|
|
||||||
|
Async friendly.
|
||||||
|
"""
|
||||||
|
return self.units.length(
|
||||||
|
location.distance(self.latitude, self.longitude, lat, lon),
|
||||||
|
UnitOfLength.METERS,
|
||||||
|
)
|
||||||
|
|
||||||
|
def path(self, *path: str) -> str:
|
||||||
|
"""Generate path to the file within the configuration directory.
|
||||||
|
|
||||||
|
Async friendly.
|
||||||
|
"""
|
||||||
|
return os.path.join(self.config_dir, *path)
|
||||||
|
|
||||||
|
def is_allowed_external_url(self, url: str) -> bool:
|
||||||
|
"""Check if an external URL is allowed."""
|
||||||
|
parsed_url = f"{yarl.URL(url)!s}/"
|
||||||
|
|
||||||
|
return any(
|
||||||
|
allowed
|
||||||
|
for allowed in self.allowlist_external_urls
|
||||||
|
if parsed_url.startswith(allowed)
|
||||||
|
)
|
||||||
|
|
||||||
|
def is_allowed_path(self, path: str) -> bool:
|
||||||
|
"""Check if the path is valid for access from outside.
|
||||||
|
|
||||||
|
This function does blocking I/O and should not be called from the event loop.
|
||||||
|
Use hass.async_add_executor_job to schedule it on the executor.
|
||||||
|
"""
|
||||||
|
assert path is not None
|
||||||
|
|
||||||
|
thepath = pathlib.Path(path)
|
||||||
|
try:
|
||||||
|
# The file path does not have to exist (it's parent should)
|
||||||
|
if thepath.exists():
|
||||||
|
thepath = thepath.resolve()
|
||||||
|
else:
|
||||||
|
thepath = thepath.parent.resolve()
|
||||||
|
except (FileNotFoundError, RuntimeError, PermissionError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
for allowed_path in self.allowlist_external_dirs:
|
||||||
|
try:
|
||||||
|
thepath.relative_to(allowed_path)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def as_dict(self) -> dict[str, Any]:
|
||||||
|
"""Create a dictionary representation of the configuration.
|
||||||
|
|
||||||
|
Async friendly.
|
||||||
|
"""
|
||||||
|
allowlist_external_dirs = list(self.allowlist_external_dirs)
|
||||||
|
return {
|
||||||
|
"latitude": self.latitude,
|
||||||
|
"longitude": self.longitude,
|
||||||
|
"elevation": self.elevation,
|
||||||
|
"unit_system": self.units.as_dict(),
|
||||||
|
"location_name": self.location_name,
|
||||||
|
"time_zone": self.time_zone,
|
||||||
|
"components": list(self.components),
|
||||||
|
"config_dir": self.config_dir,
|
||||||
|
# legacy, backwards compat
|
||||||
|
"whitelist_external_dirs": allowlist_external_dirs,
|
||||||
|
"allowlist_external_dirs": allowlist_external_dirs,
|
||||||
|
"allowlist_external_urls": list(self.allowlist_external_urls),
|
||||||
|
"version": __version__,
|
||||||
|
"config_source": self.config_source,
|
||||||
|
"recovery_mode": self.recovery_mode,
|
||||||
|
"state": self.hass.state.value,
|
||||||
|
"external_url": self.external_url,
|
||||||
|
"internal_url": self.internal_url,
|
||||||
|
"currency": self.currency,
|
||||||
|
"country": self.country,
|
||||||
|
"language": self.language,
|
||||||
|
"safe_mode": self.safe_mode,
|
||||||
|
"debug": self.debug,
|
||||||
|
"radius": self.radius,
|
||||||
|
}
|
||||||
|
|
||||||
|
async def async_set_time_zone(self, time_zone_str: str) -> None:
|
||||||
|
"""Help to set the time zone."""
|
||||||
|
if time_zone := await dt_util.async_get_time_zone(time_zone_str):
|
||||||
|
self.time_zone = time_zone_str
|
||||||
|
dt_util.set_default_time_zone(time_zone)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Received invalid time zone {time_zone_str}")
|
||||||
|
|
||||||
|
def set_time_zone(self, time_zone_str: str) -> None:
|
||||||
|
"""Set the time zone.
|
||||||
|
|
||||||
|
This is a legacy method that should not be used in new code.
|
||||||
|
Use async_set_time_zone instead.
|
||||||
|
|
||||||
|
It will be removed in Home Assistant 2025.6.
|
||||||
|
"""
|
||||||
|
report(
|
||||||
|
"set the time zone using set_time_zone instead of async_set_time_zone"
|
||||||
|
" which will stop working in Home Assistant 2025.6",
|
||||||
|
error_if_core=True,
|
||||||
|
error_if_integration=True,
|
||||||
|
)
|
||||||
|
if time_zone := dt_util.get_time_zone(time_zone_str):
|
||||||
|
self.time_zone = time_zone_str
|
||||||
|
dt_util.set_default_time_zone(time_zone)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Received invalid time zone {time_zone_str}")
|
||||||
|
|
||||||
|
async def _async_update(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
source: ConfigSource,
|
||||||
|
latitude: float | None = None,
|
||||||
|
longitude: float | None = None,
|
||||||
|
elevation: int | None = None,
|
||||||
|
unit_system: str | None = None,
|
||||||
|
location_name: str | None = None,
|
||||||
|
time_zone: str | None = None,
|
||||||
|
external_url: str | UndefinedType | None = UNDEFINED,
|
||||||
|
internal_url: str | UndefinedType | None = UNDEFINED,
|
||||||
|
currency: str | None = None,
|
||||||
|
country: str | UndefinedType | None = UNDEFINED,
|
||||||
|
language: str | None = None,
|
||||||
|
radius: int | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Update the configuration from a dictionary."""
|
||||||
|
self.config_source = source
|
||||||
|
if latitude is not None:
|
||||||
|
self.latitude = latitude
|
||||||
|
if longitude is not None:
|
||||||
|
self.longitude = longitude
|
||||||
|
if elevation is not None:
|
||||||
|
self.elevation = elevation
|
||||||
|
if unit_system is not None:
|
||||||
|
try:
|
||||||
|
self.units = get_unit_system(unit_system)
|
||||||
|
except ValueError:
|
||||||
|
self.units = METRIC_SYSTEM
|
||||||
|
if location_name is not None:
|
||||||
|
self.location_name = location_name
|
||||||
|
if time_zone is not None:
|
||||||
|
await self.async_set_time_zone(time_zone)
|
||||||
|
if external_url is not UNDEFINED:
|
||||||
|
self.external_url = external_url
|
||||||
|
if internal_url is not UNDEFINED:
|
||||||
|
self.internal_url = internal_url
|
||||||
|
if currency is not None:
|
||||||
|
self.currency = currency
|
||||||
|
if country is not UNDEFINED:
|
||||||
|
self.country = country
|
||||||
|
if language is not None:
|
||||||
|
self.language = language
|
||||||
|
if radius is not None:
|
||||||
|
self.radius = radius
|
||||||
|
|
||||||
|
async def async_update(self, **kwargs: Any) -> None:
|
||||||
|
"""Update the configuration from a dictionary."""
|
||||||
|
await self._async_update(source=ConfigSource.STORAGE, **kwargs)
|
||||||
|
await self._async_store()
|
||||||
|
self.hass.bus.async_fire_internal(EVENT_CORE_CONFIG_UPDATE, kwargs)
|
||||||
|
|
||||||
|
_raise_issue_if_historic_currency(self.hass, self.currency)
|
||||||
|
_raise_issue_if_no_country(self.hass, self.country)
|
||||||
|
|
||||||
|
async def async_load(self) -> None:
|
||||||
|
"""Load [homeassistant] core config."""
|
||||||
|
if not (data := await self._store.async_load()):
|
||||||
|
return
|
||||||
|
|
||||||
|
# In 2021.9 we fixed validation to disallow a path (because that's never
|
||||||
|
# correct) but this data still lives in storage, so we print a warning.
|
||||||
|
if data.get("external_url") and urlparse(data["external_url"]).path not in (
|
||||||
|
"",
|
||||||
|
"/",
|
||||||
|
):
|
||||||
|
_LOGGER.warning("Invalid external_url set. It's not allowed to have a path")
|
||||||
|
|
||||||
|
if data.get("internal_url") and urlparse(data["internal_url"]).path not in (
|
||||||
|
"",
|
||||||
|
"/",
|
||||||
|
):
|
||||||
|
_LOGGER.warning("Invalid internal_url set. It's not allowed to have a path")
|
||||||
|
|
||||||
|
await self._async_update(
|
||||||
|
source=ConfigSource.STORAGE,
|
||||||
|
latitude=data.get("latitude"),
|
||||||
|
longitude=data.get("longitude"),
|
||||||
|
elevation=data.get("elevation"),
|
||||||
|
unit_system=data.get("unit_system_v2"),
|
||||||
|
location_name=data.get("location_name"),
|
||||||
|
time_zone=data.get("time_zone"),
|
||||||
|
external_url=data.get("external_url", UNDEFINED),
|
||||||
|
internal_url=data.get("internal_url", UNDEFINED),
|
||||||
|
currency=data.get("currency"),
|
||||||
|
country=data.get("country"),
|
||||||
|
language=data.get("language"),
|
||||||
|
radius=data["radius"],
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _async_store(self) -> None:
|
||||||
|
"""Store [homeassistant] core config."""
|
||||||
|
data = {
|
||||||
|
"latitude": self.latitude,
|
||||||
|
"longitude": self.longitude,
|
||||||
|
"elevation": self.elevation,
|
||||||
|
# We don't want any integrations to use the name of the unit system
|
||||||
|
# so we are using the private attribute here
|
||||||
|
"unit_system_v2": self.units._name, # noqa: SLF001
|
||||||
|
"location_name": self.location_name,
|
||||||
|
"time_zone": self.time_zone,
|
||||||
|
"external_url": self.external_url,
|
||||||
|
"internal_url": self.internal_url,
|
||||||
|
"currency": self.currency,
|
||||||
|
"country": self.country,
|
||||||
|
"language": self.language,
|
||||||
|
"radius": self.radius,
|
||||||
|
}
|
||||||
|
await self._store.async_save(data)
|
||||||
|
|
||||||
|
class _ConfigStore(Store[dict[str, Any]]):
|
||||||
|
"""Class to help storing Config data."""
|
||||||
|
|
||||||
|
def __init__(self, hass: HomeAssistant) -> None:
|
||||||
|
"""Initialize storage class."""
|
||||||
|
super().__init__(
|
||||||
|
hass,
|
||||||
|
CORE_STORAGE_VERSION,
|
||||||
|
CORE_STORAGE_KEY,
|
||||||
|
private=True,
|
||||||
|
atomic_writes=True,
|
||||||
|
minor_version=CORE_STORAGE_MINOR_VERSION,
|
||||||
|
)
|
||||||
|
self._original_unit_system: str | None = None # from old store 1.1
|
||||||
|
|
||||||
|
async def _async_migrate_func(
|
||||||
|
self,
|
||||||
|
old_major_version: int,
|
||||||
|
old_minor_version: int,
|
||||||
|
old_data: dict[str, Any],
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Migrate to the new version."""
|
||||||
|
|
||||||
|
# pylint: disable-next=import-outside-toplevel
|
||||||
|
from .components.zone import DEFAULT_RADIUS
|
||||||
|
|
||||||
|
data = old_data
|
||||||
|
if old_major_version == 1 and old_minor_version < 2:
|
||||||
|
# In 1.2, we remove support for "imperial", replaced by "us_customary"
|
||||||
|
# Using a new key to allow rollback
|
||||||
|
self._original_unit_system = data.get("unit_system")
|
||||||
|
data["unit_system_v2"] = self._original_unit_system
|
||||||
|
if data["unit_system_v2"] == _CONF_UNIT_SYSTEM_IMPERIAL:
|
||||||
|
data["unit_system_v2"] = _CONF_UNIT_SYSTEM_US_CUSTOMARY
|
||||||
|
if old_major_version == 1 and old_minor_version < 3:
|
||||||
|
# In 1.3, we add the key "language", initialize it from the
|
||||||
|
# owner account.
|
||||||
|
data["language"] = "en"
|
||||||
|
try:
|
||||||
|
owner = await self.hass.auth.async_get_owner()
|
||||||
|
if owner is not None:
|
||||||
|
# pylint: disable-next=import-outside-toplevel
|
||||||
|
from .components.frontend import storage as frontend_store
|
||||||
|
|
||||||
|
_, owner_data = await frontend_store.async_user_store(
|
||||||
|
self.hass, owner.id
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
"language" in owner_data
|
||||||
|
and "language" in owner_data["language"]
|
||||||
|
):
|
||||||
|
with suppress(vol.InInvalid):
|
||||||
|
data["language"] = cv.language(
|
||||||
|
owner_data["language"]["language"]
|
||||||
|
)
|
||||||
|
# pylint: disable-next=broad-except
|
||||||
|
except Exception:
|
||||||
|
_LOGGER.exception("Unexpected error during core config migration")
|
||||||
|
if old_major_version == 1 and old_minor_version < 4:
|
||||||
|
# In 1.4, we add the key "radius", initialize it with the default.
|
||||||
|
data.setdefault("radius", DEFAULT_RADIUS)
|
||||||
|
|
||||||
|
if old_major_version > 1:
|
||||||
|
raise NotImplementedError
|
||||||
|
return data
|
||||||
|
|
||||||
|
async def async_save(self, data: dict[str, Any]) -> None:
|
||||||
|
if self._original_unit_system:
|
||||||
|
data["unit_system"] = self._original_unit_system
|
||||||
|
return await super().async_save(data)
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
|
from contextlib import suppress
|
||||||
from enum import Enum, EnumType, _EnumDict
|
from enum import Enum, EnumType, _EnumDict
|
||||||
import functools
|
import functools
|
||||||
import inspect
|
import inspect
|
||||||
|
@ -164,6 +165,30 @@ def _print_deprecation_warning_internal(
|
||||||
breaks_in_ha_version: str | None,
|
breaks_in_ha_version: str | None,
|
||||||
*,
|
*,
|
||||||
log_when_no_integration_is_found: bool,
|
log_when_no_integration_is_found: bool,
|
||||||
|
) -> None:
|
||||||
|
# Suppress ImportError due to use of deprecated enum in core.py
|
||||||
|
# Can be removed in HA Core 2025.1
|
||||||
|
with suppress(ImportError):
|
||||||
|
_print_deprecation_warning_internal_impl(
|
||||||
|
obj_name,
|
||||||
|
module_name,
|
||||||
|
replacement,
|
||||||
|
description,
|
||||||
|
verb,
|
||||||
|
breaks_in_ha_version,
|
||||||
|
log_when_no_integration_is_found=log_when_no_integration_is_found,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _print_deprecation_warning_internal_impl(
|
||||||
|
obj_name: str,
|
||||||
|
module_name: str,
|
||||||
|
replacement: str,
|
||||||
|
description: str,
|
||||||
|
verb: str,
|
||||||
|
breaks_in_ha_version: str | None,
|
||||||
|
*,
|
||||||
|
log_when_no_integration_is_found: bool,
|
||||||
) -> None:
|
) -> None:
|
||||||
# pylint: disable=import-outside-toplevel
|
# pylint: disable=import-outside-toplevel
|
||||||
from homeassistant.core import async_get_hass_or_none
|
from homeassistant.core import async_get_hass_or_none
|
||||||
|
@ -363,7 +388,7 @@ class EnumWithDeprecatedMembers(EnumType):
|
||||||
_print_deprecation_warning_internal(
|
_print_deprecation_warning_internal(
|
||||||
f"{cls.__name__}.{name}",
|
f"{cls.__name__}.{name}",
|
||||||
cls.__module__,
|
cls.__module__,
|
||||||
f"{cls.__name__}.{deprecated[name][0]}",
|
f"{deprecated[name][0]}",
|
||||||
"enum member",
|
"enum member",
|
||||||
"used",
|
"used",
|
||||||
deprecated[name][1],
|
deprecated[name][1],
|
||||||
|
|
|
@ -267,7 +267,9 @@ def mock_load_json():
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def mock_allowed_path():
|
def mock_allowed_path():
|
||||||
"""Allow using NamedTemporaryFile for mock image."""
|
"""Allow using NamedTemporaryFile for mock image."""
|
||||||
with patch("homeassistant.core.Config.is_allowed_path", return_value=True) as mock:
|
with patch(
|
||||||
|
"homeassistant.core_config.Config.is_allowed_path", return_value=True
|
||||||
|
) as mock:
|
||||||
yield mock
|
yield mock
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -545,8 +545,8 @@ def test_enum_with_deprecated_members(
|
||||||
StrEnum,
|
StrEnum,
|
||||||
metaclass=EnumWithDeprecatedMembers,
|
metaclass=EnumWithDeprecatedMembers,
|
||||||
deprecated={
|
deprecated={
|
||||||
"CATS": ("CATS_PER_CM", "2025.11.0"),
|
"CATS": ("TestEnum.CATS_PER_CM", "2025.11.0"),
|
||||||
"DOGS": ("DOGS_PER_CM", None),
|
"DOGS": ("TestEnum.DOGS_PER_CM", None),
|
||||||
},
|
},
|
||||||
):
|
):
|
||||||
"""Zoo units."""
|
"""Zoo units."""
|
||||||
|
@ -618,8 +618,8 @@ def test_enum_with_deprecated_members_integration_not_found(
|
||||||
StrEnum,
|
StrEnum,
|
||||||
metaclass=EnumWithDeprecatedMembers,
|
metaclass=EnumWithDeprecatedMembers,
|
||||||
deprecated={
|
deprecated={
|
||||||
"CATS": ("CATS_PER_CM", "2025.11.0"),
|
"CATS": ("TestEnum.CATS_PER_CM", "2025.11.0"),
|
||||||
"DOGS": ("DOGS_PER_CM", None),
|
"DOGS": ("TestEnum.DOGS_PER_CM", None),
|
||||||
},
|
},
|
||||||
):
|
):
|
||||||
"""Zoo units."""
|
"""Zoo units."""
|
||||||
|
|
|
@ -9,13 +9,11 @@ import functools
|
||||||
import gc
|
import gc
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
from pathlib import Path
|
|
||||||
import re
|
import re
|
||||||
from tempfile import TemporaryDirectory
|
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from unittest.mock import MagicMock, Mock, PropertyMock, patch
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
from freezegun import freeze_time
|
from freezegun import freeze_time
|
||||||
import pytest
|
import pytest
|
||||||
|
@ -24,7 +22,6 @@ import voluptuous as vol
|
||||||
|
|
||||||
from homeassistant.const import (
|
from homeassistant.const import (
|
||||||
ATTR_FRIENDLY_NAME,
|
ATTR_FRIENDLY_NAME,
|
||||||
CONF_UNIT_SYSTEM,
|
|
||||||
EVENT_CALL_SERVICE,
|
EVENT_CALL_SERVICE,
|
||||||
EVENT_CORE_CONFIG_UPDATE,
|
EVENT_CORE_CONFIG_UPDATE,
|
||||||
EVENT_HOMEASSISTANT_CLOSE,
|
EVENT_HOMEASSISTANT_CLOSE,
|
||||||
|
@ -37,7 +34,6 @@ from homeassistant.const import (
|
||||||
EVENT_STATE_CHANGED,
|
EVENT_STATE_CHANGED,
|
||||||
EVENT_STATE_REPORTED,
|
EVENT_STATE_REPORTED,
|
||||||
MATCH_ALL,
|
MATCH_ALL,
|
||||||
__version__,
|
|
||||||
)
|
)
|
||||||
import homeassistant.core as ha
|
import homeassistant.core as ha
|
||||||
from homeassistant.core import (
|
from homeassistant.core import (
|
||||||
|
@ -65,7 +61,6 @@ from homeassistant.setup import async_setup_component
|
||||||
from homeassistant.util.async_ import create_eager_task
|
from homeassistant.util.async_ import create_eager_task
|
||||||
import homeassistant.util.dt as dt_util
|
import homeassistant.util.dt as dt_util
|
||||||
from homeassistant.util.read_only_dict import ReadOnlyDict
|
from homeassistant.util.read_only_dict import ReadOnlyDict
|
||||||
from homeassistant.util.unit_system import METRIC_SYSTEM
|
|
||||||
|
|
||||||
from .common import (
|
from .common import (
|
||||||
async_capture_events,
|
async_capture_events,
|
||||||
|
@ -1918,173 +1913,6 @@ async def test_serviceregistry_return_response_optional(
|
||||||
assert response_data == expected_response_data
|
assert response_data == expected_response_data
|
||||||
|
|
||||||
|
|
||||||
async def test_config_defaults() -> None:
|
|
||||||
"""Test config defaults."""
|
|
||||||
hass = Mock()
|
|
||||||
hass.data = {}
|
|
||||||
config = ha.Config(hass, "/test/ha-config")
|
|
||||||
assert config.hass is hass
|
|
||||||
assert config.latitude == 0
|
|
||||||
assert config.longitude == 0
|
|
||||||
assert config.elevation == 0
|
|
||||||
assert config.location_name == "Home"
|
|
||||||
assert config.time_zone == "UTC"
|
|
||||||
assert config.internal_url is None
|
|
||||||
assert config.external_url is None
|
|
||||||
assert config.config_source is ha.ConfigSource.DEFAULT
|
|
||||||
assert config.skip_pip is False
|
|
||||||
assert config.skip_pip_packages == []
|
|
||||||
assert config.components == set()
|
|
||||||
assert config.api is None
|
|
||||||
assert config.config_dir == "/test/ha-config"
|
|
||||||
assert config.allowlist_external_dirs == set()
|
|
||||||
assert config.allowlist_external_urls == set()
|
|
||||||
assert config.media_dirs == {}
|
|
||||||
assert config.recovery_mode is False
|
|
||||||
assert config.legacy_templates is False
|
|
||||||
assert config.currency == "EUR"
|
|
||||||
assert config.country is None
|
|
||||||
assert config.language == "en"
|
|
||||||
assert config.radius == 100
|
|
||||||
|
|
||||||
|
|
||||||
async def test_config_path_with_file() -> None:
|
|
||||||
"""Test get_config_path method."""
|
|
||||||
hass = Mock()
|
|
||||||
hass.data = {}
|
|
||||||
config = ha.Config(hass, "/test/ha-config")
|
|
||||||
assert config.path("test.conf") == "/test/ha-config/test.conf"
|
|
||||||
|
|
||||||
|
|
||||||
async def test_config_path_with_dir_and_file() -> None:
|
|
||||||
"""Test get_config_path method."""
|
|
||||||
hass = Mock()
|
|
||||||
hass.data = {}
|
|
||||||
config = ha.Config(hass, "/test/ha-config")
|
|
||||||
assert config.path("dir", "test.conf") == "/test/ha-config/dir/test.conf"
|
|
||||||
|
|
||||||
|
|
||||||
async def test_config_as_dict() -> None:
|
|
||||||
"""Test as dict."""
|
|
||||||
hass = Mock()
|
|
||||||
hass.data = {}
|
|
||||||
config = ha.Config(hass, "/test/ha-config")
|
|
||||||
type(config.hass.state).value = PropertyMock(return_value="RUNNING")
|
|
||||||
expected = {
|
|
||||||
"latitude": 0,
|
|
||||||
"longitude": 0,
|
|
||||||
"elevation": 0,
|
|
||||||
CONF_UNIT_SYSTEM: METRIC_SYSTEM.as_dict(),
|
|
||||||
"location_name": "Home",
|
|
||||||
"time_zone": "UTC",
|
|
||||||
"components": [],
|
|
||||||
"config_dir": "/test/ha-config",
|
|
||||||
"whitelist_external_dirs": [],
|
|
||||||
"allowlist_external_dirs": [],
|
|
||||||
"allowlist_external_urls": [],
|
|
||||||
"version": __version__,
|
|
||||||
"config_source": ha.ConfigSource.DEFAULT,
|
|
||||||
"recovery_mode": False,
|
|
||||||
"state": "RUNNING",
|
|
||||||
"external_url": None,
|
|
||||||
"internal_url": None,
|
|
||||||
"currency": "EUR",
|
|
||||||
"country": None,
|
|
||||||
"language": "en",
|
|
||||||
"safe_mode": False,
|
|
||||||
"debug": False,
|
|
||||||
"radius": 100,
|
|
||||||
}
|
|
||||||
|
|
||||||
assert expected == config.as_dict()
|
|
||||||
|
|
||||||
|
|
||||||
async def test_config_is_allowed_path() -> None:
|
|
||||||
"""Test is_allowed_path method."""
|
|
||||||
hass = Mock()
|
|
||||||
hass.data = {}
|
|
||||||
config = ha.Config(hass, "/test/ha-config")
|
|
||||||
with TemporaryDirectory() as tmp_dir:
|
|
||||||
# The created dir is in /tmp. This is a symlink on OS X
|
|
||||||
# causing this test to fail unless we resolve path first.
|
|
||||||
config.allowlist_external_dirs = {os.path.realpath(tmp_dir)}
|
|
||||||
|
|
||||||
test_file = os.path.join(tmp_dir, "test.jpg")
|
|
||||||
await asyncio.get_running_loop().run_in_executor(
|
|
||||||
None, Path(test_file).write_text, "test"
|
|
||||||
)
|
|
||||||
|
|
||||||
valid = [test_file, tmp_dir, os.path.join(tmp_dir, "notfound321")]
|
|
||||||
for path in valid:
|
|
||||||
assert config.is_allowed_path(path)
|
|
||||||
|
|
||||||
config.allowlist_external_dirs = {"/home", "/var"}
|
|
||||||
|
|
||||||
invalid = [
|
|
||||||
"/hass/config/secure",
|
|
||||||
"/etc/passwd",
|
|
||||||
"/root/secure_file",
|
|
||||||
"/var/../etc/passwd",
|
|
||||||
test_file,
|
|
||||||
]
|
|
||||||
for path in invalid:
|
|
||||||
assert not config.is_allowed_path(path)
|
|
||||||
|
|
||||||
with pytest.raises(AssertionError):
|
|
||||||
config.is_allowed_path(None)
|
|
||||||
|
|
||||||
|
|
||||||
async def test_config_is_allowed_external_url() -> None:
|
|
||||||
"""Test is_allowed_external_url method."""
|
|
||||||
hass = Mock()
|
|
||||||
hass.data = {}
|
|
||||||
config = ha.Config(hass, "/test/ha-config")
|
|
||||||
config.allowlist_external_urls = [
|
|
||||||
"http://x.com/",
|
|
||||||
"https://y.com/bla/",
|
|
||||||
"https://z.com/images/1.jpg/",
|
|
||||||
]
|
|
||||||
|
|
||||||
valid = [
|
|
||||||
"http://x.com/1.jpg",
|
|
||||||
"http://x.com",
|
|
||||||
"https://y.com/bla/",
|
|
||||||
"https://y.com/bla/2.png",
|
|
||||||
"https://z.com/images/1.jpg",
|
|
||||||
]
|
|
||||||
for url in valid:
|
|
||||||
assert config.is_allowed_external_url(url)
|
|
||||||
|
|
||||||
invalid = [
|
|
||||||
"https://a.co",
|
|
||||||
"https://y.com/bla_wrong",
|
|
||||||
"https://y.com/bla/../image.jpg",
|
|
||||||
"https://z.com/images",
|
|
||||||
]
|
|
||||||
for url in invalid:
|
|
||||||
assert not config.is_allowed_external_url(url)
|
|
||||||
|
|
||||||
|
|
||||||
async def test_event_on_update(hass: HomeAssistant) -> None:
|
|
||||||
"""Test that event is fired on update."""
|
|
||||||
events = async_capture_events(hass, EVENT_CORE_CONFIG_UPDATE)
|
|
||||||
|
|
||||||
assert hass.config.latitude != 12
|
|
||||||
|
|
||||||
await hass.config.async_update(latitude=12)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
|
|
||||||
assert hass.config.latitude == 12
|
|
||||||
assert len(events) == 1
|
|
||||||
assert events[0].data == {"latitude": 12}
|
|
||||||
|
|
||||||
|
|
||||||
async def test_bad_timezone_raises_value_error(hass: HomeAssistant) -> None:
|
|
||||||
"""Test bad timezone raises ValueError."""
|
|
||||||
with pytest.raises(ValueError):
|
|
||||||
await hass.config.async_update(time_zone="not_a_timezone")
|
|
||||||
|
|
||||||
|
|
||||||
async def test_start_taking_too_long(caplog: pytest.LogCaptureFixture) -> None:
|
async def test_start_taking_too_long(caplog: pytest.LogCaptureFixture) -> None:
|
||||||
"""Test when async_start takes too long."""
|
"""Test when async_start takes too long."""
|
||||||
hass = ha.HomeAssistant("/test/ha-config")
|
hass = ha.HomeAssistant("/test/ha-config")
|
||||||
|
@ -2299,53 +2127,6 @@ def test_valid_domain() -> None:
|
||||||
assert ha.valid_domain(valid), valid
|
assert ha.valid_domain(valid), valid
|
||||||
|
|
||||||
|
|
||||||
async def test_additional_data_in_core_config(
|
|
||||||
hass: HomeAssistant, hass_storage: dict[str, Any]
|
|
||||||
) -> None:
|
|
||||||
"""Test that we can handle additional data in core configuration."""
|
|
||||||
config = ha.Config(hass, "/test/ha-config")
|
|
||||||
config.async_initialize()
|
|
||||||
hass_storage[ha.CORE_STORAGE_KEY] = {
|
|
||||||
"version": 1,
|
|
||||||
"data": {"location_name": "Test Name", "additional_valid_key": "value"},
|
|
||||||
}
|
|
||||||
await config.async_load()
|
|
||||||
assert config.location_name == "Test Name"
|
|
||||||
|
|
||||||
|
|
||||||
async def test_incorrect_internal_external_url(
|
|
||||||
hass: HomeAssistant, hass_storage: dict[str, Any], caplog: pytest.LogCaptureFixture
|
|
||||||
) -> None:
|
|
||||||
"""Test that we warn when detecting invalid internal/external url."""
|
|
||||||
config = ha.Config(hass, "/test/ha-config")
|
|
||||||
config.async_initialize()
|
|
||||||
|
|
||||||
hass_storage[ha.CORE_STORAGE_KEY] = {
|
|
||||||
"version": 1,
|
|
||||||
"data": {
|
|
||||||
"internal_url": None,
|
|
||||||
"external_url": None,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
await config.async_load()
|
|
||||||
assert "Invalid external_url set" not in caplog.text
|
|
||||||
assert "Invalid internal_url set" not in caplog.text
|
|
||||||
|
|
||||||
config = ha.Config(hass, "/test/ha-config")
|
|
||||||
config.async_initialize()
|
|
||||||
|
|
||||||
hass_storage[ha.CORE_STORAGE_KEY] = {
|
|
||||||
"version": 1,
|
|
||||||
"data": {
|
|
||||||
"internal_url": "https://community.home-assistant.io/profile",
|
|
||||||
"external_url": "https://www.home-assistant.io/blue",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
await config.async_load()
|
|
||||||
assert "Invalid external_url set" in caplog.text
|
|
||||||
assert "Invalid internal_url set" in caplog.text
|
|
||||||
|
|
||||||
|
|
||||||
async def test_start_events(hass: HomeAssistant) -> None:
|
async def test_start_events(hass: HomeAssistant) -> None:
|
||||||
"""Test events fired when starting Home Assistant."""
|
"""Test events fired when starting Home Assistant."""
|
||||||
hass.state = ha.CoreState.not_running
|
hass.state = ha.CoreState.not_running
|
||||||
|
@ -3462,28 +3243,6 @@ async def test_async_listen_with_run_immediately_deprecated(
|
||||||
) in caplog.text
|
) in caplog.text
|
||||||
|
|
||||||
|
|
||||||
async def test_top_level_components(hass: HomeAssistant) -> None:
|
|
||||||
"""Test top level components are updated when components change."""
|
|
||||||
hass.config.components.add("homeassistant")
|
|
||||||
assert hass.config.components == {"homeassistant"}
|
|
||||||
assert hass.config.top_level_components == {"homeassistant"}
|
|
||||||
hass.config.components.add("homeassistant.scene")
|
|
||||||
assert hass.config.components == {"homeassistant", "homeassistant.scene"}
|
|
||||||
assert hass.config.top_level_components == {"homeassistant"}
|
|
||||||
hass.config.components.remove("homeassistant")
|
|
||||||
assert hass.config.components == {"homeassistant.scene"}
|
|
||||||
assert hass.config.top_level_components == set()
|
|
||||||
with pytest.raises(ValueError):
|
|
||||||
hass.config.components.remove("homeassistant.scene")
|
|
||||||
with pytest.raises(NotImplementedError):
|
|
||||||
hass.config.components.discard("homeassistant")
|
|
||||||
|
|
||||||
|
|
||||||
async def test_debug_mode_defaults_to_off(hass: HomeAssistant) -> None:
|
|
||||||
"""Test debug mode defaults to off."""
|
|
||||||
assert not hass.config.debug
|
|
||||||
|
|
||||||
|
|
||||||
async def test_async_fire_thread_safety(hass: HomeAssistant) -> None:
|
async def test_async_fire_thread_safety(hass: HomeAssistant) -> None:
|
||||||
"""Test async_fire thread safety."""
|
"""Test async_fire thread safety."""
|
||||||
events = async_capture_events(hass, "test_event")
|
events = async_capture_events(hass, "test_event")
|
||||||
|
@ -3550,19 +3309,6 @@ async def test_thread_safety_message(hass: HomeAssistant) -> None:
|
||||||
await hass.async_add_executor_job(hass.verify_event_loop_thread, "test")
|
await hass.async_add_executor_job(hass.verify_event_loop_thread, "test")
|
||||||
|
|
||||||
|
|
||||||
async def test_set_time_zone_deprecated(hass: HomeAssistant) -> None:
|
|
||||||
"""Test set_time_zone is deprecated."""
|
|
||||||
with pytest.raises(
|
|
||||||
RuntimeError,
|
|
||||||
match=re.escape(
|
|
||||||
"Detected code that set the time zone using set_time_zone instead of "
|
|
||||||
"async_set_time_zone which will stop working in Home Assistant 2025.6. "
|
|
||||||
"Please report this issue.",
|
|
||||||
),
|
|
||||||
):
|
|
||||||
await hass.config.set_time_zone("America/New_York")
|
|
||||||
|
|
||||||
|
|
||||||
async def test_async_set_updates_last_reported(hass: HomeAssistant) -> None:
|
async def test_async_set_updates_last_reported(hass: HomeAssistant) -> None:
|
||||||
"""Test async_set method updates last_reported AND last_reported_timestamp."""
|
"""Test async_set method updates last_reported AND last_reported_timestamp."""
|
||||||
hass.states.async_set("light.bowl", "on", {})
|
hass.states.async_set("light.bowl", "on", {})
|
||||||
|
|
|
@ -1,9 +1,14 @@
|
||||||
"""Test core_config."""
|
"""Test core_config."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
import copy
|
import copy
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
import re
|
||||||
|
from tempfile import TemporaryDirectory
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from unittest.mock import patch
|
from unittest.mock import Mock, PropertyMock, patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from voluptuous import Invalid, MultipleInvalid
|
from voluptuous import Invalid, MultipleInvalid
|
||||||
|
@ -18,12 +23,18 @@ from homeassistant.const import (
|
||||||
CONF_LATITUDE,
|
CONF_LATITUDE,
|
||||||
CONF_LONGITUDE,
|
CONF_LONGITUDE,
|
||||||
CONF_NAME,
|
CONF_NAME,
|
||||||
|
CONF_UNIT_SYSTEM,
|
||||||
|
EVENT_CORE_CONFIG_UPDATE,
|
||||||
|
__version__,
|
||||||
)
|
)
|
||||||
from homeassistant.core import ConfigSource, HomeAssistant, State
|
from homeassistant.core import HomeAssistant, State
|
||||||
from homeassistant.core_config import (
|
from homeassistant.core_config import (
|
||||||
_CUSTOMIZE_DICT_SCHEMA,
|
_CUSTOMIZE_DICT_SCHEMA,
|
||||||
CORE_CONFIG_SCHEMA,
|
CORE_CONFIG_SCHEMA,
|
||||||
|
CORE_STORAGE_KEY,
|
||||||
DATA_CUSTOMIZE,
|
DATA_CUSTOMIZE,
|
||||||
|
Config,
|
||||||
|
ConfigSource,
|
||||||
_validate_stun_or_turn_url,
|
_validate_stun_or_turn_url,
|
||||||
async_process_ha_core_config,
|
async_process_ha_core_config,
|
||||||
)
|
)
|
||||||
|
@ -35,7 +46,7 @@ from homeassistant.util.unit_system import (
|
||||||
UnitSystem,
|
UnitSystem,
|
||||||
)
|
)
|
||||||
|
|
||||||
from .common import MockUser
|
from .common import MockUser, async_capture_events
|
||||||
|
|
||||||
|
|
||||||
def test_core_config_schema() -> None:
|
def test_core_config_schema() -> None:
|
||||||
|
@ -821,3 +832,252 @@ async def test_configuration_legacy_template_is_removed(hass: HomeAssistant) ->
|
||||||
)
|
)
|
||||||
|
|
||||||
assert not getattr(hass.config, "legacy_templates")
|
assert not getattr(hass.config, "legacy_templates")
|
||||||
|
|
||||||
|
|
||||||
|
async def test_config_defaults() -> None:
|
||||||
|
"""Test config defaults."""
|
||||||
|
hass = Mock()
|
||||||
|
hass.data = {}
|
||||||
|
config = Config(hass, "/test/ha-config")
|
||||||
|
assert config.hass is hass
|
||||||
|
assert config.latitude == 0
|
||||||
|
assert config.longitude == 0
|
||||||
|
assert config.elevation == 0
|
||||||
|
assert config.location_name == "Home"
|
||||||
|
assert config.time_zone == "UTC"
|
||||||
|
assert config.internal_url is None
|
||||||
|
assert config.external_url is None
|
||||||
|
assert config.config_source is ConfigSource.DEFAULT
|
||||||
|
assert config.skip_pip is False
|
||||||
|
assert config.skip_pip_packages == []
|
||||||
|
assert config.components == set()
|
||||||
|
assert config.api is None
|
||||||
|
assert config.config_dir == "/test/ha-config"
|
||||||
|
assert config.allowlist_external_dirs == set()
|
||||||
|
assert config.allowlist_external_urls == set()
|
||||||
|
assert config.media_dirs == {}
|
||||||
|
assert config.recovery_mode is False
|
||||||
|
assert config.legacy_templates is False
|
||||||
|
assert config.currency == "EUR"
|
||||||
|
assert config.country is None
|
||||||
|
assert config.language == "en"
|
||||||
|
assert config.radius == 100
|
||||||
|
|
||||||
|
|
||||||
|
async def test_config_path_with_file() -> None:
|
||||||
|
"""Test get_config_path method."""
|
||||||
|
hass = Mock()
|
||||||
|
hass.data = {}
|
||||||
|
config = Config(hass, "/test/ha-config")
|
||||||
|
assert config.path("test.conf") == "/test/ha-config/test.conf"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_config_path_with_dir_and_file() -> None:
|
||||||
|
"""Test get_config_path method."""
|
||||||
|
hass = Mock()
|
||||||
|
hass.data = {}
|
||||||
|
config = Config(hass, "/test/ha-config")
|
||||||
|
assert config.path("dir", "test.conf") == "/test/ha-config/dir/test.conf"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_config_as_dict() -> None:
|
||||||
|
"""Test as dict."""
|
||||||
|
hass = Mock()
|
||||||
|
hass.data = {}
|
||||||
|
config = Config(hass, "/test/ha-config")
|
||||||
|
type(config.hass.state).value = PropertyMock(return_value="RUNNING")
|
||||||
|
expected = {
|
||||||
|
"latitude": 0,
|
||||||
|
"longitude": 0,
|
||||||
|
"elevation": 0,
|
||||||
|
CONF_UNIT_SYSTEM: METRIC_SYSTEM.as_dict(),
|
||||||
|
"location_name": "Home",
|
||||||
|
"time_zone": "UTC",
|
||||||
|
"components": [],
|
||||||
|
"config_dir": "/test/ha-config",
|
||||||
|
"whitelist_external_dirs": [],
|
||||||
|
"allowlist_external_dirs": [],
|
||||||
|
"allowlist_external_urls": [],
|
||||||
|
"version": __version__,
|
||||||
|
"config_source": ConfigSource.DEFAULT,
|
||||||
|
"recovery_mode": False,
|
||||||
|
"state": "RUNNING",
|
||||||
|
"external_url": None,
|
||||||
|
"internal_url": None,
|
||||||
|
"currency": "EUR",
|
||||||
|
"country": None,
|
||||||
|
"language": "en",
|
||||||
|
"safe_mode": False,
|
||||||
|
"debug": False,
|
||||||
|
"radius": 100,
|
||||||
|
}
|
||||||
|
|
||||||
|
assert expected == config.as_dict()
|
||||||
|
|
||||||
|
|
||||||
|
async def test_config_is_allowed_path() -> None:
|
||||||
|
"""Test is_allowed_path method."""
|
||||||
|
hass = Mock()
|
||||||
|
hass.data = {}
|
||||||
|
config = Config(hass, "/test/ha-config")
|
||||||
|
with TemporaryDirectory() as tmp_dir:
|
||||||
|
# The created dir is in /tmp. This is a symlink on OS X
|
||||||
|
# causing this test to fail unless we resolve path first.
|
||||||
|
config.allowlist_external_dirs = {os.path.realpath(tmp_dir)}
|
||||||
|
|
||||||
|
test_file = os.path.join(tmp_dir, "test.jpg")
|
||||||
|
await asyncio.get_running_loop().run_in_executor(
|
||||||
|
None, Path(test_file).write_text, "test"
|
||||||
|
)
|
||||||
|
|
||||||
|
valid = [test_file, tmp_dir, os.path.join(tmp_dir, "notfound321")]
|
||||||
|
for path in valid:
|
||||||
|
assert config.is_allowed_path(path)
|
||||||
|
|
||||||
|
config.allowlist_external_dirs = {"/home", "/var"}
|
||||||
|
|
||||||
|
invalid = [
|
||||||
|
"/hass/config/secure",
|
||||||
|
"/etc/passwd",
|
||||||
|
"/root/secure_file",
|
||||||
|
"/var/../etc/passwd",
|
||||||
|
test_file,
|
||||||
|
]
|
||||||
|
for path in invalid:
|
||||||
|
assert not config.is_allowed_path(path)
|
||||||
|
|
||||||
|
with pytest.raises(AssertionError):
|
||||||
|
config.is_allowed_path(None)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_config_is_allowed_external_url() -> None:
|
||||||
|
"""Test is_allowed_external_url method."""
|
||||||
|
hass = Mock()
|
||||||
|
hass.data = {}
|
||||||
|
config = Config(hass, "/test/ha-config")
|
||||||
|
config.allowlist_external_urls = [
|
||||||
|
"http://x.com/",
|
||||||
|
"https://y.com/bla/",
|
||||||
|
"https://z.com/images/1.jpg/",
|
||||||
|
]
|
||||||
|
|
||||||
|
valid = [
|
||||||
|
"http://x.com/1.jpg",
|
||||||
|
"http://x.com",
|
||||||
|
"https://y.com/bla/",
|
||||||
|
"https://y.com/bla/2.png",
|
||||||
|
"https://z.com/images/1.jpg",
|
||||||
|
]
|
||||||
|
for url in valid:
|
||||||
|
assert config.is_allowed_external_url(url)
|
||||||
|
|
||||||
|
invalid = [
|
||||||
|
"https://a.co",
|
||||||
|
"https://y.com/bla_wrong",
|
||||||
|
"https://y.com/bla/../image.jpg",
|
||||||
|
"https://z.com/images",
|
||||||
|
]
|
||||||
|
for url in invalid:
|
||||||
|
assert not config.is_allowed_external_url(url)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_event_on_update(hass: HomeAssistant) -> None:
|
||||||
|
"""Test that event is fired on update."""
|
||||||
|
events = async_capture_events(hass, EVENT_CORE_CONFIG_UPDATE)
|
||||||
|
|
||||||
|
assert hass.config.latitude != 12
|
||||||
|
|
||||||
|
await hass.config.async_update(latitude=12)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
assert hass.config.latitude == 12
|
||||||
|
assert len(events) == 1
|
||||||
|
assert events[0].data == {"latitude": 12}
|
||||||
|
|
||||||
|
|
||||||
|
async def test_bad_timezone_raises_value_error(hass: HomeAssistant) -> None:
|
||||||
|
"""Test bad timezone raises ValueError."""
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
await hass.config.async_update(time_zone="not_a_timezone")
|
||||||
|
|
||||||
|
|
||||||
|
async def test_additional_data_in_core_config(
|
||||||
|
hass: HomeAssistant, hass_storage: dict[str, Any]
|
||||||
|
) -> None:
|
||||||
|
"""Test that we can handle additional data in core configuration."""
|
||||||
|
config = Config(hass, "/test/ha-config")
|
||||||
|
config.async_initialize()
|
||||||
|
hass_storage[CORE_STORAGE_KEY] = {
|
||||||
|
"version": 1,
|
||||||
|
"data": {"location_name": "Test Name", "additional_valid_key": "value"},
|
||||||
|
}
|
||||||
|
await config.async_load()
|
||||||
|
assert config.location_name == "Test Name"
|
||||||
|
|
||||||
|
|
||||||
|
async def test_incorrect_internal_external_url(
|
||||||
|
hass: HomeAssistant, hass_storage: dict[str, Any], caplog: pytest.LogCaptureFixture
|
||||||
|
) -> None:
|
||||||
|
"""Test that we warn when detecting invalid internal/external url."""
|
||||||
|
config = Config(hass, "/test/ha-config")
|
||||||
|
config.async_initialize()
|
||||||
|
|
||||||
|
hass_storage[CORE_STORAGE_KEY] = {
|
||||||
|
"version": 1,
|
||||||
|
"data": {
|
||||||
|
"internal_url": None,
|
||||||
|
"external_url": None,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
await config.async_load()
|
||||||
|
assert "Invalid external_url set" not in caplog.text
|
||||||
|
assert "Invalid internal_url set" not in caplog.text
|
||||||
|
|
||||||
|
config = Config(hass, "/test/ha-config")
|
||||||
|
config.async_initialize()
|
||||||
|
|
||||||
|
hass_storage[CORE_STORAGE_KEY] = {
|
||||||
|
"version": 1,
|
||||||
|
"data": {
|
||||||
|
"internal_url": "https://community.home-assistant.io/profile",
|
||||||
|
"external_url": "https://www.home-assistant.io/blue",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
await config.async_load()
|
||||||
|
assert "Invalid external_url set" in caplog.text
|
||||||
|
assert "Invalid internal_url set" in caplog.text
|
||||||
|
|
||||||
|
|
||||||
|
async def test_top_level_components(hass: HomeAssistant) -> None:
|
||||||
|
"""Test top level components are updated when components change."""
|
||||||
|
hass.config.components.add("homeassistant")
|
||||||
|
assert hass.config.components == {"homeassistant"}
|
||||||
|
assert hass.config.top_level_components == {"homeassistant"}
|
||||||
|
hass.config.components.add("homeassistant.scene")
|
||||||
|
assert hass.config.components == {"homeassistant", "homeassistant.scene"}
|
||||||
|
assert hass.config.top_level_components == {"homeassistant"}
|
||||||
|
hass.config.components.remove("homeassistant")
|
||||||
|
assert hass.config.components == {"homeassistant.scene"}
|
||||||
|
assert hass.config.top_level_components == set()
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
hass.config.components.remove("homeassistant.scene")
|
||||||
|
with pytest.raises(NotImplementedError):
|
||||||
|
hass.config.components.discard("homeassistant")
|
||||||
|
|
||||||
|
|
||||||
|
async def test_debug_mode_defaults_to_off(hass: HomeAssistant) -> None:
|
||||||
|
"""Test debug mode defaults to off."""
|
||||||
|
assert not hass.config.debug
|
||||||
|
|
||||||
|
|
||||||
|
async def test_set_time_zone_deprecated(hass: HomeAssistant) -> None:
|
||||||
|
"""Test set_time_zone is deprecated."""
|
||||||
|
with pytest.raises(
|
||||||
|
RuntimeError,
|
||||||
|
match=re.escape(
|
||||||
|
"Detected code that set the time zone using set_time_zone instead of "
|
||||||
|
"async_set_time_zone which will stop working in Home Assistant 2025.6. "
|
||||||
|
"Please report this issue.",
|
||||||
|
),
|
||||||
|
):
|
||||||
|
await hass.config.set_time_zone("America/New_York")
|
||||||
|
|
Loading…
Add table
Reference in a new issue