From da9749ecce13232eaa6b0a71b0b9da102af40995 Mon Sep 17 00:00:00 2001 From: Brett Adams Date: Fri, 25 Oct 2024 19:50:37 +1000 Subject: [PATCH] Add data streaming to Teslemetry (#127559) --- .../components/teslemetry/__init__.py | 46 ++++++++++++++++++- .../components/teslemetry/coordinator.py | 14 +----- .../components/teslemetry/helpers.py | 13 ++++++ .../components/teslemetry/manifest.json | 2 +- homeassistant/components/teslemetry/models.py | 4 ++ requirements_all.txt | 3 ++ requirements_test_all.txt | 3 ++ tests/components/teslemetry/conftest.py | 11 ++++- tests/components/teslemetry/test_init.py | 46 ++++++++++++++++++- 9 files changed, 124 insertions(+), 18 deletions(-) diff --git a/homeassistant/components/teslemetry/__init__.py b/homeassistant/components/teslemetry/__init__.py index ab2e4c04734..b884f9bbc5c 100644 --- a/homeassistant/components/teslemetry/__init__.py +++ b/homeassistant/components/teslemetry/__init__.py @@ -1,6 +1,7 @@ """Teslemetry integration.""" import asyncio +from collections.abc import Callable from typing import Final from tesla_fleet_api import EnergySpecific, Teslemetry, VehicleSpecific @@ -10,6 +11,7 @@ from tesla_fleet_api.exceptions import ( SubscriptionRequired, TeslaFleetError, ) +from teslemetry_stream import TeslemetryStream from homeassistant.config_entries import ConfigEntry from homeassistant.const import CONF_ACCESS_TOKEN, Platform @@ -28,6 +30,7 @@ from .coordinator import ( TeslemetryEnergySiteLiveCoordinator, TeslemetryVehicleDataCoordinator, ) +from .helpers import flatten from .models import TeslemetryData, TeslemetryEnergyData, TeslemetryVehicleData from .services import async_register_services @@ -69,8 +72,10 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslemetryConfigEntry) - access_token=access_token, ) try: - scopes = (await teslemetry.metadata())["scopes"] - products = (await teslemetry.products())["response"] + calls = await asyncio.gather( + teslemetry.metadata(), + teslemetry.products(), + ) except InvalidToken as e: raise ConfigEntryAuthFailed from e except SubscriptionRequired as e: @@ -78,11 +83,24 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslemetryConfigEntry) - except TeslaFleetError as e: raise ConfigEntryNotReady from e + scopes = calls[0]["scopes"] + region = calls[0]["region"] + products = calls[1]["response"] + device_registry = dr.async_get(hass) # Create array of classes vehicles: list[TeslemetryVehicleData] = [] energysites: list[TeslemetryEnergyData] = [] + + # Create the stream + stream = TeslemetryStream( + session, + access_token, + server=f"{region.lower()}.teslemetry.com", + parse_timestamp=True, + ) + for product in products: if "vin" in product and Scope.VEHICLE_DEVICE_DATA in scopes: # Remove the protobuff 'cached_data' that we do not use to save memory @@ -99,12 +117,19 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslemetryConfigEntry) - serial_number=vin, ) + remove_listener = stream.async_add_listener( + create_handle_vehicle_stream(vin, coordinator), + {"vin": vin}, + ) + vehicles.append( TeslemetryVehicleData( api=api, coordinator=coordinator, + stream=stream, vin=vin, device=device, + remove_listener=remove_listener, ) ) @@ -214,3 +239,20 @@ async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> config_entry, unique_id=metadata["uid"], version=1, minor_version=2 ) return True + + +def create_handle_vehicle_stream(vin: str, coordinator) -> Callable[[dict], None]: + """Create a handle vehicle stream function.""" + + def handle_vehicle_stream(data: dict) -> None: + """Handle vehicle data from the stream.""" + if "vehicle_data" in data: + LOGGER.debug("Streaming received vehicle data from %s", vin) + coordinator.updated_once = True + coordinator.async_set_updated_data(flatten(data["vehicle_data"])) + elif "state" in data: + LOGGER.debug("Streaming received state from %s", vin) + coordinator.data["state"] = data["state"] + coordinator.async_set_updated_data(coordinator.data) + + return handle_vehicle_stream diff --git a/homeassistant/components/teslemetry/coordinator.py b/homeassistant/components/teslemetry/coordinator.py index 4612408e14d..f37d0613de9 100644 --- a/homeassistant/components/teslemetry/coordinator.py +++ b/homeassistant/components/teslemetry/coordinator.py @@ -18,6 +18,7 @@ from homeassistant.exceptions import ConfigEntryAuthFailed from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from .const import ENERGY_HISTORY_FIELDS, LOGGER, TeslemetryState +from .helpers import flatten VEHICLE_INTERVAL = timedelta(seconds=30) VEHICLE_WAIT = timedelta(minutes=15) @@ -35,19 +36,6 @@ ENDPOINTS = [ ] -def flatten(data: dict[str, Any], parent: str | None = None) -> dict[str, Any]: - """Flatten the data structure.""" - result = {} - for key, value in data.items(): - if parent: - key = f"{parent}_{key}" - if isinstance(value, dict): - result.update(flatten(value, key)) - else: - result[key] = value - return result - - class TeslemetryVehicleDataCoordinator(DataUpdateCoordinator[dict[str, Any]]): """Class to manage fetching data from the Teslemetry API.""" diff --git a/homeassistant/components/teslemetry/helpers.py b/homeassistant/components/teslemetry/helpers.py index 4e086008333..30601feccbc 100644 --- a/homeassistant/components/teslemetry/helpers.py +++ b/homeassistant/components/teslemetry/helpers.py @@ -10,6 +10,19 @@ from homeassistant.exceptions import HomeAssistantError from .const import DOMAIN, LOGGER, TeslemetryState +def flatten(data: dict[str, Any], parent: str | None = None) -> dict[str, Any]: + """Flatten the data structure.""" + result = {} + for key, value in data.items(): + if parent: + key = f"{parent}_{key}" + if isinstance(value, dict): + result.update(flatten(value, key)) + else: + result[key] = value + return result + + async def wake_up_vehicle(vehicle) -> None: """Wake up a vehicle.""" async with vehicle.wakelock: diff --git a/homeassistant/components/teslemetry/manifest.json b/homeassistant/components/teslemetry/manifest.json index 4c05b8f8bae..6b667094d62 100644 --- a/homeassistant/components/teslemetry/manifest.json +++ b/homeassistant/components/teslemetry/manifest.json @@ -7,5 +7,5 @@ "iot_class": "cloud_polling", "loggers": ["tesla-fleet-api"], "quality_scale": "platinum", - "requirements": ["tesla-fleet-api==0.8.4"] + "requirements": ["tesla-fleet-api==0.8.4", "teslemetry-stream==0.4.2"] } diff --git a/homeassistant/components/teslemetry/models.py b/homeassistant/components/teslemetry/models.py index a6d549b8937..7f8bd37425a 100644 --- a/homeassistant/components/teslemetry/models.py +++ b/homeassistant/components/teslemetry/models.py @@ -3,10 +3,12 @@ from __future__ import annotations import asyncio +from collections.abc import Callable from dataclasses import dataclass from tesla_fleet_api import EnergySpecific, VehicleSpecific from tesla_fleet_api.const import Scope +from teslemetry_stream import TeslemetryStream from homeassistant.helpers.device_registry import DeviceInfo @@ -33,9 +35,11 @@ class TeslemetryVehicleData: api: VehicleSpecific coordinator: TeslemetryVehicleDataCoordinator + stream: TeslemetryStream vin: str wakelock = asyncio.Lock() device: DeviceInfo + remove_listener: Callable @dataclass diff --git a/requirements_all.txt b/requirements_all.txt index 1d4dc0476a5..e8e4fc17103 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -2800,6 +2800,9 @@ tesla-powerwall==0.5.2 # homeassistant.components.tesla_wall_connector tesla-wall-connector==1.0.2 +# homeassistant.components.teslemetry +teslemetry-stream==0.4.2 + # homeassistant.components.tessie tessie-api==0.1.1 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index e6bba8af2e3..496cf5345be 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -2228,6 +2228,9 @@ tesla-powerwall==0.5.2 # homeassistant.components.tesla_wall_connector tesla-wall-connector==1.0.2 +# homeassistant.components.teslemetry +teslemetry-stream==0.4.2 + # homeassistant.components.tessie tessie-api==0.1.1 diff --git a/tests/components/teslemetry/conftest.py b/tests/components/teslemetry/conftest.py index d50986bdb43..256428aa703 100644 --- a/tests/components/teslemetry/conftest.py +++ b/tests/components/teslemetry/conftest.py @@ -1,4 +1,4 @@ -"""Fixtures for Tessie.""" +"""Fixtures for Teslemetry.""" from __future__ import annotations @@ -106,3 +106,12 @@ def mock_energy_history(): return_value=ENERGY_HISTORY, ) as mock_live_status: yield mock_live_status + + +@pytest.fixture(autouse=True) +def mock_listen(): + """Mock Teslemetry Stream listen method.""" + with patch( + "homeassistant.components.teslemetry.TeslemetryStream.listen", + ) as mock_listen: + yield mock_listen diff --git a/tests/components/teslemetry/test_init.py b/tests/components/teslemetry/test_init.py index a7afff9e341..2a33e1def66 100644 --- a/tests/components/teslemetry/test_init.py +++ b/tests/components/teslemetry/test_init.py @@ -18,7 +18,7 @@ from homeassistant.components.teslemetry.coordinator import ( ) from homeassistant.components.teslemetry.models import TeslemetryData from homeassistant.config_entries import ConfigEntryState -from homeassistant.const import Platform +from homeassistant.const import STATE_OFF, STATE_ON, Platform from homeassistant.core import HomeAssistant from homeassistant.helpers import device_registry as dr @@ -214,3 +214,47 @@ async def test_energy_history_refresh_error( mock_energy_history.side_effect = side_effect entry = await setup_platform(hass) assert entry.state is state + + +async def test_vehicle_stream( + hass: HomeAssistant, + mock_listen: AsyncMock, + snapshot: SnapshotAssertion, +) -> None: + """Test vehicle stream events.""" + + entry = await setup_platform(hass, [Platform.BINARY_SENSOR]) + mock_listen.assert_called_once() + + state = hass.states.get("binary_sensor.test_status") + assert state.state == STATE_ON + + state = hass.states.get("binary_sensor.test_user_present") + assert state.state == STATE_OFF + + runtime_data: TeslemetryData = entry.runtime_data + for listener, _ in runtime_data.vehicles[0].stream._listeners.values(): + listener( + { + "vin": VEHICLE_DATA_ALT["response"]["vin"], + "vehicle_data": VEHICLE_DATA_ALT["response"], + "createdAt": "2024-10-04T10:45:17.537Z", + } + ) + await hass.async_block_till_done() + + state = hass.states.get("binary_sensor.test_user_present") + assert state.state == STATE_ON + + for listener, _ in runtime_data.vehicles[0].stream._listeners.values(): + listener( + { + "vin": VEHICLE_DATA_ALT["response"]["vin"], + "state": "offline", + "createdAt": "2024-10-04T10:45:17.537Z", + } + ) + await hass.async_block_till_done() + + state = hass.states.get("binary_sensor.test_status") + assert state.state == STATE_OFF