Add support for EventBridge to aws integration (#77573)

* Added EventBridge support to aws integration

* Added type hints for all aws notification services + Added unit tests for EventBridge AWS integration

* Increase line coverage for unit tests for aws integration.
This commit is contained in:
Parham Ghazanfari 2022-10-25 07:21:25 -04:00 committed by GitHub
parent aea0067e49
commit dbfca8def8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 169 additions and 5 deletions

View file

@ -3,6 +3,7 @@ import asyncio
import base64
import json
import logging
from typing import Any
from aiobotocore.session import AioSession
@ -105,6 +106,9 @@ async def async_get_service(hass, config, discovery_info=None):
if service == "sqs":
return AWSSQS(session, aws_config)
if service == "events":
return AWSEventBridge(session, aws_config)
# should not reach here since service was checked in schema
return None
@ -128,7 +132,7 @@ class AWSLambda(AWSNotify):
super().__init__(session, aws_config)
self.context = context
async def async_send_message(self, message="", **kwargs):
async def async_send_message(self, message: str = "", **kwargs: Any) -> None:
"""Send notification to specified LAMBDA ARN."""
if not kwargs.get(ATTR_TARGET):
_LOGGER.error("At least one target is required")
@ -161,7 +165,7 @@ class AWSSNS(AWSNotify):
service = "sns"
async def async_send_message(self, message="", **kwargs):
async def async_send_message(self, message: str = "", **kwargs: Any) -> None:
"""Send notification to specified SNS ARN."""
if not kwargs.get(ATTR_TARGET):
_LOGGER.error("At least one target is required")
@ -199,7 +203,7 @@ class AWSSQS(AWSNotify):
service = "sqs"
async def async_send_message(self, message="", **kwargs):
async def async_send_message(self, message: str = "", **kwargs: Any) -> None:
"""Send notification to specified SQS ARN."""
if not kwargs.get(ATTR_TARGET):
_LOGGER.error("At least one target is required")
@ -231,3 +235,52 @@ class AWSSQS(AWSNotify):
if tasks:
await asyncio.gather(*tasks)
class AWSEventBridge(AWSNotify):
"""Implement the notification service for the AWS EventBridge service."""
service = "events"
async def async_send_message(self, message: str = "", **kwargs: Any) -> None:
"""Send notification to specified EventBus."""
cleaned_kwargs = {k: v for k, v in kwargs.items() if v is not None}
data = cleaned_kwargs.get(ATTR_DATA, {})
detail = (
json.dumps(data["detail"])
if "detail" in data
else json.dumps({"message": message})
)
async with self.session.create_client(
self.service, **self.aws_config
) as client:
tasks = []
entries = []
for target in kwargs.get(ATTR_TARGET, [None]):
entry = {
"Source": data.get("source", "homeassistant"),
"Resources": data.get("resources", []),
"Detail": detail,
"DetailType": data.get("detail_type", ""),
}
if target:
entry["EventBusName"] = target
entries.append(entry)
for i in range(0, len(entries), 10):
tasks.append(
client.put_events(Entries=entries[i : min(i + 10, len(entries))])
)
if tasks:
results = await asyncio.gather(*tasks)
for result in results:
for entry in result["Entries"]:
if len(entry.get("EventId", "")) == 0:
_LOGGER.error(
"Failed to send event: ErrorCode=%s ErrorMessage=%s",
entry["ErrorCode"],
entry["ErrorMessage"],
)