Compare commits

...
Sign in to create a new pull request.

9 commits

Author SHA1 Message Date
Erik
400f792bff Make local backup a backup agent 2024-11-14 15:50:26 +01:00
Erik Montnemery
e8179f7a73
Tweak backup agent interface (#130613)
* Tweak backup agent interface

* Adjust kitchen_sink
2024-11-14 15:49:17 +01:00
Martin Hjelmare
e9247fb94b
Export relevant names from backup integration (#130596) 2024-11-14 13:14:36 +01:00
Joakim Sørensen
5a69488630
Allow setting password for backups (#110630)
* Allow setting password for backups

* use is_hassio from helpers

* move it

* Fix getting psw

* Fix restoring with psw

* Address review comments

* Improve docstring

* Adjust kitchen sink

* Adjust

---------

Co-authored-by: Erik <erik@montnemery.com>
2024-11-14 12:53:28 +01:00
Erik Montnemery
d1185f8754
Align parameter names in backup/agents/* WS commands (#130590) 2024-11-14 12:44:59 +01:00
Erik Montnemery
0599983a37
Add additional options to WS command backup/generate (#130530)
* Add additional options to WS command backup/generate

* Improve test

* Improve test
2024-11-14 09:31:08 +01:00
Martin Hjelmare
f99b319048
Rename backup sync agent to backup agent (#130575)
* Rename sync agent module to agent

* Rename BackupSyncAgent to BackupAgent

* Fix test typo

* Rename async_get_backup_sync_agents to async_get_backup_agents

* Rename and clean up remaining sync things

* Update kitchen sink

* Apply suggestions from code review

* Update test_manager.py

---------

Co-authored-by: Erik Montnemery <erik@montnemery.com>
2024-11-14 02:00:49 +01:00
Martin Hjelmare
957ece747d
Make BackupSyncMetadata model a dataclass (#130555)
Make backup BackupSyncMetadata model a dataclass
2024-11-13 21:11:25 +01:00
Joakim Sørensen
325738829d
MVP implementation of Backup sync agents (#126122)
* init sync agent

* add syncing

* root import

* rename list to info and add sync state

* Add base backup class

* Revert unneded change

* adjust tests

* move to kitchen_sink

* split

* move

* Adjustments

* Adjustment

* update

* Tests

* Test unknown agent

* adjust

* Adjust for different test environments

* Change /info WS to contain a dictinary

* reorder

* Add websocket command to trigger sync from the supervisor

* cleanup

* Make mypy happier

---------

Co-authored-by: Erik <erik@montnemery.com>
2024-11-13 20:22:54 +01:00
19 changed files with 1840 additions and 273 deletions

View file

@ -1,6 +1,7 @@
"""Home Assistant module to handle restoring backups."""
from dataclasses import dataclass
import hashlib
import json
import logging
from pathlib import Path
@ -24,6 +25,18 @@ class RestoreBackupFileContent:
"""Definition for restore backup file content."""
backup_file_path: Path
password: str | None = None
def password_to_key(password: str) -> bytes:
"""Generate a AES Key from password.
Matches the implementation in supervisor.backups.utils.password_to_key.
"""
key: bytes = password.encode()
for _ in range(100):
key = hashlib.sha256(key).digest()
return key[:16]
def restore_backup_file_content(config_dir: Path) -> RestoreBackupFileContent | None:
@ -32,7 +45,8 @@ def restore_backup_file_content(config_dir: Path) -> RestoreBackupFileContent |
try:
instruction_content = json.loads(instruction_path.read_text(encoding="utf-8"))
return RestoreBackupFileContent(
backup_file_path=Path(instruction_content["path"])
backup_file_path=Path(instruction_content["path"]),
password=instruction_content.get("password"),
)
except (FileNotFoundError, json.JSONDecodeError):
return None
@ -54,7 +68,11 @@ def _clear_configuration_directory(config_dir: Path) -> None:
shutil.rmtree(entrypath)
def _extract_backup(config_dir: Path, backup_file_path: Path) -> None:
def _extract_backup(
config_dir: Path,
backup_file_path: Path,
password: str | None = None,
) -> None:
"""Extract the backup file to the config directory."""
with (
TemporaryDirectory() as tempdir,
@ -88,22 +106,28 @@ def _extract_backup(config_dir: Path, backup_file_path: Path) -> None:
f"homeassistant.tar{'.gz' if backup_meta["compressed"] else ''}",
),
gzip=backup_meta["compressed"],
key=password_to_key(password) if password is not None else None,
mode="r",
) as istf:
for member in istf.getmembers():
if member.name == "data":
continue
member.name = member.name.replace("data/", "")
_clear_configuration_directory(config_dir)
istf.extractall(
path=config_dir,
members=[
member
for member in securetar.secure_path(istf)
if member.name != "data"
],
path=Path(
tempdir,
"homeassistant",
),
members=securetar.secure_path(istf),
filter="fully_trusted",
)
_clear_configuration_directory(config_dir)
shutil.copytree(
Path(
tempdir,
"homeassistant",
"data",
),
config_dir,
dirs_exist_ok=True,
ignore=shutil.ignore_patterns(*(KEEP_PATHS)),
)
def restore_backup(config_dir_path: str) -> bool:
@ -119,7 +143,11 @@ def restore_backup(config_dir_path: str) -> bool:
backup_file_path = restore_content.backup_file_path
_LOGGER.info("Restoring %s", backup_file_path)
try:
_extract_backup(config_dir, backup_file_path)
_extract_backup(
config_dir=config_dir,
backup_file_path=backup_file_path,
password=restore_content.password,
)
except FileNotFoundError as err:
raise ValueError(f"Backup file {backup_file_path} does not exist") from err
_LOGGER.info("Restore complete, restarting")

View file

@ -1,22 +1,38 @@
"""The Backup integration."""
import voluptuous as vol
from homeassistant.const import CONF_PASSWORD
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.hassio import is_hassio
from homeassistant.helpers.typing import ConfigType
from .const import DATA_MANAGER, DOMAIN, LOGGER
from .agent import BackupAgent, BackupAgentPlatformProtocol, UploadedBackup
from .const import DOMAIN, LOGGER
from .http import async_register_http_views
from .manager import BackupManager
from .manager import BackupManager, BackupPlatformProtocol
from .models import BackupUploadMetadata, BaseBackup
from .websocket import async_register_websocket_handlers
__all__ = [
"BackupAgent",
"BackupAgentPlatformProtocol",
"BackupPlatformProtocol",
"BackupUploadMetadata",
"BaseBackup",
"UploadedBackup",
]
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
SERVICE_CREATE_SCHEMA = vol.Schema({vol.Optional(CONF_PASSWORD): str})
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Backup integration."""
backup_manager = BackupManager(hass)
hass.data[DATA_MANAGER] = backup_manager
hass.data[DOMAIN] = backup_manager = BackupManager(hass)
await backup_manager.async_setup()
with_hassio = is_hassio(hass)
@ -32,11 +48,26 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
async def async_handle_create_service(call: ServiceCall) -> None:
"""Service handler for creating backups."""
await backup_manager.async_create_backup(on_progress=None)
await backup_manager.async_create_backup(
addons_included=None,
# pylint: disable=fixme
# TODO: Don't forget to remove this when the implementation is complete
agent_ids=[], # TODO: Should we default to local?
database_included=True,
folders_included=None,
name=None,
on_progress=None,
password=call.data.get(CONF_PASSWORD),
)
if backup_task := backup_manager.backup_task:
await backup_task
hass.services.async_register(DOMAIN, "create", async_handle_create_service)
hass.services.async_register(
DOMAIN,
"create",
async_handle_create_service,
schema=SERVICE_CREATE_SCHEMA,
)
async_register_http_views(hass)

View file

@ -0,0 +1,68 @@
"""Backup agents for the Backup integration."""
from __future__ import annotations
import abc
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Protocol
from homeassistant.core import HomeAssistant
from .models import BackupUploadMetadata, BaseBackup
@dataclass(slots=True)
class UploadedBackup(BaseBackup):
"""Uploaded backup class."""
id: str
class BackupAgent(abc.ABC):
"""Backup agent interface."""
name: str
@abc.abstractmethod
async def async_download_backup(
self,
*,
id: str,
path: Path,
**kwargs: Any,
) -> None:
"""Download a backup file.
:param id: The ID of the backup that was returned in async_list_backups.
:param path: The full file path to download the backup to.
"""
@abc.abstractmethod
async def async_upload_backup(
self,
*,
path: Path,
metadata: BackupUploadMetadata,
**kwargs: Any,
) -> None:
"""Upload a backup.
:param path: The full file path to the backup that should be uploaded.
:param metadata: Metadata about the backup that should be uploaded.
"""
@abc.abstractmethod
async def async_list_backups(self, **kwargs: Any) -> list[UploadedBackup]:
"""List backups."""
class BackupAgentPlatformProtocol(Protocol):
"""Define the format of backup platforms which implement backup agents."""
async def async_get_backup_agents(
self,
hass: HomeAssistant,
**kwargs: Any,
) -> list[BackupAgent]:
"""Return a list of backup agents."""

View file

@ -0,0 +1,146 @@
"""Local backup support for Core and Container installations."""
from __future__ import annotations
from dataclasses import asdict, dataclass
import json
from pathlib import Path
import tarfile
from tarfile import TarError
from typing import Any, cast
from homeassistant.core import HomeAssistant
from homeassistant.util.json import json_loads_object
from .agent import BackupAgent, UploadedBackup
from .const import BUF_SIZE, LOGGER
from .models import BackupUploadMetadata
async def async_get_backup_agents(
hass: HomeAssistant,
**kwargs: Any,
) -> list[BackupAgent]:
"""Register the backup agent."""
return [LocalBackupAgent(hass)]
@dataclass(slots=True)
class LocalBackup(UploadedBackup):
"""Local backup class."""
path: Path
def as_dict(self) -> dict:
"""Return a dict representation of this backup."""
return {**asdict(self), "path": self.path.as_posix()}
class LocalBackupAgent(BackupAgent):
"""Define the format that backup agents can have."""
name = "local"
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the backup agent."""
super().__init__()
self._hass = hass
self.backup_dir = Path(hass.config.path("backups"))
self.backups: dict[str, LocalBackup] = {}
self.loaded_backups = False
async def load_backups(self) -> None:
"""Load data of stored backup files."""
backups = await self._hass.async_add_executor_job(self._read_backups)
LOGGER.debug("Loaded %s local backups", len(backups))
self.backups = backups
self.loaded_backups = True
def _read_backups(self) -> dict[str, LocalBackup]:
"""Read backups from disk."""
backups: dict[str, LocalBackup] = {}
for backup_path in self.backup_dir.glob("*.tar"):
try:
with tarfile.open(backup_path, "r:", bufsize=BUF_SIZE) as backup_file:
if data_file := backup_file.extractfile("./backup.json"):
data = json_loads_object(data_file.read())
backup = LocalBackup(
id=cast(str, data["slug"]), # Do we need another ID?
slug=cast(str, data["slug"]),
name=cast(str, data["name"]),
date=cast(str, data["date"]),
path=backup_path,
size=round(backup_path.stat().st_size / 1_048_576, 2),
protected=cast(bool, data.get("protected", False)),
)
backups[backup.slug] = backup
except (OSError, TarError, json.JSONDecodeError, KeyError) as err:
LOGGER.warning("Unable to read backup %s: %s", backup_path, err)
return backups
async def async_download_backup(
self,
*,
id: str,
path: Path,
**kwargs: Any,
) -> None:
"""Download a backup file."""
raise NotImplementedError
async def async_upload_backup(
self,
*,
path: Path,
metadata: BackupUploadMetadata,
**kwargs: Any,
) -> None:
"""Upload a backup."""
self.backups[metadata.slug] = LocalBackup(
id=metadata.slug, # Do we need another ID?
slug=metadata.slug,
name=metadata.name,
date=metadata.date,
path=path,
size=round(path.stat().st_size / 1_048_576, 2),
protected=metadata.protected,
)
async def async_list_backups(self, **kwargs: Any) -> list[UploadedBackup]:
"""List backups."""
if not self.loaded_backups:
await self.load_backups()
return list(self.backups.values())
async def async_get_backup(
self, *, slug: str, **kwargs: Any
) -> UploadedBackup | None:
"""Return a backup."""
if not self.loaded_backups:
await self.load_backups()
if not (backup := self.backups.get(slug)):
return None
if not backup.path.exists():
LOGGER.debug(
(
"Removing tracked backup (%s) that does not exists on the expected"
" path %s"
),
backup.slug,
backup.path,
)
self.backups.pop(slug)
return None
return backup
async def async_remove_backup(self, *, slug: str, **kwargs: Any) -> None:
"""Remove a backup."""
if (backup := await self.async_get_backup(slug=slug)) is None:
return
await self._hass.async_add_executor_job(backup.path.unlink, True) # type: ignore[attr-defined]
LOGGER.debug("Removed backup located at %s", backup.path) # type: ignore[attr-defined]
self.backups.pop(slug)

View file

@ -8,10 +8,12 @@ from typing import TYPE_CHECKING
from homeassistant.util.hass_dict import HassKey
if TYPE_CHECKING:
from .manager import BackupManager
from .manager import BaseBackupManager
from .models import BaseBackup
BUF_SIZE = 2**20 * 4 # 4MB
DOMAIN = "backup"
DATA_MANAGER: HassKey[BackupManager] = HassKey(DOMAIN)
DATA_MANAGER: HassKey[BaseBackupManager[BaseBackup]] = HassKey(DOMAIN)
LOGGER = getLogger(__package__)
EXCLUDE_FROM_BACKUP = [
@ -25,3 +27,8 @@ EXCLUDE_FROM_BACKUP = [
"OZW_Log.txt",
"tts/*",
]
EXCLUDE_DATABASE_FROM_BACKUP = [
"home-assistant_v2.db",
"home-assistant_v2.db-wal",
]

View file

@ -15,6 +15,7 @@ from homeassistant.core import HomeAssistant, callback
from homeassistant.util import slugify
from .const import DATA_MANAGER
from .manager import BackupManager
@callback
@ -39,7 +40,7 @@ class DownloadBackupView(HomeAssistantView):
if not request["hass_user"].is_admin:
return Response(status=HTTPStatus.UNAUTHORIZED)
manager = request.app[KEY_HASS].data[DATA_MANAGER]
manager = cast(BackupManager, request.app[KEY_HASS].data[DATA_MANAGER])
backup = await manager.async_get_backup(slug=slug)
if backup is None or not backup.path.exists():

View file

@ -13,26 +13,39 @@ from pathlib import Path
from queue import SimpleQueue
import shutil
import tarfile
from tarfile import TarError
from tempfile import TemporaryDirectory
import time
from typing import Any, Protocol, cast
from typing import Any, Generic, Protocol
import aiohttp
from securetar import SecureTarFile, atomic_contents_add
from typing_extensions import TypeVar
from homeassistant.backup_restore import RESTORE_BACKUP_FILE
from homeassistant.backup_restore import RESTORE_BACKUP_FILE, password_to_key
from homeassistant.const import __version__ as HAVERSION
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import integration_platform
from homeassistant.helpers.json import json_bytes
from homeassistant.util import dt as dt_util
from homeassistant.util.json import json_loads_object
from .const import DOMAIN, EXCLUDE_FROM_BACKUP, LOGGER
from .agent import BackupAgent, BackupAgentPlatformProtocol
from .const import (
BUF_SIZE,
DOMAIN,
EXCLUDE_DATABASE_FROM_BACKUP,
EXCLUDE_FROM_BACKUP,
LOGGER,
)
from .models import BackupUploadMetadata, BaseBackup
BUF_SIZE = 2**20 * 4 # 4MB
# pylint: disable=fixme
# TODO: Don't forget to remove this when the implementation is complete
LOCAL_AGENT_ID = f"{DOMAIN}.local"
_BackupT = TypeVar("_BackupT", bound=BaseBackup, default=BaseBackup)
@dataclass(slots=True)
@ -43,14 +56,11 @@ class NewBackup:
@dataclass(slots=True)
class Backup:
class Backup(BaseBackup):
"""Backup class."""
slug: str
name: str
date: str
agent_ids: list[str]
path: Path
size: float
def as_dict(self) -> dict:
"""Return a dict representation of this backup."""
@ -76,40 +86,61 @@ class BackupPlatformProtocol(Protocol):
"""Perform operations after a backup finishes."""
class BaseBackupManager(abc.ABC):
class BaseBackupManager(abc.ABC, Generic[_BackupT]):
"""Define the format that backup managers can have."""
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the backup manager."""
self.hass = hass
self.backup_task: asyncio.Task | None = None
self.backups: dict[str, Backup] = {}
self.loaded_platforms = False
self.platforms: dict[str, BackupPlatformProtocol] = {}
self.backup_agents: dict[str, BackupAgent] = {}
self.syncing = False
async def async_setup(self) -> None:
"""Set up the backup manager."""
await self.load_platforms()
@callback
def _add_platform(
def _add_platform_pre_post_handler(
self,
hass: HomeAssistant,
integration_domain: str,
platform: BackupPlatformProtocol,
) -> None:
"""Add a platform to the backup manager."""
"""Add a backup platform."""
if not hasattr(platform, "async_pre_backup") or not hasattr(
platform, "async_post_backup"
):
LOGGER.warning(
"%s does not implement required functions for the backup platform",
integration_domain,
)
return
self.platforms[integration_domain] = platform
async def _async_add_platform_agents(
self,
integration_domain: str,
platform: BackupAgentPlatformProtocol,
) -> None:
"""Add a platform to the backup manager."""
if not hasattr(platform, "async_get_backup_agents"):
return
agents = await platform.async_get_backup_agents(self.hass)
self.backup_agents.update(
{f"{integration_domain}.{agent.name}": agent for agent in agents}
)
async def _add_platform(
self,
hass: HomeAssistant,
integration_domain: str,
platform: Any,
) -> None:
"""Add a backup platform manager."""
self._add_platform_pre_post_handler(integration_domain, platform)
await self._async_add_platform_agents(integration_domain, platform)
async def async_pre_backup_actions(self, **kwargs: Any) -> None:
"""Perform pre backup actions."""
if not self.loaded_platforms:
await self.load_platforms()
pre_backup_results = await asyncio.gather(
*(
platform.async_pre_backup(self.hass)
@ -123,9 +154,6 @@ class BaseBackupManager(abc.ABC):
async def async_post_backup_actions(self, **kwargs: Any) -> None:
"""Perform post backup actions."""
if not self.loaded_platforms:
await self.load_platforms()
post_backup_results = await asyncio.gather(
*(
platform.async_post_backup(self.hass)
@ -140,33 +168,52 @@ class BaseBackupManager(abc.ABC):
async def load_platforms(self) -> None:
"""Load backup platforms."""
await integration_platform.async_process_integration_platforms(
self.hass, DOMAIN, self._add_platform, wait_for_platforms=True
self.hass,
DOMAIN,
self._add_platform,
wait_for_platforms=True,
)
LOGGER.debug("Loaded %s platforms", len(self.platforms))
self.loaded_platforms = True
LOGGER.debug("Loaded %s agents", len(self.backup_agents))
@abc.abstractmethod
async def async_restore_backup(self, slug: str, **kwargs: Any) -> None:
async def async_restore_backup(
self,
slug: str,
*,
password: str | None = None,
**kwargs: Any,
) -> None:
"""Restore a backup."""
@abc.abstractmethod
async def async_create_backup(
self,
*,
addons_included: list[str] | None,
agent_ids: list[str],
database_included: bool,
folders_included: list[str] | None,
name: str | None,
on_progress: Callable[[BackupProgress], None] | None,
password: str | None,
**kwargs: Any,
) -> NewBackup:
"""Generate a backup."""
"""Initiate generating a backup.
:param on_progress: A callback that will be called with the progress of the
backup.
"""
@abc.abstractmethod
async def async_get_backups(self, **kwargs: Any) -> dict[str, Backup]:
async def async_get_backups(self, **kwargs: Any) -> dict[str, _BackupT]:
"""Get backups.
Return a dictionary of Backup instances keyed by their slug.
"""
@abc.abstractmethod
async def async_get_backup(self, *, slug: str, **kwargs: Any) -> Backup | None:
async def async_get_backup(self, *, slug: str, **kwargs: Any) -> _BackupT | None:
"""Get a backup."""
@abc.abstractmethod
@ -182,80 +229,98 @@ class BaseBackupManager(abc.ABC):
) -> None:
"""Receive and store a backup file from upload."""
@abc.abstractmethod
async def async_upload_backup(self, *, slug: str, **kwargs: Any) -> None:
"""Upload a backup."""
class BackupManager(BaseBackupManager):
class BackupManager(BaseBackupManager[Backup]):
"""Backup manager for the Backup integration."""
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the backup manager."""
super().__init__(hass=hass)
self.backup_dir = Path(hass.config.path("backups"))
self.loaded_backups = False
self.temp_backup_dir = Path(hass.config.path("tmp_backups"))
async def load_backups(self) -> None:
"""Load data of stored backup files."""
backups = await self.hass.async_add_executor_job(self._read_backups)
LOGGER.debug("Loaded %s backups", len(backups))
self.backups = backups
self.loaded_backups = True
async def async_upload_backup(self, *, slug: str, **kwargs: Any) -> None:
"""Upload a backup to all agents."""
if not self.backup_agents:
return
def _read_backups(self) -> dict[str, Backup]:
"""Read backups from disk."""
backups: dict[str, Backup] = {}
for backup_path in self.backup_dir.glob("*.tar"):
try:
with tarfile.open(backup_path, "r:", bufsize=BUF_SIZE) as backup_file:
if data_file := backup_file.extractfile("./backup.json"):
data = json_loads_object(data_file.read())
backup = Backup(
slug=cast(str, data["slug"]),
name=cast(str, data["name"]),
date=cast(str, data["date"]),
path=backup_path,
size=round(backup_path.stat().st_size / 1_048_576, 2),
)
backups[backup.slug] = backup
except (OSError, TarError, json.JSONDecodeError, KeyError) as err:
LOGGER.warning("Unable to read backup %s: %s", backup_path, err)
return backups
if not (backup := await self.async_get_backup(slug=slug)):
return
await self._async_upload_backup(
slug=slug,
backup=backup,
agent_ids=list(self.backup_agents.keys()),
)
async def _async_upload_backup(
self,
*,
slug: str,
backup: Backup,
agent_ids: list[str],
) -> None:
"""Upload a backup to selected agents."""
self.syncing = True
sync_backup_results = await asyncio.gather(
*(
self.backup_agents[agent_id].async_upload_backup(
path=backup.path,
metadata=BackupUploadMetadata(
homeassistant=HAVERSION,
size=backup.size,
date=backup.date,
slug=backup.slug,
name=backup.name,
protected=backup.protected,
),
)
for agent_id in agent_ids
),
return_exceptions=True,
)
for result in sync_backup_results:
if isinstance(result, Exception):
LOGGER.error("Error during backup upload - %s", result)
# TODO: Reset self.syncing in a finally block
self.syncing = False
async def async_get_backups(self, **kwargs: Any) -> dict[str, Backup]:
"""Return backups."""
if not self.loaded_backups:
await self.load_backups()
backups: dict[str, Backup] = {}
for agent_id, agent in self.backup_agents.items():
agent_backups = await agent.async_list_backups()
for agent_backup in agent_backups:
if agent_backup.slug not in backups:
backups[agent_backup.slug] = Backup(
slug=agent_backup.slug,
name=agent_backup.name,
date=agent_backup.date,
agent_ids=[],
# TODO: Do we need to expose the path?
path=agent_backup.path, # type: ignore[attr-defined]
size=agent_backup.size,
protected=agent_backup.protected,
)
backups[agent_backup.slug].agent_ids.append(agent_id)
return self.backups
return backups
async def async_get_backup(self, *, slug: str, **kwargs: Any) -> Backup | None:
"""Return a backup."""
if not self.loaded_backups:
await self.load_backups()
if not (backup := self.backups.get(slug)):
return None
if not backup.path.exists():
LOGGER.debug(
(
"Removing tracked backup (%s) that does not exists on the expected"
" path %s"
),
backup.slug,
backup.path,
)
self.backups.pop(slug)
return None
return backup
# TODO: This is not efficient, but it's fine for draft
backups = await self.async_get_backups()
return backups.get(slug)
async def async_remove_backup(self, *, slug: str, **kwargs: Any) -> None:
"""Remove a backup."""
if (backup := await self.async_get_backup(slug=slug)) is None:
return
await self.hass.async_add_executor_job(backup.path.unlink, True)
LOGGER.debug("Removed backup located at %s", backup.path)
self.backups.pop(slug)
# TODO: We should only remove from the agents that have the backup
for agent in self.backup_agents.values():
await agent.async_remove_backup(slug=slug) # type: ignore[attr-defined]
async def async_receive_backup(
self,
@ -312,22 +377,42 @@ class BackupManager(BaseBackupManager):
temp_dir_handler.cleanup()
await self.hass.async_add_executor_job(_move_and_cleanup)
await self.load_backups()
# TODO: What do we need to do instead?
async def async_create_backup(
self,
*,
addons_included: list[str] | None,
agent_ids: list[str],
database_included: bool,
folders_included: list[str] | None,
name: str | None,
on_progress: Callable[[BackupProgress], None] | None,
password: str | None,
**kwargs: Any,
) -> NewBackup:
"""Generate a backup."""
"""Initiate generating a backup."""
if self.backup_task:
raise HomeAssistantError("Backup already in progress")
backup_name = f"Core {HAVERSION}"
if not agent_ids:
raise HomeAssistantError("At least one agent must be selected")
if any(agent_id not in self.backup_agents for agent_id in agent_ids):
raise HomeAssistantError("Invalid agent selected")
backup_name = name or f"Core {HAVERSION}"
date_str = dt_util.now().isoformat()
slug = _generate_slug(date_str, backup_name)
self.backup_task = self.hass.async_create_task(
self._async_create_backup(backup_name, date_str, slug, on_progress),
self._async_create_backup(
addons_included=addons_included,
agent_ids=agent_ids,
backup_name=backup_name,
database_included=database_included,
date_str=date_str,
folders_included=folders_included,
on_progress=on_progress,
password=password,
slug=slug,
),
name="backup_manager_create_backup",
eager_start=False, # To ensure the task is not started before we return
)
@ -335,13 +420,24 @@ class BackupManager(BaseBackupManager):
async def _async_create_backup(
self,
*,
addons_included: list[str] | None,
agent_ids: list[str],
database_included: bool,
backup_name: str,
date_str: str,
slug: str,
folders_included: list[str] | None,
on_progress: Callable[[BackupProgress], None] | None,
password: str | None,
slug: str,
) -> Backup:
"""Generate a backup."""
success = False
if LOCAL_AGENT_ID in agent_ids:
backup_dir = self.backup_dir
else:
backup_dir = self.temp_backup_dir
try:
await self.async_pre_backup_actions()
@ -351,14 +447,22 @@ class BackupManager(BaseBackupManager):
"date": date_str,
"type": "partial",
"folders": ["homeassistant"],
"homeassistant": {"version": HAVERSION},
"homeassistant": {
"exclude_database": not database_included,
"version": HAVERSION,
},
"compressed": True,
"protected": password is not None,
}
tar_file_path = Path(self.backup_dir, f"{backup_data['slug']}.tar")
tar_file_path = Path(backup_dir, f"{backup_data['slug']}.tar")
size_in_bytes = await self.hass.async_add_executor_job(
self._mkdir_and_generate_backup_contents,
backup_dir,
tar_file_path,
backup_data,
database_included,
password,
)
backup = Backup(
slug=slug,
@ -366,10 +470,20 @@ class BackupManager(BaseBackupManager):
date=date_str,
path=tar_file_path,
size=round(size_in_bytes / 1_048_576, 2),
protected=password is not None,
agent_ids=agent_ids, # TODO: This should maybe be set after upload
)
if self.loaded_backups:
self.backups[slug] = backup
LOGGER.debug("Generated new backup with slug %s", slug)
# TODO: We should add a cache of the backup metadata
LOGGER.debug(
"Generated new backup with slug %s, uploading to agents %s",
slug,
agent_ids,
)
await self._async_upload_backup(
slug=slug, backup=backup, agent_ids=agent_ids
)
# TODO: Upload to other agents
# TODO: Remove from local store if not uploaded to local agent
success = True
return backup
finally:
@ -380,13 +494,20 @@ class BackupManager(BaseBackupManager):
def _mkdir_and_generate_backup_contents(
self,
backup_dir: Path,
tar_file_path: Path,
backup_data: dict[str, Any],
database_included: bool,
password: str | None = None,
) -> int:
"""Generate backup contents and return the size."""
if not self.backup_dir.exists():
LOGGER.debug("Creating backup directory")
self.backup_dir.mkdir()
if not backup_dir.exists():
LOGGER.debug("Creating backup directory %s", backup_dir)
backup_dir.mkdir()
excludes = EXCLUDE_FROM_BACKUP
if not database_included:
excludes = excludes + EXCLUDE_DATABASE_FROM_BACKUP
outer_secure_tarfile = SecureTarFile(
tar_file_path, "w", gzip=False, bufsize=BUF_SIZE
@ -399,18 +520,25 @@ class BackupManager(BaseBackupManager):
tar_info.mtime = int(time.time())
outer_secure_tarfile_tarfile.addfile(tar_info, fileobj=fileobj)
with outer_secure_tarfile.create_inner_tar(
"./homeassistant.tar.gz", gzip=True
"./homeassistant.tar.gz",
gzip=True,
key=password_to_key(password) if password is not None else None,
) as core_tar:
atomic_contents_add(
tar_file=core_tar,
origin_path=Path(self.hass.config.path()),
excludes=EXCLUDE_FROM_BACKUP,
excludes=excludes,
arcname="data",
)
return tar_file_path.stat().st_size
async def async_restore_backup(self, slug: str, **kwargs: Any) -> None:
async def async_restore_backup(
self,
slug: str,
*,
password: str | None = None,
**kwargs: Any,
) -> None:
"""Restore a backup.
This will write the restore information to .HA_RESTORE which
@ -422,7 +550,7 @@ class BackupManager(BaseBackupManager):
def _write_restore_file() -> None:
"""Write the restore file."""
Path(self.hass.config.path(RESTORE_BACKUP_FILE)).write_text(
json.dumps({"path": backup.path.as_posix()}),
json.dumps({"path": backup.path.as_posix(), "password": password}),
encoding="utf-8",
)

View file

@ -0,0 +1,30 @@
"""Models for the backup integration."""
from dataclasses import asdict, dataclass
@dataclass()
class BaseBackup:
"""Base backup class."""
date: str
name: str
protected: bool
slug: str
size: float
def as_dict(self) -> dict:
"""Return a dict representation of this backup."""
return asdict(self)
@dataclass()
class BackupUploadMetadata:
"""Backup upload metadata."""
date: str # The date the backup was created
slug: str # The slug of the backup
size: float # The size of the backup (in bytes)
name: str # The name of the backup
homeassistant: str # The version of Home Assistant that created the backup
protected: bool # If the backup is protected

View file

@ -1 +1,7 @@
create:
fields:
password:
required: false
selector:
text:
type: password

View file

@ -2,7 +2,13 @@
"services": {
"create": {
"name": "Create backup",
"description": "Creates a new backup."
"description": "Creates a new backup.",
"fields": {
"password": {
"name": "[%key:common::config_flow::data::password%]",
"description": "Password protect the backup"
}
}
}
}
}

View file

@ -1,5 +1,6 @@
"""Websocket commands for the Backup integration."""
from pathlib import Path
from typing import Any
import voluptuous as vol
@ -14,9 +15,14 @@ from .manager import BackupProgress
@callback
def async_register_websocket_handlers(hass: HomeAssistant, with_hassio: bool) -> None:
"""Register websocket commands."""
websocket_api.async_register_command(hass, backup_agents_download)
websocket_api.async_register_command(hass, backup_agents_info)
websocket_api.async_register_command(hass, backup_agents_list_backups)
if with_hassio:
websocket_api.async_register_command(hass, handle_backup_end)
websocket_api.async_register_command(hass, handle_backup_start)
websocket_api.async_register_command(hass, handle_backup_upload)
return
websocket_api.async_register_command(hass, handle_details)
@ -40,7 +46,7 @@ async def handle_info(
connection.send_result(
msg["id"],
{
"backups": list(backups.values()),
"backups": [b.as_dict() for b in backups.values()],
"backing_up": manager.backup_task is not None,
},
)
@ -92,6 +98,7 @@ async def handle_remove(
{
vol.Required("type"): "backup/restore",
vol.Required("slug"): str,
vol.Optional("password"): str,
}
)
@websocket_api.async_response
@ -101,12 +108,25 @@ async def handle_restore(
msg: dict[str, Any],
) -> None:
"""Restore a backup."""
await hass.data[DATA_MANAGER].async_restore_backup(msg["slug"])
await hass.data[DATA_MANAGER].async_restore_backup(
slug=msg["slug"],
password=msg.get("password"),
)
connection.send_result(msg["id"])
@websocket_api.require_admin
@websocket_api.websocket_command({vol.Required("type"): "backup/generate"})
@websocket_api.websocket_command(
{
vol.Required("type"): "backup/generate",
vol.Optional("addons_included"): [str],
vol.Required("agent_ids"): [str],
vol.Optional("database_included", default=True): bool,
vol.Optional("folders_included"): [str],
vol.Optional("name"): str,
vol.Optional("password"): str,
}
)
@websocket_api.async_response
async def handle_create(
hass: HomeAssistant,
@ -118,7 +138,15 @@ async def handle_create(
def on_progress(progress: BackupProgress) -> None:
connection.send_message(websocket_api.event_message(msg["id"], progress))
backup = await hass.data[DATA_MANAGER].async_create_backup(on_progress=on_progress)
backup = await hass.data[DATA_MANAGER].async_create_backup(
addons_included=msg.get("addons_included"),
agent_ids=msg["agent_ids"],
database_included=msg["database_included"],
folders_included=msg.get("folders_included"),
name=msg.get("name"),
on_progress=on_progress,
password=msg.get("password"),
)
connection.send_result(msg["id"], backup)
@ -162,3 +190,105 @@ async def handle_backup_end(
return
connection.send_result(msg["id"])
@websocket_api.ws_require_user(only_supervisor=True)
@websocket_api.websocket_command(
{
vol.Required("type"): "backup/upload",
vol.Required("data"): {
vol.Required("slug"): str,
},
}
)
@websocket_api.async_response
async def handle_backup_upload(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Backup upload."""
LOGGER.debug("Backup upload notification")
data = msg["data"]
try:
await hass.data[DATA_MANAGER].async_upload_backup(slug=data["slug"])
except Exception as err: # noqa: BLE001
connection.send_error(msg["id"], "backup_upload_failed", str(err))
return
connection.send_result(msg["id"])
@websocket_api.require_admin
@websocket_api.websocket_command({vol.Required("type"): "backup/agents/info"})
@websocket_api.async_response
async def backup_agents_info(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Return backup agents info."""
manager = hass.data[DATA_MANAGER]
await manager.load_platforms()
connection.send_result(
msg["id"],
{
"agents": [{"agent_id": agent_id} for agent_id in manager.backup_agents],
"syncing": manager.syncing,
},
)
@websocket_api.require_admin
@websocket_api.websocket_command({vol.Required("type"): "backup/agents/list_backups"})
@websocket_api.async_response
async def backup_agents_list_backups(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Return a list of uploaded backups."""
manager = hass.data[DATA_MANAGER]
backups: list[dict[str, Any]] = []
await manager.load_platforms()
for agent_id, agent in manager.backup_agents.items():
_listed_backups = await agent.async_list_backups()
backups.extend({**b.as_dict(), "agent_id": agent_id} for b in _listed_backups)
connection.send_result(msg["id"], backups)
@websocket_api.require_admin
@websocket_api.websocket_command(
{
vol.Required("type"): "backup/agents/download",
vol.Required("agent_id"): str,
vol.Required("backup_id"): str,
vol.Required("slug"): str,
}
)
@websocket_api.async_response
async def backup_agents_download(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Download an uploaded backup."""
manager = hass.data[DATA_MANAGER]
await manager.load_platforms()
if not (agent := manager.backup_agents.get(msg["agent_id"])):
connection.send_error(
msg["id"], "unknown_agent", f"Agent {msg['agent_id']} not found"
)
return
try:
await agent.async_download_backup(
id=msg["backup_id"],
path=Path(manager.backup_dir, f"{msg['slug']}.tar"), # type: ignore[attr-defined]
)
except Exception as err: # noqa: BLE001
connection.send_error(msg["id"], "backup_agents_download", str(err))
return
connection.send_result(msg["id"])

View file

@ -0,0 +1,77 @@
"""Backup platform for the kitchen_sink integration."""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any
from uuid import uuid4
from homeassistant.components.backup import (
BackupAgent,
BackupUploadMetadata,
UploadedBackup,
)
from homeassistant.core import HomeAssistant
LOGGER = logging.getLogger(__name__)
async def async_get_backup_sync_agents(
hass: HomeAssistant,
) -> list[BackupAgent]:
"""Register the backup agents."""
return [KitchenSinkBackupAgent("syncer")]
class KitchenSinkBackupAgent(BackupAgent):
"""Kitchen sink backup agent."""
def __init__(self, name: str) -> None:
"""Initialize the kitchen sink backup sync agent."""
super().__init__()
self.name = name
self._uploads = [
UploadedBackup(
id="def456",
name="Kitchen sink syncer",
protected=False,
slug="abc123",
size=1234,
date="1970-01-01T00:00:00Z",
)
]
async def async_download_backup(
self,
*,
id: str,
path: Path,
**kwargs: Any,
) -> None:
"""Download a backup file."""
LOGGER.info("Downloading backup %s to %s", id, path)
async def async_upload_backup(
self,
*,
path: Path,
metadata: BackupUploadMetadata,
**kwargs: Any,
) -> None:
"""Upload a backup."""
LOGGER.info("Uploading backup %s %s", path.name, metadata)
self._uploads.append(
UploadedBackup(
id=uuid4().hex,
name=metadata.name,
protected=metadata.protected,
slug=metadata.slug,
size=metadata.size,
date=metadata.date,
)
)
async def async_list_backups(self, **kwargs: Any) -> list[UploadedBackup]:
"""List synced backups."""
return self._uploads

View file

@ -3,28 +3,85 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
from unittest.mock import patch
from homeassistant.components.backup import DOMAIN
from homeassistant.components.backup.manager import Backup
from homeassistant.components.backup import (
DOMAIN,
BackupAgent,
BackupUploadMetadata,
UploadedBackup,
)
from homeassistant.components.backup.const import DATA_MANAGER
from homeassistant.components.backup.manager import LOCAL_AGENT_ID, Backup
from homeassistant.core import HomeAssistant
from homeassistant.helpers.typing import ConfigType
from homeassistant.setup import async_setup_component
TEST_BACKUP = Backup(
agent_ids=["backup.local"],
slug="abc123",
name="Test",
date="1970-01-01T00:00:00.000Z",
path=Path("abc123.tar"),
size=0.0,
protected=False,
)
class BackupAgentTest(BackupAgent):
"""Test backup agent."""
def __init__(self, name: str) -> None:
"""Initialize the backup agent."""
self.name = name
async def async_download_backup(
self,
*,
id: str,
path: Path,
**kwargs: Any,
) -> None:
"""Download a backup file."""
async def async_upload_backup(
self,
*,
path: Path,
metadata: BackupUploadMetadata,
**kwargs: Any,
) -> None:
"""Upload a backup."""
async def async_list_backups(self, **kwargs: Any) -> list[UploadedBackup]:
"""List backups."""
return [
UploadedBackup(
id="abc123",
date="1970-01-01T00:00:00Z",
name="Test",
protected=False,
size=13.37,
slug="abc123",
)
]
async def setup_backup_integration(
hass: HomeAssistant,
with_hassio: bool = False,
configuration: ConfigType | None = None,
backups: list[Backup] | None = None,
) -> bool:
"""Set up the Backup integration."""
with patch("homeassistant.components.backup.is_hassio", return_value=with_hassio):
return await async_setup_component(hass, DOMAIN, configuration or {})
result = await async_setup_component(hass, DOMAIN, configuration or {})
if with_hassio or not backups:
return result
local_agent = hass.data[DATA_MANAGER].backup_agents[LOCAL_AGENT_ID]
local_agent.backups = {backups.slug: backups for backups in backups}
local_agent.loaded_backups = True
return result

View file

@ -43,11 +43,12 @@ def mock_backup_generation_fixture(
Path("test.txt"),
Path(".DS_Store"),
Path(".storage"),
Path("home-assistant_v2.db"),
]
with (
patch("pathlib.Path.iterdir", _mock_iterdir),
patch("pathlib.Path.stat", MagicMock(st_size=123)),
patch("pathlib.Path.stat", return_value=MagicMock(st_size=123)),
patch("pathlib.Path.is_file", lambda x: x.name != ".storage"),
patch(
"pathlib.Path.is_dir",

View file

@ -1,4 +1,114 @@
# serializer version: 1
# name: test_agents_download[with_hassio]
dict({
'id': 1,
'result': None,
'success': True,
'type': 'result',
})
# ---
# name: test_agents_download[without_hassio]
dict({
'id': 1,
'result': None,
'success': True,
'type': 'result',
})
# ---
# name: test_agents_download_exception
dict({
'error': dict({
'code': 'backup_agents_download',
'message': 'Boom',
}),
'id': 1,
'success': False,
'type': 'result',
})
# ---
# name: test_agents_download_unknown_agent
dict({
'error': dict({
'code': 'unknown_agent',
'message': 'Agent domain.test not found',
}),
'id': 1,
'success': False,
'type': 'result',
})
# ---
# name: test_agents_info[with_hassio]
dict({
'id': 1,
'result': dict({
'agents': list([
dict({
'agent_id': 'domain.test',
}),
dict({
'agent_id': 'backup.local',
}),
]),
'syncing': False,
}),
'success': True,
'type': 'result',
})
# ---
# name: test_agents_info[without_hassio]
dict({
'id': 1,
'result': dict({
'agents': list([
dict({
'agent_id': 'domain.test',
}),
dict({
'agent_id': 'backup.local',
}),
]),
'syncing': False,
}),
'success': True,
'type': 'result',
})
# ---
# name: test_agents_list_backups[with_hassio]
dict({
'id': 1,
'result': list([
dict({
'agent_id': 'domain.test',
'date': '1970-01-01T00:00:00Z',
'id': 'abc123',
'name': 'Test',
'protected': False,
'size': 13.37,
'slug': 'abc123',
}),
]),
'success': True,
'type': 'result',
})
# ---
# name: test_agents_list_backups[without_hassio]
dict({
'id': 1,
'result': list([
dict({
'agent_id': 'domain.test',
'date': '1970-01-01T00:00:00Z',
'id': 'abc123',
'name': 'Test',
'protected': False,
'size': 13.37,
'slug': 'abc123',
}),
]),
'success': True,
'type': 'result',
})
# ---
# name: test_backup_end[with_hassio-hass_access_token]
dict({
'error': dict({
@ -40,7 +150,7 @@
'type': 'result',
})
# ---
# name: test_backup_end_excepion[exception0]
# name: test_backup_end_exception[exception0]
dict({
'error': dict({
'code': 'post_backup_actions_failed',
@ -51,7 +161,7 @@
'type': 'result',
})
# ---
# name: test_backup_end_excepion[exception1]
# name: test_backup_end_exception[exception1]
dict({
'error': dict({
'code': 'post_backup_actions_failed',
@ -62,7 +172,7 @@
'type': 'result',
})
# ---
# name: test_backup_end_excepion[exception2]
# name: test_backup_end_exception[exception2]
dict({
'error': dict({
'code': 'post_backup_actions_failed',
@ -114,7 +224,7 @@
'type': 'result',
})
# ---
# name: test_backup_start_excepion[exception0]
# name: test_backup_start_exception[exception0]
dict({
'error': dict({
'code': 'pre_backup_actions_failed',
@ -125,7 +235,7 @@
'type': 'result',
})
# ---
# name: test_backup_start_excepion[exception1]
# name: test_backup_start_exception[exception1]
dict({
'error': dict({
'code': 'pre_backup_actions_failed',
@ -136,7 +246,7 @@
'type': 'result',
})
# ---
# name: test_backup_start_excepion[exception2]
# name: test_backup_start_exception[exception2]
dict({
'error': dict({
'code': 'pre_backup_actions_failed',
@ -147,6 +257,80 @@
'type': 'result',
})
# ---
# name: test_backup_upload[with_hassio-hass_access_token]
dict({
'error': dict({
'code': 'only_supervisor',
'message': 'Only allowed as Supervisor',
}),
'id': 1,
'success': False,
'type': 'result',
})
# ---
# name: test_backup_upload[with_hassio-hass_supervisor_access_token]
dict({
'id': 1,
'result': None,
'success': True,
'type': 'result',
})
# ---
# name: test_backup_upload[without_hassio-hass_access_token]
dict({
'error': dict({
'code': 'unknown_command',
'message': 'Unknown command.',
}),
'id': 1,
'success': False,
'type': 'result',
})
# ---
# name: test_backup_upload[without_hassio-hass_supervisor_access_token]
dict({
'error': dict({
'code': 'unknown_command',
'message': 'Unknown command.',
}),
'id': 1,
'success': False,
'type': 'result',
})
# ---
# name: test_backup_upload_exception[exception0]
dict({
'error': dict({
'code': 'backup_upload_failed',
'message': '',
}),
'id': 1,
'success': False,
'type': 'result',
})
# ---
# name: test_backup_upload_exception[exception1]
dict({
'error': dict({
'code': 'backup_upload_failed',
'message': 'Boom',
}),
'id': 1,
'success': False,
'type': 'result',
})
# ---
# name: test_backup_upload_exception[exception2]
dict({
'error': dict({
'code': 'backup_upload_failed',
'message': 'Boom',
}),
'id': 1,
'success': False,
'type': 'result',
})
# ---
# name: test_details[with_hassio-with_backup_content]
dict({
'error': dict({
@ -174,9 +358,13 @@
'id': 1,
'result': dict({
'backup': dict({
'agent_ids': list([
'backup.local',
]),
'date': '1970-01-01T00:00:00.000Z',
'name': 'Test',
'path': 'abc123.tar',
'protected': False,
'size': 0.0,
'slug': 'abc123',
}),
@ -195,7 +383,7 @@
'type': 'result',
})
# ---
# name: test_generate[with_hassio]
# name: test_generate[with_hassio-None]
dict({
'error': dict({
'code': 'unknown_command',
@ -206,7 +394,29 @@
'type': 'result',
})
# ---
# name: test_generate[without_hassio]
# name: test_generate[with_hassio-data1]
dict({
'error': dict({
'code': 'unknown_command',
'message': 'Unknown command.',
}),
'id': 1,
'success': False,
'type': 'result',
})
# ---
# name: test_generate[with_hassio-data2]
dict({
'error': dict({
'code': 'unknown_command',
'message': 'Unknown command.',
}),
'id': 1,
'success': False,
'type': 'result',
})
# ---
# name: test_generate[without_hassio-None]
dict({
'id': 1,
'result': dict({
@ -216,7 +426,7 @@
'type': 'result',
})
# ---
# name: test_generate[without_hassio].1
# name: test_generate[without_hassio-None].1
dict({
'event': dict({
'done': True,
@ -227,6 +437,68 @@
'type': 'event',
})
# ---
# name: test_generate[without_hassio-data1]
dict({
'id': 1,
'result': dict({
'slug': '27f5c632',
}),
'success': True,
'type': 'result',
})
# ---
# name: test_generate[without_hassio-data1].1
dict({
'event': dict({
'done': True,
'stage': None,
'success': True,
}),
'id': 1,
'type': 'event',
})
# ---
# name: test_generate[without_hassio-data2]
dict({
'id': 1,
'result': dict({
'slug': '27f5c632',
}),
'success': True,
'type': 'result',
})
# ---
# name: test_generate[without_hassio-data2].1
dict({
'event': dict({
'done': True,
'stage': None,
'success': True,
}),
'id': 1,
'type': 'event',
})
# ---
# name: test_generate_without_hassio[params0-expected_extra_call_params0]
dict({
'id': 1,
'result': dict({
'slug': 'abc123',
}),
'success': True,
'type': 'result',
})
# ---
# name: test_generate_without_hassio[params1-expected_extra_call_params1]
dict({
'id': 1,
'result': dict({
'slug': 'abc123',
}),
'success': True,
'type': 'result',
})
# ---
# name: test_info[with_hassio]
dict({
'error': dict({
@ -245,9 +517,13 @@
'backing_up': False,
'backups': list([
dict({
'agent_ids': list([
'backup.local',
]),
'date': '1970-01-01T00:00:00.000Z',
'name': 'Test',
'path': 'abc123.tar',
'protected': False,
'size': 0.0,
'slug': 'abc123',
}),

View file

@ -1,5 +1,6 @@
"""Tests for the Backup integration."""
from typing import Any
from unittest.mock import patch
import pytest
@ -26,8 +27,10 @@ async def test_setup_with_hassio(
) in caplog.text
@pytest.mark.parametrize("service_data", [None, {}, {"password": "abc123"}])
async def test_create_service(
hass: HomeAssistant,
service_data: dict[str, Any] | None,
) -> None:
"""Test generate backup."""
await setup_backup_integration(hass)
@ -39,6 +42,7 @@ async def test_create_service(
DOMAIN,
"create",
blocking=True,
service_data=service_data,
)
assert generate_backup.called

View file

@ -3,28 +3,45 @@
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock, Mock, mock_open, patch
from typing import Any
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, call, mock_open, patch
import aiohttp
from multidict import CIMultiDict, CIMultiDictProxy
import pytest
from homeassistant.components.backup import BackupManager
from homeassistant.components.backup.manager import (
from homeassistant.components.backup import (
DOMAIN,
BackupAgentPlatformProtocol,
BackupManager,
BackupPlatformProtocol,
BackupProgress,
BackupUploadMetadata,
backup as local_backup_platform,
)
from homeassistant.components.backup.manager import LOCAL_AGENT_ID, BackupProgress
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.setup import async_setup_component
from .common import TEST_BACKUP
from .common import TEST_BACKUP, BackupAgentTest
from tests.common import MockPlatform, mock_platform
_EXPECTED_FILES_WITH_DATABASE = {
True: ["test.txt", ".storage", "home-assistant_v2.db"],
False: ["test.txt", ".storage"],
}
async def _mock_backup_generation(
manager: BackupManager, mocked_json_bytes: Mock, mocked_tarfile: Mock
hass: HomeAssistant,
manager: BackupManager,
mocked_json_bytes: Mock,
mocked_tarfile: Mock,
*,
database_included: bool = True,
name: str | None = "Core 2025.1.0",
password: str | None = None,
) -> None:
"""Mock backup generator."""
@ -35,43 +52,79 @@ async def _mock_backup_generation(
progress.append(_progress)
assert manager.backup_task is None
await manager.async_create_backup(on_progress=on_progress)
await manager.async_create_backup(
addons_included=[],
agent_ids=[LOCAL_AGENT_ID],
database_included=database_included,
folders_included=[],
name=name,
on_progress=on_progress,
password=password,
)
assert manager.backup_task is not None
assert progress == []
await manager.backup_task
backup = await manager.backup_task
assert progress == [BackupProgress(done=True, stage=None, success=True)]
assert mocked_json_bytes.call_count == 1
backup_json_dict = mocked_json_bytes.call_args[0][0]
assert isinstance(backup_json_dict, dict)
assert backup_json_dict["homeassistant"] == {"version": "2025.1.0"}
assert backup_json_dict == {
"compressed": True,
"date": ANY,
"folders": ["homeassistant"],
"homeassistant": {
"exclude_database": not database_included,
"version": "2025.1.0",
},
"name": name,
"protected": bool(password),
"slug": ANY,
"type": "partial",
}
assert manager.backup_dir.as_posix() in str(mocked_tarfile.call_args_list[0][0][0])
outer_tar = mocked_tarfile.return_value
core_tar = outer_tar.create_inner_tar.return_value.__enter__.return_value
expected_files = [call(hass.config.path(), arcname="data", recursive=False)] + [
call(file, arcname=f"data/{file}", recursive=False)
for file in _EXPECTED_FILES_WITH_DATABASE[database_included]
]
assert core_tar.add.call_args_list == expected_files
return backup
async def _setup_mock_domain(
async def _setup_backup_platform(
hass: HomeAssistant,
platform: BackupPlatformProtocol | None = None,
*,
domain: str = "some_domain",
platform: BackupPlatformProtocol | BackupAgentPlatformProtocol | None = None,
) -> None:
"""Set up a mock domain."""
mock_platform(hass, "some_domain.backup", platform or MockPlatform())
assert await async_setup_component(hass, "some_domain", {})
mock_platform(hass, f"{domain}.backup", platform or MockPlatform())
assert await async_setup_component(hass, domain, {})
async def test_constructor(hass: HomeAssistant) -> None:
"""Test BackupManager constructor."""
manager = BackupManager(hass)
assert manager.backup_dir.as_posix() == hass.config.path("backups")
assert manager.temp_backup_dir.as_posix() == hass.config.path("tmp_backups")
async def test_load_backups(hass: HomeAssistant) -> None:
"""Test loading backups."""
manager = BackupManager(hass)
await _setup_backup_platform(hass, domain=DOMAIN, platform=local_backup_platform)
await manager.load_platforms()
with (
patch("pathlib.Path.glob", return_value=[TEST_BACKUP.path]),
patch("tarfile.open", return_value=MagicMock()),
patch(
"homeassistant.components.backup.manager.json_loads_object",
"homeassistant.components.backup.backup.json_loads_object",
return_value={
"slug": TEST_BACKUP.slug,
"name": TEST_BACKUP.name,
@ -83,7 +136,7 @@ async def test_load_backups(hass: HomeAssistant) -> None:
return_value=MagicMock(st_size=TEST_BACKUP.size),
),
):
await manager.load_backups()
await manager.backup_agents[LOCAL_AGENT_ID].load_backups()
backups = await manager.async_get_backups()
assert backups == {TEST_BACKUP.slug: TEST_BACKUP}
@ -94,11 +147,15 @@ async def test_load_backups_with_exception(
) -> None:
"""Test loading backups with exception."""
manager = BackupManager(hass)
await _setup_backup_platform(hass, domain=DOMAIN, platform=local_backup_platform)
await manager.load_platforms()
with (
patch("pathlib.Path.glob", return_value=[TEST_BACKUP.path]),
patch("tarfile.open", side_effect=OSError("Test exception")),
):
await manager.load_backups()
await manager.backup_agents[LOCAL_AGENT_ID].load_backups()
backups = await manager.async_get_backups()
assert f"Unable to read backup {TEST_BACKUP.path}: Test exception" in caplog.text
assert backups == {}
@ -110,8 +167,13 @@ async def test_removing_backup(
) -> None:
"""Test removing backup."""
manager = BackupManager(hass)
manager.backups = {TEST_BACKUP.slug: TEST_BACKUP}
manager.loaded_backups = True
await _setup_backup_platform(hass, domain=DOMAIN, platform=local_backup_platform)
await manager.load_platforms()
local_agent = manager.backup_agents[LOCAL_AGENT_ID]
local_agent.backups = {TEST_BACKUP.slug: TEST_BACKUP}
local_agent.loaded_backups = True
with patch("pathlib.Path.exists", return_value=True):
await manager.async_remove_backup(slug=TEST_BACKUP.slug)
@ -125,18 +187,27 @@ async def test_removing_non_existing_backup(
"""Test removing not existing backup."""
manager = BackupManager(hass)
await _setup_backup_platform(hass, domain=DOMAIN, platform=local_backup_platform)
await manager.load_platforms()
await manager.async_remove_backup(slug="non_existing")
assert "Removed backup located at" not in caplog.text
@pytest.mark.xfail(reason="Cleanup not implemented in the draft")
async def test_getting_backup_that_does_not_exist(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test getting backup that does not exist."""
manager = BackupManager(hass)
manager.backups = {TEST_BACKUP.slug: TEST_BACKUP}
manager.loaded_backups = True
await _setup_backup_platform(hass, domain=DOMAIN, platform=local_backup_platform)
await manager.load_platforms()
local_agent = manager.backup_agents[LOCAL_AGENT_ID]
local_agent.backups = {TEST_BACKUP.slug: TEST_BACKUP}
local_agent.loaded_backups = True
with patch("pathlib.Path.exists", return_value=False):
backup = await manager.async_get_backup(slug=TEST_BACKUP.slug)
@ -154,26 +225,56 @@ async def test_async_create_backup_when_backing_up(hass: HomeAssistant) -> None:
manager = BackupManager(hass)
manager.backup_task = hass.async_create_task(event.wait())
with pytest.raises(HomeAssistantError, match="Backup already in progress"):
await manager.async_create_backup(on_progress=None)
await manager.async_create_backup(
addons_included=[],
agent_ids=[LOCAL_AGENT_ID],
database_included=True,
folders_included=[],
name=None,
on_progress=None,
password=None,
)
event.set()
@pytest.mark.usefixtures("mock_backup_generation")
@pytest.mark.parametrize(
"params",
[
{},
{"database_included": True, "name": "abc123"},
{"database_included": False},
{"password": "abc123"},
],
)
async def test_async_create_backup(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
mocked_json_bytes: Mock,
mocked_tarfile: Mock,
params: dict,
) -> None:
"""Test generate backup."""
manager = BackupManager(hass)
manager.loaded_backups = True
await _mock_backup_generation(manager, mocked_json_bytes, mocked_tarfile)
await _setup_backup_platform(hass, domain=DOMAIN, platform=local_backup_platform)
await manager.load_platforms()
local_agent = manager.backup_agents[LOCAL_AGENT_ID]
local_agent.loaded_backups = True
await _mock_backup_generation(
hass, manager, mocked_json_bytes, mocked_tarfile, **params
)
assert "Generated new backup with slug " in caplog.text
assert "Creating backup directory" in caplog.text
assert "Loaded 0 platforms" in caplog.text
assert "Loaded 1 agents" in caplog.text
assert len(local_agent.backups) == 1
backup = list(local_agent.backups.values())[0]
assert backup.protected is bool(params.get("password"))
async def test_loading_platforms(
@ -183,25 +284,48 @@ async def test_loading_platforms(
"""Test loading backup platforms."""
manager = BackupManager(hass)
assert not manager.loaded_platforms
assert not manager.platforms
await _setup_mock_domain(
await _setup_backup_platform(
hass,
Mock(
platform=Mock(
async_pre_backup=AsyncMock(),
async_post_backup=AsyncMock(),
async_get_backup_agents=AsyncMock(),
),
)
await manager.load_platforms()
await hass.async_block_till_done()
assert manager.loaded_platforms
assert len(manager.platforms) == 1
assert "Loaded 1 platforms" in caplog.text
async def test_loading_agents(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test loading backup agents."""
manager = BackupManager(hass)
assert not manager.platforms
await _setup_backup_platform(
hass,
platform=Mock(
async_get_backup_agents=AsyncMock(return_value=[BackupAgentTest("test")]),
),
)
await manager.load_platforms()
await hass.async_block_till_done()
assert len(manager.backup_agents) == 1
assert "Loaded 1 agents" in caplog.text
assert "some_domain.test" in manager.backup_agents
async def test_not_loading_bad_platforms(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
@ -209,21 +333,171 @@ async def test_not_loading_bad_platforms(
"""Test loading backup platforms."""
manager = BackupManager(hass)
assert not manager.loaded_platforms
assert not manager.platforms
await _setup_mock_domain(hass)
await _setup_backup_platform(hass)
await manager.load_platforms()
await hass.async_block_till_done()
assert manager.loaded_platforms
assert len(manager.platforms) == 0
assert "Loaded 0 platforms" in caplog.text
assert (
"some_domain does not implement required functions for the backup platform"
in caplog.text
@pytest.mark.usefixtures("mock_backup_generation")
async def test_syncing_backup(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
mocked_json_bytes: Mock,
mocked_tarfile: Mock,
) -> None:
"""Test syncing a backup."""
manager = BackupManager(hass)
await _setup_backup_platform(hass, domain=DOMAIN, platform=local_backup_platform)
await _setup_backup_platform(
hass,
platform=Mock(
async_pre_backup=AsyncMock(),
async_post_backup=AsyncMock(),
async_get_backup_agents=AsyncMock(
return_value=[
BackupAgentTest("agent1"),
BackupAgentTest("agent2"),
]
),
),
)
await manager.load_platforms()
await hass.async_block_till_done()
backup = await _mock_backup_generation(
hass, manager, mocked_json_bytes, mocked_tarfile
)
with (
patch(
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
return_value=backup,
),
patch.object(BackupAgentTest, "async_upload_backup") as mocked_upload,
patch(
"homeassistant.components.backup.manager.HAVERSION",
"2025.1.0",
),
):
await manager.async_upload_backup(slug=backup.slug)
assert mocked_upload.call_count == 2
first_call = mocked_upload.call_args_list[0]
assert first_call[1]["path"] == backup.path
assert first_call[1]["metadata"] == BackupUploadMetadata(
date=backup.date,
homeassistant="2025.1.0",
name=backup.name,
protected=backup.protected,
size=backup.size,
slug=backup.slug,
)
assert "Error during backup upload" not in caplog.text
@pytest.mark.usefixtures("mock_backup_generation")
async def test_syncing_backup_with_exception(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
mocked_json_bytes: Mock,
mocked_tarfile: Mock,
) -> None:
"""Test syncing a backup with exception."""
manager = BackupManager(hass)
class ModifiedBackupSyncAgentTest(BackupAgentTest):
async def async_upload_backup(self, **kwargs: Any) -> None:
raise HomeAssistantError("Test exception")
await _setup_backup_platform(hass, domain=DOMAIN, platform=local_backup_platform)
await _setup_backup_platform(
hass,
platform=Mock(
async_pre_backup=AsyncMock(),
async_post_backup=AsyncMock(),
async_get_backup_agents=AsyncMock(
return_value=[
ModifiedBackupSyncAgentTest("agent1"),
ModifiedBackupSyncAgentTest("agent2"),
]
),
),
)
await manager.load_platforms()
await hass.async_block_till_done()
backup = await _mock_backup_generation(
hass, manager, mocked_json_bytes, mocked_tarfile
)
with (
patch(
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
return_value=backup,
),
patch.object(
ModifiedBackupSyncAgentTest,
"async_upload_backup",
) as mocked_upload,
patch(
"homeassistant.components.backup.manager.HAVERSION",
"2025.1.0",
),
):
mocked_upload.side_effect = HomeAssistantError("Test exception")
await manager.async_upload_backup(slug=backup.slug)
assert mocked_upload.call_count == 2
first_call = mocked_upload.call_args_list[0]
assert first_call[1]["path"] == backup.path
assert first_call[1]["metadata"] == BackupUploadMetadata(
date=backup.date,
homeassistant="2025.1.0",
name=backup.name,
protected=backup.protected,
size=backup.size,
slug=backup.slug,
)
assert "Error during backup upload - Test exception" in caplog.text
@pytest.mark.usefixtures("mock_backup_generation")
async def test_syncing_backup_no_agents(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
mocked_json_bytes: Mock,
mocked_tarfile: Mock,
) -> None:
"""Test syncing a backup with no agents."""
manager = BackupManager(hass)
await _setup_backup_platform(hass, domain=DOMAIN, platform=local_backup_platform)
await _setup_backup_platform(
hass,
platform=Mock(
async_pre_backup=AsyncMock(),
async_post_backup=AsyncMock(),
async_get_backup_agents=AsyncMock(return_value=[]),
),
)
await manager.load_platforms()
await hass.async_block_till_done()
backup = await _mock_backup_generation(
hass, manager, mocked_json_bytes, mocked_tarfile
)
with patch(
"homeassistant.components.backup.agent.BackupAgent.async_upload_backup"
) as mocked_async_upload_backup:
await manager.async_upload_backup(slug=backup.slug)
assert mocked_async_upload_backup.call_count == 0
async def test_exception_plaform_pre(
@ -236,16 +510,17 @@ async def test_exception_plaform_pre(
async def _mock_step(hass: HomeAssistant) -> None:
raise HomeAssistantError("Test exception")
await _setup_mock_domain(
await _setup_backup_platform(
hass,
Mock(
platform=Mock(
async_pre_backup=_mock_step,
async_post_backup=AsyncMock(),
async_get_backup_agents=AsyncMock(),
),
)
with pytest.raises(HomeAssistantError):
await _mock_backup_generation(manager, mocked_json_bytes, mocked_tarfile)
await _mock_backup_generation(hass, manager, mocked_json_bytes, mocked_tarfile)
async def test_exception_plaform_post(
@ -258,66 +533,17 @@ async def test_exception_plaform_post(
async def _mock_step(hass: HomeAssistant) -> None:
raise HomeAssistantError("Test exception")
await _setup_mock_domain(
await _setup_backup_platform(
hass,
Mock(
platform=Mock(
async_pre_backup=AsyncMock(),
async_post_backup=_mock_step,
async_get_backup_agents=AsyncMock(),
),
)
with pytest.raises(HomeAssistantError):
await _mock_backup_generation(manager, mocked_json_bytes, mocked_tarfile)
async def test_loading_platforms_when_running_async_pre_backup_actions(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test loading backup platforms when running post backup actions."""
manager = BackupManager(hass)
assert not manager.loaded_platforms
assert not manager.platforms
await _setup_mock_domain(
hass,
Mock(
async_pre_backup=AsyncMock(),
async_post_backup=AsyncMock(),
),
)
await manager.async_pre_backup_actions()
assert manager.loaded_platforms
assert len(manager.platforms) == 1
assert "Loaded 1 platforms" in caplog.text
async def test_loading_platforms_when_running_async_post_backup_actions(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test loading backup platforms when running post backup actions."""
manager = BackupManager(hass)
assert not manager.loaded_platforms
assert not manager.platforms
await _setup_mock_domain(
hass,
Mock(
async_pre_backup=AsyncMock(),
async_post_backup=AsyncMock(),
),
)
await manager.async_post_backup_actions()
assert manager.loaded_platforms
assert len(manager.platforms) == 1
assert "Loaded 1 platforms" in caplog.text
await _mock_backup_generation(hass, manager, mocked_json_bytes, mocked_tarfile)
async def test_async_receive_backup(
@ -354,6 +580,7 @@ async def test_async_receive_backup(
assert mover_mock.mock_calls[0].args[1].name == "abc123.tar"
@pytest.mark.xfail(reason="Restore not implemented in the draft")
async def test_async_trigger_restore(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
@ -369,10 +596,37 @@ async def test_async_trigger_restore(
patch("homeassistant.core.ServiceRegistry.async_call") as mocked_service_call,
):
await manager.async_restore_backup(TEST_BACKUP.slug)
assert mocked_write_text.call_args[0][0] == '{"path": "abc123.tar"}'
assert (
mocked_write_text.call_args[0][0]
== '{"path": "abc123.tar", "password": null}'
)
assert mocked_service_call.called
@pytest.mark.xfail(reason="Restore not implemented in the draft")
async def test_async_trigger_restore_with_password(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test trigger restore."""
manager = BackupManager(hass)
manager.loaded_backups = True
manager.backups = {TEST_BACKUP.slug: TEST_BACKUP}
with (
patch("pathlib.Path.exists", return_value=True),
patch("pathlib.Path.write_text") as mocked_write_text,
patch("homeassistant.core.ServiceRegistry.async_call") as mocked_service_call,
):
await manager.async_restore_backup(slug=TEST_BACKUP.slug, password="abc123")
assert (
mocked_write_text.call_args[0][0]
== '{"path": "abc123.tar", "password": "abc123"}'
)
assert mocked_service_call.called
@pytest.mark.xfail(reason="Restore not implemented in the draft")
async def test_async_trigger_restore_missing_backup(hass: HomeAssistant) -> None:
"""Test trigger restore."""
manager = BackupManager(hass)

View file

@ -1,16 +1,20 @@
"""Tests for the Backup integration."""
from unittest.mock import patch
from pathlib import Path
from typing import Any
from unittest.mock import ANY, patch
from freezegun.api import FrozenDateTimeFactory
import pytest
from syrupy import SnapshotAssertion
from homeassistant.components.backup.manager import Backup
from homeassistant.components.backup import BaseBackup
from homeassistant.components.backup.const import DATA_MANAGER
from homeassistant.components.backup.manager import NewBackup
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from .common import TEST_BACKUP, setup_backup_integration
from .common import TEST_BACKUP, BackupAgentTest, setup_backup_integration
from tests.typing import WebSocketGenerator
@ -41,23 +45,19 @@ async def test_info(
with_hassio: bool,
) -> None:
"""Test getting backup info."""
await setup_backup_integration(hass, with_hassio=with_hassio)
await setup_backup_integration(hass, with_hassio=with_hassio, backups=[TEST_BACKUP])
client = await hass_ws_client(hass)
await hass.async_block_till_done()
with patch(
"homeassistant.components.backup.manager.BackupManager.async_get_backups",
return_value={TEST_BACKUP.slug: TEST_BACKUP},
):
await client.send_json_auto_id({"type": "backup/info"})
assert snapshot == await client.receive_json()
await client.send_json_auto_id({"type": "backup/info"})
assert await client.receive_json() == snapshot
@pytest.mark.parametrize(
"backup_content",
[
pytest.param(TEST_BACKUP, id="with_backup_content"),
pytest.param([TEST_BACKUP], id="with_backup_content"),
pytest.param(None, id="without_backup_content"),
],
)
@ -73,20 +73,18 @@ async def test_details(
hass_ws_client: WebSocketGenerator,
snapshot: SnapshotAssertion,
with_hassio: bool,
backup_content: Backup | None,
backup_content: BaseBackup | None,
) -> None:
"""Test getting backup info."""
await setup_backup_integration(hass, with_hassio=with_hassio)
await setup_backup_integration(
hass, with_hassio=with_hassio, backups=backup_content
)
client = await hass_ws_client(hass)
await hass.async_block_till_done()
with patch(
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
return_value=backup_content,
):
await client.send_json_auto_id({"type": "backup/details", "slug": "abc123"})
assert await client.receive_json() == snapshot
await client.send_json_auto_id({"type": "backup/details", "slug": "abc123"})
assert await client.receive_json() == snapshot
@pytest.mark.parametrize(
@ -112,9 +110,17 @@ async def test_remove(
"homeassistant.components.backup.manager.BackupManager.async_remove_backup",
):
await client.send_json_auto_id({"type": "backup/remove", "slug": "abc123"})
assert snapshot == await client.receive_json()
assert await client.receive_json() == snapshot
@pytest.mark.parametrize(
"data",
[
None,
{},
{"password": "abc123"},
],
)
@pytest.mark.parametrize(
("with_hassio", "number_of_messages"),
[
@ -126,6 +132,7 @@ async def test_remove(
async def test_generate(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
data: dict[str, Any] | None,
freezer: FrozenDateTimeFactory,
snapshot: SnapshotAssertion,
with_hassio: bool,
@ -138,9 +145,68 @@ async def test_generate(
freezer.move_to("2024-11-13 12:01:00+01:00")
await hass.async_block_till_done()
await client.send_json_auto_id({"type": "backup/generate"})
await client.send_json_auto_id(
{"type": "backup/generate", **{"agent_ids": ["backup.local"]} | (data or {})}
)
for _ in range(number_of_messages):
assert snapshot == await client.receive_json()
assert await client.receive_json() == snapshot
@pytest.mark.usefixtures("mock_backup_generation")
@pytest.mark.parametrize(
("params", "expected_extra_call_params"),
[
({"agent_ids": ["backup.local"]}, {"agent_ids": ["backup.local"]}),
(
{
"addons_included": ["ssl"],
"agent_ids": ["backup.local"],
"database_included": False,
"folders_included": ["media"],
"name": "abc123",
},
{
"addons_included": ["ssl"],
"agent_ids": ["backup.local"],
"database_included": False,
"folders_included": ["media"],
"name": "abc123",
},
),
],
)
async def test_generate_without_hassio(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
freezer: FrozenDateTimeFactory,
snapshot: SnapshotAssertion,
params: dict,
expected_extra_call_params: tuple,
) -> None:
"""Test generating a backup."""
await setup_backup_integration(hass, with_hassio=False)
client = await hass_ws_client(hass)
freezer.move_to("2024-11-13 12:01:00+01:00")
await hass.async_block_till_done()
with patch(
"homeassistant.components.backup.manager.BackupManager.async_create_backup",
return_value=NewBackup("abc123"),
) as generate_backup:
await client.send_json_auto_id({"type": "backup/generate"} | params)
assert await client.receive_json() == snapshot
generate_backup.assert_called_once_with(
**{
"addons_included": None,
"database_included": True,
"folders_included": None,
"name": None,
"on_progress": ANY,
"password": None,
}
| expected_extra_call_params
)
@pytest.mark.parametrize(
@ -199,7 +265,7 @@ async def test_backup_end(
"homeassistant.components.backup.manager.BackupManager.async_post_backup_actions",
):
await client.send_json_auto_id({"type": "backup/end"})
assert snapshot == await client.receive_json()
assert await client.receive_json() == snapshot
@pytest.mark.parametrize(
@ -232,7 +298,47 @@ async def test_backup_start(
"homeassistant.components.backup.manager.BackupManager.async_pre_backup_actions",
):
await client.send_json_auto_id({"type": "backup/start"})
assert snapshot == await client.receive_json()
assert await client.receive_json() == snapshot
@pytest.mark.parametrize(
"access_token_fixture_name",
["hass_access_token", "hass_supervisor_access_token"],
)
@pytest.mark.parametrize(
("with_hassio"),
[
pytest.param(True, id="with_hassio"),
pytest.param(False, id="without_hassio"),
],
)
async def test_backup_upload(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
snapshot: SnapshotAssertion,
sync_access_token_proxy: str,
*,
access_token_fixture_name: str,
with_hassio: bool,
) -> None:
"""Test backup upload from a WS command."""
await setup_backup_integration(hass, with_hassio=with_hassio)
client = await hass_ws_client(hass, sync_access_token_proxy)
await hass.async_block_till_done()
with patch(
"homeassistant.components.backup.manager.BackupManager.async_upload_backup",
):
await client.send_json_auto_id(
{
"type": "backup/upload",
"data": {
"slug": "abc123",
},
}
)
assert await client.receive_json() == snapshot
@pytest.mark.parametrize(
@ -243,7 +349,7 @@ async def test_backup_start(
Exception("Boom"),
],
)
async def test_backup_end_excepion(
async def test_backup_end_exception(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
snapshot: SnapshotAssertion,
@ -261,7 +367,7 @@ async def test_backup_end_excepion(
side_effect=exception,
):
await client.send_json_auto_id({"type": "backup/end"})
assert snapshot == await client.receive_json()
assert await client.receive_json() == snapshot
@pytest.mark.parametrize(
@ -272,7 +378,43 @@ async def test_backup_end_excepion(
Exception("Boom"),
],
)
async def test_backup_start_excepion(
async def test_backup_upload_exception(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
snapshot: SnapshotAssertion,
hass_supervisor_access_token: str,
exception: Exception,
) -> None:
"""Test exception handling while running backup upload from a WS command."""
await setup_backup_integration(hass, with_hassio=True)
client = await hass_ws_client(hass, hass_supervisor_access_token)
await hass.async_block_till_done()
with patch(
"homeassistant.components.backup.manager.BackupManager.async_upload_backup",
side_effect=exception,
):
await client.send_json_auto_id(
{
"type": "backup/upload",
"data": {
"slug": "abc123",
},
}
)
assert await client.receive_json() == snapshot
@pytest.mark.parametrize(
"exception",
[
TimeoutError(),
HomeAssistantError("Boom"),
Exception("Boom"),
],
)
async def test_backup_start_exception(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
snapshot: SnapshotAssertion,
@ -290,4 +432,135 @@ async def test_backup_start_excepion(
side_effect=exception,
):
await client.send_json_auto_id({"type": "backup/start"})
assert snapshot == await client.receive_json()
assert await client.receive_json() == snapshot
@pytest.mark.parametrize(
"with_hassio",
[
pytest.param(True, id="with_hassio"),
pytest.param(False, id="without_hassio"),
],
)
async def test_agents_info(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
snapshot: SnapshotAssertion,
with_hassio: bool,
) -> None:
"""Test getting backup agents info."""
await setup_backup_integration(hass, with_hassio=with_hassio)
hass.data[DATA_MANAGER].backup_agents = {"domain.test": BackupAgentTest("test")}
client = await hass_ws_client(hass)
await hass.async_block_till_done()
await client.send_json_auto_id({"type": "backup/agents/info"})
assert await client.receive_json() == snapshot
@pytest.mark.parametrize(
"with_hassio",
[
pytest.param(True, id="with_hassio"),
pytest.param(False, id="without_hassio"),
],
)
async def test_agents_list_backups(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
snapshot: SnapshotAssertion,
with_hassio: bool,
) -> None:
"""Test backup agents list backups details."""
await setup_backup_integration(hass, with_hassio=with_hassio)
hass.data[DATA_MANAGER].backup_agents = {"domain.test": BackupAgentTest("test")}
client = await hass_ws_client(hass)
await hass.async_block_till_done()
await client.send_json_auto_id({"type": "backup/agents/list_backups"})
assert await client.receive_json() == snapshot
@pytest.mark.parametrize(
"with_hassio",
[
pytest.param(True, id="with_hassio"),
pytest.param(False, id="without_hassio"),
],
)
async def test_agents_download(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
snapshot: SnapshotAssertion,
with_hassio: bool,
) -> None:
"""Test WS command to start downloading a backup."""
await setup_backup_integration(hass, with_hassio=with_hassio)
hass.data[DATA_MANAGER].backup_agents = {"domain.test": BackupAgentTest("test")}
client = await hass_ws_client(hass)
await hass.async_block_till_done()
await client.send_json_auto_id(
{
"type": "backup/agents/download",
"slug": "abc123",
"agent_id": "domain.test",
"backup_id": "abc123",
}
)
with patch.object(BackupAgentTest, "async_download_backup") as download_mock:
assert await client.receive_json() == snapshot
assert download_mock.call_args[1] == {
"id": "abc123",
"path": Path(hass.config.path("backups"), "abc123.tar"),
}
async def test_agents_download_exception(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
snapshot: SnapshotAssertion,
) -> None:
"""Test WS command to start downloading a backup throwing an exception."""
await setup_backup_integration(hass)
hass.data[DATA_MANAGER].backup_agents = {"domain.test": BackupAgentTest("test")}
client = await hass_ws_client(hass)
await hass.async_block_till_done()
await client.send_json_auto_id(
{
"type": "backup/agents/download",
"slug": "abc123",
"agent_id": "domain.test",
"backup_id": "abc123",
}
)
with patch.object(BackupAgentTest, "async_download_backup") as download_mock:
download_mock.side_effect = Exception("Boom")
assert await client.receive_json() == snapshot
async def test_agents_download_unknown_agent(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
snapshot: SnapshotAssertion,
) -> None:
"""Test downloading a backup with an unknown agent."""
await setup_backup_integration(hass)
client = await hass_ws_client(hass)
await hass.async_block_till_done()
await client.send_json_auto_id(
{
"type": "backup/agents/download",
"slug": "abc123",
"agent_id": "domain.test",
"backup_id": "abc123",
}
)
assert await client.receive_json() == snapshot

View file

@ -19,7 +19,23 @@ from .common import get_test_config_dir
(
None,
'{"path": "test"}',
backup_restore.RestoreBackupFileContent(backup_file_path=Path("test")),
backup_restore.RestoreBackupFileContent(
backup_file_path=Path("test"), password=None
),
),
(
None,
'{"path": "test", "password": "psw"}',
backup_restore.RestoreBackupFileContent(
backup_file_path=Path("test"), password="psw"
),
),
(
None,
'{"path": "test", "password": null}',
backup_restore.RestoreBackupFileContent(
backup_file_path=Path("test"), password=None
),
),
],
)
@ -155,15 +171,17 @@ def test_removal_of_current_configuration_when_restoring() -> None:
return_value=[x["path"] for x in mock_config_dir],
),
mock.patch("pathlib.Path.unlink") as unlink_mock,
mock.patch("shutil.rmtree") as rmtreemock,
mock.patch("shutil.copytree") as copytree_mock,
mock.patch("shutil.rmtree") as rmtree_mock,
):
assert backup_restore.restore_backup(config_dir) is True
assert unlink_mock.call_count == 2
assert copytree_mock.call_count == 1
assert (
rmtreemock.call_count == 1
rmtree_mock.call_count == 1
) # We have 2 directories in the config directory, but backups is kept
removed_directories = {Path(call.args[0]) for call in rmtreemock.mock_calls}
removed_directories = {Path(call.args[0]) for call in rmtree_mock.mock_calls}
assert removed_directories == {Path(config_dir, "www")}
@ -177,8 +195,8 @@ def test_extracting_the_contents_of_a_backup_file() -> None:
getmembers_mock = mock.MagicMock(
return_value=[
tarfile.TarInfo(name="../data/test"),
tarfile.TarInfo(name="data"),
tarfile.TarInfo(name="data/../test"),
tarfile.TarInfo(name="data/.HA_VERSION"),
tarfile.TarInfo(name="data/.storage"),
tarfile.TarInfo(name="data/www"),
@ -190,7 +208,7 @@ def test_extracting_the_contents_of_a_backup_file() -> None:
mock.patch(
"homeassistant.backup_restore.restore_backup_file_content",
return_value=backup_restore.RestoreBackupFileContent(
backup_file_path=backup_file_path
backup_file_path=backup_file_path,
),
),
mock.patch(
@ -205,11 +223,37 @@ def test_extracting_the_contents_of_a_backup_file() -> None:
mock.patch("pathlib.Path.read_text", _patched_path_read_text),
mock.patch("pathlib.Path.is_file", return_value=False),
mock.patch("pathlib.Path.iterdir", return_value=[]),
mock.patch("shutil.copytree"),
):
assert backup_restore.restore_backup(config_dir) is True
assert getmembers_mock.call_count == 1
assert extractall_mock.call_count == 2
assert {
member.name for member in extractall_mock.mock_calls[-1].kwargs["members"]
} == {".HA_VERSION", ".storage", "www"}
} == {"data", "data/.HA_VERSION", "data/.storage", "data/www"}
@pytest.mark.parametrize(
("password", "expected"),
[
("test", b"\xf0\x9b\xb9\x1f\xdc,\xff\xd5x\xd6\xd6\x8fz\x19.\x0f"),
("lorem ipsum...", b"#\xe0\xfc\xe0\xdb?_\x1f,$\rQ\xf4\xf5\xd8\xfb"),
],
)
def test_pw_to_key(password: str | None, expected: bytes | None) -> None:
"""Test password to key conversion."""
assert backup_restore.password_to_key(password) == expected
@pytest.mark.parametrize(
("password", "expected"),
[
(None, None),
("test", b"\xf0\x9b\xb9\x1f\xdc,\xff\xd5x\xd6\xd6\x8fz\x19.\x0f"),
("lorem ipsum...", b"#\xe0\xfc\xe0\xdb?_\x1f,$\rQ\xf4\xf5\xd8\xfb"),
],
)
def test_pw_to_key_none(password: str | None, expected: bytes | None) -> None:
"""Test password to key conversion."""
with pytest.raises(AttributeError):
backup_restore.password_to_key(None)