Rework OAuth in Tesla Fleet (#123324)

* Rework Oauth

* Improve docstrings

* Update homeassistant/components/tesla_fleet/oauth.py

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>

* review feedback

* Add tests for user creds

---------

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
Brett Adams 2024-08-09 16:38:12 +10:00 committed by GitHub
parent 00c1a3fd4e
commit f8e1c2cfd4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 181 additions and 70 deletions

View file

@ -13,7 +13,6 @@ from tesla_fleet_api.exceptions import (
TeslaFleetError, TeslaFleetError,
) )
from homeassistant.components.application_credentials import ClientCredential
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_TOKEN, Platform from homeassistant.const import CONF_ACCESS_TOKEN, CONF_TOKEN, Platform
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
@ -27,15 +26,15 @@ from homeassistant.helpers.config_entry_oauth2_flow import (
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.device_registry import DeviceInfo from homeassistant.helpers.device_registry import DeviceInfo
from .application_credentials import TeslaOAuth2Implementation
from .config_flow import OAuth2FlowHandler from .config_flow import OAuth2FlowHandler
from .const import CLIENT_ID, DOMAIN, LOGGER, MODELS, NAME from .const import DOMAIN, LOGGER, MODELS
from .coordinator import ( from .coordinator import (
TeslaFleetEnergySiteInfoCoordinator, TeslaFleetEnergySiteInfoCoordinator,
TeslaFleetEnergySiteLiveCoordinator, TeslaFleetEnergySiteLiveCoordinator,
TeslaFleetVehicleDataCoordinator, TeslaFleetVehicleDataCoordinator,
) )
from .models import TeslaFleetData, TeslaFleetEnergyData, TeslaFleetVehicleData from .models import TeslaFleetData, TeslaFleetEnergyData, TeslaFleetVehicleData
from .oauth import TeslaSystemImplementation
PLATFORMS: Final = [Platform.BINARY_SENSOR, Platform.DEVICE_TRACKER, Platform.SENSOR] PLATFORMS: Final = [Platform.BINARY_SENSOR, Platform.DEVICE_TRACKER, Platform.SENSOR]
@ -56,7 +55,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslaFleetConfigEntry) -
OAuth2FlowHandler.async_register_implementation( OAuth2FlowHandler.async_register_implementation(
hass, hass,
TeslaOAuth2Implementation(hass, DOMAIN, ClientCredential(CLIENT_ID, "", NAME)), TeslaSystemImplementation(hass),
) )
implementation = await async_get_config_entry_implementation(hass, entry) implementation = await async_get_config_entry_implementation(hass, entry)

View file

@ -1,72 +1,18 @@
"""Application Credentials platform the Tesla Fleet integration.""" """Application Credentials platform the Tesla Fleet integration."""
import base64 from homeassistant.components.application_credentials import ClientCredential
import hashlib
import secrets
from typing import Any
from homeassistant.components.application_credentials import (
AuthImplementation,
AuthorizationServer,
ClientCredential,
)
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_entry_oauth2_flow from homeassistant.helpers import config_entry_oauth2_flow
from .const import AUTHORIZE_URL, DOMAIN, SCOPES, TOKEN_URL from .oauth import TeslaUserImplementation
AUTH_SERVER = AuthorizationServer(AUTHORIZE_URL, TOKEN_URL)
async def async_get_auth_implementation( async def async_get_auth_implementation(
hass: HomeAssistant, auth_domain: str, credential: ClientCredential hass: HomeAssistant, auth_domain: str, credential: ClientCredential
) -> config_entry_oauth2_flow.AbstractOAuth2Implementation: ) -> config_entry_oauth2_flow.AbstractOAuth2Implementation:
"""Return auth implementation.""" """Return auth implementation."""
return TeslaOAuth2Implementation( return TeslaUserImplementation(
hass, hass,
DOMAIN, auth_domain,
credential, credential,
) )
class TeslaOAuth2Implementation(AuthImplementation):
"""Tesla Fleet API Open Source Oauth2 implementation."""
def __init__(
self, hass: HomeAssistant, domain: str, credential: ClientCredential
) -> None:
"""Initialize local auth implementation."""
self.hass = hass
self._domain = domain
# Setup PKCE
self.code_verifier = secrets.token_urlsafe(32)
hashed_verifier = hashlib.sha256(self.code_verifier.encode()).digest()
self.code_challenge = (
base64.urlsafe_b64encode(hashed_verifier).decode().replace("=", "")
)
super().__init__(
hass,
domain,
credential,
AUTH_SERVER,
)
@property
def extra_authorize_data(self) -> dict[str, Any]:
"""Extra data that needs to be appended to the authorize url."""
return {
"scope": " ".join(SCOPES),
"code_challenge": self.code_challenge, # PKCE
}
async def async_resolve_external_data(self, external_data: Any) -> dict:
"""Resolve the authorization code to tokens."""
return await self._token_request(
{
"grant_type": "authorization_code",
"code": external_data["code"],
"redirect_uri": external_data["state"]["redirect_uri"],
"code_verifier": self.code_verifier, # PKCE
}
)

