From 28af0b4092d325e1b5ae200cc138184b85a330e6 Mon Sep 17 00:00:00 2001 From: Thomas Dietrich Date: Mon, 20 Dec 2021 14:53:51 +0100 Subject: [PATCH] Statistics component typing (#60997) * Implement optional manually defined uniqueid * Fix test case via mocked environment * Add typing to statistics component * Fix minor inconsistency * Fix linter issues * Execute hassfest * Fix stricter mypy warnings * Fix maxsplit warning * Make binary value range explicit check * Add basic typing to statistics tests * Add empty config testcase * Minor improvements * Improve after comments * Remove unnecessary test case * Fix changed type * Remove dict.get default --- .strict-typing | 1 + homeassistant/components/statistics/sensor.py | 265 +++++++++--------- mypy.ini | 11 + tests/components/statistics/test_sensor.py | 106 ++++--- 4 files changed, 214 insertions(+), 169 deletions(-) diff --git a/.strict-typing b/.strict-typing index 3eae62f4b4a..2558506c1c2 100644 --- a/.strict-typing +++ b/.strict-typing @@ -121,6 +121,7 @@ homeassistant.components.slack.* homeassistant.components.sonos.media_player homeassistant.components.ssdp.* homeassistant.components.stookalert.* +homeassistant.components.statistics.* homeassistant.components.stream.* homeassistant.components.sun.* homeassistant.components.surepetcare.* diff --git a/homeassistant/components/statistics/sensor.py b/homeassistant/components/statistics/sensor.py index e2608c888aa..3ec1c6fc512 100644 --- a/homeassistant/components/statistics/sensor.py +++ b/homeassistant/components/statistics/sensor.py @@ -1,11 +1,17 @@ """Support for statistics for sensor values.""" +from __future__ import annotations + from collections import deque +from collections.abc import Callable import contextlib +from datetime import datetime, timedelta import logging import statistics +from typing import Any, Literal, cast import voluptuous as vol +from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN from homeassistant.components.recorder.models import States from homeassistant.components.recorder.util import execute, session_scope from homeassistant.components.sensor import ( @@ -21,14 +27,23 @@ from homeassistant.const import ( STATE_UNAVAILABLE, STATE_UNKNOWN, ) -from homeassistant.core import callback +from homeassistant.core import ( + CALLBACK_TYPE, + Event, + HomeAssistant, + State, + callback, + split_entity_id, +) from homeassistant.helpers import config_validation as cv +from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.event import ( async_track_point_in_utc_time, async_track_state_change_event, ) from homeassistant.helpers.reload import async_setup_reload_service from homeassistant.helpers.start import async_at_start +from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType, StateType from homeassistant.util import dt as dt_util from . import DOMAIN, PLATFORMS @@ -100,13 +115,13 @@ DEFAULT_QUANTILE_METHOD = "exclusive" ICON = "mdi:calculator" -def valid_binary_characteristic_configuration(config): +def valid_binary_characteristic_configuration(config: dict[str, Any]) -> dict[str, Any]: """Validate that the characteristic selected is valid for the source sensor type, throw if it isn't.""" - if config.get(CONF_ENTITY_ID).split(".")[0] == "binary_sensor": + if split_entity_id(str(config.get(CONF_ENTITY_ID)))[0] == BINARY_SENSOR_DOMAIN: if config.get(CONF_STATE_CHARACTERISTIC) not in STATS_BINARY_SUPPORT: raise ValueError( "The configured characteristic '" - + config.get(CONF_STATE_CHARACTERISTIC) + + str(config.get(CONF_STATE_CHARACTERISTIC)) + "' is not supported for a binary source sensor." ) return config @@ -162,28 +177,32 @@ PLATFORM_SCHEMA = vol.All( ) -async def async_setup_platform(hass, config, async_add_entities, discovery_info=None): +async def async_setup_platform( + hass: HomeAssistant, + config: ConfigType, + async_add_entities: AddEntitiesCallback, + discovery_info: DiscoveryInfoType | None = None, +) -> None: """Set up the Statistics sensor.""" await async_setup_reload_service(hass, DOMAIN, PLATFORMS) async_add_entities( - [ + new_entities=[ StatisticsSensor( - source_entity_id=config.get(CONF_ENTITY_ID), - name=config.get(CONF_NAME), + source_entity_id=config[CONF_ENTITY_ID], + name=config[CONF_NAME], unique_id=config.get(CONF_UNIQUE_ID), - state_characteristic=config.get(CONF_STATE_CHARACTERISTIC), - samples_max_buffer_size=config.get(CONF_SAMPLES_MAX_BUFFER_SIZE), + state_characteristic=config[CONF_STATE_CHARACTERISTIC], + samples_max_buffer_size=config[CONF_SAMPLES_MAX_BUFFER_SIZE], samples_max_age=config.get(CONF_MAX_AGE), - precision=config.get(CONF_PRECISION), - quantile_intervals=config.get(CONF_QUANTILE_INTERVALS), - quantile_method=config.get(CONF_QUANTILE_METHOD), + precision=config[CONF_PRECISION], + quantile_intervals=config[CONF_QUANTILE_INTERVALS], + quantile_method=config[CONF_QUANTILE_METHOD], ) ], - True, + update_before_add=True, ) - return True class StatisticsSensor(SensorEntity): @@ -191,41 +210,46 @@ class StatisticsSensor(SensorEntity): def __init__( self, - source_entity_id, - name, - unique_id, - state_characteristic, - samples_max_buffer_size, - samples_max_age, - precision, - quantile_intervals, - quantile_method, - ): + source_entity_id: str, + name: str, + unique_id: str | None, + state_characteristic: str, + samples_max_buffer_size: int, + samples_max_age: timedelta | None, + precision: int, + quantile_intervals: int, + quantile_method: str, + ) -> None: """Initialize the Statistics sensor.""" - self._source_entity_id = source_entity_id - self.is_binary = self._source_entity_id.split(".")[0] == "binary_sensor" - self._name = name - self._unique_id = unique_id - self._state_characteristic = state_characteristic + self._attr_icon: str = ICON + self._attr_name: str = name + self._attr_should_poll: bool = False + self._attr_unique_id: str | None = unique_id + self._source_entity_id: str = source_entity_id + self.is_binary: bool = ( + split_entity_id(self._source_entity_id)[0] == BINARY_SENSOR_DOMAIN + ) + self._state_characteristic: str = state_characteristic if self._state_characteristic == STAT_DEFAULT: self._state_characteristic = STAT_COUNT if self.is_binary else STAT_MEAN _LOGGER.warning(DEPRECATION_WARNING, self._state_characteristic, name) - self._samples_max_buffer_size = samples_max_buffer_size - self._samples_max_age = samples_max_age - self._precision = precision - self._quantile_intervals = quantile_intervals - self._quantile_method = quantile_method - self._value = None - self._unit_of_measurement = None - self._available = False - self.states = deque(maxlen=self._samples_max_buffer_size) - self.ages = deque(maxlen=self._samples_max_buffer_size) - self.attributes = { + self._samples_max_buffer_size: int = samples_max_buffer_size + self._samples_max_age: timedelta | None = samples_max_age + self._precision: int = precision + self._quantile_intervals: int = quantile_intervals + self._quantile_method: str = quantile_method + self._value: StateType | datetime = None + self._unit_of_measurement: str | None = None + self._available: bool = False + self.states: deque[float | bool] = deque(maxlen=self._samples_max_buffer_size) + self.ages: deque[datetime] = deque(maxlen=self._samples_max_buffer_size) + self.attributes: dict[str, StateType] = { STAT_AGE_COVERAGE_RATIO: None, STAT_BUFFER_USAGE_RATIO: None, STAT_SOURCE_VALUE_VALID: None, } + self._state_characteristic_fn: Callable[[], StateType | datetime] if self.is_binary: self._state_characteristic_fn = getattr( self, f"_stat_binary_{self._state_characteristic}" @@ -235,20 +259,20 @@ class StatisticsSensor(SensorEntity): self, f"_stat_{self._state_characteristic}" ) - self._update_listener = None + self._update_listener: CALLBACK_TYPE | None = None - async def async_added_to_hass(self): + async def async_added_to_hass(self) -> None: """Register callbacks.""" @callback - def async_stats_sensor_state_listener(event): + def async_stats_sensor_state_listener(event: Event) -> None: """Handle the sensor state changes.""" if (new_state := event.data.get("new_state")) is None: return self._add_state_to_queue(new_state) self.async_schedule_update_ha_state(True) - async def async_stats_sensor_startup(_): + async def async_stats_sensor_startup(_: HomeAssistant) -> None: """Add listener and get recorded state.""" _LOGGER.debug("Startup for %s", self.entity_id) @@ -265,7 +289,7 @@ class StatisticsSensor(SensorEntity): async_at_start(self.hass, async_stats_sensor_startup) - def _add_state_to_queue(self, new_state): + def _add_state_to_queue(self, new_state: State) -> None: """Add the state to the queue.""" self._available = new_state.state != STATE_UNAVAILABLE if new_state.state == STATE_UNAVAILABLE: @@ -277,7 +301,8 @@ class StatisticsSensor(SensorEntity): try: if self.is_binary: - self.states.append(new_state.state) + assert new_state.state in ("on", "off") + self.states.append(new_state.state == "on") else: self.states.append(float(new_state.state)) self.ages.append(new_state.last_updated) @@ -293,8 +318,9 @@ class StatisticsSensor(SensorEntity): self._unit_of_measurement = self._derive_unit_of_measurement(new_state) - def _derive_unit_of_measurement(self, new_state): - base_unit = new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) + def _derive_unit_of_measurement(self, new_state: State) -> str | None: + base_unit: str | None = new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) + unit: str | None if self.is_binary and self._state_characteristic in ( STAT_AVERAGE_STEP, STAT_AVERAGE_TIMELESS, @@ -336,66 +362,46 @@ class StatisticsSensor(SensorEntity): return unit @property - def name(self): - """Return the name of the sensor.""" - return self._name - - @property - def unique_id(self): - """Return the unique id of the sensor.""" - return self._unique_id - - @property - def state_class(self): + def state_class(self) -> Literal[SensorStateClass.MEASUREMENT] | None: """Return the state class of this entity.""" if self._state_characteristic in STATS_NOT_A_NUMBER: return None return SensorStateClass.MEASUREMENT @property - def native_value(self): + def native_value(self) -> StateType | datetime: """Return the state of the sensor.""" return self._value @property - def native_unit_of_measurement(self): + def native_unit_of_measurement(self) -> str | None: """Return the unit the value is expressed in.""" return self._unit_of_measurement @property - def available(self): + def available(self) -> bool: """Return the availability of the sensor linked to the source sensor.""" return self._available @property - def should_poll(self): - """No polling needed.""" - return False - - @property - def extra_state_attributes(self): + def extra_state_attributes(self) -> dict[str, StateType] | None: """Return the state attributes of the sensor.""" return { key: value for key, value in self.attributes.items() if value is not None } - @property - def icon(self): - """Return the icon to use in the frontend, if any.""" - return ICON - - def _purge_old(self): - """Remove states which are older than self._samples_max_age.""" + def _purge_old_states(self, max_age: timedelta) -> None: + """Remove states which are older than a given age.""" now = dt_util.utcnow() _LOGGER.debug( "%s: purging records older then %s(%s)", self.entity_id, - dt_util.as_local(now - self._samples_max_age), + dt_util.as_local(now - max_age), self._samples_max_age, ) - while self.ages and (now - self.ages[0]) > self._samples_max_age: + while self.ages and (now - self.ages[0]) > max_age: _LOGGER.debug( "%s: purging record with datetime %s(%s)", self.entity_id, @@ -405,7 +411,7 @@ class StatisticsSensor(SensorEntity): self.ages.popleft() self.states.popleft() - def _next_to_purge_timestamp(self): + def _next_to_purge_timestamp(self) -> datetime | None: """Find the timestamp when the next purge would occur.""" if self.ages and self._samples_max_age: # Take the oldest entry from the ages list and add the configured max_age. @@ -414,11 +420,11 @@ class StatisticsSensor(SensorEntity): return self.ages[0] + self._samples_max_age return None - async def async_update(self): + async def async_update(self) -> None: """Get the latest data and updates the states.""" _LOGGER.debug("%s: updating statistics", self.entity_id) if self._samples_max_age is not None: - self._purge_old() + self._purge_old_states(self._samples_max_age) self._update_attributes() self._update_value() @@ -434,7 +440,7 @@ class StatisticsSensor(SensorEntity): self._update_listener = None @callback - def _scheduled_update(now): + def _scheduled_update(now: datetime) -> None: """Timer callback for sensor update.""" _LOGGER.debug("%s: executing scheduled update", self.entity_id) self.async_schedule_update_ha_state(True) @@ -444,7 +450,7 @@ class StatisticsSensor(SensorEntity): self.hass, _scheduled_update, next_to_purge_timestamp ) - async def _initialize_from_database(self): + async def _initialize_from_database(self) -> None: """Initialize the list of states from the database. The query will get the list of states in DESCENDING order so that we @@ -478,14 +484,15 @@ class StatisticsSensor(SensorEntity): ) states = execute(query, to_native=True, validate_entity_ids=False) - for state in reversed(states): - self._add_state_to_queue(state) + if states: + for state in reversed(states): + self._add_state_to_queue(state) self.async_schedule_update_ha_state(True) _LOGGER.debug("%s: initializing from database completed", self.entity_id) - def _update_attributes(self): + def _update_attributes(self) -> None: """Calculate and update the various attributes.""" self.attributes[STAT_BUFFER_USAGE_RATIO] = round( len(self.states) / self._samples_max_buffer_size, 2 @@ -500,7 +507,7 @@ class StatisticsSensor(SensorEntity): else: self.attributes[STAT_AGE_COVERAGE_RATIO] = None - def _update_value(self): + def _update_value(self) -> None: """Front to call the right statistical characteristics functions. One of the _stat_*() functions is represented by self._state_characteristic_fn(). @@ -510,16 +517,16 @@ class StatisticsSensor(SensorEntity): if self._state_characteristic not in STATS_NOT_A_NUMBER: with contextlib.suppress(TypeError): - value = round(value, self._precision) + value = round(cast(float, value), self._precision) if self._precision == 0: value = int(value) self._value = value # Statistics for numeric sensor - def _stat_average_linear(self): + def _stat_average_linear(self) -> StateType: if len(self.states) >= 2: - area = 0 + area: float = 0 for i in range(1, len(self.states)): area += ( 0.5 @@ -530,9 +537,9 @@ class StatisticsSensor(SensorEntity): return area / age_range_seconds return None - def _stat_average_step(self): + def _stat_average_step(self) -> StateType: if len(self.states) >= 2: - area = 0 + area: float = 0 for i in range(1, len(self.states)): area += ( self.states[i - 1] @@ -542,65 +549,65 @@ class StatisticsSensor(SensorEntity): return area / age_range_seconds return None - def _stat_average_timeless(self): + def _stat_average_timeless(self) -> StateType: return self._stat_mean() - def _stat_change(self): + def _stat_change(self) -> StateType: if len(self.states) > 0: return self.states[-1] - self.states[0] return None - def _stat_change_sample(self): + def _stat_change_sample(self) -> StateType: if len(self.states) > 1: return (self.states[-1] - self.states[0]) / (len(self.states) - 1) return None - def _stat_change_second(self): + def _stat_change_second(self) -> StateType: if len(self.states) > 1: age_range_seconds = (self.ages[-1] - self.ages[0]).total_seconds() if age_range_seconds > 0: return (self.states[-1] - self.states[0]) / age_range_seconds return None - def _stat_count(self): + def _stat_count(self) -> StateType: return len(self.states) - def _stat_datetime_newest(self): + def _stat_datetime_newest(self) -> datetime | None: if len(self.states) > 0: return self.ages[-1] return None - def _stat_datetime_oldest(self): + def _stat_datetime_oldest(self) -> datetime | None: if len(self.states) > 0: return self.ages[0] return None - def _stat_distance_95_percent_of_values(self): + def _stat_distance_95_percent_of_values(self) -> StateType: if len(self.states) >= 2: - return 2 * 1.96 * self._stat_standard_deviation() + return 2 * 1.96 * cast(float, self._stat_standard_deviation()) return None - def _stat_distance_99_percent_of_values(self): + def _stat_distance_99_percent_of_values(self) -> StateType: if len(self.states) >= 2: - return 2 * 2.58 * self._stat_standard_deviation() + return 2 * 2.58 * cast(float, self._stat_standard_deviation()) return None - def _stat_distance_absolute(self): + def _stat_distance_absolute(self) -> StateType: if len(self.states) > 0: return max(self.states) - min(self.states) return None - def _stat_mean(self): + def _stat_mean(self) -> StateType: if len(self.states) > 0: return statistics.mean(self.states) return None - def _stat_median(self): + def _stat_median(self) -> StateType: if len(self.states) > 0: return statistics.median(self.states) return None - def _stat_noisiness(self): + def _stat_noisiness(self) -> StateType: if len(self.states) >= 2: diff_sum = sum( abs(j - i) for i, j in zip(list(self.states), list(self.states)[1:]) @@ -608,62 +615,64 @@ class StatisticsSensor(SensorEntity): return diff_sum / (len(self.states) - 1) return None - def _stat_quantiles(self): + def _stat_quantiles(self) -> StateType: if len(self.states) > self._quantile_intervals: - return [ - round(quantile, self._precision) - for quantile in statistics.quantiles( - self.states, - n=self._quantile_intervals, - method=self._quantile_method, - ) - ] + return str( + [ + round(quantile, self._precision) + for quantile in statistics.quantiles( + self.states, + n=self._quantile_intervals, + method=self._quantile_method, + ) + ] + ) return None - def _stat_standard_deviation(self): + def _stat_standard_deviation(self) -> StateType: if len(self.states) >= 2: return statistics.stdev(self.states) return None - def _stat_total(self): + def _stat_total(self) -> StateType: if len(self.states) > 0: return sum(self.states) return None - def _stat_value_max(self): + def _stat_value_max(self) -> StateType: if len(self.states) > 0: return max(self.states) return None - def _stat_value_min(self): + def _stat_value_min(self) -> StateType: if len(self.states) > 0: return min(self.states) return None - def _stat_variance(self): + def _stat_variance(self) -> StateType: if len(self.states) >= 2: return statistics.variance(self.states) return None # Statistics for binary sensor - def _stat_binary_average_step(self): + def _stat_binary_average_step(self) -> StateType: if len(self.states) >= 2: - on_seconds = 0 + on_seconds: float = 0 for i in range(1, len(self.states)): - if self.states[i - 1] == "on": + if self.states[i - 1] is True: on_seconds += (self.ages[i] - self.ages[i - 1]).total_seconds() age_range_seconds = (self.ages[-1] - self.ages[0]).total_seconds() return 100 / age_range_seconds * on_seconds return None - def _stat_binary_average_timeless(self): + def _stat_binary_average_timeless(self) -> StateType: return self._stat_binary_mean() - def _stat_binary_count(self): + def _stat_binary_count(self) -> StateType: return len(self.states) - def _stat_binary_mean(self): + def _stat_binary_mean(self) -> StateType: if len(self.states) > 0: - return 100.0 / len(self.states) * self.states.count("on") + return 100.0 / len(self.states) * self.states.count(True) return None diff --git a/mypy.ini b/mypy.ini index b4c6c802d04..d092972f2b0 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1342,6 +1342,17 @@ no_implicit_optional = true warn_return_any = true warn_unreachable = true +[mypy-homeassistant.components.statistics.*] +check_untyped_defs = true +disallow_incomplete_defs = true +disallow_subclassing_any = true +disallow_untyped_calls = true +disallow_untyped_decorators = true +disallow_untyped_defs = true +no_implicit_optional = true +warn_return_any = true +warn_unreachable = true + [mypy-homeassistant.components.stream.*] check_untyped_defs = true disallow_incomplete_defs = true diff --git a/tests/components/statistics/test_sensor.py b/tests/components/statistics/test_sensor.py index 51ab31e6ed5..05420a859c6 100644 --- a/tests/components/statistics/test_sensor.py +++ b/tests/components/statistics/test_sensor.py @@ -14,6 +14,7 @@ from homeassistant.const import ( STATE_UNKNOWN, TEMP_CELSIUS, ) +from homeassistant.core import HomeAssistant from homeassistant.helpers import entity_registry as er from homeassistant.setup import async_setup_component from homeassistant.util import dt as dt_util @@ -29,7 +30,7 @@ VALUES_BINARY = ["on", "off", "on", "off", "on", "off", "on", "off", "on"] VALUES_NUMERIC = [17, 20, 15.2, 5, 3.8, 9.2, 6.7, 14, 6] -async def test_unique_id(hass): +async def test_unique_id(hass: HomeAssistant): """Test configuration defined unique_id.""" assert await async_setup_component( hass, @@ -54,7 +55,7 @@ async def test_unique_id(hass): assert entity_id == "sensor.test" -async def test_sensor_defaults_numeric(hass): +async def test_sensor_defaults_numeric(hass: HomeAssistant): """Test the general behavior of the sensor, with numeric source sensor.""" assert await async_setup_component( hass, @@ -74,12 +75,13 @@ async def test_sensor_defaults_numeric(hass): for value in VALUES_NUMERIC: hass.states.async_set( "sensor.test_monitored", - value, + str(value), {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}, ) await hass.async_block_till_done() state = hass.states.get("sensor.test") + assert state is not None assert state.state == str(round(sum(VALUES_NUMERIC) / len(VALUES_NUMERIC), 2)) assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == TEMP_CELSIUS assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT @@ -96,16 +98,18 @@ async def test_sensor_defaults_numeric(hass): ) await hass.async_block_till_done() new_state = hass.states.get("sensor.test") + assert new_state is not None assert new_state.state == STATE_UNAVAILABLE assert new_state.attributes.get("source_value_valid") is None hass.states.async_set( "sensor.test_monitored", - 0, + "0", {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}, ) await hass.async_block_till_done() new_state = hass.states.get("sensor.test") new_mean = round(sum(VALUES_NUMERIC) / (len(VALUES_NUMERIC) + 1), 2) + assert new_state is not None assert new_state.state == str(new_mean) assert new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == TEMP_CELSIUS assert new_state.attributes.get("buffer_usage_ratio") == round(10 / 20, 2) @@ -116,6 +120,7 @@ async def test_sensor_defaults_numeric(hass): hass.states.async_set("sensor.test_monitored", "beer", {}) await hass.async_block_till_done() new_state = hass.states.get("sensor.test") + assert new_state is not None assert new_state.state == str(new_mean) assert new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == TEMP_CELSIUS assert new_state.attributes.get("source_value_valid") is False @@ -125,6 +130,7 @@ async def test_sensor_defaults_numeric(hass): hass.states.async_set("sensor.test_monitored", STATE_UNKNOWN, {}) await hass.async_block_till_done() new_state = hass.states.get("sensor.test") + assert new_state is not None assert new_state.state == str(new_mean) assert new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == TEMP_CELSIUS assert new_state.attributes.get("source_value_valid") is False @@ -134,12 +140,13 @@ async def test_sensor_defaults_numeric(hass): hass.states.async_remove("sensor.test_monitored") await hass.async_block_till_done() new_state = hass.states.get("sensor.test") + assert new_state is not None assert new_state.state == str(new_mean) assert new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == TEMP_CELSIUS assert new_state.attributes.get("source_value_valid") is False -async def test_sensor_defaults_binary(hass): +async def test_sensor_defaults_binary(hass: HomeAssistant): """Test the general behavior of the sensor, with binary source sensor.""" assert await async_setup_component( hass, @@ -165,6 +172,7 @@ async def test_sensor_defaults_binary(hass): await hass.async_block_till_done() state = hass.states.get("sensor.test") + assert state is not None assert state.state == str(len(VALUES_BINARY)) assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT @@ -173,7 +181,7 @@ async def test_sensor_defaults_binary(hass): assert "age_coverage_ratio" not in state.attributes -async def test_sensor_source_with_force_update(hass): +async def test_sensor_source_with_force_update(hass: HomeAssistant): """Test the behavior of the sensor when the source sensor force-updates with same value.""" repeating_values = [18, 0, 0, 0, 0, 0, 0, 0, 9] assert await async_setup_component( @@ -201,12 +209,12 @@ async def test_sensor_source_with_force_update(hass): for value in repeating_values: hass.states.async_set( "sensor.test_monitored_normal", - value, + str(value), {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}, ) hass.states.async_set( "sensor.test_monitored_force", - value, + str(value), {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}, force_update=True, ) @@ -214,13 +222,14 @@ async def test_sensor_source_with_force_update(hass): state_normal = hass.states.get("sensor.test_normal") state_force = hass.states.get("sensor.test_force") + assert state_normal and state_force assert state_normal.state == str(round(sum(repeating_values) / 3, 2)) assert state_force.state == str(round(sum(repeating_values) / 9, 2)) assert state_normal.attributes.get("buffer_usage_ratio") == round(3 / 20, 2) assert state_force.attributes.get("buffer_usage_ratio") == round(9 / 20, 2) -async def test_sampling_size_non_default(hass): +async def test_sampling_size_non_default(hass: HomeAssistant): """Test rotation.""" assert await async_setup_component( hass, @@ -242,18 +251,19 @@ async def test_sampling_size_non_default(hass): for value in VALUES_NUMERIC: hass.states.async_set( "sensor.test_monitored", - value, + str(value), {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}, ) await hass.async_block_till_done() state = hass.states.get("sensor.test") new_mean = round(sum(VALUES_NUMERIC[-5:]) / len(VALUES_NUMERIC[-5:]), 2) + assert state is not None assert state.state == str(new_mean) assert state.attributes.get("buffer_usage_ratio") == round(5 / 5, 2) -async def test_sampling_size_1(hass): +async def test_sampling_size_1(hass: HomeAssistant): """Test validity of stats requiring only one sample.""" assert await async_setup_component( hass, @@ -275,18 +285,19 @@ async def test_sampling_size_1(hass): for value in VALUES_NUMERIC[-3:]: # just the last 3 will do hass.states.async_set( "sensor.test_monitored", - value, + str(value), {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}, ) await hass.async_block_till_done() state = hass.states.get("sensor.test") new_mean = float(VALUES_NUMERIC[-1]) + assert state is not None assert state.state == str(new_mean) assert state.attributes.get("buffer_usage_ratio") == round(1 / 1, 2) -async def test_age_limit_expiry(hass): +async def test_age_limit_expiry(hass: HomeAssistant): """Test that values are removed after certain age.""" now = dt_util.utcnow() mock_data = { @@ -321,7 +332,7 @@ async def test_age_limit_expiry(hass): async_fire_time_changed(hass, mock_data["return_time"]) hass.states.async_set( "sensor.test_monitored", - value, + str(value), {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}, ) await hass.async_block_till_done() @@ -330,6 +341,7 @@ async def test_age_limit_expiry(hass): state = hass.states.get("sensor.test") new_mean = round(sum(VALUES_NUMERIC[-5:]) / len(VALUES_NUMERIC[-5:]), 2) + assert state is not None assert state.state == str(new_mean) assert state.attributes.get("buffer_usage_ratio") == round(5 / 20, 2) assert state.attributes.get("age_coverage_ratio") == 1.0 @@ -342,6 +354,7 @@ async def test_age_limit_expiry(hass): state = hass.states.get("sensor.test") new_mean = round(sum(VALUES_NUMERIC[-2:]) / len(VALUES_NUMERIC[-2:]), 2) + assert state is not None assert state.state == str(new_mean) assert state.attributes.get("buffer_usage_ratio") == round(2 / 20, 2) assert state.attributes.get("age_coverage_ratio") == 1 / 4 @@ -354,6 +367,7 @@ async def test_age_limit_expiry(hass): state = hass.states.get("sensor.test") new_mean = float(VALUES_NUMERIC[-1]) + assert state is not None assert state.state == str(new_mean) assert state.attributes.get("buffer_usage_ratio") == round(1 / 20, 2) assert state.attributes.get("age_coverage_ratio") == 0 @@ -365,12 +379,13 @@ async def test_age_limit_expiry(hass): await hass.async_block_till_done() state = hass.states.get("sensor.test") + assert state is not None assert state.state == STATE_UNKNOWN assert state.attributes.get("buffer_usage_ratio") == round(0 / 20, 2) assert state.attributes.get("age_coverage_ratio") is None -async def test_precision(hass): +async def test_precision(hass: HomeAssistant): """Test correct result with precision set.""" assert await async_setup_component( hass, @@ -399,19 +414,21 @@ async def test_precision(hass): for value in VALUES_NUMERIC: hass.states.async_set( "sensor.test_monitored", - value, + str(value), {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}, ) await hass.async_block_till_done() mean = sum(VALUES_NUMERIC) / len(VALUES_NUMERIC) state = hass.states.get("sensor.test_precision_0") + assert state is not None assert state.state == str(int(round(mean, 0))) state = hass.states.get("sensor.test_precision_3") + assert state is not None assert state.state == str(round(mean, 3)) -async def test_state_class(hass): +async def test_state_class(hass: HomeAssistant): """Test state class, which depends on the characteristic configured.""" assert await async_setup_component( hass, @@ -438,18 +455,20 @@ async def test_state_class(hass): for value in VALUES_NUMERIC: hass.states.async_set( "sensor.test_monitored", - value, + str(value), {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}, ) await hass.async_block_till_done() state = hass.states.get("sensor.test_normal") + assert state is not None assert state.attributes.get(ATTR_STATE_CLASS) is SensorStateClass.MEASUREMENT state = hass.states.get("sensor.test_nan") + assert state is not None assert state.attributes.get(ATTR_STATE_CLASS) is None -async def test_unitless_source_sensor(hass): +async def test_unitless_source_sensor(hass: HomeAssistant): """Statistics for a unitless source sensor should never have a unit.""" assert await async_setup_component( hass, @@ -490,31 +509,31 @@ async def test_unitless_source_sensor(hass): ) await hass.async_block_till_done() - for value in VALUES_NUMERIC: + for value_numeric in VALUES_NUMERIC: hass.states.async_set( "sensor.test_monitored_unitless", - value, + str(value_numeric), ) - for value in VALUES_BINARY: + for value_binary in VALUES_BINARY: hass.states.async_set( "binary_sensor.test_monitored_unitless", - value, + str(value_binary), ) await hass.async_block_till_done() state = hass.states.get("sensor.test_unitless_1") - assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None + assert state and state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None state = hass.states.get("sensor.test_unitless_2") - assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None + assert state and state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None state = hass.states.get("sensor.test_unitless_3") - assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None + assert state and state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None state = hass.states.get("sensor.test_unitless_4") - assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None + assert state and state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) is None state = hass.states.get("sensor.test_unitless_5") - assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "%" + assert state and state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == "%" -async def test_state_characteristics(hass): +async def test_state_characteristics(hass: HomeAssistant): """Test configured state characteristic for value and unit.""" now = dt_util.utcnow() start_datetime = datetime(now.year + 1, 8, 2, 12, 23, 42, tzinfo=dt_util.UTC) @@ -772,12 +791,12 @@ async def test_state_characteristics(hass): async_fire_time_changed(hass, mock_data["return_time"]) hass.states.async_set( "sensor.test_monitored", - VALUES_NUMERIC[i], + str(VALUES_NUMERIC[i]), {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}, ) hass.states.async_set( "binary_sensor.test_monitored", - VALUES_BINARY[i], + str(VALUES_BINARY[i]), {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}, ) await hass.async_block_till_done() @@ -786,6 +805,7 @@ async def test_state_characteristics(hass): state = hass.states.get( f"sensor.test_{characteristic['source_sensor_domain']}_{characteristic['name']}" ) + assert state is not None assert state.state == str(characteristic["value_9"]), ( f"value mismatch for characteristic " f"'{characteristic['source_sensor_domain']}/{characteristic['name']}' " @@ -806,6 +826,7 @@ async def test_state_characteristics(hass): state = hass.states.get( f"sensor.test_{characteristic['source_sensor_domain']}_{characteristic['name']}" ) + assert state is not None assert state.state == str(characteristic["value_1"]), ( f"value mismatch for characteristic " f"'{characteristic['source_sensor_domain']}/{characteristic['name']}' " @@ -823,6 +844,7 @@ async def test_state_characteristics(hass): state = hass.states.get( f"sensor.test_{characteristic['source_sensor_domain']}_{characteristic['name']}" ) + assert state is not None assert state.state == str(characteristic["value_0"]), ( f"value mismatch for characteristic " f"'{characteristic['source_sensor_domain']}/{characteristic['name']}' " @@ -831,7 +853,7 @@ async def test_state_characteristics(hass): ) -async def test_invalid_state_characteristic(hass): +async def test_invalid_state_characteristic(hass: HomeAssistant): """Test the detection of wrong state_characteristics selected.""" assert await async_setup_component( hass, @@ -857,7 +879,7 @@ async def test_invalid_state_characteristic(hass): hass.states.async_set( "sensor.test_monitored", - VALUES_NUMERIC[0], + str(VALUES_NUMERIC[0]), {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}, ) await hass.async_block_till_done() @@ -868,7 +890,7 @@ async def test_invalid_state_characteristic(hass): assert state is None -async def test_initialize_from_database(hass): +async def test_initialize_from_database(hass: HomeAssistant): """Test initializing the statistics from the recorder database.""" # enable and pre-fill the recorder await async_init_recorder_component(hass) @@ -878,7 +900,7 @@ async def test_initialize_from_database(hass): for value in VALUES_NUMERIC: hass.states.async_set( "sensor.test_monitored", - value, + str(value), {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}, ) await hass.async_block_till_done() @@ -903,11 +925,12 @@ async def test_initialize_from_database(hass): await hass.async_block_till_done() state = hass.states.get("sensor.test") + assert state is not None assert state.state == str(round(sum(VALUES_NUMERIC) / len(VALUES_NUMERIC), 2)) assert state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == TEMP_CELSIUS -async def test_initialize_from_database_with_maxage(hass): +async def test_initialize_from_database_with_maxage(hass: HomeAssistant): """Test initializing the statistics from the database.""" now = dt_util.utcnow() mock_data = { @@ -919,7 +942,7 @@ async def test_initialize_from_database_with_maxage(hass): # Testing correct retrieval from recorder, thus we do not # want purging to occur within the class itself. - def mock_purge(self): + def mock_purge(self, *args): return # enable and pre-fill the recorder @@ -929,11 +952,11 @@ async def test_initialize_from_database_with_maxage(hass): with patch( "homeassistant.components.statistics.sensor.dt_util.utcnow", new=mock_now - ), patch.object(StatisticsSensor, "_purge_old", mock_purge): + ), patch.object(StatisticsSensor, "_purge_old_states", mock_purge): for value in VALUES_NUMERIC: hass.states.async_set( "sensor.test_monitored", - value, + str(value), {ATTR_UNIT_OF_MEASUREMENT: TEMP_CELSIUS}, ) await hass.async_block_till_done() @@ -959,6 +982,7 @@ async def test_initialize_from_database_with_maxage(hass): await hass.async_block_till_done() state = hass.states.get("sensor.test") + assert state is not None assert state.attributes.get("age_coverage_ratio") == round(2 / 3, 2) # The max_age timestamp should be 1 hour before what we have right # now in mock_data['return_time']. @@ -967,7 +991,7 @@ async def test_initialize_from_database_with_maxage(hass): ) + timedelta(hours=1) -async def test_reload(hass): +async def test_reload(hass: HomeAssistant): """Verify we can reload statistics sensors.""" await async_init_recorder_component(hass) @@ -988,7 +1012,7 @@ async def test_reload(hass): ) await hass.async_block_till_done() - hass.states.async_set("sensor.test_monitored", 12345) + hass.states.async_set("sensor.test_monitored", "0") await hass.async_block_till_done() assert len(hass.states.async_all()) == 2