Alexa typing part 6 (state_report) (#97920)

state_report
This commit is contained in:
Jan Bouwhuis 2023-08-08 15:46:54 +02:00 committed by GitHub
parent 8b56e28838
commit 2a48159b69
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 85 additions and 54 deletions

View file

@ -5,7 +5,8 @@ import asyncio
from http import HTTPStatus
import json
import logging
from typing import TYPE_CHECKING, cast
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, cast
from uuid import uuid4
import aiohttp
@ -46,17 +47,22 @@ DEFAULT_TIMEOUT = 10
class AlexaDirective:
"""An incoming Alexa directive."""
def __init__(self, request):
entity: State
entity_id: str | None
endpoint: AlexaEntity
instance: str | None
def __init__(self, request: dict[str, Any]) -> None:
"""Initialize a directive."""
self._directive = request[API_DIRECTIVE]
self.namespace = self._directive[API_HEADER]["namespace"]
self.name = self._directive[API_HEADER]["name"]
self.payload = self._directive[API_PAYLOAD]
self.has_endpoint = API_ENDPOINT in self._directive
self._directive: dict[str, Any] = request[API_DIRECTIVE]
self.namespace: str = self._directive[API_HEADER]["namespace"]
self.name: str = self._directive[API_HEADER]["name"]
self.payload: dict[str, Any] = self._directive[API_PAYLOAD]
self.has_endpoint: bool = API_ENDPOINT in self._directive
self.instance = None
self.entity_id = None
self.entity = self.entity_id = self.endpoint = self.instance = None
def load_entity(self, hass, config):
def load_entity(self, hass: HomeAssistant, config: AbstractConfig) -> None:
"""Set attributes related to the entity for this request.
Sets these attributes when self.has_endpoint is True:
@ -71,18 +77,24 @@ class AlexaDirective:
Will raise AlexaInvalidEndpointError if the endpoint in the request is
malformed or nonexistent.
"""
_endpoint_id = self._directive[API_ENDPOINT]["endpointId"]
_endpoint_id: str = self._directive[API_ENDPOINT]["endpointId"]
self.entity_id = _endpoint_id.replace("#", ".")
self.entity = hass.states.get(self.entity_id)
if not self.entity or not config.should_expose(self.entity_id):
entity: State | None = hass.states.get(self.entity_id)
if not entity or not config.should_expose(self.entity_id):
raise AlexaInvalidEndpointError(_endpoint_id)
self.entity = entity
self.endpoint = ENTITY_ADAPTERS[self.entity.domain](hass, config, self.entity)
if "instance" in self._directive[API_HEADER]:
self.instance = self._directive[API_HEADER]["instance"]
def response(self, name="Response", namespace="Alexa", payload=None):
def response(
self,
name: str = "Response",
namespace: str = "Alexa",
payload: dict[str, Any] | None = None,
) -> AlexaResponse:
"""Create an API formatted response.
Async friendly.
@ -100,11 +112,11 @@ class AlexaDirective:
def error(
self,
namespace="Alexa",
error_type="INTERNAL_ERROR",
error_message="",
payload=None,
):
namespace: str = "Alexa",
error_type: str = "INTERNAL_ERROR",
error_message: str = "",
payload: dict[str, Any] | None = None,
) -> AlexaResponse:
"""Create a API formatted error response.
Async friendly.
@ -127,10 +139,12 @@ class AlexaDirective:
class AlexaResponse:
"""Class to hold a response."""
def __init__(self, name, namespace, payload=None):
def __init__(
self, name: str, namespace: str, payload: dict[str, Any] | None = None
) -> None:
"""Initialize the response."""
payload = payload or {}
self._response = {
self._response: dict[str, Any] = {
API_EVENT: {
API_HEADER: {
"namespace": namespace,
@ -143,16 +157,16 @@ class AlexaResponse:
}
@property
def name(self):
def name(self) -> str:
"""Return the name of this response."""
return self._response[API_EVENT][API_HEADER]["name"]
@property
def namespace(self):
def namespace(self) -> str:
"""Return the namespace of this response."""
return self._response[API_EVENT][API_HEADER]["namespace"]
def set_correlation_token(self, token):
def set_correlation_token(self, token: str) -> None:
"""Set the correlationToken.
This should normally mirror the value from a request, and is set by
@ -160,7 +174,9 @@ class AlexaResponse:
"""
self._response[API_EVENT][API_HEADER]["correlationToken"] = token
def set_endpoint_full(self, bearer_token, endpoint_id, cookie=None):
def set_endpoint_full(
self, bearer_token: str | None, endpoint_id: str | None
) -> None:
"""Set the endpoint dictionary.
This is used to send proactive messages to Alexa.
@ -172,10 +188,7 @@ class AlexaResponse:
if endpoint_id is not None:
self._response[API_EVENT][API_ENDPOINT]["endpointId"] = endpoint_id
if cookie is not None:
self._response[API_EVENT][API_ENDPOINT]["cookie"] = cookie
def set_endpoint(self, endpoint):
def set_endpoint(self, endpoint: dict[str, Any]) -> None:
"""Set the endpoint.
This should normally mirror the value from a request, and is set by
@ -187,7 +200,7 @@ class AlexaResponse:
context = self._response.setdefault(API_CONTEXT, {})
return context.setdefault("properties", [])
def add_context_property(self, prop):
def add_context_property(self, prop: dict[str, Any]) -> None:
"""Add a property to the response context.
The Alexa response includes a list of properties which provides
@ -204,7 +217,7 @@ class AlexaResponse:
"""
self._properties().append(prop)
def merge_context_properties(self, endpoint):
def merge_context_properties(self, endpoint: AlexaEntity) -> None:
"""Add all properties from given endpoint if not already set.
Handlers should be using .add_context_property().
@ -216,12 +229,14 @@ class AlexaResponse:
if (prop["namespace"], prop["name"]) not in already_set:
self.add_context_property(prop)
def serialize(self):
def serialize(self) -> dict[str, Any]:
"""Return response as a JSON-able data structure."""
return self._response
async def async_enable_proactive_mode(hass, smart_home_config):
async def async_enable_proactive_mode(
hass: HomeAssistant, smart_home_config: AbstractConfig
):
"""Enable the proactive mode.
Proactive mode makes this component report state changes to Alexa.
@ -233,12 +248,12 @@ async def async_enable_proactive_mode(hass, smart_home_config):
def extra_significant_check(
hass: HomeAssistant,
old_state: str,
old_attrs: dict,
old_extra_arg: dict,
old_attrs: dict[Any, Any] | MappingProxyType[Any, Any],
old_extra_arg: Any,
new_state: str,
new_attrs: dict,
new_extra_arg: dict,
):
new_attrs: dict[str, Any] | MappingProxyType[Any, Any],
new_extra_arg: Any,
) -> bool:
"""Check if the serialized data has changed."""
return old_extra_arg is not None and old_extra_arg != new_extra_arg
@ -248,7 +263,7 @@ async def async_enable_proactive_mode(hass, smart_home_config):
changed_entity: str,
old_state: State | None,
new_state: State | None,
):
) -> None:
if not hass.is_running:
return
@ -307,8 +322,13 @@ async def async_enable_proactive_mode(hass, smart_home_config):
async def async_send_changereport_message(
hass, config, alexa_entity, alexa_properties, *, invalidate_access_token=True
):
hass: HomeAssistant,
config: AbstractConfig,
alexa_entity: AlexaEntity,
alexa_properties: list[dict[str, Any]],
*,
invalidate_access_token: bool = True,
) -> None:
"""Send a ChangeReport message for an Alexa entity.
https://developer.amazon.com/docs/smarthome/state-reporting-for-a-smart-home-skill.html#report-state-with-changereport-events
@ -322,11 +342,11 @@ async def async_send_changereport_message(
)
return
headers = {"Authorization": f"Bearer {token}"}
headers: dict[str, Any] = {"Authorization": f"Bearer {token}"}
endpoint = alexa_entity.alexa_id()
payload = {
payload: dict[str, Any] = {
API_CHANGE: {
"cause": {"type": Cause.APP_INTERACTION},
"properties": alexa_properties,
@ -339,6 +359,7 @@ async def async_send_changereport_message(
message_serialized = message.serialize()
session = async_get_clientsession(hass)
assert config.endpoint is not None
try:
async with async_timeout.timeout(DEFAULT_TIMEOUT):
response = await session.post(
@ -393,9 +414,9 @@ async def async_send_add_or_update_message(
"""
token = await config.async_get_access_token()
headers = {"Authorization": f"Bearer {token}"}
headers: dict[str, Any] = {"Authorization": f"Bearer {token}"}
endpoints = []
endpoints: list[dict[str, Any]] = []
for entity_id in entity_ids:
if (domain := entity_id.split(".", 1)[0]) not in ENTITY_ADAPTERS:
@ -407,7 +428,10 @@ async def async_send_add_or_update_message(
alexa_entity = ENTITY_ADAPTERS[domain](hass, config, state)
endpoints.append(alexa_entity.serialize_discovery())
payload = {"endpoints": endpoints, "scope": {"type": "BearerToken", "token": token}}
payload: dict[str, Any] = {
"endpoints": endpoints,
"scope": {"type": "BearerToken", "token": token},
}
message = AlexaResponse(
name="AddOrUpdateReport", namespace="Alexa.Discovery", payload=payload
@ -431,9 +455,9 @@ async def async_send_delete_message(
"""
token = await config.async_get_access_token()
headers = {"Authorization": f"Bearer {token}"}
headers: dict[str, Any] = {"Authorization": f"Bearer {token}"}
endpoints = []
endpoints: list[dict[str, Any]] = []
for entity_id in entity_ids:
domain = entity_id.split(".", 1)[0]
@ -443,7 +467,10 @@ async def async_send_delete_message(
endpoints.append({"endpointId": generate_alexa_id(entity_id)})
payload = {"endpoints": endpoints, "scope": {"type": "BearerToken", "token": token}}
payload: dict[str, Any] = {
"endpoints": endpoints,
"scope": {"type": "BearerToken", "token": token},
}
message = AlexaResponse(
name="DeleteReport", namespace="Alexa.Discovery", payload=payload
@ -458,14 +485,16 @@ async def async_send_delete_message(
)
async def async_send_doorbell_event_message(hass, config, alexa_entity):
async def async_send_doorbell_event_message(
hass: HomeAssistant, config: AbstractConfig, alexa_entity: AlexaEntity
) -> None:
"""Send a DoorbellPress event message for an Alexa entity.
https://developer.amazon.com/en-US/docs/alexa/device-apis/alexa-doorbelleventsource.html
"""
token = await config.async_get_access_token()
headers = {"Authorization": f"Bearer {token}"}
headers: dict[str, Any] = {"Authorization": f"Bearer {token}"}
endpoint = alexa_entity.alexa_id()
@ -483,6 +512,7 @@ async def async_send_doorbell_event_message(hass, config, alexa_entity):
message_serialized = message.serialize()
session = async_get_clientsession(hass)
assert config.endpoint is not None
try:
async with async_timeout.timeout(DEFAULT_TIMEOUT):
response = await session.post(

View file

@ -12,6 +12,7 @@ from typing import TYPE_CHECKING, Any
import aiohttp
import async_timeout
from hass_nabucasa import Cloud, cloud_api
from yarl import URL
from homeassistant.components import persistent_notification
from homeassistant.components.alexa import (
@ -149,7 +150,7 @@ class CloudAlexaConfig(alexa_config.AbstractConfig):
self._token_valid: datetime | None = None
self._cur_entity_prefs = async_get_assistant_settings(hass, CLOUD_ALEXA)
self._alexa_sync_unsub: Callable[[], None] | None = None
self._endpoint: Any = None
self._endpoint: str | URL | None = None
@property
def enabled(self) -> bool:
@ -175,7 +176,7 @@ class CloudAlexaConfig(alexa_config.AbstractConfig):
)
@property
def endpoint(self) -> Any | None:
def endpoint(self) -> str | URL | None:
"""Endpoint for report state."""
if self._endpoint is None:
raise ValueError("No endpoint available. Fetch access token first")
@ -309,7 +310,7 @@ class CloudAlexaConfig(alexa_config.AbstractConfig):
"""Invalidate access token."""
self._token_valid = None
async def async_get_access_token(self) -> Any:
async def async_get_access_token(self) -> str | None:
"""Get an access token."""
if self._token_valid is not None and self._token_valid > utcnow():
return self._token