hass-core/homeassistant/components/teslemetry/config_flow.py
Brett Adams 909cdc2e5c
Add Teslemetry Integration (#108147)
* Copy Paste Find Replace

* Small progress

* wip

* more wip

* Add SSE listen and close

* More rework

* Fix coordinator

* Get working

* Bump to 0.1.3

* Push to 0.1.4

* Lots of fixes

* Remove stream

* Add wakeup

* Improve set temp

* Be consistent with self

* Increase polling until streaming

* Work in progress

* Move to single climate

* bump to 0.2.0

* Update entity

* Data handling

* fixes

* WIP tests

* Tests

* Delete other tests

* Update comment

* Fix init

* Update homeassistant/components/teslemetry/entity.py

Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>

* Add Codeowner

* Update coverage

* requirements

* Add failure for subscription required

* Add VIN to model

* Add wake

* Add context manager

* Rename to wake_up_if_asleep

* Remove context from coverage

* change lock to context

Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>

* Improving Logger

* Add url to subscription error

* Errors cannot markdown

* Fix logger

Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>

* rename logger

* Fix error logging

* Apply suggestions from code review

---------

Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
2024-01-25 12:54:47 +01:00

63 lines
2.1 KiB
Python

"""Config Flow for Teslemetry integration."""
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from aiohttp import ClientConnectionError
from tesla_fleet_api import Teslemetry
from tesla_fleet_api.exceptions import InvalidToken, PaymentRequired, TeslaFleetError
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN, LOGGER
TESLEMETRY_SCHEMA = vol.Schema({vol.Required(CONF_ACCESS_TOKEN): str})
DESCRIPTION_PLACEHOLDERS = {
"short_url": "teslemetry.com/console",
"url": "[teslemetry.com/console](https://teslemetry.com/console)",
}
class TeslemetryConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Config Teslemetry API connection."""
VERSION = 1
async def async_step_user(
self, user_input: Mapping[str, Any] | None = None
) -> FlowResult:
"""Get configuration from the user."""
errors: dict[str, str] = {}
if user_input:
teslemetry = Teslemetry(
session=async_get_clientsession(self.hass),
access_token=user_input[CONF_ACCESS_TOKEN],
)
try:
await teslemetry.test()
except InvalidToken:
errors[CONF_ACCESS_TOKEN] = "invalid_access_token"
except PaymentRequired:
errors["base"] = "subscription_required"
except ClientConnectionError:
errors["base"] = "cannot_connect"
except TeslaFleetError as e:
LOGGER.exception(str(e))
errors["base"] = "unknown"
else:
return self.async_create_entry(
title="Teslemetry",
data=user_input,
)
return self.async_show_form(
step_id="user",
data_schema=TESLEMETRY_SCHEMA,
description_placeholders=DESCRIPTION_PLACEHOLDERS,
errors=errors,
)