View file

@ -8,12 +8,11 @@ from typing import Any
import jwt import jwt
from homeassistant.components.application_credentials import ClientCredential
from homeassistant.config_entries import ConfigEntry, ConfigFlowResult from homeassistant.config_entries import ConfigEntry, ConfigFlowResult
from homeassistant.helpers import config_entry_oauth2_flow from homeassistant.helpers import config_entry_oauth2_flow
from .application_credentials import TeslaOAuth2Implementation from .const import DOMAIN, LOGGER
from .const import CLIENT_ID, DOMAIN, LOGGER, NAME from .oauth import TeslaSystemImplementation
class OAuth2FlowHandler( class OAuth2FlowHandler(
@ -35,9 +34,7 @@ class OAuth2FlowHandler(
"""Handle a flow start.""" """Handle a flow start."""
self.async_register_implementation( self.async_register_implementation(
self.hass, self.hass,
TeslaOAuth2Implementation( TeslaSystemImplementation(self.hass),
self.hass, DOMAIN, ClientCredential(CLIENT_ID, "", NAME)
),
) )
return await super().async_step_user() return await super().async_step_user()

View file

@ -13,7 +13,6 @@ CONF_REFRESH_TOKEN = "refresh_token"
LOGGER = logging.getLogger(__package__) LOGGER = logging.getLogger(__package__)
NAME = "Home Assistant"
CLIENT_ID = "71b813eb-4a2e-483a-b831-4dec5cb9bf0d" CLIENT_ID = "71b813eb-4a2e-483a-b831-4dec5cb9bf0d"
AUTHORIZE_URL = "https://auth.tesla.com/oauth2/v3/authorize" AUTHORIZE_URL = "https://auth.tesla.com/oauth2/v3/authorize"
TOKEN_URL = "https://auth.tesla.com/oauth2/v3/token" TOKEN_URL = "https://auth.tesla.com/oauth2/v3/token"

View file

@ -0,0 +1,86 @@
"""Provide oauth implementations for the Tesla Fleet integration."""
import base64
import hashlib
import secrets
from typing import Any
from homeassistant.components.application_credentials import (
AuthImplementation,
AuthorizationServer,
ClientCredential,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_entry_oauth2_flow
from .const import AUTHORIZE_URL, CLIENT_ID, DOMAIN, SCOPES, TOKEN_URL
class TeslaSystemImplementation(config_entry_oauth2_flow.LocalOAuth2Implementation):
"""Tesla Fleet API open source Oauth2 implementation."""
code_verifier: str
code_challenge: str
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize open source Oauth2 implementation."""
# Setup PKCE
self.code_verifier = secrets.token_urlsafe(32)
hashed_verifier = hashlib.sha256(self.code_verifier.encode()).digest()
self.code_challenge = (
base64.urlsafe_b64encode(hashed_verifier).decode().replace("=", "")
)
super().__init__(
hass,
DOMAIN,
CLIENT_ID,
"",
AUTHORIZE_URL,
TOKEN_URL,
)
@property
def name(self) -> str:
"""Name of the implementation."""
return "Built-in open source client ID"
@property
def extra_authorize_data(self) -> dict[str, Any]:
"""Extra data that needs to be appended to the authorize url."""
return {
"scope": " ".join(SCOPES),
"code_challenge": self.code_challenge, # PKCE
}
async def async_resolve_external_data(self, external_data: Any) -> dict:
"""Resolve the authorization code to tokens."""
return await self._token_request(
{
"grant_type": "authorization_code",
"code": external_data["code"],
"redirect_uri": external_data["state"]["redirect_uri"],
"code_verifier": self.code_verifier, # PKCE
}
)
class TeslaUserImplementation(AuthImplementation):
"""Tesla Fleet API user Oauth2 implementation."""
def __init__(
self, hass: HomeAssistant, auth_domain: str, credential: ClientCredential
) -> None:
"""Initialize user Oauth2 implementation."""
super().__init__(
hass,
auth_domain,
credential,
AuthorizationServer(AUTHORIZE_URL, TOKEN_URL),
)
@property
def extra_authorize_data(self) -> dict[str, Any]:
"""Extra data that needs to be appended to the authorize url."""
return {"scope": " ".join(SCOPES)}

View file

@ -5,6 +5,10 @@ from urllib.parse import parse_qs, urlparse
import pytest import pytest
from homeassistant.components.application_credentials import (
ClientCredential,
async_import_client_credential,
)
from homeassistant.components.tesla_fleet.const import ( from homeassistant.components.tesla_fleet.const import (
AUTHORIZE_URL, AUTHORIZE_URL,
CLIENT_ID, CLIENT_ID,
@ -16,6 +20,7 @@ from homeassistant.config_entries import SOURCE_REAUTH, SOURCE_USER
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers import config_entry_oauth2_flow from homeassistant.helpers import config_entry_oauth2_flow
from homeassistant.setup import async_setup_component
from tests.common import MockConfigEntry from tests.common import MockConfigEntry
from tests.test_util.aiohttp import AiohttpClientMocker from tests.test_util.aiohttp import AiohttpClientMocker
@ -26,7 +31,7 @@ UNIQUE_ID = "uid"
@pytest.fixture @pytest.fixture
async def access_token(hass: HomeAssistant) -> dict[str, str | list[str]]: async def access_token(hass: HomeAssistant) -> str:
"""Return a valid access token.""" """Return a valid access token."""
return config_entry_oauth2_flow._encode_jwt( return config_entry_oauth2_flow._encode_jwt(
hass, hass,
@ -111,6 +116,85 @@ async def test_full_flow(
assert result["result"].data["token"]["refresh_token"] == "mock-refresh-token" assert result["result"].data["token"]["refresh_token"] == "mock-refresh-token"
@pytest.mark.usefixtures("current_request_with_host")
async def test_full_flow_user_cred(
hass: HomeAssistant,
hass_client_no_auth: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
access_token,
) -> None:
"""Check full flow."""
# Create user application credential
assert await async_setup_component(hass, "application_credentials", {})
await async_import_client_credential(
hass,
DOMAIN,
ClientCredential("user_client_id", "user_client_secret"),
"user_cred",
)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"implementation": "user_cred"}
)
assert result["type"] is FlowResultType.EXTERNAL_STEP
state = config_entry_oauth2_flow._encode_jwt(
hass,
{
"flow_id": result["flow_id"],
"redirect_uri": REDIRECT,
},
)
assert result["url"].startswith(AUTHORIZE_URL)
parsed_url = urlparse(result["url"])
parsed_query = parse_qs(parsed_url.query)
assert parsed_query["response_type"][0] == "code"
assert parsed_query["client_id"][0] == "user_client_id"
assert parsed_query["redirect_uri"][0] == REDIRECT
assert parsed_query["state"][0] == state
assert parsed_query["scope"][0] == " ".join(SCOPES)
assert "code_challenge" not in parsed_query # Ensure not a PKCE flow
client = await hass_client_no_auth()
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
assert resp.status == 200
assert resp.headers["content-type"] == "text/html; charset=utf-8"
aioclient_mock.clear_requests()
aioclient_mock.post(
TOKEN_URL,
json={
"refresh_token": "mock-refresh-token",
"access_token": access_token,
"type": "Bearer",
"expires_in": 60,
},
)
with patch(
"homeassistant.components.tesla_fleet.async_setup_entry", return_value=True
) as mock_setup:
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert len(hass.config_entries.async_entries(DOMAIN)) == 1
assert len(mock_setup.mock_calls) == 1
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == UNIQUE_ID
assert "result" in result
assert result["result"].unique_id == UNIQUE_ID
assert "token" in result["result"].data
assert result["result"].data["token"]["access_token"] == access_token
assert result["result"].data["token"]["refresh_token"] == "mock-refresh-token"
@pytest.mark.usefixtures("current_request_with_host") @pytest.mark.usefixtures("current_request_with_host")
async def test_reauthentication( async def test_reauthentication(
hass: HomeAssistant, hass: HomeAssistant,