Convert test helpers to get hass instance to contextmanagers (#109990)

* Convert get_test_home_assistant helper to contextmanager

* Convert async_test_home_assistant helper to contextmanager

* Move timezone reset to async_test_home_assistant helper
This commit is contained in:
Marc Mueller 2024-02-11 21:23:51 +01:00 committed by GitHub
parent 3342e6ddbd
commit 2ef2172b01
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 750 additions and 784 deletions

View file

@ -516,199 +516,192 @@ async def test_changing_delayed_written_data(
async def test_saving_load_round_trip(tmpdir: py.path.local) -> None:
"""Test saving and loading round trip."""
loop = asyncio.get_running_loop()
hass = await async_test_home_assistant(loop)
async with async_test_home_assistant() as hass:
hass.config.config_dir = await hass.async_add_executor_job(
tmpdir.mkdir, "temp_storage"
)
hass.config.config_dir = await hass.async_add_executor_job(
tmpdir.mkdir, "temp_storage"
)
class NamedTupleSubclass(NamedTuple):
"""A NamedTuple subclass."""
class NamedTupleSubclass(NamedTuple):
"""A NamedTuple subclass."""
name: str
name: str
nts = NamedTupleSubclass("a")
nts = NamedTupleSubclass("a")
data = {
"named_tuple_subclass": nts,
"rgb_color": RGBColor(255, 255, 0),
"set": {1, 2, 3},
"list": [1, 2, 3],
"tuple": (1, 2, 3),
"dict_with_int": {1: 1, 2: 2},
"dict_with_named_tuple": {1: nts, 2: nts},
}
data = {
"named_tuple_subclass": nts,
"rgb_color": RGBColor(255, 255, 0),
"set": {1, 2, 3},
"list": [1, 2, 3],
"tuple": (1, 2, 3),
"dict_with_int": {1: 1, 2: 2},
"dict_with_named_tuple": {1: nts, 2: nts},
}
store = storage.Store(
hass, MOCK_VERSION_2, MOCK_KEY, minor_version=MOCK_MINOR_VERSION_1
)
await store.async_save(data)
load = await store.async_load()
assert load == {
"dict_with_int": {"1": 1, "2": 2},
"dict_with_named_tuple": {"1": ["a"], "2": ["a"]},
"list": [1, 2, 3],
"named_tuple_subclass": ["a"],
"rgb_color": [255, 255, 0],
"set": [1, 2, 3],
"tuple": [1, 2, 3],
}
store = storage.Store(
hass, MOCK_VERSION_2, MOCK_KEY, minor_version=MOCK_MINOR_VERSION_1
)
await store.async_save(data)
load = await store.async_load()
assert load == {
"dict_with_int": {"1": 1, "2": 2},
"dict_with_named_tuple": {"1": ["a"], "2": ["a"]},
"list": [1, 2, 3],
"named_tuple_subclass": ["a"],
"rgb_color": [255, 255, 0],
"set": [1, 2, 3],
"tuple": [1, 2, 3],
}
await hass.async_stop(force=True)
await hass.async_stop(force=True)
async def test_loading_corrupt_core_file(
tmpdir: py.path.local, caplog: pytest.LogCaptureFixture
) -> None:
"""Test we handle unrecoverable corruption in a core file."""
loop = asyncio.get_running_loop()
hass = await async_test_home_assistant(loop)
async with async_test_home_assistant() as hass:
tmp_storage = await hass.async_add_executor_job(tmpdir.mkdir, "temp_storage")
hass.config.config_dir = tmp_storage
tmp_storage = await hass.async_add_executor_job(tmpdir.mkdir, "temp_storage")
hass.config.config_dir = tmp_storage
storage_key = "core.anything"
store = storage.Store(
hass, MOCK_VERSION_2, storage_key, minor_version=MOCK_MINOR_VERSION_1
)
await store.async_save({"hello": "world"})
storage_path = os.path.join(tmp_storage, ".storage")
store_file = os.path.join(storage_path, store.key)
storage_key = "core.anything"
store = storage.Store(
hass, MOCK_VERSION_2, storage_key, minor_version=MOCK_MINOR_VERSION_1
)
await store.async_save({"hello": "world"})
storage_path = os.path.join(tmp_storage, ".storage")
store_file = os.path.join(storage_path, store.key)
data = await store.async_load()
assert data == {"hello": "world"}
data = await store.async_load()
assert data == {"hello": "world"}
def _corrupt_store():
with open(store_file, "w") as f:
f.write("corrupt")
def _corrupt_store():
with open(store_file, "w") as f:
f.write("corrupt")
await hass.async_add_executor_job(_corrupt_store)
await hass.async_add_executor_job(_corrupt_store)
data = await store.async_load()
assert data is None
assert "Unrecoverable error decoding storage" in caplog.text
data = await store.async_load()
assert data is None
assert "Unrecoverable error decoding storage" in caplog.text
issue_registry = ir.async_get(hass)
found_issue = None
issue_entry = None
for (domain, issue), entry in issue_registry.issues.items():
if domain == HOMEASSISTANT_DOMAIN and issue.startswith(
f"storage_corruption_{storage_key}_"
):
found_issue = issue
issue_entry = entry
break
issue_registry = ir.async_get(hass)
found_issue = None
issue_entry = None
for (domain, issue), entry in issue_registry.issues.items():
if domain == HOMEASSISTANT_DOMAIN and issue.startswith(
f"storage_corruption_{storage_key}_"
):
found_issue = issue
issue_entry = entry
break
assert found_issue is not None
assert issue_entry is not None
assert issue_entry.is_fixable is True
assert issue_entry.translation_placeholders["storage_key"] == storage_key
assert issue_entry.issue_domain == HOMEASSISTANT_DOMAIN
assert (
issue_entry.translation_placeholders["error"]
== "unexpected character: line 1 column 1 (char 0)"
)
assert found_issue is not None
assert issue_entry is not None
assert issue_entry.is_fixable is True
assert issue_entry.translation_placeholders["storage_key"] == storage_key
assert issue_entry.issue_domain == HOMEASSISTANT_DOMAIN
assert (
issue_entry.translation_placeholders["error"]
== "unexpected character: line 1 column 1 (char 0)"
)
files = await hass.async_add_executor_job(
os.listdir, os.path.join(tmp_storage, ".storage")
)
assert ".corrupt" in files[0]
files = await hass.async_add_executor_job(
os.listdir, os.path.join(tmp_storage, ".storage")
)
assert ".corrupt" in files[0]
await hass.async_stop(force=True)
await hass.async_stop(force=True)
async def test_loading_corrupt_file_known_domain(
tmpdir: py.path.local, caplog: pytest.LogCaptureFixture
) -> None:
"""Test we handle unrecoverable corruption for a known domain."""
loop = asyncio.get_running_loop()
hass = await async_test_home_assistant(loop)
hass.config.components.add("testdomain")
storage_key = "testdomain.testkey"
async with async_test_home_assistant() as hass:
hass.config.components.add("testdomain")
storage_key = "testdomain.testkey"
tmp_storage = await hass.async_add_executor_job(tmpdir.mkdir, "temp_storage")
hass.config.config_dir = tmp_storage
tmp_storage = await hass.async_add_executor_job(tmpdir.mkdir, "temp_storage")
hass.config.config_dir = tmp_storage
store = storage.Store(
hass, MOCK_VERSION_2, storage_key, minor_version=MOCK_MINOR_VERSION_1
)
await store.async_save({"hello": "world"})
storage_path = os.path.join(tmp_storage, ".storage")
store_file = os.path.join(storage_path, store.key)
store = storage.Store(
hass, MOCK_VERSION_2, storage_key, minor_version=MOCK_MINOR_VERSION_1
)
await store.async_save({"hello": "world"})
storage_path = os.path.join(tmp_storage, ".storage")
store_file = os.path.join(storage_path, store.key)
data = await store.async_load()
assert data == {"hello": "world"}
data = await store.async_load()
assert data == {"hello": "world"}
def _corrupt_store():
with open(store_file, "w") as f:
f.write('{"valid":"json"}..with..corrupt')
def _corrupt_store():
with open(store_file, "w") as f:
f.write('{"valid":"json"}..with..corrupt')
await hass.async_add_executor_job(_corrupt_store)
await hass.async_add_executor_job(_corrupt_store)
data = await store.async_load()
assert data is None
assert "Unrecoverable error decoding storage" in caplog.text
data = await store.async_load()
assert data is None
assert "Unrecoverable error decoding storage" in caplog.text
issue_registry = ir.async_get(hass)
found_issue = None
issue_entry = None
for (domain, issue), entry in issue_registry.issues.items():
if domain == HOMEASSISTANT_DOMAIN and issue.startswith(
f"storage_corruption_{storage_key}_"
):
found_issue = issue
issue_entry = entry
break
issue_registry = ir.async_get(hass)
found_issue = None
issue_entry = None
for (domain, issue), entry in issue_registry.issues.items():
if domain == HOMEASSISTANT_DOMAIN and issue.startswith(
f"storage_corruption_{storage_key}_"
):
found_issue = issue
issue_entry = entry
break
assert found_issue is not None
assert issue_entry is not None
assert issue_entry.is_fixable is True
assert issue_entry.translation_placeholders["storage_key"] == storage_key
assert issue_entry.issue_domain == "testdomain"
assert (
issue_entry.translation_placeholders["error"]
== "unexpected content after document: line 1 column 17 (char 16)"
)
assert found_issue is not None
assert issue_entry is not None
assert issue_entry.is_fixable is True
assert issue_entry.translation_placeholders["storage_key"] == storage_key
assert issue_entry.issue_domain == "testdomain"
assert (
issue_entry.translation_placeholders["error"]
== "unexpected content after document: line 1 column 17 (char 16)"
)
files = await hass.async_add_executor_job(
os.listdir, os.path.join(tmp_storage, ".storage")
)
assert ".corrupt" in files[0]
files = await hass.async_add_executor_job(
os.listdir, os.path.join(tmp_storage, ".storage")
)
assert ".corrupt" in files[0]
await hass.async_stop(force=True)
await hass.async_stop(force=True)
async def test_os_error_is_fatal(tmpdir: py.path.local) -> None:
"""Test OSError during load is fatal."""
loop = asyncio.get_running_loop()
hass = await async_test_home_assistant(loop)
async with async_test_home_assistant() as hass:
tmp_storage = await hass.async_add_executor_job(tmpdir.mkdir, "temp_storage")
hass.config.config_dir = tmp_storage
tmp_storage = await hass.async_add_executor_job(tmpdir.mkdir, "temp_storage")
hass.config.config_dir = tmp_storage
store = storage.Store(
hass, MOCK_VERSION_2, MOCK_KEY, minor_version=MOCK_MINOR_VERSION_1
)
await store.async_save({"hello": "world"})
store = storage.Store(
hass, MOCK_VERSION_2, MOCK_KEY, minor_version=MOCK_MINOR_VERSION_1
)
await store.async_save({"hello": "world"})
with pytest.raises(OSError), patch(
"homeassistant.helpers.storage.json_util.load_json", side_effect=OSError
):
await store.async_load()
with pytest.raises(OSError), patch(
"homeassistant.helpers.storage.json_util.load_json", side_effect=OSError
):
await store.async_load()
base_os_error = OSError()
base_os_error.errno = 30
home_assistant_error = HomeAssistantError()
home_assistant_error.__cause__ = base_os_error
base_os_error = OSError()
base_os_error.errno = 30
home_assistant_error = HomeAssistantError()
home_assistant_error.__cause__ = base_os_error
with pytest.raises(HomeAssistantError), patch(
"homeassistant.helpers.storage.json_util.load_json",
side_effect=home_assistant_error,
):
await store.async_load()
with pytest.raises(HomeAssistantError), patch(
"homeassistant.helpers.storage.json_util.load_json",
side_effect=home_assistant_error,
):
await store.async_load()
await hass.async_stop(force=True)
await hass.async_stop(force=True)
async def test_read_only_store(