2018-10-02 04:16:43 -04:00
|
|
|
"""Test Home Assistant json utility functions."""
|
2020-04-13 23:46:41 -07:00
|
|
|
from datetime import datetime
|
|
|
|
from functools import partial
|
|
|
|
from json import JSONEncoder, dumps
|
|
|
|
import math
|
2018-10-02 04:16:43 -04:00
|
|
|
import os
|
|
|
|
from tempfile import mkdtemp
|
2022-07-04 08:41:23 -05:00
|
|
|
from unittest.mock import Mock, patch
|
2018-10-02 04:16:43 -04:00
|
|
|
|
2019-04-30 18:20:38 +02:00
|
|
|
import pytest
|
|
|
|
|
2020-04-13 23:46:41 -07:00
|
|
|
from homeassistant.core import Event, State
|
2018-10-02 04:16:43 -04:00
|
|
|
from homeassistant.exceptions import HomeAssistantError
|
2022-07-04 08:41:23 -05:00
|
|
|
from homeassistant.helpers.json import JSONEncoder as DefaultHASSJSONEncoder
|
2020-02-17 10:49:42 -08:00
|
|
|
from homeassistant.util.json import (
|
|
|
|
SerializationError,
|
|
|
|
find_paths_unserializable_data,
|
2023-02-16 19:40:47 +01:00
|
|
|
json_loads_array,
|
2023-02-16 11:37:57 +01:00
|
|
|
json_loads_object,
|
2020-02-17 10:49:42 -08:00
|
|
|
load_json,
|
|
|
|
save_json,
|
|
|
|
)
|
2018-11-28 07:16:43 -05:00
|
|
|
|
2018-10-02 04:16:43 -04:00
|
|
|
# Test data that can be saved as JSON
|
|
|
|
TEST_JSON_A = {"a": 1, "B": "two"}
|
|
|
|
TEST_JSON_B = {"a": "one", "B": 2}
|
2023-02-03 11:37:16 +01:00
|
|
|
# Test data that cannot be loaded as JSON
|
2018-10-02 04:16:43 -04:00
|
|
|
TEST_BAD_SERIALIED = "THIS IS NOT JSON\n"
|
2019-04-30 18:20:38 +02:00
|
|
|
TMP_DIR = None
|
|
|
|
|
|
|
|
|
2022-02-08 10:03:27 +01:00
|
|
|
@pytest.fixture(autouse=True)
|
|
|
|
def setup_and_teardown():
|
|
|
|
"""Clean up after tests."""
|
2019-04-30 18:20:38 +02:00
|
|
|
global TMP_DIR
|
|
|
|
TMP_DIR = mkdtemp()
|
|
|
|
|
2022-02-08 10:03:27 +01:00
|
|
|
yield
|
2019-04-30 18:20:38 +02:00
|
|
|
|
|
|
|
for fname in os.listdir(TMP_DIR):
|
|
|
|
os.remove(os.path.join(TMP_DIR, fname))
|
|
|
|
os.rmdir(TMP_DIR)
|
|
|
|
|
|
|
|
|
|
|
|
def _path_for(leaf_name):
|
2020-04-07 23:14:28 +02:00
|
|
|
return os.path.join(TMP_DIR, f"{leaf_name}.json")
|
2019-04-30 18:20:38 +02:00
|
|
|
|
|
|
|
|
2023-02-07 14:20:06 +01:00
|
|
|
def test_save_and_load() -> None:
|
2019-04-30 18:20:38 +02:00
|
|
|
"""Test saving and loading back."""
|
|
|
|
fname = _path_for("test1")
|
|
|
|
save_json(fname, TEST_JSON_A)
|
|
|
|
data = load_json(fname)
|
|
|
|
assert data == TEST_JSON_A
|
|
|
|
|
|
|
|
|
2023-02-07 14:20:06 +01:00
|
|
|
def test_save_and_load_int_keys() -> None:
|
2022-06-24 09:59:01 -05:00
|
|
|
"""Test saving and loading back stringifies the keys."""
|
|
|
|
fname = _path_for("test1")
|
|
|
|
save_json(fname, {1: "a", 2: "b"})
|
|
|
|
data = load_json(fname)
|
|
|
|
assert data == {"1": "a", "2": "b"}
|
|
|
|
|
|
|
|
|
2023-02-07 14:20:06 +01:00
|
|
|
def test_save_and_load_private() -> None:
|
2019-04-30 18:20:38 +02:00
|
|
|
"""Test we can load private files and that they are protected."""
|
|
|
|
fname = _path_for("test2")
|
|
|
|
save_json(fname, TEST_JSON_A, private=True)
|
|
|
|
data = load_json(fname)
|
|
|
|
assert data == TEST_JSON_A
|
|
|
|
stats = os.stat(fname)
|
|
|
|
assert stats.st_mode & 0o77 == 0
|
|
|
|
|
|
|
|
|
2021-11-15 04:19:31 -06:00
|
|
|
@pytest.mark.parametrize("atomic_writes", [True, False])
|
|
|
|
def test_overwrite_and_reload(atomic_writes):
|
2019-04-30 18:20:38 +02:00
|
|
|
"""Test that we can overwrite an existing file and read back."""
|
|
|
|
fname = _path_for("test3")
|
2021-11-15 04:19:31 -06:00
|
|
|
save_json(fname, TEST_JSON_A, atomic_writes=atomic_writes)
|
|
|
|
save_json(fname, TEST_JSON_B, atomic_writes=atomic_writes)
|
2019-04-30 18:20:38 +02:00
|
|
|
data = load_json(fname)
|
|
|
|
assert data == TEST_JSON_B
|
|
|
|
|
|
|
|
|
2023-02-07 14:20:06 +01:00
|
|
|
def test_save_bad_data() -> None:
|
2022-06-23 13:32:45 -05:00
|
|
|
"""Test error from trying to save unserializable data."""
|
2019-04-30 18:20:38 +02:00
|
|
|
|
2022-07-17 07:25:19 -05:00
|
|
|
class CannotSerializeMe:
|
|
|
|
"""Cannot serialize this."""
|
2019-04-30 18:20:38 +02:00
|
|
|
|
2022-06-23 13:32:45 -05:00
|
|
|
with pytest.raises(SerializationError) as excinfo:
|
2022-07-17 07:25:19 -05:00
|
|
|
save_json("test4", {"hello": CannotSerializeMe()})
|
2022-06-23 13:32:45 -05:00
|
|
|
|
2022-07-17 07:25:19 -05:00
|
|
|
assert "Failed to serialize to JSON: test4. Bad data at $.hello=" in str(
|
|
|
|
excinfo.value
|
2022-06-23 13:32:45 -05:00
|
|
|
)
|
|
|
|
|
|
|
|
|
2023-02-07 14:20:06 +01:00
|
|
|
def test_load_bad_data() -> None:
|
2019-04-30 18:20:38 +02:00
|
|
|
"""Test error from trying to load unserialisable data."""
|
|
|
|
fname = _path_for("test5")
|
|
|
|
with open(fname, "w") as fh:
|
|
|
|
fh.write(TEST_BAD_SERIALIED)
|
|
|
|
with pytest.raises(HomeAssistantError):
|
|
|
|
load_json(fname)
|
|
|
|
|
|
|
|
|
2023-02-07 14:20:06 +01:00
|
|
|
def test_custom_encoder() -> None:
|
2019-04-30 18:20:38 +02:00
|
|
|
"""Test serializing with a custom encoder."""
|
2019-07-31 12:25:30 -07:00
|
|
|
|
2019-04-30 18:20:38 +02:00
|
|
|
class MockJSONEncoder(JSONEncoder):
|
|
|
|
"""Mock JSON encoder."""
|
2018-10-02 04:16:43 -04:00
|
|
|
|
2019-04-30 18:20:38 +02:00
|
|
|
def default(self, o):
|
|
|
|
"""Mock JSON encode method."""
|
|
|
|
return "9"
|
2018-10-02 04:16:43 -04:00
|
|
|
|
2019-04-30 18:20:38 +02:00
|
|
|
fname = _path_for("test6")
|
|
|
|
save_json(fname, Mock(), encoder=MockJSONEncoder)
|
|
|
|
data = load_json(fname)
|
|
|
|
assert data == "9"
|
2020-02-17 10:49:42 -08:00
|
|
|
|
|
|
|
|
2023-02-07 14:20:06 +01:00
|
|
|
def test_default_encoder_is_passed() -> None:
|
2022-07-04 08:41:23 -05:00
|
|
|
"""Test we use orjson if they pass in the default encoder."""
|
|
|
|
fname = _path_for("test6")
|
|
|
|
with patch(
|
|
|
|
"homeassistant.util.json.orjson.dumps", return_value=b"{}"
|
|
|
|
) as mock_orjson_dumps:
|
|
|
|
save_json(fname, {"any": 1}, encoder=DefaultHASSJSONEncoder)
|
|
|
|
assert len(mock_orjson_dumps.mock_calls) == 1
|
|
|
|
# Patch json.dumps to make sure we are using the orjson path
|
|
|
|
with patch("homeassistant.util.json.json.dumps", side_effect=Exception):
|
|
|
|
save_json(fname, {"any": {1}}, encoder=DefaultHASSJSONEncoder)
|
|
|
|
data = load_json(fname)
|
|
|
|
assert data == {"any": [1]}
|
|
|
|
|
|
|
|
|
2023-02-07 14:20:06 +01:00
|
|
|
def test_find_unserializable_data() -> None:
|
2020-02-17 10:49:42 -08:00
|
|
|
"""Find unserializeable data."""
|
2020-04-13 23:46:41 -07:00
|
|
|
assert find_paths_unserializable_data(1) == {}
|
|
|
|
assert find_paths_unserializable_data([1, 2]) == {}
|
|
|
|
assert find_paths_unserializable_data({"something": "yo"}) == {}
|
|
|
|
|
|
|
|
assert find_paths_unserializable_data({"something": set()}) == {
|
|
|
|
"$.something": set()
|
|
|
|
}
|
|
|
|
assert find_paths_unserializable_data({"something": [1, set()]}) == {
|
|
|
|
"$.something[1]": set()
|
|
|
|
}
|
|
|
|
assert find_paths_unserializable_data([1, {"bla": set(), "blub": set()}]) == {
|
|
|
|
"$[1].bla": set(),
|
|
|
|
"$[1].blub": set(),
|
|
|
|
}
|
|
|
|
assert find_paths_unserializable_data({("A",): 1}) == {"$<key: ('A',)>": ("A",)}
|
|
|
|
assert math.isnan(
|
|
|
|
find_paths_unserializable_data(
|
|
|
|
float("nan"), dump=partial(dumps, allow_nan=False)
|
|
|
|
)["$"]
|
|
|
|
)
|
|
|
|
|
|
|
|
# Test custom encoder + State support.
|
|
|
|
|
|
|
|
class MockJSONEncoder(JSONEncoder):
|
|
|
|
"""Mock JSON encoder."""
|
|
|
|
|
|
|
|
def default(self, o):
|
|
|
|
"""Mock JSON encode method."""
|
|
|
|
if isinstance(o, datetime):
|
|
|
|
return o.isoformat()
|
|
|
|
return super().default(o)
|
|
|
|
|
|
|
|
bad_data = object()
|
|
|
|
|
2022-02-05 14:19:37 +01:00
|
|
|
assert find_paths_unserializable_data(
|
|
|
|
[State("mock_domain.mock_entity", "on", {"bad": bad_data})],
|
|
|
|
dump=partial(dumps, cls=MockJSONEncoder),
|
|
|
|
) == {"$[0](State: mock_domain.mock_entity).attributes.bad": bad_data}
|
2020-08-27 13:56:20 +02:00
|
|
|
|
2022-02-05 14:19:37 +01:00
|
|
|
assert find_paths_unserializable_data(
|
|
|
|
[Event("bad_event", {"bad_attribute": bad_data})],
|
|
|
|
dump=partial(dumps, cls=MockJSONEncoder),
|
|
|
|
) == {"$[0](Event: bad_event).data.bad_attribute": bad_data}
|
2021-03-15 02:41:25 -07:00
|
|
|
|
|
|
|
class BadData:
|
|
|
|
def __init__(self):
|
|
|
|
self.bla = bad_data
|
|
|
|
|
|
|
|
def as_dict(self):
|
|
|
|
return {"bla": self.bla}
|
|
|
|
|
2022-02-05 14:19:37 +01:00
|
|
|
assert find_paths_unserializable_data(
|
|
|
|
BadData(),
|
|
|
|
dump=partial(dumps, cls=MockJSONEncoder),
|
|
|
|
) == {"$(BadData).bla": bad_data}
|
2023-02-16 11:37:57 +01:00
|
|
|
|
|
|
|
|
2023-02-16 19:40:47 +01:00
|
|
|
def test_json_loads_array() -> None:
|
|
|
|
"""Test json_loads_array validates result."""
|
|
|
|
assert json_loads_array('[{"c":1.2}]') == [{"c": 1.2}]
|
|
|
|
with pytest.raises(
|
|
|
|
ValueError, match="Expected JSON to be parsed as a list got <class 'dict'>"
|
|
|
|
):
|
|
|
|
json_loads_array("{}")
|
|
|
|
with pytest.raises(
|
|
|
|
ValueError, match="Expected JSON to be parsed as a list got <class 'bool'>"
|
|
|
|
):
|
|
|
|
json_loads_array("true")
|
|
|
|
with pytest.raises(
|
|
|
|
ValueError, match="Expected JSON to be parsed as a list got <class 'NoneType'>"
|
|
|
|
):
|
|
|
|
json_loads_array("null")
|
|
|
|
|
|
|
|
|
2023-02-16 11:37:57 +01:00
|
|
|
def test_json_loads_object() -> None:
|
|
|
|
"""Test json_loads_object validates result."""
|
|
|
|
assert json_loads_object('{"c":1.2}') == {"c": 1.2}
|
|
|
|
with pytest.raises(
|
|
|
|
ValueError, match="Expected JSON to be parsed as a dict got <class 'list'>"
|
|
|
|
):
|
|
|
|
json_loads_object("[]")
|
|
|
|
with pytest.raises(
|
|
|
|
ValueError, match="Expected JSON to be parsed as a dict got <class 'bool'>"
|
|
|
|
):
|
|
|
|
json_loads_object("true")
|
|
|
|
with pytest.raises(
|
|
|
|
ValueError, match="Expected JSON to be parsed as a dict got <class 'NoneType'>"
|
|
|
|
):
|
|
|
|
json_loads_object("null")
|
|
|
|
|
|
|
|
|
|
|
|
async def test_deprecated_test_find_unserializable_data(
|
|
|
|
caplog: pytest.LogCaptureFixture,
|
|
|
|
) -> None:
|
|
|
|
"""Test deprecated test_find_unserializable_data logs a warning."""
|
|
|
|
find_paths_unserializable_data(1)
|
|
|
|
assert (
|
|
|
|
"uses find_paths_unserializable_data from homeassistant.util.json"
|
|
|
|
in caplog.text
|
|
|
|
)
|
|
|
|
assert "should be updated to use homeassistant.helpers.json module" in caplog.text
|
|
|
|
|
|
|
|
|
|
|
|
async def test_deprecated_save_json(caplog: pytest.LogCaptureFixture) -> None:
|
|
|
|
"""Test deprecated save_json logs a warning."""
|
|
|
|
fname = _path_for("test1")
|
|
|
|
save_json(fname, TEST_JSON_A)
|
|
|
|
assert "uses save_json from homeassistant.util.json" in caplog.text
|
|
|
|
assert "should be updated to use homeassistant.helpers.json module" in caplog.text
|