Use tmp_path in recorder tests (#91202)

This commit is contained in:
epenet 2023-04-11 09:18:16 +02:00 committed by GitHub
parent a7093d3687
commit 2f7c5a56eb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 53 additions and 37 deletions

View file

@ -3,10 +3,10 @@ from collections.abc import Callable
# pylint: disable=invalid-name
import importlib
from pathlib import Path
import sys
from unittest.mock import patch
import py
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
@ -129,10 +129,12 @@ def _create_engine_28(*args, **kwargs):
def test_delete_metadata_duplicates(
caplog: pytest.LogCaptureFixture, tmpdir: py.path.local
caplog: pytest.LogCaptureFixture, tmp_path: Path
) -> None:
"""Test removal of duplicated statistics."""
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
test_dir = tmp_path.joinpath("sqlite")
test_dir.mkdir()
test_db_file = test_dir.joinpath("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
module = "tests.components.recorder.db_schema_28"
@ -222,10 +224,12 @@ def test_delete_metadata_duplicates(
def test_delete_metadata_duplicates_many(
caplog: pytest.LogCaptureFixture, tmpdir: py.path.local
caplog: pytest.LogCaptureFixture, tmp_path: Path
) -> None:
"""Test removal of duplicated statistics."""
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
test_dir = tmp_path.joinpath("sqlite")
test_dir.mkdir()
test_db_file = test_dir.joinpath("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
module = "tests.components.recorder.db_schema_28"

View file

@ -11,7 +11,6 @@ from typing import cast
from unittest.mock import Mock, patch
from freezegun.api import FrozenDateTimeFactory
import py
import pytest
from sqlalchemy.exc import DatabaseError, OperationalError, SQLAlchemyError
@ -1280,11 +1279,13 @@ def test_statistics_runs_initiated(hass_recorder: Callable[..., HomeAssistant])
@pytest.mark.freeze_time("2022-09-13 09:00:00+02:00")
def test_compile_missing_statistics(
tmpdir: py.path.local, freezer: FrozenDateTimeFactory
tmp_path: Path, freezer: FrozenDateTimeFactory
) -> None:
"""Test missing statistics are compiled on startup."""
now = dt_util.utcnow().replace(minute=0, second=0, microsecond=0)
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
test_dir = tmp_path.joinpath("sqlite")
test_dir.mkdir()
test_db_file = test_dir.joinpath("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
hass = get_test_home_assistant()
@ -1541,9 +1542,11 @@ def test_service_disable_states_not_recording(
)
def test_service_disable_run_information_recorded(tmpdir: py.path.local) -> None:
def test_service_disable_run_information_recorded(tmp_path: Path) -> None:
"""Test that runs are still recorded when recorder is disabled."""
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
test_dir = tmp_path.joinpath("sqlite")
test_dir.mkdir()
test_db_file = test_dir.joinpath("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
hass = get_test_home_assistant()
@ -1590,12 +1593,14 @@ class CannotSerializeMe:
async def test_database_corruption_while_running(
hass: HomeAssistant, tmpdir: py.path.local, caplog: pytest.LogCaptureFixture
hass: HomeAssistant, tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
"""Test we can recover from sqlite3 db corruption."""
def _create_tmpdir_for_test_db():
return tmpdir.mkdir("sqlite").join("test.db")
def _create_tmpdir_for_test_db() -> Path:
test_dir = tmp_path.joinpath("sqlite")
test_dir.mkdir()
return test_dir.joinpath("test.db")
test_db_file = await hass.async_add_executor_job(_create_tmpdir_for_test_db)
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"

View file

@ -8,10 +8,10 @@ from functools import partial
# pylint: disable=invalid-name
import importlib
import json
from pathlib import Path
import sys
from unittest.mock import patch
import py
import pytest
from homeassistant.components import recorder
@ -36,11 +36,11 @@ SCHEMA_VERSION_POSTFIX = "23_with_newer_columns"
SCHEMA_MODULE = get_schema_module_path(SCHEMA_VERSION_POSTFIX)
def test_delete_duplicates(
caplog: pytest.LogCaptureFixture, tmpdir: py.path.local
) -> None:
def test_delete_duplicates(caplog: pytest.LogCaptureFixture, tmp_path: Path) -> None:
"""Test removal of duplicated statistics."""
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
test_dir = tmp_path.joinpath("sqlite")
test_dir.mkdir()
test_db_file = test_dir.joinpath("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
importlib.import_module(SCHEMA_MODULE)
@ -215,10 +215,12 @@ def test_delete_duplicates(
def test_delete_duplicates_many(
caplog: pytest.LogCaptureFixture, tmpdir: py.path.local
caplog: pytest.LogCaptureFixture, tmp_path: Path
) -> None:
"""Test removal of duplicated statistics."""
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
test_dir = tmp_path.joinpath("sqlite")
test_dir.mkdir()
test_db_file = test_dir.joinpath("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
importlib.import_module(SCHEMA_MODULE)
@ -400,10 +402,12 @@ def test_delete_duplicates_many(
@pytest.mark.freeze_time("2021-08-01 00:00:00+00:00")
def test_delete_duplicates_non_identical(
caplog: pytest.LogCaptureFixture, tmpdir: py.path.local
caplog: pytest.LogCaptureFixture, tmp_path: Path
) -> None:
"""Test removal of duplicated statistics."""
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
test_dir = tmp_path.joinpath("sqlite")
test_dir.mkdir()
test_db_file = test_dir.joinpath("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
importlib.import_module(SCHEMA_MODULE)
@ -529,7 +533,7 @@ def test_delete_duplicates_non_identical(
# Test that the duplicates are removed during migration from schema 23
hass = get_test_home_assistant()
hass.config.config_dir = tmpdir
hass.config.config_dir = tmp_path
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
hass.start()
@ -579,10 +583,12 @@ def test_delete_duplicates_non_identical(
def test_delete_duplicates_short_term(
caplog: pytest.LogCaptureFixture, tmpdir: py.path.local
caplog: pytest.LogCaptureFixture, tmp_path: Path
) -> None:
"""Test removal of duplicated statistics."""
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
test_dir = tmp_path.joinpath("sqlite")
test_dir.mkdir()
test_db_file = test_dir.joinpath("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
importlib.import_module(SCHEMA_MODULE)
@ -638,7 +644,7 @@ def test_delete_duplicates_short_term(
# Test that the duplicates are removed during migration from schema 23
hass = get_test_home_assistant()
hass.config.config_dir = tmpdir
hass.config.config_dir = tmp_path
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
hass.start()

View file

@ -6,7 +6,6 @@ from pathlib import Path
import sqlite3
from unittest.mock import MagicMock, Mock, patch
import py
import pytest
from sqlalchemy import text
from sqlalchemy.engine.result import ChunkedIteratorResult
@ -73,11 +72,11 @@ def test_recorder_bad_execute(hass_recorder: Callable[..., HomeAssistant]) -> No
def test_validate_or_move_away_sqlite_database(
hass: HomeAssistant, tmpdir: py.path.local, caplog: pytest.LogCaptureFixture
hass: HomeAssistant, tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
"""Ensure a malformed sqlite database is moved away."""
test_dir = tmpdir.mkdir("test_validate_or_move_away_sqlite_database")
test_dir = tmp_path.joinpath("test_validate_or_move_away_sqlite_database")
test_dir.mkdir()
test_db_file = f"{test_dir}/broken.db"
dburl = f"{SQLITE_URL_PREFIX}{test_db_file}"

View file

@ -3,10 +3,10 @@
import asyncio
from datetime import timedelta
import importlib
from pathlib import Path
import sys
from unittest.mock import patch
import py
import pytest
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import Session
@ -52,11 +52,11 @@ def _create_engine_test(*args, **kwargs):
return engine
async def test_migrate_times(
caplog: pytest.LogCaptureFixture, tmpdir: py.path.local
) -> None:
async def test_migrate_times(caplog: pytest.LogCaptureFixture, tmp_path: Path) -> None:
"""Test we can migrate times."""
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
test_dir = tmp_path.joinpath("sqlite")
test_dir.mkdir()
test_db_file = test_dir.joinpath("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
importlib.import_module(SCHEMA_MODULE)
@ -225,10 +225,12 @@ async def test_migrate_times(
async def test_migrate_can_resume_entity_id_post_migration(
caplog: pytest.LogCaptureFixture, tmpdir: py.path.local
caplog: pytest.LogCaptureFixture, tmp_path: Path
) -> None:
"""Test we resume the entity id post migration after a restart."""
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
test_dir = tmp_path.joinpath("sqlite")
test_dir.mkdir()
test_db_file = test_dir.joinpath("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
importlib.import_module(SCHEMA_MODULE)