Refactor frame.get_integration_frame (#101322)

This commit is contained in:
Erik Montnemery 2023-10-03 19:21:27 +02:00 committed by GitHub
parent 956098ae3a
commit ab2de18f8f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 163 additions and 60 deletions

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable
from dataclasses import dataclass
import functools
import logging
from traceback import FrameSummary, extract_stack
@ -18,9 +19,17 @@ _REPORTED_INTEGRATIONS: set[str] = set()
_CallableT = TypeVar("_CallableT", bound=Callable)
def get_integration_frame(
exclude_integrations: set | None = None,
) -> tuple[FrameSummary, str, str]:
@dataclass
class IntegrationFrame:
"""Integration frame container."""
custom_integration: bool
filename: str
frame: FrameSummary
integration: str
def get_integration_frame(exclude_integrations: set | None = None) -> IntegrationFrame:
"""Return the frame, integration and integration path of the current stack frame."""
found_frame = None
if not exclude_integrations:
@ -46,7 +55,12 @@ def get_integration_frame(
if found_frame is None:
raise MissingIntegrationFrame
return found_frame, integration, path
return IntegrationFrame(
path == "custom_components/",
found_frame.filename[index:],
found_frame,
integration,
)
class MissingIntegrationFrame(HomeAssistantError):
@ -74,28 +88,26 @@ def report(
_LOGGER.warning(msg, stack_info=True)
return
report_integration(what, integration_frame, level)
_report_integration(what, integration_frame, level)
def report_integration(
def _report_integration(
what: str,
integration_frame: tuple[FrameSummary, str, str],
integration_frame: IntegrationFrame,
level: int = logging.WARNING,
) -> None:
"""Report incorrect usage in an integration.
Async friendly.
"""
found_frame, integration, path = integration_frame
found_frame = integration_frame.frame
# Keep track of integrations already reported to prevent flooding
key = f"{found_frame.filename}:{found_frame.lineno}"
if key in _REPORTED_INTEGRATIONS:
return
_REPORTED_INTEGRATIONS.add(key)
index = found_frame.filename.index(path)
if path == "custom_components/":
if integration_frame.custom_integration:
extra = " to the custom integration author"
else:
extra = ""
@ -108,8 +120,8 @@ def report_integration(
),
what,
extra,
integration,
found_frame.filename[index:],
integration_frame.integration,
integration_frame.filename,
found_frame.lineno,
(found_frame.line or "?").strip(),
)