Compare commits
9 commits
dev
...
alltheback
Author | SHA1 | Date | |
---|---|---|---|
|
8fc470b943 | ||
|
e8179f7a73 | ||
|
e9247fb94b | ||
|
5a69488630 | ||
|
d1185f8754 | ||
|
0599983a37 | ||
|
f99b319048 | ||
|
957ece747d | ||
|
325738829d |
19 changed files with 1688 additions and 114 deletions
|
@ -1,6 +1,7 @@
|
|||
"""Home Assistant module to handle restoring backups."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
@ -24,6 +25,18 @@ class RestoreBackupFileContent:
|
|||
"""Definition for restore backup file content."""
|
||||
|
||||
backup_file_path: Path
|
||||
password: str | None = None
|
||||
|
||||
|
||||
def password_to_key(password: str) -> bytes:
|
||||
"""Generate a AES Key from password.
|
||||
|
||||
Matches the implementation in supervisor.backups.utils.password_to_key.
|
||||
"""
|
||||
key: bytes = password.encode()
|
||||
for _ in range(100):
|
||||
key = hashlib.sha256(key).digest()
|
||||
return key[:16]
|
||||
|
||||
|
||||
def restore_backup_file_content(config_dir: Path) -> RestoreBackupFileContent | None:
|
||||
|
@ -32,7 +45,8 @@ def restore_backup_file_content(config_dir: Path) -> RestoreBackupFileContent |
|
|||
try:
|
||||
instruction_content = json.loads(instruction_path.read_text(encoding="utf-8"))
|
||||
return RestoreBackupFileContent(
|
||||
backup_file_path=Path(instruction_content["path"])
|
||||
backup_file_path=Path(instruction_content["path"]),
|
||||
password=instruction_content.get("password"),
|
||||
)
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
return None
|
||||
|
@ -54,7 +68,11 @@ def _clear_configuration_directory(config_dir: Path) -> None:
|
|||
shutil.rmtree(entrypath)
|
||||
|
||||
|
||||
def _extract_backup(config_dir: Path, backup_file_path: Path) -> None:
|
||||
def _extract_backup(
|
||||
config_dir: Path,
|
||||
backup_file_path: Path,
|
||||
password: str | None = None,
|
||||
) -> None:
|
||||
"""Extract the backup file to the config directory."""
|
||||
with (
|
||||
TemporaryDirectory() as tempdir,
|
||||
|
@ -88,22 +106,28 @@ def _extract_backup(config_dir: Path, backup_file_path: Path) -> None:
|
|||
f"homeassistant.tar{'.gz' if backup_meta["compressed"] else ''}",
|
||||
),
|
||||
gzip=backup_meta["compressed"],
|
||||
key=password_to_key(password) if password is not None else None,
|
||||
mode="r",
|
||||
) as istf:
|
||||
for member in istf.getmembers():
|
||||
if member.name == "data":
|
||||
continue
|
||||
member.name = member.name.replace("data/", "")
|
||||
_clear_configuration_directory(config_dir)
|
||||
istf.extractall(
|
||||
path=config_dir,
|
||||
members=[
|
||||
member
|
||||
for member in securetar.secure_path(istf)
|
||||
if member.name != "data"
|
||||
],
|
||||
path=Path(
|
||||
tempdir,
|
||||
"homeassistant",
|
||||
),
|
||||
members=securetar.secure_path(istf),
|
||||
filter="fully_trusted",
|
||||
)
|
||||
_clear_configuration_directory(config_dir)
|
||||
shutil.copytree(
|
||||
Path(
|
||||
tempdir,
|
||||
"homeassistant",
|
||||
"data",
|
||||
),
|
||||
config_dir,
|
||||
dirs_exist_ok=True,
|
||||
ignore=shutil.ignore_patterns(*(KEEP_PATHS)),
|
||||
)
|
||||
|
||||
|
||||
def restore_backup(config_dir_path: str) -> bool:
|
||||
|
@ -119,7 +143,11 @@ def restore_backup(config_dir_path: str) -> bool:
|
|||
backup_file_path = restore_content.backup_file_path
|
||||
_LOGGER.info("Restoring %s", backup_file_path)
|
||||
try:
|
||||
_extract_backup(config_dir, backup_file_path)
|
||||
_extract_backup(
|
||||
config_dir=config_dir,
|
||||
backup_file_path=backup_file_path,
|
||||
password=restore_content.password,
|
||||
)
|
||||
except FileNotFoundError as err:
|
||||
raise ValueError(f"Backup file {backup_file_path} does not exist") from err
|
||||
_LOGGER.info("Restore complete, restarting")
|
||||
|
|
|
@ -1,22 +1,38 @@
|
|||
"""The Backup integration."""
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.const import CONF_PASSWORD
|
||||
from homeassistant.core import HomeAssistant, ServiceCall
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
from homeassistant.helpers.hassio import is_hassio
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
|
||||
from .const import DATA_MANAGER, DOMAIN, LOGGER
|
||||
from .agent import BackupAgent, BackupAgentPlatformProtocol, UploadedBackup
|
||||
from .const import DOMAIN, LOGGER
|
||||
from .http import async_register_http_views
|
||||
from .manager import BackupManager
|
||||
from .manager import Backup, BackupManager, BackupPlatformProtocol
|
||||
from .models import BackupUploadMetadata, BaseBackup
|
||||
from .websocket import async_register_websocket_handlers
|
||||
|
||||
__all__ = [
|
||||
"Backup",
|
||||
"BackupAgent",
|
||||
"BackupAgentPlatformProtocol",
|
||||
"BackupPlatformProtocol",
|
||||
"BackupUploadMetadata",
|
||||
"BaseBackup",
|
||||
"UploadedBackup",
|
||||
]
|
||||
|
||||
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
|
||||
|
||||
SERVICE_CREATE_SCHEMA = vol.Schema({vol.Optional(CONF_PASSWORD): str})
|
||||
|
||||
|
||||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Set up the Backup integration."""
|
||||
backup_manager = BackupManager(hass)
|
||||
hass.data[DATA_MANAGER] = backup_manager
|
||||
hass.data[DOMAIN] = backup_manager = BackupManager(hass)
|
||||
|
||||
with_hassio = is_hassio(hass)
|
||||
|
||||
|
@ -32,11 +48,23 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|||
|
||||
async def async_handle_create_service(call: ServiceCall) -> None:
|
||||
"""Service handler for creating backups."""
|
||||
await backup_manager.async_create_backup(on_progress=None)
|
||||
await backup_manager.async_create_backup(
|
||||
addons_included=None,
|
||||
database_included=True,
|
||||
folders_included=None,
|
||||
name=None,
|
||||
on_progress=None,
|
||||
password=call.data.get(CONF_PASSWORD),
|
||||
)
|
||||
if backup_task := backup_manager.backup_task:
|
||||
await backup_task
|
||||
|
||||
hass.services.async_register(DOMAIN, "create", async_handle_create_service)
|
||||
hass.services.async_register(
|
||||
DOMAIN,
|
||||
"create",
|
||||
async_handle_create_service,
|
||||
schema=SERVICE_CREATE_SCHEMA,
|
||||
)
|
||||
|
||||
async_register_http_views(hass)
|
||||
|
||||
|
|
68
homeassistant/components/backup/agent.py
Normal file
68
homeassistant/components/backup/agent.py
Normal file
|
@ -0,0 +1,68 @@
|
|||
"""Backup agents for the Backup integration."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Protocol
|
||||
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
from .models import BackupUploadMetadata, BaseBackup
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class UploadedBackup(BaseBackup):
|
||||
"""Uploaded backup class."""
|
||||
|
||||
id: str
|
||||
|
||||
|
||||
class BackupAgent(abc.ABC):
|
||||
"""Backup agent interface."""
|
||||
|
||||
name: str
|
||||
|
||||
@abc.abstractmethod
|
||||
async def async_download_backup(
|
||||
self,
|
||||
*,
|
||||
id: str,
|
||||
path: Path,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Download a backup file.
|
||||
|
||||
:param id: The ID of the backup that was returned in async_list_backups.
|
||||
:param path: The full file path to download the backup to.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
async def async_upload_backup(
|
||||
self,
|
||||
*,
|
||||
path: Path,
|
||||
metadata: BackupUploadMetadata,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Upload a backup.
|
||||
|
||||
:param path: The full file path to the backup that should be uploaded.
|
||||
:param metadata: Metadata about the backup that should be uploaded.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
async def async_list_backups(self, **kwargs: Any) -> list[UploadedBackup]:
|
||||
"""List backups."""
|
||||
|
||||
|
||||
class BackupAgentPlatformProtocol(Protocol):
|
||||
"""Define the format of backup platforms which implement backup agents."""
|
||||
|
||||
async def async_get_backup_agents(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
**kwargs: Any,
|
||||
) -> list[BackupAgent]:
|
||||
"""Return a list of backup agents."""
|
|
@ -8,10 +8,11 @@ from typing import TYPE_CHECKING
|
|||
from homeassistant.util.hass_dict import HassKey
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .manager import BackupManager
|
||||
from .manager import BaseBackupManager
|
||||
from .models import BaseBackup
|
||||
|
||||
DOMAIN = "backup"
|
||||
DATA_MANAGER: HassKey[BackupManager] = HassKey(DOMAIN)
|
||||
DATA_MANAGER: HassKey[BaseBackupManager[BaseBackup]] = HassKey(DOMAIN)
|
||||
LOGGER = getLogger(__package__)
|
||||
|
||||
EXCLUDE_FROM_BACKUP = [
|
||||
|
@ -25,3 +26,8 @@ EXCLUDE_FROM_BACKUP = [
|
|||
"OZW_Log.txt",
|
||||
"tts/*",
|
||||
]
|
||||
|
||||
EXCLUDE_DATABASE_FROM_BACKUP = [
|
||||
"home-assistant_v2.db",
|
||||
"home-assistant_v2.db-wal",
|
||||
]
|
||||
|
|
|
@ -15,6 +15,7 @@ from homeassistant.core import HomeAssistant, callback
|
|||
from homeassistant.util import slugify
|
||||
|
||||
from .const import DATA_MANAGER
|
||||
from .manager import BackupManager
|
||||
|
||||
|
||||
@callback
|
||||
|
@ -39,7 +40,7 @@ class DownloadBackupView(HomeAssistantView):
|
|||
if not request["hass_user"].is_admin:
|
||||
return Response(status=HTTPStatus.UNAUTHORIZED)
|
||||
|
||||
manager = request.app[KEY_HASS].data[DATA_MANAGER]
|
||||
manager = cast(BackupManager, request.app[KEY_HASS].data[DATA_MANAGER])
|
||||
backup = await manager.async_get_backup(slug=slug)
|
||||
|
||||
if backup is None or not backup.path.exists():
|
||||
|
|
|
@ -16,12 +16,13 @@ import tarfile
|
|||
from tarfile import TarError
|
||||
from tempfile import TemporaryDirectory
|
||||
import time
|
||||
from typing import Any, Protocol, cast
|
||||
from typing import Any, Generic, Protocol, cast
|
||||
|
||||
import aiohttp
|
||||
from securetar import SecureTarFile, atomic_contents_add
|
||||
from typing_extensions import TypeVar
|
||||
|
||||
from homeassistant.backup_restore import RESTORE_BACKUP_FILE
|
||||
from homeassistant.backup_restore import RESTORE_BACKUP_FILE, password_to_key
|
||||
from homeassistant.const import __version__ as HAVERSION
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
|
@ -30,10 +31,14 @@ from homeassistant.helpers.json import json_bytes
|
|||
from homeassistant.util import dt as dt_util
|
||||
from homeassistant.util.json import json_loads_object
|
||||
|
||||
from .const import DOMAIN, EXCLUDE_FROM_BACKUP, LOGGER
|
||||
from .agent import BackupAgent, BackupAgentPlatformProtocol
|
||||
from .const import DOMAIN, EXCLUDE_DATABASE_FROM_BACKUP, EXCLUDE_FROM_BACKUP, LOGGER
|
||||
from .models import BackupUploadMetadata, BaseBackup
|
||||
|
||||
BUF_SIZE = 2**20 * 4 # 4MB
|
||||
|
||||
_BackupT = TypeVar("_BackupT", bound=BaseBackup, default=BaseBackup)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class NewBackup:
|
||||
|
@ -43,14 +48,10 @@ class NewBackup:
|
|||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class Backup:
|
||||
class Backup(BaseBackup):
|
||||
"""Backup class."""
|
||||
|
||||
slug: str
|
||||
name: str
|
||||
date: str
|
||||
path: Path
|
||||
size: float
|
||||
|
||||
def as_dict(self) -> dict:
|
||||
"""Return a dict representation of this backup."""
|
||||
|
@ -76,19 +77,21 @@ class BackupPlatformProtocol(Protocol):
|
|||
"""Perform operations after a backup finishes."""
|
||||
|
||||
|
||||
class BaseBackupManager(abc.ABC):
|
||||
class BaseBackupManager(abc.ABC, Generic[_BackupT]):
|
||||
"""Define the format that backup managers can have."""
|
||||
|
||||
def __init__(self, hass: HomeAssistant) -> None:
|
||||
"""Initialize the backup manager."""
|
||||
self.hass = hass
|
||||
self.backup_task: asyncio.Task | None = None
|
||||
self.backups: dict[str, Backup] = {}
|
||||
self.backups: dict[str, _BackupT] = {}
|
||||
self.loaded_platforms = False
|
||||
self.platforms: dict[str, BackupPlatformProtocol] = {}
|
||||
self.backup_agents: dict[str, BackupAgent] = {}
|
||||
self.syncing = False
|
||||
|
||||
@callback
|
||||
def _add_platform(
|
||||
def _add_platform_pre_post_handlers(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
integration_domain: str,
|
||||
|
@ -98,13 +101,25 @@ class BaseBackupManager(abc.ABC):
|
|||
if not hasattr(platform, "async_pre_backup") or not hasattr(
|
||||
platform, "async_post_backup"
|
||||
):
|
||||
LOGGER.warning(
|
||||
"%s does not implement required functions for the backup platform",
|
||||
integration_domain,
|
||||
)
|
||||
return
|
||||
|
||||
self.platforms[integration_domain] = platform
|
||||
|
||||
async def _async_add_platform_agents(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
integration_domain: str,
|
||||
platform: BackupAgentPlatformProtocol,
|
||||
) -> None:
|
||||
"""Add a platform to the backup manager."""
|
||||
if not hasattr(platform, "async_get_backup_agents"):
|
||||
return
|
||||
|
||||
agents = await platform.async_get_backup_agents(hass=hass)
|
||||
self.backup_agents.update(
|
||||
{f"{integration_domain}.{agent.name}": agent for agent in agents}
|
||||
)
|
||||
|
||||
async def async_pre_backup_actions(self, **kwargs: Any) -> None:
|
||||
"""Perform pre backup actions."""
|
||||
if not self.loaded_platforms:
|
||||
|
@ -139,34 +154,61 @@ class BaseBackupManager(abc.ABC):
|
|||
|
||||
async def load_platforms(self) -> None:
|
||||
"""Load backup platforms."""
|
||||
if self.loaded_platforms:
|
||||
return
|
||||
await integration_platform.async_process_integration_platforms(
|
||||
self.hass, DOMAIN, self._add_platform, wait_for_platforms=True
|
||||
self.hass,
|
||||
DOMAIN,
|
||||
self._add_platform_pre_post_handlers,
|
||||
wait_for_platforms=True,
|
||||
)
|
||||
await integration_platform.async_process_integration_platforms(
|
||||
self.hass,
|
||||
DOMAIN,
|
||||
self._async_add_platform_agents,
|
||||
wait_for_platforms=True,
|
||||
)
|
||||
LOGGER.debug("Loaded %s platforms", len(self.platforms))
|
||||
LOGGER.debug("Loaded %s agents", len(self.backup_agents))
|
||||
self.loaded_platforms = True
|
||||
|
||||
@abc.abstractmethod
|
||||
async def async_restore_backup(self, slug: str, **kwargs: Any) -> None:
|
||||
async def async_restore_backup(
|
||||
self,
|
||||
slug: str,
|
||||
*,
|
||||
password: str | None = None,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Restore a backup."""
|
||||
|
||||
@abc.abstractmethod
|
||||
async def async_create_backup(
|
||||
self,
|
||||
*,
|
||||
addons_included: list[str] | None,
|
||||
database_included: bool,
|
||||
folders_included: list[str] | None,
|
||||
name: str | None,
|
||||
on_progress: Callable[[BackupProgress], None] | None,
|
||||
password: str | None,
|
||||
**kwargs: Any,
|
||||
) -> NewBackup:
|
||||
"""Generate a backup."""
|
||||
"""Initiate generating a backup.
|
||||
|
||||
:param on_progress: A callback that will be called with the progress of the
|
||||
backup.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
async def async_get_backups(self, **kwargs: Any) -> dict[str, Backup]:
|
||||
async def async_get_backups(self, **kwargs: Any) -> dict[str, _BackupT]:
|
||||
"""Get backups.
|
||||
|
||||
Return a dictionary of Backup instances keyed by their slug.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
async def async_get_backup(self, *, slug: str, **kwargs: Any) -> Backup | None:
|
||||
async def async_get_backup(self, *, slug: str, **kwargs: Any) -> _BackupT | None:
|
||||
"""Get a backup."""
|
||||
|
||||
@abc.abstractmethod
|
||||
|
@ -182,8 +224,12 @@ class BaseBackupManager(abc.ABC):
|
|||
) -> None:
|
||||
"""Receive and store a backup file from upload."""
|
||||
|
||||
@abc.abstractmethod
|
||||
async def async_upload_backup(self, *, slug: str, **kwargs: Any) -> None:
|
||||
"""Upload a backup."""
|
||||
|
||||
class BackupManager(BaseBackupManager):
|
||||
|
||||
class BackupManager(BaseBackupManager[Backup]):
|
||||
"""Backup manager for the Backup integration."""
|
||||
|
||||
def __init__(self, hass: HomeAssistant) -> None:
|
||||
|
@ -192,10 +238,43 @@ class BackupManager(BaseBackupManager):
|
|||
self.backup_dir = Path(hass.config.path("backups"))
|
||||
self.loaded_backups = False
|
||||
|
||||
async def async_upload_backup(self, *, slug: str, **kwargs: Any) -> None:
|
||||
"""Upload a backup."""
|
||||
await self.load_platforms()
|
||||
|
||||
if not self.backup_agents:
|
||||
return
|
||||
|
||||
if not (backup := await self.async_get_backup(slug=slug)):
|
||||
return
|
||||
|
||||
self.syncing = True
|
||||
sync_backup_results = await asyncio.gather(
|
||||
*(
|
||||
agent.async_upload_backup(
|
||||
path=backup.path,
|
||||
metadata=BackupUploadMetadata(
|
||||
homeassistant=HAVERSION,
|
||||
size=backup.size,
|
||||
date=backup.date,
|
||||
slug=backup.slug,
|
||||
name=backup.name,
|
||||
protected=backup.protected,
|
||||
),
|
||||
)
|
||||
for agent in self.backup_agents.values()
|
||||
),
|
||||
return_exceptions=True,
|
||||
)
|
||||
for result in sync_backup_results:
|
||||
if isinstance(result, Exception):
|
||||
LOGGER.error("Error during backup upload - %s", result)
|
||||
self.syncing = False
|
||||
|
||||
async def load_backups(self) -> None:
|
||||
"""Load data of stored backup files."""
|
||||
backups = await self.hass.async_add_executor_job(self._read_backups)
|
||||
LOGGER.debug("Loaded %s backups", len(backups))
|
||||
LOGGER.debug("Loaded %s local backups", len(backups))
|
||||
self.backups = backups
|
||||
self.loaded_backups = True
|
||||
|
||||
|
@ -213,6 +292,7 @@ class BackupManager(BaseBackupManager):
|
|||
date=cast(str, data["date"]),
|
||||
path=backup_path,
|
||||
size=round(backup_path.stat().st_size / 1_048_576, 2),
|
||||
protected=cast(bool, data.get("protected", False)),
|
||||
)
|
||||
backups[backup.slug] = backup
|
||||
except (OSError, TarError, json.JSONDecodeError, KeyError) as err:
|
||||
|
@ -317,17 +397,31 @@ class BackupManager(BaseBackupManager):
|
|||
async def async_create_backup(
|
||||
self,
|
||||
*,
|
||||
addons_included: list[str] | None,
|
||||
database_included: bool,
|
||||
folders_included: list[str] | None,
|
||||
name: str | None,
|
||||
on_progress: Callable[[BackupProgress], None] | None,
|
||||
password: str | None,
|
||||
**kwargs: Any,
|
||||
) -> NewBackup:
|
||||
"""Generate a backup."""
|
||||
"""Initiate generating a backup."""
|
||||
if self.backup_task:
|
||||
raise HomeAssistantError("Backup already in progress")
|
||||
backup_name = f"Core {HAVERSION}"
|
||||
backup_name = name or f"Core {HAVERSION}"
|
||||
date_str = dt_util.now().isoformat()
|
||||
slug = _generate_slug(date_str, backup_name)
|
||||
self.backup_task = self.hass.async_create_task(
|
||||
self._async_create_backup(backup_name, date_str, slug, on_progress),
|
||||
self._async_create_backup(
|
||||
addons_included=addons_included,
|
||||
backup_name=backup_name,
|
||||
database_included=database_included,
|
||||
date_str=date_str,
|
||||
folders_included=folders_included,
|
||||
on_progress=on_progress,
|
||||
password=password,
|
||||
slug=slug,
|
||||
),
|
||||
name="backup_manager_create_backup",
|
||||
eager_start=False, # To ensure the task is not started before we return
|
||||
)
|
||||
|
@ -335,10 +429,15 @@ class BackupManager(BaseBackupManager):
|
|||
|
||||
async def _async_create_backup(
|
||||
self,
|
||||
*,
|
||||
addons_included: list[str] | None,
|
||||
database_included: bool,
|
||||
backup_name: str,
|
||||
date_str: str,
|
||||
slug: str,
|
||||
folders_included: list[str] | None,
|
||||
on_progress: Callable[[BackupProgress], None] | None,
|
||||
password: str | None,
|
||||
slug: str,
|
||||
) -> Backup:
|
||||
"""Generate a backup."""
|
||||
success = False
|
||||
|
@ -351,14 +450,21 @@ class BackupManager(BaseBackupManager):
|
|||
"date": date_str,
|
||||
"type": "partial",
|
||||
"folders": ["homeassistant"],
|
||||
"homeassistant": {"version": HAVERSION},
|
||||
"homeassistant": {
|
||||
"exclude_database": not database_included,
|
||||
"version": HAVERSION,
|
||||
},
|
||||
"compressed": True,
|
||||
"protected": password is not None,
|
||||
}
|
||||
|
||||
tar_file_path = Path(self.backup_dir, f"{backup_data['slug']}.tar")
|
||||
size_in_bytes = await self.hass.async_add_executor_job(
|
||||
self._mkdir_and_generate_backup_contents,
|
||||
tar_file_path,
|
||||
backup_data,
|
||||
database_included,
|
||||
password,
|
||||
)
|
||||
backup = Backup(
|
||||
slug=slug,
|
||||
|
@ -366,6 +472,7 @@ class BackupManager(BaseBackupManager):
|
|||
date=date_str,
|
||||
path=tar_file_path,
|
||||
size=round(size_in_bytes / 1_048_576, 2),
|
||||
protected=password is not None,
|
||||
)
|
||||
if self.loaded_backups:
|
||||
self.backups[slug] = backup
|
||||
|
@ -382,12 +489,18 @@ class BackupManager(BaseBackupManager):
|
|||
self,
|
||||
tar_file_path: Path,
|
||||
backup_data: dict[str, Any],
|
||||
database_included: bool,
|
||||
password: str | None = None,
|
||||
) -> int:
|
||||
"""Generate backup contents and return the size."""
|
||||
if not self.backup_dir.exists():
|
||||
LOGGER.debug("Creating backup directory")
|
||||
self.backup_dir.mkdir()
|
||||
|
||||
excludes = EXCLUDE_FROM_BACKUP
|
||||
if not database_included:
|
||||
excludes = excludes + EXCLUDE_DATABASE_FROM_BACKUP
|
||||
|
||||
outer_secure_tarfile = SecureTarFile(
|
||||
tar_file_path, "w", gzip=False, bufsize=BUF_SIZE
|
||||
)
|
||||
|
@ -399,18 +512,25 @@ class BackupManager(BaseBackupManager):
|
|||
tar_info.mtime = int(time.time())
|
||||
outer_secure_tarfile_tarfile.addfile(tar_info, fileobj=fileobj)
|
||||
with outer_secure_tarfile.create_inner_tar(
|
||||
"./homeassistant.tar.gz", gzip=True
|
||||
"./homeassistant.tar.gz",
|
||||
gzip=True,
|
||||
key=password_to_key(password) if password is not None else None,
|
||||
) as core_tar:
|
||||
atomic_contents_add(
|
||||
tar_file=core_tar,
|
||||
origin_path=Path(self.hass.config.path()),
|
||||
excludes=EXCLUDE_FROM_BACKUP,
|
||||
excludes=excludes,
|
||||
arcname="data",
|
||||
)
|
||||
|
||||
return tar_file_path.stat().st_size
|
||||
|
||||
async def async_restore_backup(self, slug: str, **kwargs: Any) -> None:
|
||||
async def async_restore_backup(
|
||||
self,
|
||||
slug: str,
|
||||
*,
|
||||
password: str | None = None,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Restore a backup.
|
||||
|
||||
This will write the restore information to .HA_RESTORE which
|
||||
|
@ -422,7 +542,7 @@ class BackupManager(BaseBackupManager):
|
|||
def _write_restore_file() -> None:
|
||||
"""Write the restore file."""
|
||||
Path(self.hass.config.path(RESTORE_BACKUP_FILE)).write_text(
|
||||
json.dumps({"path": backup.path.as_posix()}),
|
||||
json.dumps({"path": backup.path.as_posix(), "password": password}),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
|
|
30
homeassistant/components/backup/models.py
Normal file
30
homeassistant/components/backup/models.py
Normal file
|
@ -0,0 +1,30 @@
|
|||
"""Models for the backup integration."""
|
||||
|
||||
from dataclasses import asdict, dataclass
|
||||
|
||||
|
||||
@dataclass()
|
||||
class BaseBackup:
|
||||
"""Base backup class."""
|
||||
|
||||
date: str
|
||||
name: str
|
||||
protected: bool
|
||||
slug: str
|
||||
size: float
|
||||
|
||||
def as_dict(self) -> dict:
|
||||
"""Return a dict representation of this backup."""
|
||||
return asdict(self)
|
||||
|
||||
|
||||
@dataclass()
|
||||
class BackupUploadMetadata:
|
||||
"""Backup upload metadata."""
|
||||
|
||||
date: str # The date the backup was created
|
||||
slug: str # The slug of the backup
|
||||
size: float # The size of the backup (in bytes)
|
||||
name: str # The name of the backup
|
||||
homeassistant: str # The version of Home Assistant that created the backup
|
||||
protected: bool # If the backup is protected
|
|
@ -1 +1,7 @@
|
|||
create:
|
||||
fields:
|
||||
password:
|
||||
required: false
|
||||
selector:
|
||||
text:
|
||||
type: password
|
||||
|
|
|
@ -2,7 +2,13 @@
|
|||
"services": {
|
||||
"create": {
|
||||
"name": "Create backup",
|
||||
"description": "Creates a new backup."
|
||||
"description": "Creates a new backup.",
|
||||
"fields": {
|
||||
"password": {
|
||||
"name": "[%key:common::config_flow::data::password%]",
|
||||
"description": "Password protect the backup"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
"""Websocket commands for the Backup integration."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import voluptuous as vol
|
||||
|
@ -14,9 +15,14 @@ from .manager import BackupProgress
|
|||
@callback
|
||||
def async_register_websocket_handlers(hass: HomeAssistant, with_hassio: bool) -> None:
|
||||
"""Register websocket commands."""
|
||||
websocket_api.async_register_command(hass, backup_agents_download)
|
||||
websocket_api.async_register_command(hass, backup_agents_info)
|
||||
websocket_api.async_register_command(hass, backup_agents_list_backups)
|
||||
|
||||
if with_hassio:
|
||||
websocket_api.async_register_command(hass, handle_backup_end)
|
||||
websocket_api.async_register_command(hass, handle_backup_start)
|
||||
websocket_api.async_register_command(hass, handle_backup_upload)
|
||||
return
|
||||
|
||||
websocket_api.async_register_command(hass, handle_details)
|
||||
|
@ -40,7 +46,7 @@ async def handle_info(
|
|||
connection.send_result(
|
||||
msg["id"],
|
||||
{
|
||||
"backups": list(backups.values()),
|
||||
"backups": [b.as_dict() for b in backups.values()],
|
||||
"backing_up": manager.backup_task is not None,
|
||||
},
|
||||
)
|
||||
|
@ -92,6 +98,7 @@ async def handle_remove(
|
|||
{
|
||||
vol.Required("type"): "backup/restore",
|
||||
vol.Required("slug"): str,
|
||||
vol.Optional("password"): str,
|
||||
}
|
||||
)
|
||||
@websocket_api.async_response
|
||||
|
@ -101,12 +108,24 @@ async def handle_restore(
|
|||
msg: dict[str, Any],
|
||||
) -> None:
|
||||
"""Restore a backup."""
|
||||
await hass.data[DATA_MANAGER].async_restore_backup(msg["slug"])
|
||||
await hass.data[DATA_MANAGER].async_restore_backup(
|
||||
slug=msg["slug"],
|
||||
password=msg.get("password"),
|
||||
)
|
||||
connection.send_result(msg["id"])
|
||||
|
||||
|
||||
@websocket_api.require_admin
|
||||
@websocket_api.websocket_command({vol.Required("type"): "backup/generate"})
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "backup/generate",
|
||||
vol.Optional("addons_included"): [str],
|
||||
vol.Optional("database_included", default=True): bool,
|
||||
vol.Optional("folders_included"): [str],
|
||||
vol.Optional("name"): str,
|
||||
vol.Optional("password"): str,
|
||||
}
|
||||
)
|
||||
@websocket_api.async_response
|
||||
async def handle_create(
|
||||
hass: HomeAssistant,
|
||||
|
@ -118,7 +137,14 @@ async def handle_create(
|
|||
def on_progress(progress: BackupProgress) -> None:
|
||||
connection.send_message(websocket_api.event_message(msg["id"], progress))
|
||||
|
||||
backup = await hass.data[DATA_MANAGER].async_create_backup(on_progress=on_progress)
|
||||
backup = await hass.data[DATA_MANAGER].async_create_backup(
|
||||
addons_included=msg.get("addons_included"),
|
||||
database_included=msg["database_included"],
|
||||
folders_included=msg.get("folders_included"),
|
||||
name=msg.get("name"),
|
||||
on_progress=on_progress,
|
||||
password=msg.get("password"),
|
||||
)
|
||||
connection.send_result(msg["id"], backup)
|
||||
|
||||
|
||||
|
@ -162,3 +188,105 @@ async def handle_backup_end(
|
|||
return
|
||||
|
||||
connection.send_result(msg["id"])
|
||||
|
||||
|
||||
@websocket_api.ws_require_user(only_supervisor=True)
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "backup/upload",
|
||||
vol.Required("data"): {
|
||||
vol.Required("slug"): str,
|
||||
},
|
||||
}
|
||||
)
|
||||
@websocket_api.async_response
|
||||
async def handle_backup_upload(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: dict[str, Any],
|
||||
) -> None:
|
||||
"""Backup upload."""
|
||||
LOGGER.debug("Backup upload notification")
|
||||
data = msg["data"]
|
||||
|
||||
try:
|
||||
await hass.data[DATA_MANAGER].async_upload_backup(slug=data["slug"])
|
||||
except Exception as err: # noqa: BLE001
|
||||
connection.send_error(msg["id"], "backup_upload_failed", str(err))
|
||||
return
|
||||
|
||||
connection.send_result(msg["id"])
|
||||
|
||||
|
||||
@websocket_api.require_admin
|
||||
@websocket_api.websocket_command({vol.Required("type"): "backup/agents/info"})
|
||||
@websocket_api.async_response
|
||||
async def backup_agents_info(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: dict[str, Any],
|
||||
) -> None:
|
||||
"""Return backup agents info."""
|
||||
manager = hass.data[DATA_MANAGER]
|
||||
await manager.load_platforms()
|
||||
connection.send_result(
|
||||
msg["id"],
|
||||
{
|
||||
"agents": [{"agent_id": agent_id} for agent_id in manager.backup_agents],
|
||||
"syncing": manager.syncing,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@websocket_api.require_admin
|
||||
@websocket_api.websocket_command({vol.Required("type"): "backup/agents/list_backups"})
|
||||
@websocket_api.async_response
|
||||
async def backup_agents_list_backups(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: dict[str, Any],
|
||||
) -> None:
|
||||
"""Return a list of uploaded backups."""
|
||||
manager = hass.data[DATA_MANAGER]
|
||||
backups: list[dict[str, Any]] = []
|
||||
await manager.load_platforms()
|
||||
for agent_id, agent in manager.backup_agents.items():
|
||||
_listed_backups = await agent.async_list_backups()
|
||||
backups.extend({**b.as_dict(), "agent_id": agent_id} for b in _listed_backups)
|
||||
connection.send_result(msg["id"], backups)
|
||||
|
||||
|
||||
@websocket_api.require_admin
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "backup/agents/download",
|
||||
vol.Required("agent_id"): str,
|
||||
vol.Required("backup_id"): str,
|
||||
vol.Required("slug"): str,
|
||||
}
|
||||
)
|
||||
@websocket_api.async_response
|
||||
async def backup_agents_download(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: dict[str, Any],
|
||||
) -> None:
|
||||
"""Download an uploaded backup."""
|
||||
manager = hass.data[DATA_MANAGER]
|
||||
await manager.load_platforms()
|
||||
|
||||
if not (agent := manager.backup_agents.get(msg["agent_id"])):
|
||||
connection.send_error(
|
||||
msg["id"], "unknown_agent", f"Agent {msg['agent_id']} not found"
|
||||
)
|
||||
return
|
||||
try:
|
||||
await agent.async_download_backup(
|
||||
id=msg["backup_id"],
|
||||
path=Path(hass.config.path("backup"), f"{msg['slug']}.tar"),
|
||||
)
|
||||
except Exception as err: # noqa: BLE001
|
||||
connection.send_error(msg["id"], "backup_agents_download", str(err))
|
||||
return
|
||||
|
||||
connection.send_result(msg["id"])
|
||||
|
|
77
homeassistant/components/kitchen_sink/backup.py
Normal file
77
homeassistant/components/kitchen_sink/backup.py
Normal file
|
@ -0,0 +1,77 @@
|
|||
"""Backup platform for the kitchen_sink integration."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from uuid import uuid4
|
||||
|
||||
from homeassistant.components.backup import (
|
||||
BackupAgent,
|
||||
BackupUploadMetadata,
|
||||
UploadedBackup,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def async_get_backup_agents(
|
||||
hass: HomeAssistant,
|
||||
) -> list[BackupAgent]:
|
||||
"""Register the backup agents."""
|
||||
return [KitchenSinkBackupAgent("syncer")]
|
||||
|
||||
|
||||
class KitchenSinkBackupAgent(BackupAgent):
|
||||
"""Kitchen sink backup agent."""
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
"""Initialize the kitchen sink backup sync agent."""
|
||||
super().__init__()
|
||||
self.name = name
|
||||
self._uploads = [
|
||||
UploadedBackup(
|
||||
id="def456",
|
||||
name="Kitchen sink syncer",
|
||||
protected=False,
|
||||
slug="abc123",
|
||||
size=1234,
|
||||
date="1970-01-01T00:00:00Z",
|
||||
)
|
||||
]
|
||||
|
||||
async def async_download_backup(
|
||||
self,
|
||||
*,
|
||||
id: str,
|
||||
path: Path,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Download a backup file."""
|
||||
LOGGER.info("Downloading backup %s to %s", id, path)
|
||||
|
||||
async def async_upload_backup(
|
||||
self,
|
||||
*,
|
||||
path: Path,
|
||||
metadata: BackupUploadMetadata,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Upload a backup."""
|
||||
LOGGER.info("Uploading backup %s %s", path.name, metadata)
|
||||
self._uploads.append(
|
||||
UploadedBackup(
|
||||
id=uuid4().hex,
|
||||
name=metadata.name,
|
||||
protected=metadata.protected,
|
||||
slug=metadata.slug,
|
||||
size=metadata.size,
|
||||
date=metadata.date,
|
||||
)
|
||||
)
|
||||
|
||||
async def async_list_backups(self, **kwargs: Any) -> list[UploadedBackup]:
|
||||
"""List synced backups."""
|
||||
return self._uploads
|
|
@ -3,9 +3,15 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
from homeassistant.components.backup import DOMAIN
|
||||
from homeassistant.components.backup import (
|
||||
DOMAIN,
|
||||
BackupAgent,
|
||||
BackupUploadMetadata,
|
||||
UploadedBackup,
|
||||
)
|
||||
from homeassistant.components.backup.manager import Backup
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
|
@ -17,9 +23,49 @@ TEST_BACKUP = Backup(
|
|||
date="1970-01-01T00:00:00.000Z",
|
||||
path=Path("abc123.tar"),
|
||||
size=0.0,
|
||||
protected=False,
|
||||
)
|
||||
|
||||
|
||||
class BackupAgentTest(BackupAgent):
|
||||
"""Test backup agent."""
|
||||
|
||||
def __init__(self, name: str) -> None:
|
||||
"""Initialize the backup agent."""
|
||||
self.name = name
|
||||
|
||||
async def async_download_backup(
|
||||
self,
|
||||
*,
|
||||
id: str,
|
||||
path: Path,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Download a backup file."""
|
||||
|
||||
async def async_upload_backup(
|
||||
self,
|
||||
*,
|
||||
path: Path,
|
||||
metadata: BackupUploadMetadata,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Upload a backup."""
|
||||
|
||||
async def async_list_backups(self, **kwargs: Any) -> list[UploadedBackup]:
|
||||
"""List backups."""
|
||||
return [
|
||||
UploadedBackup(
|
||||
id="abc123",
|
||||
date="1970-01-01T00:00:00Z",
|
||||
name="Test",
|
||||
protected=False,
|
||||
size=13.37,
|
||||
slug="abc123",
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
async def setup_backup_integration(
|
||||
hass: HomeAssistant,
|
||||
with_hassio: bool = False,
|
||||
|
|
|
@ -43,11 +43,12 @@ def mock_backup_generation_fixture(
|
|||
Path("test.txt"),
|
||||
Path(".DS_Store"),
|
||||
Path(".storage"),
|
||||
Path("home-assistant_v2.db"),
|
||||
]
|
||||
|
||||
with (
|
||||
patch("pathlib.Path.iterdir", _mock_iterdir),
|
||||
patch("pathlib.Path.stat", MagicMock(st_size=123)),
|
||||
patch("pathlib.Path.stat", return_value=MagicMock(st_size=123)),
|
||||
patch("pathlib.Path.is_file", lambda x: x.name != ".storage"),
|
||||
patch(
|
||||
"pathlib.Path.is_dir",
|
||||
|
|
|
@ -1,4 +1,108 @@
|
|||
# serializer version: 1
|
||||
# name: test_agents_download[with_hassio]
|
||||
dict({
|
||||
'id': 1,
|
||||
'result': None,
|
||||
'success': True,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_agents_download[without_hassio]
|
||||
dict({
|
||||
'id': 1,
|
||||
'result': None,
|
||||
'success': True,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_agents_download_exception
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'backup_agents_download',
|
||||
'message': 'Boom',
|
||||
}),
|
||||
'id': 1,
|
||||
'success': False,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_agents_download_unknown_agent
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'unknown_agent',
|
||||
'message': 'Agent domain.test not found',
|
||||
}),
|
||||
'id': 1,
|
||||
'success': False,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_agents_info[with_hassio]
|
||||
dict({
|
||||
'id': 1,
|
||||
'result': dict({
|
||||
'agents': list([
|
||||
dict({
|
||||
'agent_id': 'domain.test',
|
||||
}),
|
||||
]),
|
||||
'syncing': False,
|
||||
}),
|
||||
'success': True,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_agents_info[without_hassio]
|
||||
dict({
|
||||
'id': 1,
|
||||
'result': dict({
|
||||
'agents': list([
|
||||
dict({
|
||||
'agent_id': 'domain.test',
|
||||
}),
|
||||
]),
|
||||
'syncing': False,
|
||||
}),
|
||||
'success': True,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_agents_list_backups[with_hassio]
|
||||
dict({
|
||||
'id': 1,
|
||||
'result': list([
|
||||
dict({
|
||||
'agent_id': 'domain.test',
|
||||
'date': '1970-01-01T00:00:00Z',
|
||||
'id': 'abc123',
|
||||
'name': 'Test',
|
||||
'protected': False,
|
||||
'size': 13.37,
|
||||
'slug': 'abc123',
|
||||
}),
|
||||
]),
|
||||
'success': True,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_agents_list_backups[without_hassio]
|
||||
dict({
|
||||
'id': 1,
|
||||
'result': list([
|
||||
dict({
|
||||
'agent_id': 'domain.test',
|
||||
'date': '1970-01-01T00:00:00Z',
|
||||
'id': 'abc123',
|
||||
'name': 'Test',
|
||||
'protected': False,
|
||||
'size': 13.37,
|
||||
'slug': 'abc123',
|
||||
}),
|
||||
]),
|
||||
'success': True,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_backup_end[with_hassio-hass_access_token]
|
||||
dict({
|
||||
'error': dict({
|
||||
|
@ -40,7 +144,7 @@
|
|||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_backup_end_excepion[exception0]
|
||||
# name: test_backup_end_exception[exception0]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'post_backup_actions_failed',
|
||||
|
@ -51,7 +155,7 @@
|
|||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_backup_end_excepion[exception1]
|
||||
# name: test_backup_end_exception[exception1]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'post_backup_actions_failed',
|
||||
|
@ -62,7 +166,7 @@
|
|||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_backup_end_excepion[exception2]
|
||||
# name: test_backup_end_exception[exception2]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'post_backup_actions_failed',
|
||||
|
@ -114,7 +218,7 @@
|
|||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_backup_start_excepion[exception0]
|
||||
# name: test_backup_start_exception[exception0]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'pre_backup_actions_failed',
|
||||
|
@ -125,7 +229,7 @@
|
|||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_backup_start_excepion[exception1]
|
||||
# name: test_backup_start_exception[exception1]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'pre_backup_actions_failed',
|
||||
|
@ -136,7 +240,7 @@
|
|||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_backup_start_excepion[exception2]
|
||||
# name: test_backup_start_exception[exception2]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'pre_backup_actions_failed',
|
||||
|
@ -147,6 +251,80 @@
|
|||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_backup_upload[with_hassio-hass_access_token]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'only_supervisor',
|
||||
'message': 'Only allowed as Supervisor',
|
||||
}),
|
||||
'id': 1,
|
||||
'success': False,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_backup_upload[with_hassio-hass_supervisor_access_token]
|
||||
dict({
|
||||
'id': 1,
|
||||
'result': None,
|
||||
'success': True,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_backup_upload[without_hassio-hass_access_token]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'unknown_command',
|
||||
'message': 'Unknown command.',
|
||||
}),
|
||||
'id': 1,
|
||||
'success': False,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_backup_upload[without_hassio-hass_supervisor_access_token]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'unknown_command',
|
||||
'message': 'Unknown command.',
|
||||
}),
|
||||
'id': 1,
|
||||
'success': False,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_backup_upload_exception[exception0]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'backup_upload_failed',
|
||||
'message': '',
|
||||
}),
|
||||
'id': 1,
|
||||
'success': False,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_backup_upload_exception[exception1]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'backup_upload_failed',
|
||||
'message': 'Boom',
|
||||
}),
|
||||
'id': 1,
|
||||
'success': False,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_backup_upload_exception[exception2]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'backup_upload_failed',
|
||||
'message': 'Boom',
|
||||
}),
|
||||
'id': 1,
|
||||
'success': False,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_details[with_hassio-with_backup_content]
|
||||
dict({
|
||||
'error': dict({
|
||||
|
@ -177,6 +355,7 @@
|
|||
'date': '1970-01-01T00:00:00.000Z',
|
||||
'name': 'Test',
|
||||
'path': 'abc123.tar',
|
||||
'protected': False,
|
||||
'size': 0.0,
|
||||
'slug': 'abc123',
|
||||
}),
|
||||
|
@ -195,7 +374,7 @@
|
|||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_generate[with_hassio]
|
||||
# name: test_generate[with_hassio-None]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'unknown_command',
|
||||
|
@ -206,7 +385,29 @@
|
|||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_generate[without_hassio]
|
||||
# name: test_generate[with_hassio-data1]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'unknown_command',
|
||||
'message': 'Unknown command.',
|
||||
}),
|
||||
'id': 1,
|
||||
'success': False,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_generate[with_hassio-data2]
|
||||
dict({
|
||||
'error': dict({
|
||||
'code': 'unknown_command',
|
||||
'message': 'Unknown command.',
|
||||
}),
|
||||
'id': 1,
|
||||
'success': False,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_generate[without_hassio-None]
|
||||
dict({
|
||||
'id': 1,
|
||||
'result': dict({
|
||||
|
@ -216,7 +417,7 @@
|
|||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_generate[without_hassio].1
|
||||
# name: test_generate[without_hassio-None].1
|
||||
dict({
|
||||
'event': dict({
|
||||
'done': True,
|
||||
|
@ -227,6 +428,68 @@
|
|||
'type': 'event',
|
||||
})
|
||||
# ---
|
||||
# name: test_generate[without_hassio-data1]
|
||||
dict({
|
||||
'id': 1,
|
||||
'result': dict({
|
||||
'slug': '27f5c632',
|
||||
}),
|
||||
'success': True,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_generate[without_hassio-data1].1
|
||||
dict({
|
||||
'event': dict({
|
||||
'done': True,
|
||||
'stage': None,
|
||||
'success': True,
|
||||
}),
|
||||
'id': 1,
|
||||
'type': 'event',
|
||||
})
|
||||
# ---
|
||||
# name: test_generate[without_hassio-data2]
|
||||
dict({
|
||||
'id': 1,
|
||||
'result': dict({
|
||||
'slug': '27f5c632',
|
||||
}),
|
||||
'success': True,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_generate[without_hassio-data2].1
|
||||
dict({
|
||||
'event': dict({
|
||||
'done': True,
|
||||
'stage': None,
|
||||
'success': True,
|
||||
}),
|
||||
'id': 1,
|
||||
'type': 'event',
|
||||
})
|
||||
# ---
|
||||
# name: test_generate_without_hassio[params0-expected_extra_call_params0]
|
||||
dict({
|
||||
'id': 1,
|
||||
'result': dict({
|
||||
'slug': 'abc123',
|
||||
}),
|
||||
'success': True,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_generate_without_hassio[params1-expected_extra_call_params1]
|
||||
dict({
|
||||
'id': 1,
|
||||
'result': dict({
|
||||
'slug': 'abc123',
|
||||
}),
|
||||
'success': True,
|
||||
'type': 'result',
|
||||
})
|
||||
# ---
|
||||
# name: test_info[with_hassio]
|
||||
dict({
|
||||
'error': dict({
|
||||
|
@ -248,6 +511,7 @@
|
|||
'date': '1970-01-01T00:00:00.000Z',
|
||||
'name': 'Test',
|
||||
'path': 'abc123.tar',
|
||||
'protected': False,
|
||||
'size': 0.0,
|
||||
'slug': 'abc123',
|
||||
}),
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
"""Tests for the Backup integration."""
|
||||
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
@ -26,8 +27,10 @@ async def test_setup_with_hassio(
|
|||
) in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.parametrize("service_data", [None, {}, {"password": "abc123"}])
|
||||
async def test_create_service(
|
||||
hass: HomeAssistant,
|
||||
service_data: dict[str, Any] | None,
|
||||
) -> None:
|
||||
"""Test generate backup."""
|
||||
await setup_backup_integration(hass)
|
||||
|
@ -39,6 +42,7 @@ async def test_create_service(
|
|||
DOMAIN,
|
||||
"create",
|
||||
blocking=True,
|
||||
service_data=service_data,
|
||||
)
|
||||
|
||||
assert generate_backup.called
|
||||
|
|
|
@ -3,28 +3,43 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, Mock, mock_open, patch
|
||||
from typing import Any
|
||||
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, call, mock_open, patch
|
||||
|
||||
import aiohttp
|
||||
from multidict import CIMultiDict, CIMultiDictProxy
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.backup import BackupManager
|
||||
from homeassistant.components.backup.manager import (
|
||||
from homeassistant.components.backup import (
|
||||
BackupAgentPlatformProtocol,
|
||||
BackupManager,
|
||||
BackupPlatformProtocol,
|
||||
BackupProgress,
|
||||
BackupUploadMetadata,
|
||||
)
|
||||
from homeassistant.components.backup.manager import BackupProgress
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from .common import TEST_BACKUP
|
||||
from .common import TEST_BACKUP, BackupAgentTest
|
||||
|
||||
from tests.common import MockPlatform, mock_platform
|
||||
|
||||
_EXPECTED_FILES_WITH_DATABASE = {
|
||||
True: ["test.txt", ".storage", "home-assistant_v2.db"],
|
||||
False: ["test.txt", ".storage"],
|
||||
}
|
||||
|
||||
|
||||
async def _mock_backup_generation(
|
||||
manager: BackupManager, mocked_json_bytes: Mock, mocked_tarfile: Mock
|
||||
hass: HomeAssistant,
|
||||
manager: BackupManager,
|
||||
mocked_json_bytes: Mock,
|
||||
mocked_tarfile: Mock,
|
||||
*,
|
||||
database_included: bool = True,
|
||||
name: str | None = "Core 2025.1.0",
|
||||
password: str | None = None,
|
||||
) -> None:
|
||||
"""Mock backup generator."""
|
||||
|
||||
|
@ -35,23 +50,51 @@ async def _mock_backup_generation(
|
|||
progress.append(_progress)
|
||||
|
||||
assert manager.backup_task is None
|
||||
await manager.async_create_backup(on_progress=on_progress)
|
||||
await manager.async_create_backup(
|
||||
addons_included=[],
|
||||
database_included=database_included,
|
||||
folders_included=[],
|
||||
name=name,
|
||||
on_progress=on_progress,
|
||||
password=password,
|
||||
)
|
||||
assert manager.backup_task is not None
|
||||
assert progress == []
|
||||
|
||||
await manager.backup_task
|
||||
backup = await manager.backup_task
|
||||
assert progress == [BackupProgress(done=True, stage=None, success=True)]
|
||||
|
||||
assert mocked_json_bytes.call_count == 1
|
||||
backup_json_dict = mocked_json_bytes.call_args[0][0]
|
||||
assert isinstance(backup_json_dict, dict)
|
||||
assert backup_json_dict["homeassistant"] == {"version": "2025.1.0"}
|
||||
assert backup_json_dict == {
|
||||
"compressed": True,
|
||||
"date": ANY,
|
||||
"folders": ["homeassistant"],
|
||||
"homeassistant": {
|
||||
"exclude_database": not database_included,
|
||||
"version": "2025.1.0",
|
||||
},
|
||||
"name": name,
|
||||
"protected": bool(password),
|
||||
"slug": ANY,
|
||||
"type": "partial",
|
||||
}
|
||||
assert manager.backup_dir.as_posix() in str(mocked_tarfile.call_args_list[0][0][0])
|
||||
outer_tar = mocked_tarfile.return_value
|
||||
core_tar = outer_tar.create_inner_tar.return_value.__enter__.return_value
|
||||
expected_files = [call(hass.config.path(), arcname="data", recursive=False)] + [
|
||||
call(file, arcname=f"data/{file}", recursive=False)
|
||||
for file in _EXPECTED_FILES_WITH_DATABASE[database_included]
|
||||
]
|
||||
assert core_tar.add.call_args_list == expected_files
|
||||
|
||||
return backup
|
||||
|
||||
|
||||
async def _setup_mock_domain(
|
||||
hass: HomeAssistant,
|
||||
platform: BackupPlatformProtocol | None = None,
|
||||
platform: BackupPlatformProtocol | BackupAgentPlatformProtocol | None = None,
|
||||
) -> None:
|
||||
"""Set up a mock domain."""
|
||||
mock_platform(hass, "some_domain.backup", platform or MockPlatform())
|
||||
|
@ -154,26 +197,50 @@ async def test_async_create_backup_when_backing_up(hass: HomeAssistant) -> None:
|
|||
manager = BackupManager(hass)
|
||||
manager.backup_task = hass.async_create_task(event.wait())
|
||||
with pytest.raises(HomeAssistantError, match="Backup already in progress"):
|
||||
await manager.async_create_backup(on_progress=None)
|
||||
await manager.async_create_backup(
|
||||
addons_included=[],
|
||||
database_included=True,
|
||||
folders_included=[],
|
||||
name=None,
|
||||
on_progress=None,
|
||||
password=None,
|
||||
)
|
||||
event.set()
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_backup_generation")
|
||||
@pytest.mark.parametrize(
|
||||
"params",
|
||||
[
|
||||
{},
|
||||
{"database_included": True, "name": "abc123"},
|
||||
{"database_included": False},
|
||||
{"password": "abc123"},
|
||||
],
|
||||
)
|
||||
async def test_async_create_backup(
|
||||
hass: HomeAssistant,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
mocked_json_bytes: Mock,
|
||||
mocked_tarfile: Mock,
|
||||
params: dict,
|
||||
) -> None:
|
||||
"""Test generate backup."""
|
||||
manager = BackupManager(hass)
|
||||
manager.loaded_backups = True
|
||||
|
||||
await _mock_backup_generation(manager, mocked_json_bytes, mocked_tarfile)
|
||||
await _mock_backup_generation(
|
||||
hass, manager, mocked_json_bytes, mocked_tarfile, **params
|
||||
)
|
||||
|
||||
assert "Generated new backup with slug " in caplog.text
|
||||
assert "Creating backup directory" in caplog.text
|
||||
assert "Loaded 0 platforms" in caplog.text
|
||||
assert "Loaded 0 agents" in caplog.text
|
||||
|
||||
assert len(manager.backups) == 1
|
||||
backup = list(manager.backups.values())[0]
|
||||
assert backup.protected is bool(params.get("password"))
|
||||
|
||||
|
||||
async def test_loading_platforms(
|
||||
|
@ -191,6 +258,7 @@ async def test_loading_platforms(
|
|||
Mock(
|
||||
async_pre_backup=AsyncMock(),
|
||||
async_post_backup=AsyncMock(),
|
||||
async_get_backup_agents=AsyncMock(),
|
||||
),
|
||||
)
|
||||
await manager.load_platforms()
|
||||
|
@ -202,6 +270,32 @@ async def test_loading_platforms(
|
|||
assert "Loaded 1 platforms" in caplog.text
|
||||
|
||||
|
||||
async def test_loading_agents(
|
||||
hass: HomeAssistant,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test loading backup agents."""
|
||||
manager = BackupManager(hass)
|
||||
|
||||
assert not manager.loaded_platforms
|
||||
assert not manager.platforms
|
||||
|
||||
await _setup_mock_domain(
|
||||
hass,
|
||||
Mock(
|
||||
async_get_backup_agents=AsyncMock(return_value=[BackupAgentTest("test")]),
|
||||
),
|
||||
)
|
||||
await manager.load_platforms()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert manager.loaded_platforms
|
||||
assert len(manager.backup_agents) == 1
|
||||
|
||||
assert "Loaded 1 agents" in caplog.text
|
||||
assert "some_domain.test" in manager.backup_agents
|
||||
|
||||
|
||||
async def test_not_loading_bad_platforms(
|
||||
hass: HomeAssistant,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
|
@ -220,10 +314,159 @@ async def test_not_loading_bad_platforms(
|
|||
assert len(manager.platforms) == 0
|
||||
|
||||
assert "Loaded 0 platforms" in caplog.text
|
||||
assert (
|
||||
"some_domain does not implement required functions for the backup platform"
|
||||
in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_backup_generation")
|
||||
async def test_syncing_backup(
|
||||
hass: HomeAssistant,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
mocked_json_bytes: Mock,
|
||||
mocked_tarfile: Mock,
|
||||
) -> None:
|
||||
"""Test syncing a backup."""
|
||||
manager = BackupManager(hass)
|
||||
|
||||
await _setup_mock_domain(
|
||||
hass,
|
||||
Mock(
|
||||
async_pre_backup=AsyncMock(),
|
||||
async_post_backup=AsyncMock(),
|
||||
async_get_backup_agents=AsyncMock(
|
||||
return_value=[
|
||||
BackupAgentTest("agent1"),
|
||||
BackupAgentTest("agent2"),
|
||||
]
|
||||
),
|
||||
),
|
||||
)
|
||||
await manager.load_platforms()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
backup = await _mock_backup_generation(
|
||||
hass, manager, mocked_json_bytes, mocked_tarfile
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
|
||||
return_value=backup,
|
||||
),
|
||||
patch.object(BackupAgentTest, "async_upload_backup") as mocked_upload,
|
||||
patch(
|
||||
"homeassistant.components.backup.manager.HAVERSION",
|
||||
"2025.1.0",
|
||||
),
|
||||
):
|
||||
await manager.async_upload_backup(slug=backup.slug)
|
||||
assert mocked_upload.call_count == 2
|
||||
first_call = mocked_upload.call_args_list[0]
|
||||
assert first_call[1]["path"] == backup.path
|
||||
assert first_call[1]["metadata"] == BackupUploadMetadata(
|
||||
date=backup.date,
|
||||
homeassistant="2025.1.0",
|
||||
name=backup.name,
|
||||
protected=backup.protected,
|
||||
size=backup.size,
|
||||
slug=backup.slug,
|
||||
)
|
||||
|
||||
assert "Error during backup upload" not in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_backup_generation")
|
||||
async def test_syncing_backup_with_exception(
|
||||
hass: HomeAssistant,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
mocked_json_bytes: Mock,
|
||||
mocked_tarfile: Mock,
|
||||
) -> None:
|
||||
"""Test syncing a backup with exception."""
|
||||
manager = BackupManager(hass)
|
||||
|
||||
class ModifiedBackupSyncAgentTest(BackupAgentTest):
|
||||
async def async_upload_backup(self, **kwargs: Any) -> None:
|
||||
raise HomeAssistantError("Test exception")
|
||||
|
||||
await _setup_mock_domain(
|
||||
hass,
|
||||
Mock(
|
||||
async_pre_backup=AsyncMock(),
|
||||
async_post_backup=AsyncMock(),
|
||||
async_get_backup_agents=AsyncMock(
|
||||
return_value=[
|
||||
ModifiedBackupSyncAgentTest("agent1"),
|
||||
ModifiedBackupSyncAgentTest("agent2"),
|
||||
]
|
||||
),
|
||||
),
|
||||
)
|
||||
await manager.load_platforms()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
backup = await _mock_backup_generation(
|
||||
hass, manager, mocked_json_bytes, mocked_tarfile
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
|
||||
return_value=backup,
|
||||
),
|
||||
patch.object(
|
||||
ModifiedBackupSyncAgentTest,
|
||||
"async_upload_backup",
|
||||
) as mocked_upload,
|
||||
patch(
|
||||
"homeassistant.components.backup.manager.HAVERSION",
|
||||
"2025.1.0",
|
||||
),
|
||||
):
|
||||
mocked_upload.side_effect = HomeAssistantError("Test exception")
|
||||
await manager.async_upload_backup(slug=backup.slug)
|
||||
assert mocked_upload.call_count == 2
|
||||
first_call = mocked_upload.call_args_list[0]
|
||||
assert first_call[1]["path"] == backup.path
|
||||
assert first_call[1]["metadata"] == BackupUploadMetadata(
|
||||
date=backup.date,
|
||||
homeassistant="2025.1.0",
|
||||
name=backup.name,
|
||||
protected=backup.protected,
|
||||
size=backup.size,
|
||||
slug=backup.slug,
|
||||
)
|
||||
|
||||
assert "Error during backup upload - Test exception" in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_backup_generation")
|
||||
async def test_syncing_backup_no_agents(
|
||||
hass: HomeAssistant,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
mocked_json_bytes: Mock,
|
||||
mocked_tarfile: Mock,
|
||||
) -> None:
|
||||
"""Test syncing a backup with no agents."""
|
||||
manager = BackupManager(hass)
|
||||
|
||||
await _setup_mock_domain(
|
||||
hass,
|
||||
Mock(
|
||||
async_pre_backup=AsyncMock(),
|
||||
async_post_backup=AsyncMock(),
|
||||
async_get_backup_agents=AsyncMock(return_value=[]),
|
||||
),
|
||||
)
|
||||
await manager.load_platforms()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
backup = await _mock_backup_generation(
|
||||
hass, manager, mocked_json_bytes, mocked_tarfile
|
||||
)
|
||||
with patch(
|
||||
"homeassistant.components.backup.agent.BackupAgent.async_upload_backup"
|
||||
) as mocked_async_upload_backup:
|
||||
await manager.async_upload_backup(slug=backup.slug)
|
||||
assert mocked_async_upload_backup.call_count == 0
|
||||
|
||||
|
||||
async def test_exception_plaform_pre(
|
||||
|
@ -241,11 +484,12 @@ async def test_exception_plaform_pre(
|
|||
Mock(
|
||||
async_pre_backup=_mock_step,
|
||||
async_post_backup=AsyncMock(),
|
||||
async_get_backup_agents=AsyncMock(),
|
||||
),
|
||||
)
|
||||
|
||||
with pytest.raises(HomeAssistantError):
|
||||
await _mock_backup_generation(manager, mocked_json_bytes, mocked_tarfile)
|
||||
await _mock_backup_generation(hass, manager, mocked_json_bytes, mocked_tarfile)
|
||||
|
||||
|
||||
async def test_exception_plaform_post(
|
||||
|
@ -263,11 +507,12 @@ async def test_exception_plaform_post(
|
|||
Mock(
|
||||
async_pre_backup=AsyncMock(),
|
||||
async_post_backup=_mock_step,
|
||||
async_get_backup_agents=AsyncMock(),
|
||||
),
|
||||
)
|
||||
|
||||
with pytest.raises(HomeAssistantError):
|
||||
await _mock_backup_generation(manager, mocked_json_bytes, mocked_tarfile)
|
||||
await _mock_backup_generation(hass, manager, mocked_json_bytes, mocked_tarfile)
|
||||
|
||||
|
||||
async def test_loading_platforms_when_running_async_pre_backup_actions(
|
||||
|
@ -285,6 +530,7 @@ async def test_loading_platforms_when_running_async_pre_backup_actions(
|
|||
Mock(
|
||||
async_pre_backup=AsyncMock(),
|
||||
async_post_backup=AsyncMock(),
|
||||
async_get_backup_agents=AsyncMock(),
|
||||
),
|
||||
)
|
||||
await manager.async_pre_backup_actions()
|
||||
|
@ -310,6 +556,7 @@ async def test_loading_platforms_when_running_async_post_backup_actions(
|
|||
Mock(
|
||||
async_pre_backup=AsyncMock(),
|
||||
async_post_backup=AsyncMock(),
|
||||
async_get_backup_agents=AsyncMock(),
|
||||
),
|
||||
)
|
||||
await manager.async_post_backup_actions()
|
||||
|
@ -369,7 +616,32 @@ async def test_async_trigger_restore(
|
|||
patch("homeassistant.core.ServiceRegistry.async_call") as mocked_service_call,
|
||||
):
|
||||
await manager.async_restore_backup(TEST_BACKUP.slug)
|
||||
assert mocked_write_text.call_args[0][0] == '{"path": "abc123.tar"}'
|
||||
assert (
|
||||
mocked_write_text.call_args[0][0]
|
||||
== '{"path": "abc123.tar", "password": null}'
|
||||
)
|
||||
assert mocked_service_call.called
|
||||
|
||||
|
||||
async def test_async_trigger_restore_with_password(
|
||||
hass: HomeAssistant,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test trigger restore."""
|
||||
manager = BackupManager(hass)
|
||||
manager.loaded_backups = True
|
||||
manager.backups = {TEST_BACKUP.slug: TEST_BACKUP}
|
||||
|
||||
with (
|
||||
patch("pathlib.Path.exists", return_value=True),
|
||||
patch("pathlib.Path.write_text") as mocked_write_text,
|
||||
patch("homeassistant.core.ServiceRegistry.async_call") as mocked_service_call,
|
||||
):
|
||||
await manager.async_restore_backup(slug=TEST_BACKUP.slug, password="abc123")
|
||||
assert (
|
||||
mocked_write_text.call_args[0][0]
|
||||
== '{"path": "abc123.tar", "password": "abc123"}'
|
||||
)
|
||||
assert mocked_service_call.called
|
||||
|
||||
|
||||
|
|
|
@ -1,16 +1,20 @@
|
|||
"""Tests for the Backup integration."""
|
||||
|
||||
from unittest.mock import patch
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from unittest.mock import ANY, AsyncMock, patch
|
||||
|
||||
from freezegun.api import FrozenDateTimeFactory
|
||||
import pytest
|
||||
from syrupy import SnapshotAssertion
|
||||
|
||||
from homeassistant.components.backup.manager import Backup
|
||||
from homeassistant.components.backup import BaseBackup
|
||||
from homeassistant.components.backup.const import DATA_MANAGER
|
||||
from homeassistant.components.backup.manager import NewBackup
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
|
||||
from .common import TEST_BACKUP, setup_backup_integration
|
||||
from .common import TEST_BACKUP, BackupAgentTest, setup_backup_integration
|
||||
|
||||
from tests.typing import WebSocketGenerator
|
||||
|
||||
|
@ -43,15 +47,23 @@ async def test_info(
|
|||
"""Test getting backup info."""
|
||||
await setup_backup_integration(hass, with_hassio=with_hassio)
|
||||
|
||||
hass.data[DATA_MANAGER].backups = {TEST_BACKUP.slug: TEST_BACKUP}
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.backup.manager.BackupManager.async_get_backups",
|
||||
return_value={TEST_BACKUP.slug: TEST_BACKUP},
|
||||
with (
|
||||
patch(
|
||||
"homeassistant.components.backup.manager.BackupManager.load_backups",
|
||||
AsyncMock(),
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.backup.manager.BackupManager.async_get_backups",
|
||||
return_value={TEST_BACKUP.slug: TEST_BACKUP},
|
||||
),
|
||||
):
|
||||
await client.send_json_auto_id({"type": "backup/info"})
|
||||
assert snapshot == await client.receive_json()
|
||||
assert await client.receive_json() == snapshot
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -73,7 +85,7 @@ async def test_details(
|
|||
hass_ws_client: WebSocketGenerator,
|
||||
snapshot: SnapshotAssertion,
|
||||
with_hassio: bool,
|
||||
backup_content: Backup | None,
|
||||
backup_content: BaseBackup | None,
|
||||
) -> None:
|
||||
"""Test getting backup info."""
|
||||
await setup_backup_integration(hass, with_hassio=with_hassio)
|
||||
|
@ -112,9 +124,17 @@ async def test_remove(
|
|||
"homeassistant.components.backup.manager.BackupManager.async_remove_backup",
|
||||
):
|
||||
await client.send_json_auto_id({"type": "backup/remove", "slug": "abc123"})
|
||||
assert snapshot == await client.receive_json()
|
||||
assert await client.receive_json() == snapshot
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"data",
|
||||
[
|
||||
None,
|
||||
{},
|
||||
{"password": "abc123"},
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
("with_hassio", "number_of_messages"),
|
||||
[
|
||||
|
@ -126,6 +146,7 @@ async def test_remove(
|
|||
async def test_generate(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
data: dict[str, Any] | None,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
snapshot: SnapshotAssertion,
|
||||
with_hassio: bool,
|
||||
|
@ -138,9 +159,64 @@ async def test_generate(
|
|||
freezer.move_to("2024-11-13 12:01:00+01:00")
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await client.send_json_auto_id({"type": "backup/generate"})
|
||||
await client.send_json_auto_id({"type": "backup/generate", **(data or {})})
|
||||
for _ in range(number_of_messages):
|
||||
assert snapshot == await client.receive_json()
|
||||
assert await client.receive_json() == snapshot
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_backup_generation")
|
||||
@pytest.mark.parametrize(
|
||||
("params", "expected_extra_call_params"),
|
||||
[
|
||||
({}, {}),
|
||||
(
|
||||
{
|
||||
"addons_included": ["ssl"],
|
||||
"database_included": False,
|
||||
"folders_included": ["media"],
|
||||
"name": "abc123",
|
||||
},
|
||||
{
|
||||
"addons_included": ["ssl"],
|
||||
"database_included": False,
|
||||
"folders_included": ["media"],
|
||||
"name": "abc123",
|
||||
},
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_generate_without_hassio(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
snapshot: SnapshotAssertion,
|
||||
params: dict,
|
||||
expected_extra_call_params: tuple,
|
||||
) -> None:
|
||||
"""Test generating a backup."""
|
||||
await setup_backup_integration(hass, with_hassio=False)
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
freezer.move_to("2024-11-13 12:01:00+01:00")
|
||||
await hass.async_block_till_done()
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.backup.manager.BackupManager.async_create_backup",
|
||||
return_value=NewBackup("abc123"),
|
||||
) as generate_backup:
|
||||
await client.send_json_auto_id({"type": "backup/generate"} | params)
|
||||
assert await client.receive_json() == snapshot
|
||||
generate_backup.assert_called_once_with(
|
||||
**{
|
||||
"addons_included": None,
|
||||
"database_included": True,
|
||||
"folders_included": None,
|
||||
"name": None,
|
||||
"on_progress": ANY,
|
||||
"password": None,
|
||||
}
|
||||
| expected_extra_call_params
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -199,7 +275,7 @@ async def test_backup_end(
|
|||
"homeassistant.components.backup.manager.BackupManager.async_post_backup_actions",
|
||||
):
|
||||
await client.send_json_auto_id({"type": "backup/end"})
|
||||
assert snapshot == await client.receive_json()
|
||||
assert await client.receive_json() == snapshot
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -232,7 +308,47 @@ async def test_backup_start(
|
|||
"homeassistant.components.backup.manager.BackupManager.async_pre_backup_actions",
|
||||
):
|
||||
await client.send_json_auto_id({"type": "backup/start"})
|
||||
assert snapshot == await client.receive_json()
|
||||
assert await client.receive_json() == snapshot
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"access_token_fixture_name",
|
||||
["hass_access_token", "hass_supervisor_access_token"],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
("with_hassio"),
|
||||
[
|
||||
pytest.param(True, id="with_hassio"),
|
||||
pytest.param(False, id="without_hassio"),
|
||||
],
|
||||
)
|
||||
async def test_backup_upload(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
snapshot: SnapshotAssertion,
|
||||
sync_access_token_proxy: str,
|
||||
*,
|
||||
access_token_fixture_name: str,
|
||||
with_hassio: bool,
|
||||
) -> None:
|
||||
"""Test backup upload from a WS command."""
|
||||
await setup_backup_integration(hass, with_hassio=with_hassio)
|
||||
|
||||
client = await hass_ws_client(hass, sync_access_token_proxy)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.backup.manager.BackupManager.async_upload_backup",
|
||||
):
|
||||
await client.send_json_auto_id(
|
||||
{
|
||||
"type": "backup/upload",
|
||||
"data": {
|
||||
"slug": "abc123",
|
||||
},
|
||||
}
|
||||
)
|
||||
assert await client.receive_json() == snapshot
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -243,7 +359,7 @@ async def test_backup_start(
|
|||
Exception("Boom"),
|
||||
],
|
||||
)
|
||||
async def test_backup_end_excepion(
|
||||
async def test_backup_end_exception(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
snapshot: SnapshotAssertion,
|
||||
|
@ -261,7 +377,7 @@ async def test_backup_end_excepion(
|
|||
side_effect=exception,
|
||||
):
|
||||
await client.send_json_auto_id({"type": "backup/end"})
|
||||
assert snapshot == await client.receive_json()
|
||||
assert await client.receive_json() == snapshot
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -272,7 +388,43 @@ async def test_backup_end_excepion(
|
|||
Exception("Boom"),
|
||||
],
|
||||
)
|
||||
async def test_backup_start_excepion(
|
||||
async def test_backup_upload_exception(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
snapshot: SnapshotAssertion,
|
||||
hass_supervisor_access_token: str,
|
||||
exception: Exception,
|
||||
) -> None:
|
||||
"""Test exception handling while running backup upload from a WS command."""
|
||||
await setup_backup_integration(hass, with_hassio=True)
|
||||
|
||||
client = await hass_ws_client(hass, hass_supervisor_access_token)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.backup.manager.BackupManager.async_upload_backup",
|
||||
side_effect=exception,
|
||||
):
|
||||
await client.send_json_auto_id(
|
||||
{
|
||||
"type": "backup/upload",
|
||||
"data": {
|
||||
"slug": "abc123",
|
||||
},
|
||||
}
|
||||
)
|
||||
assert await client.receive_json() == snapshot
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"exception",
|
||||
[
|
||||
TimeoutError(),
|
||||
HomeAssistantError("Boom"),
|
||||
Exception("Boom"),
|
||||
],
|
||||
)
|
||||
async def test_backup_start_exception(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
snapshot: SnapshotAssertion,
|
||||
|
@ -290,4 +442,135 @@ async def test_backup_start_excepion(
|
|||
side_effect=exception,
|
||||
):
|
||||
await client.send_json_auto_id({"type": "backup/start"})
|
||||
assert snapshot == await client.receive_json()
|
||||
assert await client.receive_json() == snapshot
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"with_hassio",
|
||||
[
|
||||
pytest.param(True, id="with_hassio"),
|
||||
pytest.param(False, id="without_hassio"),
|
||||
],
|
||||
)
|
||||
async def test_agents_info(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
snapshot: SnapshotAssertion,
|
||||
with_hassio: bool,
|
||||
) -> None:
|
||||
"""Test getting backup agents info."""
|
||||
await setup_backup_integration(hass, with_hassio=with_hassio)
|
||||
hass.data[DATA_MANAGER].backup_agents = {"domain.test": BackupAgentTest("test")}
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await client.send_json_auto_id({"type": "backup/agents/info"})
|
||||
assert await client.receive_json() == snapshot
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"with_hassio",
|
||||
[
|
||||
pytest.param(True, id="with_hassio"),
|
||||
pytest.param(False, id="without_hassio"),
|
||||
],
|
||||
)
|
||||
async def test_agents_list_backups(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
snapshot: SnapshotAssertion,
|
||||
with_hassio: bool,
|
||||
) -> None:
|
||||
"""Test backup agents list backups details."""
|
||||
await setup_backup_integration(hass, with_hassio=with_hassio)
|
||||
hass.data[DATA_MANAGER].backup_agents = {"domain.test": BackupAgentTest("test")}
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await client.send_json_auto_id({"type": "backup/agents/list_backups"})
|
||||
assert await client.receive_json() == snapshot
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"with_hassio",
|
||||
[
|
||||
pytest.param(True, id="with_hassio"),
|
||||
pytest.param(False, id="without_hassio"),
|
||||
],
|
||||
)
|
||||
async def test_agents_download(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
snapshot: SnapshotAssertion,
|
||||
with_hassio: bool,
|
||||
) -> None:
|
||||
"""Test WS command to start downloading a backup."""
|
||||
await setup_backup_integration(hass, with_hassio=with_hassio)
|
||||
hass.data[DATA_MANAGER].backup_agents = {"domain.test": BackupAgentTest("test")}
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await client.send_json_auto_id(
|
||||
{
|
||||
"type": "backup/agents/download",
|
||||
"slug": "abc123",
|
||||
"agent_id": "domain.test",
|
||||
"backup_id": "abc123",
|
||||
}
|
||||
)
|
||||
with patch.object(BackupAgentTest, "async_download_backup") as download_mock:
|
||||
assert await client.receive_json() == snapshot
|
||||
assert download_mock.call_args[1] == {
|
||||
"id": "abc123",
|
||||
"path": Path(hass.config.path("backup"), "abc123.tar"),
|
||||
}
|
||||
|
||||
|
||||
async def test_agents_download_exception(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
snapshot: SnapshotAssertion,
|
||||
) -> None:
|
||||
"""Test WS command to start downloading a backup throwing an exception."""
|
||||
await setup_backup_integration(hass)
|
||||
hass.data[DATA_MANAGER].backup_agents = {"domain.test": BackupAgentTest("test")}
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await client.send_json_auto_id(
|
||||
{
|
||||
"type": "backup/agents/download",
|
||||
"slug": "abc123",
|
||||
"agent_id": "domain.test",
|
||||
"backup_id": "abc123",
|
||||
}
|
||||
)
|
||||
with patch.object(BackupAgentTest, "async_download_backup") as download_mock:
|
||||
download_mock.side_effect = Exception("Boom")
|
||||
assert await client.receive_json() == snapshot
|
||||
|
||||
|
||||
async def test_agents_download_unknown_agent(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
snapshot: SnapshotAssertion,
|
||||
) -> None:
|
||||
"""Test downloading a backup with an unknown agent."""
|
||||
await setup_backup_integration(hass)
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await client.send_json_auto_id(
|
||||
{
|
||||
"type": "backup/agents/download",
|
||||
"slug": "abc123",
|
||||
"agent_id": "domain.test",
|
||||
"backup_id": "abc123",
|
||||
}
|
||||
)
|
||||
assert await client.receive_json() == snapshot
|
||||
|
|
162
tests/components/kitchen_sink/test_backup.py
Normal file
162
tests/components/kitchen_sink/test_backup.py
Normal file
|
@ -0,0 +1,162 @@
|
|||
"""Test the Kitchen Sink backup platform."""
|
||||
|
||||
from collections.abc import AsyncGenerator
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
from uuid import UUID
|
||||
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.backup import DOMAIN as BACKUP_DOMAIN, Backup
|
||||
from homeassistant.components.kitchen_sink import DOMAIN
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from tests.typing import WebSocketGenerator
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
async def backup_only() -> AsyncGenerator[None]:
|
||||
"""Enable only the backup platform.
|
||||
|
||||
The backup platform is not an entity platform.
|
||||
"""
|
||||
with patch(
|
||||
"homeassistant.components.kitchen_sink.COMPONENTS_WITH_DEMO_PLATFORM",
|
||||
[],
|
||||
):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
async def setup_integration(hass: HomeAssistant) -> AsyncGenerator[None]:
|
||||
"""Set up Kitchen Sink integration."""
|
||||
with patch("homeassistant.components.backup.is_hassio", return_value=True):
|
||||
assert await async_setup_component(hass, BACKUP_DOMAIN, {BACKUP_DOMAIN: {}})
|
||||
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
|
||||
await hass.async_block_till_done()
|
||||
yield
|
||||
|
||||
|
||||
async def test_agents_info(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
) -> None:
|
||||
"""Test backup agents info."""
|
||||
client = await hass_ws_client(hass)
|
||||
|
||||
await client.send_json_auto_id({"type": "backup/agents/info"})
|
||||
response = await client.receive_json()
|
||||
|
||||
assert response["success"]
|
||||
assert response["result"] == {
|
||||
"agents": [{"agent_id": "kitchen_sink.syncer"}],
|
||||
"syncing": False,
|
||||
}
|
||||
|
||||
|
||||
async def test_agents_list_backups(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
) -> None:
|
||||
"""Test backup agents list backups."""
|
||||
client = await hass_ws_client(hass)
|
||||
|
||||
await client.send_json_auto_id({"type": "backup/agents/list_backups"})
|
||||
response = await client.receive_json()
|
||||
|
||||
assert response["success"]
|
||||
assert response["result"] == [
|
||||
{
|
||||
"agent_id": "kitchen_sink.syncer",
|
||||
"date": "1970-01-01T00:00:00Z",
|
||||
"id": "def456",
|
||||
"slug": "abc123",
|
||||
"size": 1234,
|
||||
"name": "Kitchen sink syncer",
|
||||
"protected": False,
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
async def test_agents_download(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test backup agents download."""
|
||||
client = await hass_ws_client(hass)
|
||||
backup_id = "def456"
|
||||
slug = "abc123"
|
||||
|
||||
await client.send_json_auto_id(
|
||||
{
|
||||
"type": "backup/agents/download",
|
||||
"slug": slug,
|
||||
"agent_id": "kitchen_sink.syncer",
|
||||
"backup_id": backup_id,
|
||||
}
|
||||
)
|
||||
response = await client.receive_json()
|
||||
|
||||
assert response["success"]
|
||||
path = hass.config.path(f"backup/{slug}.tar")
|
||||
assert f"Downloading backup {backup_id} to {path}" in caplog.text
|
||||
|
||||
|
||||
async def test_agents_upload(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
hass_supervisor_access_token: str,
|
||||
) -> None:
|
||||
"""Test backup agents upload."""
|
||||
client = await hass_ws_client(hass, hass_supervisor_access_token)
|
||||
slug = "test-backup"
|
||||
test_backup = Backup(
|
||||
slug=slug,
|
||||
name="Test",
|
||||
date="1970-01-01T00:00:00.000Z",
|
||||
path=Path(hass.config.path(f"backups/{slug}.tar")),
|
||||
size=0.0,
|
||||
protected=False,
|
||||
)
|
||||
uuid = UUID(int=123456)
|
||||
|
||||
with (
|
||||
patch("homeassistant.components.kitchen_sink.backup.uuid4", return_value=uuid),
|
||||
patch(
|
||||
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
|
||||
) as fetch_backup,
|
||||
):
|
||||
fetch_backup.return_value = test_backup
|
||||
await client.send_json_auto_id(
|
||||
{
|
||||
"type": "backup/upload",
|
||||
"data": {
|
||||
"slug": slug,
|
||||
},
|
||||
}
|
||||
)
|
||||
response = await client.receive_json()
|
||||
|
||||
assert response["success"]
|
||||
backup_name = f"{slug}.tar"
|
||||
assert f"Uploading backup {backup_name}" in caplog.text
|
||||
|
||||
with patch("homeassistant.components.kitchen_sink.backup.uuid4", return_value=uuid):
|
||||
await client.send_json_auto_id({"type": "backup/agents/list_backups"})
|
||||
response = await client.receive_json()
|
||||
|
||||
assert response["success"]
|
||||
backup_list = response["result"]
|
||||
assert len(backup_list) == 2
|
||||
assert backup_list[1] == {
|
||||
"agent_id": "kitchen_sink.syncer",
|
||||
"date": test_backup.date,
|
||||
"id": uuid.hex,
|
||||
"slug": slug,
|
||||
"size": 0.0,
|
||||
"name": test_backup.name,
|
||||
"protected": test_backup.protected,
|
||||
}
|
|
@ -19,7 +19,23 @@ from .common import get_test_config_dir
|
|||
(
|
||||
None,
|
||||
'{"path": "test"}',
|
||||
backup_restore.RestoreBackupFileContent(backup_file_path=Path("test")),
|
||||
backup_restore.RestoreBackupFileContent(
|
||||
backup_file_path=Path("test"), password=None
|
||||
),
|
||||
),
|
||||
(
|
||||
None,
|
||||
'{"path": "test", "password": "psw"}',
|
||||
backup_restore.RestoreBackupFileContent(
|
||||
backup_file_path=Path("test"), password="psw"
|
||||
),
|
||||
),
|
||||
(
|
||||
None,
|
||||
'{"path": "test", "password": null}',
|
||||
backup_restore.RestoreBackupFileContent(
|
||||
backup_file_path=Path("test"), password=None
|
||||
),
|
||||
),
|
||||
],
|
||||
)
|
||||
|
@ -155,15 +171,17 @@ def test_removal_of_current_configuration_when_restoring() -> None:
|
|||
return_value=[x["path"] for x in mock_config_dir],
|
||||
),
|
||||
mock.patch("pathlib.Path.unlink") as unlink_mock,
|
||||
mock.patch("shutil.rmtree") as rmtreemock,
|
||||
mock.patch("shutil.copytree") as copytree_mock,
|
||||
mock.patch("shutil.rmtree") as rmtree_mock,
|
||||
):
|
||||
assert backup_restore.restore_backup(config_dir) is True
|
||||
assert unlink_mock.call_count == 2
|
||||
assert copytree_mock.call_count == 1
|
||||
assert (
|
||||
rmtreemock.call_count == 1
|
||||
rmtree_mock.call_count == 1
|
||||
) # We have 2 directories in the config directory, but backups is kept
|
||||
|
||||
removed_directories = {Path(call.args[0]) for call in rmtreemock.mock_calls}
|
||||
removed_directories = {Path(call.args[0]) for call in rmtree_mock.mock_calls}
|
||||
assert removed_directories == {Path(config_dir, "www")}
|
||||
|
||||
|
||||
|
@ -177,8 +195,8 @@ def test_extracting_the_contents_of_a_backup_file() -> None:
|
|||
|
||||
getmembers_mock = mock.MagicMock(
|
||||
return_value=[
|
||||
tarfile.TarInfo(name="../data/test"),
|
||||
tarfile.TarInfo(name="data"),
|
||||
tarfile.TarInfo(name="data/../test"),
|
||||
tarfile.TarInfo(name="data/.HA_VERSION"),
|
||||
tarfile.TarInfo(name="data/.storage"),
|
||||
tarfile.TarInfo(name="data/www"),
|
||||
|
@ -190,7 +208,7 @@ def test_extracting_the_contents_of_a_backup_file() -> None:
|
|||
mock.patch(
|
||||
"homeassistant.backup_restore.restore_backup_file_content",
|
||||
return_value=backup_restore.RestoreBackupFileContent(
|
||||
backup_file_path=backup_file_path
|
||||
backup_file_path=backup_file_path,
|
||||
),
|
||||
),
|
||||
mock.patch(
|
||||
|
@ -205,11 +223,37 @@ def test_extracting_the_contents_of_a_backup_file() -> None:
|
|||
mock.patch("pathlib.Path.read_text", _patched_path_read_text),
|
||||
mock.patch("pathlib.Path.is_file", return_value=False),
|
||||
mock.patch("pathlib.Path.iterdir", return_value=[]),
|
||||
mock.patch("shutil.copytree"),
|
||||
):
|
||||
assert backup_restore.restore_backup(config_dir) is True
|
||||
assert getmembers_mock.call_count == 1
|
||||
assert extractall_mock.call_count == 2
|
||||
|
||||
assert {
|
||||
member.name for member in extractall_mock.mock_calls[-1].kwargs["members"]
|
||||
} == {".HA_VERSION", ".storage", "www"}
|
||||
} == {"data", "data/.HA_VERSION", "data/.storage", "data/www"}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("password", "expected"),
|
||||
[
|
||||
("test", b"\xf0\x9b\xb9\x1f\xdc,\xff\xd5x\xd6\xd6\x8fz\x19.\x0f"),
|
||||
("lorem ipsum...", b"#\xe0\xfc\xe0\xdb?_\x1f,$\rQ\xf4\xf5\xd8\xfb"),
|
||||
],
|
||||
)
|
||||
def test_pw_to_key(password: str | None, expected: bytes | None) -> None:
|
||||
"""Test password to key conversion."""
|
||||
assert backup_restore.password_to_key(password) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("password", "expected"),
|
||||
[
|
||||
(None, None),
|
||||
("test", b"\xf0\x9b\xb9\x1f\xdc,\xff\xd5x\xd6\xd6\x8fz\x19.\x0f"),
|
||||
("lorem ipsum...", b"#\xe0\xfc\xe0\xdb?_\x1f,$\rQ\xf4\xf5\xd8\xfb"),
|
||||
],
|
||||
)
|
||||
def test_pw_to_key_none(password: str | None, expected: bytes | None) -> None:
|
||||
"""Test password to key conversion."""
|
||||
with pytest.raises(AttributeError):
|
||||
backup_restore.password_to_key(None)
|
||||
|
|
Loading…
Add table
Reference in a new issue