Add data streaming to Teslemetry (#127559)

This commit is contained in:
Brett Adams 2024-10-25 19:50:37 +10:00 committed by GitHub
parent fa7be597d2
commit da9749ecce
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 124 additions and 18 deletions

View file

@ -1,6 +1,7 @@
"""Teslemetry integration.""" """Teslemetry integration."""
import asyncio import asyncio
from collections.abc import Callable
from typing import Final from typing import Final
from tesla_fleet_api import EnergySpecific, Teslemetry, VehicleSpecific from tesla_fleet_api import EnergySpecific, Teslemetry, VehicleSpecific
@ -10,6 +11,7 @@ from tesla_fleet_api.exceptions import (
SubscriptionRequired, SubscriptionRequired,
TeslaFleetError, TeslaFleetError,
) )
from teslemetry_stream import TeslemetryStream
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_ACCESS_TOKEN, Platform from homeassistant.const import CONF_ACCESS_TOKEN, Platform
@ -28,6 +30,7 @@ from .coordinator import (
TeslemetryEnergySiteLiveCoordinator, TeslemetryEnergySiteLiveCoordinator,
TeslemetryVehicleDataCoordinator, TeslemetryVehicleDataCoordinator,
) )
from .helpers import flatten
from .models import TeslemetryData, TeslemetryEnergyData, TeslemetryVehicleData from .models import TeslemetryData, TeslemetryEnergyData, TeslemetryVehicleData
from .services import async_register_services from .services import async_register_services
@ -69,8 +72,10 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslemetryConfigEntry) -
access_token=access_token, access_token=access_token,
) )
try: try:
scopes = (await teslemetry.metadata())["scopes"] calls = await asyncio.gather(
products = (await teslemetry.products())["response"] teslemetry.metadata(),
teslemetry.products(),
)
except InvalidToken as e: except InvalidToken as e:
raise ConfigEntryAuthFailed from e raise ConfigEntryAuthFailed from e
except SubscriptionRequired as e: except SubscriptionRequired as e:
@ -78,11 +83,24 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslemetryConfigEntry) -
except TeslaFleetError as e: except TeslaFleetError as e:
raise ConfigEntryNotReady from e raise ConfigEntryNotReady from e
scopes = calls[0]["scopes"]
region = calls[0]["region"]
products = calls[1]["response"]
device_registry = dr.async_get(hass) device_registry = dr.async_get(hass)
# Create array of classes # Create array of classes
vehicles: list[TeslemetryVehicleData] = [] vehicles: list[TeslemetryVehicleData] = []
energysites: list[TeslemetryEnergyData] = [] energysites: list[TeslemetryEnergyData] = []
# Create the stream
stream = TeslemetryStream(
session,
access_token,
server=f"{region.lower()}.teslemetry.com",
parse_timestamp=True,
)
for product in products: for product in products:
if "vin" in product and Scope.VEHICLE_DEVICE_DATA in scopes: if "vin" in product and Scope.VEHICLE_DEVICE_DATA in scopes:
# Remove the protobuff 'cached_data' that we do not use to save memory # Remove the protobuff 'cached_data' that we do not use to save memory
@ -99,12 +117,19 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslemetryConfigEntry) -
serial_number=vin, serial_number=vin,
) )
remove_listener = stream.async_add_listener(
create_handle_vehicle_stream(vin, coordinator),
{"vin": vin},
)
vehicles.append( vehicles.append(
TeslemetryVehicleData( TeslemetryVehicleData(
api=api, api=api,
coordinator=coordinator, coordinator=coordinator,
stream=stream,
vin=vin, vin=vin,
device=device, device=device,
remove_listener=remove_listener,
) )
) )
@ -214,3 +239,20 @@ async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry) ->
config_entry, unique_id=metadata["uid"], version=1, minor_version=2 config_entry, unique_id=metadata["uid"], version=1, minor_version=2
) )
return True return True
def create_handle_vehicle_stream(vin: str, coordinator) -> Callable[[dict], None]:
"""Create a handle vehicle stream function."""
def handle_vehicle_stream(data: dict) -> None:
"""Handle vehicle data from the stream."""
if "vehicle_data" in data:
LOGGER.debug("Streaming received vehicle data from %s", vin)
coordinator.updated_once = True
coordinator.async_set_updated_data(flatten(data["vehicle_data"]))
elif "state" in data:
LOGGER.debug("Streaming received state from %s", vin)
coordinator.data["state"] = data["state"]
coordinator.async_set_updated_data(coordinator.data)
return handle_vehicle_stream

View file

