Use GraphQL for GitHub integration (#66928)

This commit is contained in:
Joakim Sørensen 2022-02-20 11:59:11 +01:00 committed by GitHub
parent 4ca339c5b1
commit 9f57ce504b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 201 additions and 1144 deletions

View file

@ -1,44 +1,92 @@
"""Custom data update coordinators for the GitHub integration."""
"""Custom data update coordinator for the GitHub integration."""
from __future__ import annotations
from typing import Literal, TypedDict
from typing import Any
from aiogithubapi import (
GitHubAPI,
GitHubCommitModel,
GitHubConnectionException,
GitHubException,
GitHubNotModifiedException,
GitHubRatelimitException,
GitHubReleaseModel,
GitHubRepositoryModel,
GitHubResponseModel,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, T
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DEFAULT_UPDATE_INTERVAL, DOMAIN, LOGGER, IssuesPulls
from .const import DEFAULT_UPDATE_INTERVAL, DOMAIN, LOGGER
CoordinatorKeyType = Literal["information", "release", "issue", "commit"]
GRAPHQL_REPOSITORY_QUERY = """
query ($owner: String!, $repository: String!) {
rateLimit {
cost
remaining
}
repository(owner: $owner, name: $repository) {
default_branch_ref: defaultBranchRef {
commit: target {
... on Commit {
message: messageHeadline
url
sha: oid
}
}
}
stargazers_count: stargazerCount
forks_count: forkCount
full_name: nameWithOwner
id: databaseId
watchers(first: 1) {
total: totalCount
}
issue: issues(
first: 1
states: OPEN
orderBy: {field: CREATED_AT, direction: DESC}
) {
total: totalCount
issues: nodes {
title
url
number
}
}
pull_request: pullRequests(
first: 1
states: OPEN
orderBy: {field: CREATED_AT, direction: DESC}
) {
total: totalCount
pull_requests: nodes {
title
url
number
}
}
release: latestRelease {
name
url
tag: tagName
}
}
}
"""
class GitHubBaseDataUpdateCoordinator(DataUpdateCoordinator[T]):
"""Base class for GitHub data update coordinators."""
class GitHubDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Data update coordinator for the GitHub integration."""
def __init__(
self,
hass: HomeAssistant,
entry: ConfigEntry,
client: GitHubAPI,
repository: str,
) -> None:
"""Initialize GitHub data update coordinator base class."""
self.config_entry = entry
self.repository = repository
self._client = client
self._last_response: GitHubResponseModel[T] | None = None
self._last_response: GitHubResponseModel[dict[str, Any]] | None = None
self.data = {}
super().__init__(
hass,
@ -47,30 +95,14 @@ class GitHubBaseDataUpdateCoordinator(DataUpdateCoordinator[T]):
update_interval=DEFAULT_UPDATE_INTERVAL,
)
@property
def _etag(self) -> str:
"""Return the ETag of the last response."""
return self._last_response.etag if self._last_response is not None else None
async def fetch_data(self) -> GitHubResponseModel[T]:
"""Fetch data from GitHub API."""
@staticmethod
def _parse_response(response: GitHubResponseModel[T]) -> T:
"""Parse the response from GitHub API."""
return response.data
async def _async_update_data(self) -> T:
async def _async_update_data(self) -> GitHubResponseModel[dict[str, Any]]:
"""Update data."""
owner, repository = self.repository.split("/")
try:
response = await self.fetch_data()
except GitHubNotModifiedException:
LOGGER.debug(
"Content for %s with %s not modified",
self.repository,
self.__class__.__name__,
response = await self._client.graphql(
query=GRAPHQL_REPOSITORY_QUERY,
variables={"owner": owner, "repository": repository},
)
# Return the last known data if the request result was not modified
return self.data
except (GitHubConnectionException, GitHubRatelimitException) as exception:
# These are expected and we dont log anything extra
raise UpdateFailed(exception) from exception
@ -80,133 +112,4 @@ class GitHubBaseDataUpdateCoordinator(DataUpdateCoordinator[T]):
raise UpdateFailed(exception) from exception
else:
self._last_response = response
return self._parse_response(response)
class RepositoryInformationDataUpdateCoordinator(
GitHubBaseDataUpdateCoordinator[GitHubRepositoryModel]
):
"""Data update coordinator for repository information."""
async def fetch_data(self) -> GitHubResponseModel[GitHubRepositoryModel]:
"""Get the latest data from GitHub."""
return await self._client.repos.get(self.repository, **{"etag": self._etag})
class RepositoryReleaseDataUpdateCoordinator(
GitHubBaseDataUpdateCoordinator[GitHubReleaseModel]
):
"""Data update coordinator for repository release."""
@staticmethod
def _parse_response(
response: GitHubResponseModel[GitHubReleaseModel | None],
) -> GitHubReleaseModel | None:
"""Parse the response from GitHub API."""
if not response.data:
return None
for release in response.data:
if not release.prerelease and not release.draft:
return release
# Fall back to the latest release if no non-prerelease release is found
return response.data[0]
async def fetch_data(self) -> GitHubReleaseModel | None:
"""Get the latest data from GitHub."""
return await self._client.repos.releases.list(
self.repository, **{"etag": self._etag}
)
class RepositoryIssueDataUpdateCoordinator(
GitHubBaseDataUpdateCoordinator[IssuesPulls]
):
"""Data update coordinator for repository issues."""
_issue_etag: str | None = None
_pull_etag: str | None = None
@staticmethod
def _parse_response(response: IssuesPulls) -> IssuesPulls:
"""Parse the response from GitHub API."""
return response
async def fetch_data(self) -> IssuesPulls:
"""Get the latest data from GitHub."""
pulls_count = 0
pull_last = None
issues_count = 0
issue_last = None
try:
pull_response = await self._client.repos.pulls.list(
self.repository,
**{"params": {"per_page": 1}, "etag": self._pull_etag},
)
except GitHubNotModifiedException:
# Return the last known data if the request result was not modified
pulls_count = self.data.pulls_count
pull_last = self.data.pull_last
else:
self._pull_etag = pull_response.etag
pulls_count = pull_response.last_page_number or len(pull_response.data)
pull_last = pull_response.data[0] if pull_response.data else None
try:
issue_response = await self._client.repos.issues.list(
self.repository,
**{"params": {"per_page": 1}, "etag": self._issue_etag},
)
except GitHubNotModifiedException:
# Return the last known data if the request result was not modified
issues_count = self.data.issues_count
issue_last = self.data.issue_last
else:
self._issue_etag = issue_response.etag
issues_count = (
issue_response.last_page_number or len(issue_response.data)
) - pulls_count
issue_last = issue_response.data[0] if issue_response.data else None
if issue_last is not None and issue_last.pull_request:
issue_response = await self._client.repos.issues.list(self.repository)
for issue in issue_response.data:
if not issue.pull_request:
issue_last = issue
break
return IssuesPulls(
issues_count=issues_count,
issue_last=issue_last,
pulls_count=pulls_count,
pull_last=pull_last,
)
class RepositoryCommitDataUpdateCoordinator(
GitHubBaseDataUpdateCoordinator[GitHubCommitModel]
):
"""Data update coordinator for repository commit."""
@staticmethod
def _parse_response(
response: GitHubResponseModel[GitHubCommitModel | None],
) -> GitHubCommitModel | None:
"""Parse the response from GitHub API."""
return response.data[0] if response.data else None
async def fetch_data(self) -> GitHubCommitModel | None:
"""Get the latest data from GitHub."""
return await self._client.repos.list_commits(
self.repository, **{"params": {"per_page": 1}, "etag": self._etag}
)
class DataUpdateCoordinators(TypedDict):
"""Custom data update coordinators for the GitHub integration."""
information: RepositoryInformationDataUpdateCoordinator
release: RepositoryReleaseDataUpdateCoordinator
issue: RepositoryIssueDataUpdateCoordinator
commit: RepositoryCommitDataUpdateCoordinator
return response.data["data"]["repository"]