diff --git a/tests/components/recorder/conftest.py b/tests/components/recorder/conftest.py new file mode 100644 index 00000000000..d91a86402ac --- /dev/null +++ b/tests/components/recorder/conftest.py @@ -0,0 +1,24 @@ +"""Common test tools.""" + +import pytest + +from homeassistant.components.recorder.const import DATA_INSTANCE + +from tests.common import get_test_home_assistant, init_recorder_component + + +@pytest.fixture +def hass_recorder(): + """Home Assistant fixture with in-memory recorder.""" + hass = get_test_home_assistant() + + def setup_recorder(config=None): + """Set up with params.""" + init_recorder_component(hass, config) + hass.start() + hass.block_till_done() + hass.data[DATA_INSTANCE].block_till_done() + return hass + + yield setup_recorder + hass.stop() diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 08af027bce3..fb860348a46 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -1,9 +1,6 @@ """The tests for the Recorder component.""" # pylint: disable=protected-access from datetime import datetime, timedelta -import unittest - -import pytest from homeassistant.components.recorder import ( CONFIG_SCHEMA, @@ -24,99 +21,69 @@ from homeassistant.util import dt as dt_util from .common import wait_recording_done from tests.async_mock import patch -from tests.common import ( - async_fire_time_changed, - get_test_home_assistant, - init_recorder_component, -) +from tests.common import async_fire_time_changed, get_test_home_assistant -class TestRecorder(unittest.TestCase): - """Test the recorder module.""" +def test_saving_state(hass, hass_recorder): + """Test saving and restoring a state.""" + hass = hass_recorder() - def setUp(self): # pylint: disable=invalid-name - """Set up things to be run when tests are started.""" - self.hass = get_test_home_assistant() - init_recorder_component(self.hass) - self.hass.start() - self.addCleanup(self.tear_down_cleanup) + entity_id = "test.recorder" + state = "restoring_from_db" + attributes = {"test_attr": 5, "test_attr_10": "nice"} - def tear_down_cleanup(self): - """Stop everything that was started.""" - self.hass.stop() + hass.states.set(entity_id, state, attributes) - def test_saving_state(self): - """Test saving and restoring a state.""" - entity_id = "test.recorder" - state = "restoring_from_db" - attributes = {"test_attr": 5, "test_attr_10": "nice"} + wait_recording_done(hass) - self.hass.states.set(entity_id, state, attributes) + with session_scope(hass=hass) as session: + db_states = list(session.query(States)) + assert len(db_states) == 1 + assert db_states[0].event_id > 0 + state = db_states[0].to_native() - wait_recording_done(self.hass) - - with session_scope(hass=self.hass) as session: - db_states = list(session.query(States)) - assert len(db_states) == 1 - assert db_states[0].event_id > 0 - state = db_states[0].to_native() - - assert state == _state_empty_context(self.hass, entity_id) - - def test_saving_event(self): - """Test saving and restoring an event.""" - event_type = "EVENT_TEST" - event_data = {"test_attr": 5, "test_attr_10": "nice"} - - events = [] - - @callback - def event_listener(event): - """Record events from eventbus.""" - if event.event_type == event_type: - events.append(event) - - self.hass.bus.listen(MATCH_ALL, event_listener) - - self.hass.bus.fire(event_type, event_data) - - wait_recording_done(self.hass) - - assert len(events) == 1 - event = events[0] - - self.hass.data[DATA_INSTANCE].block_till_done() - - with session_scope(hass=self.hass) as session: - db_events = list(session.query(Events).filter_by(event_type=event_type)) - assert len(db_events) == 1 - db_event = db_events[0].to_native() - - assert event.event_type == db_event.event_type - assert event.data == db_event.data - assert event.origin == db_event.origin - - # Recorder uses SQLite and stores datetimes as integer unix timestamps - assert event.time_fired.replace(microsecond=0) == db_event.time_fired.replace( - microsecond=0 - ) + assert state == _state_empty_context(hass, entity_id) -@pytest.fixture -def hass_recorder(): - """Home Assistant fixture with in-memory recorder.""" - hass = get_test_home_assistant() +def test_saving_event(hass, hass_recorder): + """Test saving and restoring an event.""" + hass = hass_recorder() - def setup_recorder(config=None): - """Set up with params.""" - init_recorder_component(hass, config) - hass.start() - hass.block_till_done() - hass.data[DATA_INSTANCE].block_till_done() - return hass + event_type = "EVENT_TEST" + event_data = {"test_attr": 5, "test_attr_10": "nice"} - yield setup_recorder - hass.stop() + events = [] + + @callback + def event_listener(event): + """Record events from eventbus.""" + if event.event_type == event_type: + events.append(event) + + hass.bus.listen(MATCH_ALL, event_listener) + + hass.bus.fire(event_type, event_data) + + wait_recording_done(hass) + + assert len(events) == 1 + event = events[0] + + hass.data[DATA_INSTANCE].block_till_done() + + with session_scope(hass=hass) as session: + db_events = list(session.query(Events).filter_by(event_type=event_type)) + assert len(db_events) == 1 + db_event = db_events[0].to_native() + + assert event.event_type == db_event.event_type + assert event.data == db_event.data + assert event.origin == db_event.origin + + # Recorder uses SQLite and stores datetimes as integer unix timestamps + assert event.time_fired.replace(microsecond=0) == db_event.time_fired.replace( + microsecond=0 + ) def _add_entities(hass, entity_ids): diff --git a/tests/components/recorder/test_models.py b/tests/components/recorder/test_models.py index c11fa8a692f..26a1e487ba8 100644 --- a/tests/components/recorder/test_models.py +++ b/tests/components/recorder/test_models.py @@ -1,6 +1,5 @@ """The tests for the Recorder component.""" from datetime import datetime -import unittest import pytest import pytz @@ -21,150 +20,112 @@ from homeassistant.exceptions import InvalidEntityFormatError from homeassistant.util import dt import homeassistant.util.dt as dt_util -ENGINE = None -SESSION = None + +def test_from_event_to_db_event(): + """Test converting event to db event.""" + event = ha.Event("test_event", {"some_data": 15}) + assert event == Events.from_event(event).to_native() -def setUpModule(): # pylint: disable=invalid-name - """Set up a database to use.""" - global ENGINE - global SESSION - - ENGINE = create_engine("sqlite://") - Base.metadata.create_all(ENGINE) - session_factory = sessionmaker(bind=ENGINE) - SESSION = scoped_session(session_factory) +def test_from_event_to_db_state(): + """Test converting event to db state.""" + state = ha.State("sensor.temperature", "18") + event = ha.Event( + EVENT_STATE_CHANGED, + {"entity_id": "sensor.temperature", "old_state": None, "new_state": state}, + context=state.context, + ) + # We don't restore context unless we need it by joining the + # events table on the event_id for state_changed events + state.context = ha.Context(id=None) + assert state == States.from_event(event).to_native() -def tearDownModule(): # pylint: disable=invalid-name - """Close database.""" - global ENGINE - global SESSION +def test_from_event_to_delete_state(): + """Test converting deleting state event to db state.""" + event = ha.Event( + EVENT_STATE_CHANGED, + { + "entity_id": "sensor.temperature", + "old_state": ha.State("sensor.temperature", "18"), + "new_state": None, + }, + ) + db_state = States.from_event(event) - ENGINE.dispose() - ENGINE = None - SESSION = None + assert db_state.entity_id == "sensor.temperature" + assert db_state.domain == "sensor" + assert db_state.state == "" + assert db_state.last_changed == event.time_fired + assert db_state.last_updated == event.time_fired -class TestEvents(unittest.TestCase): - """Test Events model.""" +def test_entity_ids(): + """Test if entity ids helper method works.""" + engine = create_engine("sqlite://") + Base.metadata.create_all(engine) + session_factory = sessionmaker(bind=engine) - # pylint: disable=no-self-use - def test_from_event(self): - """Test converting event to db event.""" - event = ha.Event("test_event", {"some_data": 15}) - assert event == Events.from_event(event).to_native() + session = scoped_session(session_factory) + session.query(Events).delete() + session.query(States).delete() + session.query(RecorderRuns).delete() + run = RecorderRuns( + start=datetime(2016, 7, 9, 11, 0, 0, tzinfo=dt.UTC), + end=datetime(2016, 7, 9, 23, 0, 0, tzinfo=dt.UTC), + closed_incorrect=False, + created=datetime(2016, 7, 9, 11, 0, 0, tzinfo=dt.UTC), + ) -class TestStates(unittest.TestCase): - """Test States model.""" + session.add(run) + session.commit() - # pylint: disable=no-self-use + before_run = datetime(2016, 7, 9, 8, 0, 0, tzinfo=dt.UTC) + in_run = datetime(2016, 7, 9, 13, 0, 0, tzinfo=dt.UTC) + in_run2 = datetime(2016, 7, 9, 15, 0, 0, tzinfo=dt.UTC) + in_run3 = datetime(2016, 7, 9, 18, 0, 0, tzinfo=dt.UTC) + after_run = datetime(2016, 7, 9, 23, 30, 0, tzinfo=dt.UTC) - def test_from_event(self): - """Test converting event to db state.""" - state = ha.State("sensor.temperature", "18") - event = ha.Event( - EVENT_STATE_CHANGED, - {"entity_id": "sensor.temperature", "old_state": None, "new_state": state}, - context=state.context, + assert run.to_native() == run + assert run.entity_ids() == [] + + session.add( + States( + entity_id="sensor.temperature", + state="20", + last_changed=before_run, + last_updated=before_run, ) - # We don't restore context unless we need it by joining the - # events table on the event_id for state_changed events - state.context = ha.Context(id=None) - assert state == States.from_event(event).to_native() - - def test_from_event_to_delete_state(self): - """Test converting deleting state event to db state.""" - event = ha.Event( - EVENT_STATE_CHANGED, - { - "entity_id": "sensor.temperature", - "old_state": ha.State("sensor.temperature", "18"), - "new_state": None, - }, + ) + session.add( + States( + entity_id="sensor.sound", + state="10", + last_changed=after_run, + last_updated=after_run, ) - db_state = States.from_event(event) + ) - assert db_state.entity_id == "sensor.temperature" - assert db_state.domain == "sensor" - assert db_state.state == "" - assert db_state.last_changed == event.time_fired - assert db_state.last_updated == event.time_fired - - -class TestRecorderRuns(unittest.TestCase): - """Test recorder run model.""" - - def setUp(self): # pylint: disable=invalid-name - """Set up recorder runs.""" - self.session = session = SESSION() - session.query(Events).delete() - session.query(States).delete() - session.query(RecorderRuns).delete() - self.addCleanup(self.tear_down_cleanup) - - def tear_down_cleanup(self): - """Clean up.""" - self.session.rollback() - - def test_entity_ids(self): - """Test if entity ids helper method works.""" - run = RecorderRuns( - start=datetime(2016, 7, 9, 11, 0, 0, tzinfo=dt.UTC), - end=datetime(2016, 7, 9, 23, 0, 0, tzinfo=dt.UTC), - closed_incorrect=False, - created=datetime(2016, 7, 9, 11, 0, 0, tzinfo=dt.UTC), + session.add( + States( + entity_id="sensor.humidity", + state="76", + last_changed=in_run, + last_updated=in_run, ) - - self.session.add(run) - self.session.commit() - - before_run = datetime(2016, 7, 9, 8, 0, 0, tzinfo=dt.UTC) - in_run = datetime(2016, 7, 9, 13, 0, 0, tzinfo=dt.UTC) - in_run2 = datetime(2016, 7, 9, 15, 0, 0, tzinfo=dt.UTC) - in_run3 = datetime(2016, 7, 9, 18, 0, 0, tzinfo=dt.UTC) - after_run = datetime(2016, 7, 9, 23, 30, 0, tzinfo=dt.UTC) - - assert run.to_native() == run - assert run.entity_ids() == [] - - self.session.add( - States( - entity_id="sensor.temperature", - state="20", - last_changed=before_run, - last_updated=before_run, - ) - ) - self.session.add( - States( - entity_id="sensor.sound", - state="10", - last_changed=after_run, - last_updated=after_run, - ) + ) + session.add( + States( + entity_id="sensor.lux", + state="5", + last_changed=in_run3, + last_updated=in_run3, ) + ) - self.session.add( - States( - entity_id="sensor.humidity", - state="76", - last_changed=in_run, - last_updated=in_run, - ) - ) - self.session.add( - States( - entity_id="sensor.lux", - state="5", - last_changed=in_run3, - last_updated=in_run3, - ) - ) - - assert sorted(run.entity_ids()) == ["sensor.humidity", "sensor.lux"] - assert run.entity_ids(in_run2) == ["sensor.humidity"] + assert sorted(run.entity_ids()) == ["sensor.humidity", "sensor.lux"] + assert run.entity_ids(in_run2) == ["sensor.humidity"] def test_states_from_native_invalid_entity_id(): diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index a93e3537905..91a2299e4b6 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -1,7 +1,6 @@ """Test data purging.""" from datetime import datetime, timedelta import json -import unittest from homeassistant.components import recorder from homeassistant.components.recorder.const import DATA_INSTANCE @@ -13,226 +12,216 @@ from homeassistant.util import dt as dt_util from .common import wait_recording_done from tests.async_mock import patch -from tests.common import get_test_home_assistant, init_recorder_component -class TestRecorderPurge(unittest.TestCase): - """Base class for common recorder tests.""" +def test_purge_old_states(hass, hass_recorder): + """Test deleting old states.""" + hass = hass_recorder() + _add_test_states(hass) - def setUp(self): # pylint: disable=invalid-name - """Set up things to be run when tests are started.""" - self.hass = get_test_home_assistant() - init_recorder_component(self.hass) - self.hass.start() - self.addCleanup(self.tear_down_cleanup) + # make sure we start with 6 states + with session_scope(hass=hass) as session: + states = session.query(States) + assert states.count() == 6 - def tear_down_cleanup(self): - """Stop everything that was started.""" - self.hass.stop() + # run purge_old_data() + finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False) + assert not finished + assert states.count() == 4 - def _add_test_states(self): - """Add multiple states to the db for testing.""" - now = datetime.now() - five_days_ago = now - timedelta(days=5) - eleven_days_ago = now - timedelta(days=11) - attributes = {"test_attr": 5, "test_attr_10": "nice"} + finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False) + assert not finished + assert states.count() == 2 - self.hass.block_till_done() - self.hass.data[DATA_INSTANCE].block_till_done() - wait_recording_done(self.hass) + finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False) + assert finished + assert states.count() == 2 - with recorder.session_scope(hass=self.hass) as session: - for event_id in range(6): - if event_id < 2: - timestamp = eleven_days_ago - state = "autopurgeme" - elif event_id < 4: - timestamp = five_days_ago - state = "purgeme" - else: - timestamp = now - state = "dontpurgeme" - session.add( - States( - entity_id="test.recorder2", - domain="sensor", - state=state, - attributes=json.dumps(attributes), - last_changed=timestamp, - last_updated=timestamp, - created=timestamp, - event_id=event_id + 1000, - ) - ) +def test_purge_old_events(hass, hass_recorder): + """Test deleting old events.""" + hass = hass_recorder() + _add_test_events(hass) - def _add_test_events(self): - """Add a few events for testing.""" - now = datetime.now() - five_days_ago = now - timedelta(days=5) - eleven_days_ago = now - timedelta(days=11) - event_data = {"test_attr": 5, "test_attr_10": "nice"} + with session_scope(hass=hass) as session: + events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%")) + assert events.count() == 6 - self.hass.block_till_done() - self.hass.data[DATA_INSTANCE].block_till_done() - wait_recording_done(self.hass) + # run purge_old_data() + finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False) + assert not finished + assert events.count() == 4 - with recorder.session_scope(hass=self.hass) as session: - for event_id in range(6): - if event_id < 2: - timestamp = eleven_days_ago - event_type = "EVENT_TEST_AUTOPURGE" - elif event_id < 4: - timestamp = five_days_ago - event_type = "EVENT_TEST_PURGE" - else: - timestamp = now - event_type = "EVENT_TEST" + finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False) + assert not finished + assert events.count() == 2 - session.add( - Events( - event_type=event_type, - event_data=json.dumps(event_data), - origin="LOCAL", - created=timestamp, - time_fired=timestamp, - ) - ) + # we should only have 2 events left + finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False) + assert finished + assert events.count() == 2 - def _add_test_recorder_runs(self): - """Add a few recorder_runs for testing.""" - now = datetime.now() - five_days_ago = now - timedelta(days=5) - eleven_days_ago = now - timedelta(days=11) - self.hass.block_till_done() - self.hass.data[DATA_INSTANCE].block_till_done() - wait_recording_done(self.hass) +def test_purge_method(hass, hass_recorder): + """Test purge method.""" + hass = hass_recorder() + service_data = {"keep_days": 4} + _add_test_events(hass) + _add_test_states(hass) + _add_test_recorder_runs(hass) - with recorder.session_scope(hass=self.hass) as session: - for rec_id in range(6): - if rec_id < 2: - timestamp = eleven_days_ago - elif rec_id < 4: - timestamp = five_days_ago - else: - timestamp = now + # make sure we start with 6 states + with session_scope(hass=hass) as session: + states = session.query(States) + assert states.count() == 6 - session.add( - RecorderRuns( - start=timestamp, - created=dt_util.utcnow(), - end=timestamp + timedelta(days=1), - ) - ) + events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%")) + assert events.count() == 6 - def test_purge_old_states(self): - """Test deleting old states.""" - self._add_test_states() - # make sure we start with 6 states - with session_scope(hass=self.hass) as session: - states = session.query(States) - assert states.count() == 6 + recorder_runs = session.query(RecorderRuns) + assert recorder_runs.count() == 7 - # run purge_old_data() - finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False) - assert not finished - assert states.count() == 4 + hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(hass) - finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False) - assert not finished - assert states.count() == 2 + # run purge method - no service data, use defaults + hass.services.call("recorder", "purge") + hass.block_till_done() - finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False) - assert finished - assert states.count() == 2 + # Small wait for recorder thread + hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(hass) - def test_purge_old_events(self): - """Test deleting old events.""" - self._add_test_events() + # only purged old events + assert states.count() == 4 + assert events.count() == 4 - with session_scope(hass=self.hass) as session: - events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%")) - assert events.count() == 6 + # run purge method - correct service data + hass.services.call("recorder", "purge", service_data=service_data) + hass.block_till_done() - # run purge_old_data() - finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False) - assert not finished - assert events.count() == 4 + # Small wait for recorder thread + hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(hass) - finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False) - assert not finished - assert events.count() == 2 + # we should only have 2 states left after purging + assert states.count() == 2 - # we should only have 2 events left - finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False) - assert finished - assert events.count() == 2 + # now we should only have 2 events left + assert events.count() == 2 - def test_purge_method(self): - """Test purge method.""" - service_data = {"keep_days": 4} - self._add_test_events() - self._add_test_states() - self._add_test_recorder_runs() + # now we should only have 3 recorder runs left + assert recorder_runs.count() == 3 - # make sure we start with 6 states - with session_scope(hass=self.hass) as session: - states = session.query(States) - assert states.count() == 6 + assert not ("EVENT_TEST_PURGE" in (event.event_type for event in events.all())) - events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%")) - assert events.count() == 6 - - recorder_runs = session.query(RecorderRuns) - assert recorder_runs.count() == 7 - - self.hass.data[DATA_INSTANCE].block_till_done() - wait_recording_done(self.hass) - - # run purge method - no service data, use defaults - self.hass.services.call("recorder", "purge") - self.hass.block_till_done() - - # Small wait for recorder thread - self.hass.data[DATA_INSTANCE].block_till_done() - wait_recording_done(self.hass) - - # only purged old events - assert states.count() == 4 - assert events.count() == 4 - - # run purge method - correct service data - self.hass.services.call("recorder", "purge", service_data=service_data) - self.hass.block_till_done() - - # Small wait for recorder thread - self.hass.data[DATA_INSTANCE].block_till_done() - wait_recording_done(self.hass) - - # we should only have 2 states left after purging - assert states.count() == 2 - - # now we should only have 2 events left - assert events.count() == 2 - - # now we should only have 3 recorder runs left - assert recorder_runs.count() == 3 - - assert not ( - "EVENT_TEST_PURGE" in (event.event_type for event in events.all()) + # run purge method - correct service data, with repack + with patch("homeassistant.components.recorder.purge._LOGGER") as mock_logger: + service_data["repack"] = True + hass.services.call("recorder", "purge", service_data=service_data) + hass.block_till_done() + hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(hass) + assert ( + mock_logger.debug.mock_calls[5][1][0] + == "Vacuuming SQL DB to free space" ) - # run purge method - correct service data, with repack - with patch( - "homeassistant.components.recorder.purge._LOGGER" - ) as mock_logger: - service_data["repack"] = True - self.hass.services.call("recorder", "purge", service_data=service_data) - self.hass.block_till_done() - self.hass.data[DATA_INSTANCE].block_till_done() - wait_recording_done(self.hass) - assert ( - mock_logger.debug.mock_calls[5][1][0] - == "Vacuuming SQL DB to free space" + +def _add_test_states(hass): + """Add multiple states to the db for testing.""" + now = datetime.now() + five_days_ago = now - timedelta(days=5) + eleven_days_ago = now - timedelta(days=11) + attributes = {"test_attr": 5, "test_attr_10": "nice"} + + hass.block_till_done() + hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(hass) + + with recorder.session_scope(hass=hass) as session: + for event_id in range(6): + if event_id < 2: + timestamp = eleven_days_ago + state = "autopurgeme" + elif event_id < 4: + timestamp = five_days_ago + state = "purgeme" + else: + timestamp = now + state = "dontpurgeme" + + session.add( + States( + entity_id="test.recorder2", + domain="sensor", + state=state, + attributes=json.dumps(attributes), + last_changed=timestamp, + last_updated=timestamp, + created=timestamp, + event_id=event_id + 1000, ) + ) + + +def _add_test_events(hass): + """Add a few events for testing.""" + now = datetime.now() + five_days_ago = now - timedelta(days=5) + eleven_days_ago = now - timedelta(days=11) + event_data = {"test_attr": 5, "test_attr_10": "nice"} + + hass.block_till_done() + hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(hass) + + with recorder.session_scope(hass=hass) as session: + for event_id in range(6): + if event_id < 2: + timestamp = eleven_days_ago + event_type = "EVENT_TEST_AUTOPURGE" + elif event_id < 4: + timestamp = five_days_ago + event_type = "EVENT_TEST_PURGE" + else: + timestamp = now + event_type = "EVENT_TEST" + + session.add( + Events( + event_type=event_type, + event_data=json.dumps(event_data), + origin="LOCAL", + created=timestamp, + time_fired=timestamp, + ) + ) + + +def _add_test_recorder_runs(hass): + """Add a few recorder_runs for testing.""" + now = datetime.now() + five_days_ago = now - timedelta(days=5) + eleven_days_ago = now - timedelta(days=11) + + hass.block_till_done() + hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(hass) + + with recorder.session_scope(hass=hass) as session: + for rec_id in range(6): + if rec_id < 2: + timestamp = eleven_days_ago + elif rec_id < 4: + timestamp = five_days_ago + else: + timestamp = now + + session.add( + RecorderRuns( + start=timestamp, + created=dt_util.utcnow(), + end=timestamp + timedelta(days=1), + ) + )