@ -18,6 +18,7 @@ from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import ENERGY_HISTORY_FIELDS, LOGGER, TeslemetryState from .const import ENERGY_HISTORY_FIELDS, LOGGER, TeslemetryState
from .helpers import flatten
VEHICLE_INTERVAL = timedelta(seconds=30) VEHICLE_INTERVAL = timedelta(seconds=30)
VEHICLE_WAIT = timedelta(minutes=15) VEHICLE_WAIT = timedelta(minutes=15)
@ -35,19 +36,6 @@ ENDPOINTS = [
] ]
def flatten(data: dict[str, Any], parent: str | None = None) -> dict[str, Any]:
"""Flatten the data structure."""
result = {}
for key, value in data.items():
if parent:
key = f"{parent}_{key}"
if isinstance(value, dict):
result.update(flatten(value, key))
else:
result[key] = value
return result
class TeslemetryVehicleDataCoordinator(DataUpdateCoordinator[dict[str, Any]]): class TeslemetryVehicleDataCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Class to manage fetching data from the Teslemetry API.""" """Class to manage fetching data from the Teslemetry API."""

View file

@ -10,6 +10,19 @@ from homeassistant.exceptions import HomeAssistantError
from .const import DOMAIN, LOGGER, TeslemetryState from .const import DOMAIN, LOGGER, TeslemetryState
def flatten(data: dict[str, Any], parent: str | None = None) -> dict[str, Any]:
"""Flatten the data structure."""
result = {}
for key, value in data.items():
if parent:
key = f"{parent}_{key}"
if isinstance(value, dict):
result.update(flatten(value, key))
else:
result[key] = value
return result
async def wake_up_vehicle(vehicle) -> None: async def wake_up_vehicle(vehicle) -> None:
"""Wake up a vehicle.""" """Wake up a vehicle."""
async with vehicle.wakelock: async with vehicle.wakelock:

View file

@ -7,5 +7,5 @@
"iot_class": "cloud_polling", "iot_class": "cloud_polling",
"loggers": ["tesla-fleet-api"], "loggers": ["tesla-fleet-api"],
"quality_scale": "platinum", "quality_scale": "platinum",
"requirements": ["tesla-fleet-api==0.8.4"] "requirements": ["tesla-fleet-api==0.8.4", "teslemetry-stream==0.4.2"]
} }

View file

@ -3,10 +3,12 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from collections.abc import Callable
from dataclasses import dataclass from dataclasses import dataclass
from tesla_fleet_api import EnergySpecific, VehicleSpecific from tesla_fleet_api import EnergySpecific, VehicleSpecific
from tesla_fleet_api.const import Scope from tesla_fleet_api.const import Scope
from teslemetry_stream import TeslemetryStream
from homeassistant.helpers.device_registry import DeviceInfo from homeassistant.helpers.device_registry import DeviceInfo
@ -33,9 +35,11 @@ class TeslemetryVehicleData:
api: VehicleSpecific api: VehicleSpecific
coordinator: TeslemetryVehicleDataCoordinator coordinator: TeslemetryVehicleDataCoordinator
stream: TeslemetryStream
vin: str vin: str
wakelock = asyncio.Lock() wakelock = asyncio.Lock()
device: DeviceInfo device: DeviceInfo
remove_listener: Callable
@dataclass @dataclass

View file

@ -2800,6 +2800,9 @@ tesla-powerwall==0.5.2
# homeassistant.components.tesla_wall_connector # homeassistant.components.tesla_wall_connector
tesla-wall-connector==1.0.2 tesla-wall-connector==1.0.2
# homeassistant.components.teslemetry
teslemetry-stream==0.4.2
# homeassistant.components.tessie # homeassistant.components.tessie
tessie-api==0.1.1 tessie-api==0.1.1

View file

@ -2228,6 +2228,9 @@ tesla-powerwall==0.5.2
# homeassistant.components.tesla_wall_connector # homeassistant.components.tesla_wall_connector
tesla-wall-connector==1.0.2 tesla-wall-connector==1.0.2
# homeassistant.components.teslemetry
teslemetry-stream==0.4.2
# homeassistant.components.tessie # homeassistant.components.tessie
tessie-api==0.1.1 tessie-api==0.1.1

View file

@ -1,4 +1,4 @@
"""Fixtures for Tessie.""" """Fixtures for Teslemetry."""
from __future__ import annotations from __future__ import annotations
@ -106,3 +106,12 @@ def mock_energy_history():
return_value=ENERGY_HISTORY, return_value=ENERGY_HISTORY,
) as mock_live_status: ) as mock_live_status:
yield mock_live_status yield mock_live_status
@pytest.fixture(autouse=True)
def mock_listen():
"""Mock Teslemetry Stream listen method."""
with patch(
"homeassistant.components.teslemetry.TeslemetryStream.listen",
) as mock_listen:
yield mock_listen

View file

@ -18,7 +18,7 @@ from homeassistant.components.teslemetry.coordinator import (
) )
from homeassistant.components.teslemetry.models import TeslemetryData from homeassistant.components.teslemetry.models import TeslemetryData
from homeassistant.config_entries import ConfigEntryState from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import Platform from homeassistant.const import STATE_OFF, STATE_ON, Platform
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr from homeassistant.helpers import device_registry as dr
@ -214,3 +214,47 @@ async def test_energy_history_refresh_error(
mock_energy_history.side_effect = side_effect mock_energy_history.side_effect = side_effect
entry = await setup_platform(hass) entry = await setup_platform(hass)
assert entry.state is state assert entry.state is state
async def test_vehicle_stream(
hass: HomeAssistant,
mock_listen: AsyncMock,
snapshot: SnapshotAssertion,
) -> None:
"""Test vehicle stream events."""
entry = await setup_platform(hass, [Platform.BINARY_SENSOR])
mock_listen.assert_called_once()
state = hass.states.get("binary_sensor.test_status")
assert state.state == STATE_ON
state = hass.states.get("binary_sensor.test_user_present")
assert state.state == STATE_OFF
runtime_data: TeslemetryData = entry.runtime_data
for listener, _ in runtime_data.vehicles[0].stream._listeners.values():
listener(
{
"vin": VEHICLE_DATA_ALT["response"]["vin"],
"vehicle_data": VEHICLE_DATA_ALT["response"],
"createdAt": "2024-10-04T10:45:17.537Z",
}
)
await hass.async_block_till_done()
state = hass.states.get("binary_sensor.test_user_present")
assert state.state == STATE_ON
for listener, _ in runtime_data.vehicles[0].stream._listeners.values():
listener(
{
"vin": VEHICLE_DATA_ALT["response"]["vin"],
"state": "offline",
"createdAt": "2024-10-04T10:45:17.537Z",
}
)
await hass.async_block_till_done()
state = hass.states.get("binary_sensor.test_status")
assert state.state == STATE_OFF