Refactor Netatmo integration (#29851)

* Refactor to use ids in data class

* Use station_id

* Refactor Netatmo to use oauth

* Remove old code

* Clean up

* Clean up

* Clean up

* Refactor binary sensor

* Add initial light implementation

* Add discovery

* Add set schedule service back in

* Add discovery via homekit

* More work on the light

* Fix set schedule service

* Clean up

* Remove unnecessary code

* Add support for multiple entities/accounts

* Fix MANUFACTURER typo

* Remove multiline inline if statement

* Only add tags when camera type is welcome

* Remove on/off as it's currently broken

* Fix camera turn_on/off

* Fix debug message

* Refactor some camera code

* Refactor camera methods

* Remove old code

* Rename method

* Update persons regularly

* Remove unused code

* Refactor method

* Fix isort

* Add english strings

* Catch NoDevice exception

* Fix unique id and only add sensors for tags if present

* Address comments

* Remove ToDo comment

* Add set_light_auto back in

* Add debug info

* Fix multiple camera issue

* Move camera light service to camera

* Only allow camera entities

* Make test pass

* Upgrade pyatmo module to 3.2.0

* Update requirements

* Remove list comprehension

* Remove guideline violating code

* Remove stale code

* Rename devices to entities

* Remove light platform

* Remove commented code

* Exclude files from coverage

* Remove unused code

* Fix unique id

* Address comments

* Fix comments

* Exclude sensor as well

* Add another test

* Use core interfaces
This commit is contained in:
cgtobi 2020-01-11 12:20:00 +01:00 committed by Martin Hjelmare
parent 5ffbf55170
commit e793ed9ab0
19 changed files with 762 additions and 927 deletions

View file

@ -455,8 +455,13 @@ omit =
homeassistant/components/nederlandse_spoorwegen/sensor.py homeassistant/components/nederlandse_spoorwegen/sensor.py
homeassistant/components/nello/lock.py homeassistant/components/nello/lock.py
homeassistant/components/nest/* homeassistant/components/nest/*
homeassistant/components/netatmo/* homeassistant/components/netatmo/__init__.py
homeassistant/components/netatmo_public/sensor.py homeassistant/components/netatmo/binary_sensor.py
homeassistant/components/netatmo/api.py
homeassistant/components/netatmo/camera.py
homeassistant/components/netatmo/climate.py
homeassistant/components/netatmo/const.py
homeassistant/components/netatmo/sensor.py
homeassistant/components/netdata/sensor.py homeassistant/components/netdata/sensor.py
homeassistant/components/netgear/device_tracker.py homeassistant/components/netgear/device_tracker.py
homeassistant/components/netgear_lte/* homeassistant/components/netgear_lte/*

View file

@ -222,6 +222,7 @@ homeassistant/components/neato/* @dshokouhi @Santobert
homeassistant/components/nello/* @pschmitt homeassistant/components/nello/* @pschmitt
homeassistant/components/ness_alarm/* @nickw444 homeassistant/components/ness_alarm/* @nickw444
homeassistant/components/nest/* @awarecan homeassistant/components/nest/* @awarecan
homeassistant/components/netatmo/* @cgtobi
homeassistant/components/netdata/* @fabaff homeassistant/components/netdata/* @fabaff
homeassistant/components/nextbus/* @vividboarder homeassistant/components/nextbus/* @vividboarder
homeassistant/components/nilu/* @hfurubotten homeassistant/components/nilu/* @hfurubotten

View file

@ -0,0 +1,18 @@
{
"config": {
"title": "Netatmo",
"step": {
"pick_implementation": {
"title": "Pick Authentication Method"
}
},
"abort": {
"already_setup": "You can only configure one Netatmo account.",
"authorize_url_timeout": "Timeout generating authorize url.",
"missing_configuration": "The Netatmo component is not configured. Please follow the documentation."
},
"create_entry": {
"default": "Successfully authenticated with Netatmo."
}
}
}

View file

@ -1,286 +1,86 @@
"""Support for the Netatmo devices.""" """The Netatmo integration."""
from datetime import timedelta import asyncio
import logging import logging
from urllib.error import HTTPError
import pyatmo
import voluptuous as vol import voluptuous as vol
from homeassistant.const import ( from homeassistant.config_entries import ConfigEntry
CONF_API_KEY, from homeassistant.const import CONF_CLIENT_ID, CONF_CLIENT_SECRET
CONF_DISCOVERY, from homeassistant.core import HomeAssistant
CONF_PASSWORD, from homeassistant.helpers import config_entry_oauth2_flow, config_validation as cv
CONF_URL,
CONF_USERNAME,
EVENT_HOMEASSISTANT_STOP,
)
from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
from .const import DATA_NETATMO_AUTH, DOMAIN from . import api, config_flow
from .const import AUTH, DATA_PERSONS, DOMAIN, OAUTH2_AUTHORIZE, OAUTH2_TOKEN
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
DATA_PERSONS = "netatmo_persons"
DATA_WEBHOOK_URL = "netatmo_webhook_url"
CONF_SECRET_KEY = "secret_key"
CONF_WEBHOOKS = "webhooks"
SERVICE_ADDWEBHOOK = "addwebhook"
SERVICE_DROPWEBHOOK = "dropwebhook"
SERVICE_SETSCHEDULE = "set_schedule"
NETATMO_AUTH = None
NETATMO_WEBHOOK_URL = None
DEFAULT_PERSON = "Unknown"
DEFAULT_DISCOVERY = True
DEFAULT_WEBHOOKS = False
EVENT_PERSON = "person"
EVENT_MOVEMENT = "movement"
EVENT_HUMAN = "human"
EVENT_ANIMAL = "animal"
EVENT_VEHICLE = "vehicle"
EVENT_BUS_PERSON = "netatmo_person"
EVENT_BUS_MOVEMENT = "netatmo_movement"
EVENT_BUS_HUMAN = "netatmo_human"
EVENT_BUS_ANIMAL = "netatmo_animal"
EVENT_BUS_VEHICLE = "netatmo_vehicle"
EVENT_BUS_OTHER = "netatmo_other"
ATTR_ID = "id"
ATTR_PSEUDO = "pseudo"
ATTR_NAME = "name"
ATTR_EVENT_TYPE = "event_type"
ATTR_MESSAGE = "message"
ATTR_CAMERA_ID = "camera_id"
ATTR_HOME_NAME = "home_name"
ATTR_PERSONS = "persons"
ATTR_IS_KNOWN = "is_known"
ATTR_FACE_URL = "face_url"
ATTR_SNAPSHOT_URL = "snapshot_url"
ATTR_VIGNETTE_URL = "vignette_url"
ATTR_SCHEDULE = "schedule"
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)
MIN_TIME_BETWEEN_EVENT_UPDATES = timedelta(seconds=5)
CONFIG_SCHEMA = vol.Schema( CONFIG_SCHEMA = vol.Schema(
{ {
DOMAIN: vol.Schema( DOMAIN: vol.Schema(
{ {
vol.Required(CONF_API_KEY): cv.string, vol.Required(CONF_CLIENT_ID): cv.string,
vol.Required(CONF_PASSWORD): cv.string, vol.Required(CONF_CLIENT_SECRET): cv.string,
vol.Required(CONF_SECRET_KEY): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Optional(CONF_WEBHOOKS, default=DEFAULT_WEBHOOKS): cv.boolean,
vol.Optional(CONF_DISCOVERY, default=DEFAULT_DISCOVERY): cv.boolean,
} }
) )
}, },
extra=vol.ALLOW_EXTRA, extra=vol.ALLOW_EXTRA,
) )
SCHEMA_SERVICE_ADDWEBHOOK = vol.Schema({vol.Optional(CONF_URL): cv.string}) PLATFORMS = ["binary_sensor", "camera", "climate", "sensor"]
SCHEMA_SERVICE_DROPWEBHOOK = vol.Schema({})
SCHEMA_SERVICE_SETSCHEDULE = vol.Schema({vol.Required(ATTR_SCHEDULE): cv.string})
def setup(hass, config): async def async_setup(hass: HomeAssistant, config: dict):
"""Set up the Netatmo devices.""" """Set up the Netatmo component."""
hass.data[DOMAIN] = {}
hass.data[DOMAIN][DATA_PERSONS] = {}
hass.data[DATA_PERSONS] = {} if DOMAIN not in config:
try: return True
auth = pyatmo.ClientAuth(
config[DOMAIN][CONF_API_KEY],
config[DOMAIN][CONF_SECRET_KEY],
config[DOMAIN][CONF_USERNAME],
config[DOMAIN][CONF_PASSWORD],
"read_station read_camera access_camera "
"read_thermostat write_thermostat "
"read_presence access_presence read_homecoach",
)
except HTTPError:
_LOGGER.error("Unable to connect to Netatmo API")
return False
try: config_flow.NetatmoFlowHandler.async_register_implementation(
home_data = pyatmo.HomeData(auth) hass,
except pyatmo.NoDevice: config_entry_oauth2_flow.LocalOAuth2Implementation(
home_data = None hass,
_LOGGER.debug("No climate device. Disable %s service", SERVICE_SETSCHEDULE)
# Store config to be used during entry setup
hass.data[DATA_NETATMO_AUTH] = auth
if config[DOMAIN][CONF_DISCOVERY]:
for component in "camera", "sensor", "binary_sensor", "climate":
discovery.load_platform(hass, component, DOMAIN, {}, config)
if config[DOMAIN][CONF_WEBHOOKS]:
webhook_id = hass.components.webhook.async_generate_id()
hass.data[DATA_WEBHOOK_URL] = hass.components.webhook.async_generate_url(
webhook_id
)
hass.components.webhook.async_register(
DOMAIN, "Netatmo", webhook_id, handle_webhook
)
auth.addwebhook(hass.data[DATA_WEBHOOK_URL])
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, dropwebhook)
def _service_addwebhook(service):
"""Service to (re)add webhooks during runtime."""
url = service.data.get(CONF_URL)
if url is None:
url = hass.data[DATA_WEBHOOK_URL]
_LOGGER.info("Adding webhook for URL: %s", url)
auth.addwebhook(url)
hass.services.register(
DOMAIN,
SERVICE_ADDWEBHOOK,
_service_addwebhook,
schema=SCHEMA_SERVICE_ADDWEBHOOK,
)
def _service_dropwebhook(service):
"""Service to drop webhooks during runtime."""
_LOGGER.info("Dropping webhook")
auth.dropwebhook()
hass.services.register(
DOMAIN,
SERVICE_DROPWEBHOOK,
_service_dropwebhook,
schema=SCHEMA_SERVICE_DROPWEBHOOK,
)
def _service_setschedule(service):
"""Service to change current home schedule."""
schedule_name = service.data.get(ATTR_SCHEDULE)
home_data.switchHomeSchedule(schedule=schedule_name)
_LOGGER.info("Set home schedule to %s", schedule_name)
if home_data is not None:
hass.services.register(
DOMAIN, DOMAIN,
SERVICE_SETSCHEDULE, config[DOMAIN][CONF_CLIENT_ID],
_service_setschedule, config[DOMAIN][CONF_CLIENT_SECRET],
schema=SCHEMA_SERVICE_SETSCHEDULE, OAUTH2_AUTHORIZE,
OAUTH2_TOKEN,
),
)
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Set up Netatmo from a config entry."""
implementation = await config_entry_oauth2_flow.async_get_config_entry_implementation(
hass, entry
)
hass.data[DOMAIN][entry.entry_id] = {
AUTH: api.ConfigEntryNetatmoAuth(hass, entry, implementation)
}
for component in PLATFORMS:
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(entry, component)
) )
return True return True
def dropwebhook(hass): async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Drop the webhook subscription.""" """Unload a config entry."""
auth = hass.data[DATA_NETATMO_AUTH] unload_ok = all(
auth.dropwebhook() await asyncio.gather(
*[
hass.config_entries.async_forward_entry_unload(entry, component)
async def handle_webhook(hass, webhook_id, request): for component in PLATFORMS
"""Handle webhook callback.""" ]
try:
data = await request.json()
except ValueError:
return None
_LOGGER.debug("Got webhook data: %s", data)
published_data = {
ATTR_EVENT_TYPE: data.get(ATTR_EVENT_TYPE),
ATTR_HOME_NAME: data.get(ATTR_HOME_NAME),
ATTR_CAMERA_ID: data.get(ATTR_CAMERA_ID),
ATTR_MESSAGE: data.get(ATTR_MESSAGE),
}
if data.get(ATTR_EVENT_TYPE) == EVENT_PERSON:
for person in data[ATTR_PERSONS]:
published_data[ATTR_ID] = person.get(ATTR_ID)
published_data[ATTR_NAME] = hass.data[DATA_PERSONS].get(
published_data[ATTR_ID], DEFAULT_PERSON
)
published_data[ATTR_IS_KNOWN] = person.get(ATTR_IS_KNOWN)
published_data[ATTR_FACE_URL] = person.get(ATTR_FACE_URL)
hass.bus.async_fire(EVENT_BUS_PERSON, published_data)
elif data.get(ATTR_EVENT_TYPE) == EVENT_MOVEMENT:
published_data[ATTR_VIGNETTE_URL] = data.get(ATTR_VIGNETTE_URL)
published_data[ATTR_SNAPSHOT_URL] = data.get(ATTR_SNAPSHOT_URL)
hass.bus.async_fire(EVENT_BUS_MOVEMENT, published_data)
elif data.get(ATTR_EVENT_TYPE) == EVENT_HUMAN:
published_data[ATTR_VIGNETTE_URL] = data.get(ATTR_VIGNETTE_URL)
published_data[ATTR_SNAPSHOT_URL] = data.get(ATTR_SNAPSHOT_URL)
hass.bus.async_fire(EVENT_BUS_HUMAN, published_data)
elif data.get(ATTR_EVENT_TYPE) == EVENT_ANIMAL:
published_data[ATTR_VIGNETTE_URL] = data.get(ATTR_VIGNETTE_URL)
published_data[ATTR_SNAPSHOT_URL] = data.get(ATTR_SNAPSHOT_URL)
hass.bus.async_fire(EVENT_BUS_ANIMAL, published_data)
elif data.get(ATTR_EVENT_TYPE) == EVENT_VEHICLE:
hass.bus.async_fire(EVENT_BUS_VEHICLE, published_data)
published_data[ATTR_VIGNETTE_URL] = data.get(ATTR_VIGNETTE_URL)
published_data[ATTR_SNAPSHOT_URL] = data.get(ATTR_SNAPSHOT_URL)
else:
hass.bus.async_fire(EVENT_BUS_OTHER, data)
class CameraData:
"""Get the latest data from Netatmo."""
def __init__(self, hass, auth, home=None):
"""Initialize the data object."""
self._hass = hass
self.auth = auth
self.camera_data = None
self.camera_names = []
self.module_names = []
self.home = home
self.camera_type = None
def get_camera_names(self):
"""Return all camera available on the API as a list."""
self.camera_names = []
self.update()
if not self.home:
for home in self.camera_data.cameras:
for camera in self.camera_data.cameras[home].values():
self.camera_names.append(camera["name"])
else:
for camera in self.camera_data.cameras[self.home].values():
self.camera_names.append(camera["name"])
return self.camera_names
def get_module_names(self, camera_name):
"""Return all module available on the API as a list."""
self.module_names = []
self.update()
cam_id = self.camera_data.cameraByName(camera=camera_name, home=self.home)["id"]
for module in self.camera_data.modules.values():
if cam_id == module["cam_id"]:
self.module_names.append(module["name"])
return self.module_names
def get_camera_type(self, camera=None, home=None, cid=None):
"""Return camera type for a camera, cid has preference over camera."""
self.camera_type = self.camera_data.cameraType(
camera=camera, home=home, cid=cid
) )
return self.camera_type )
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
def get_persons(self): return unload_ok
"""Gather person data for webhooks."""
for person_id, person_data in self.camera_data.persons.items():
self._hass.data[DATA_PERSONS][person_id] = person_data.get(ATTR_PSEUDO)
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Call the Netatmo API to update the data."""
self.camera_data = pyatmo.CameraData(self.auth, size=100)
@Throttle(MIN_TIME_BETWEEN_EVENT_UPDATES)
def update_event(self):
"""Call the Netatmo API to update the events."""
self.camera_data.updateEvent(home=self.home, devicetype=self.camera_type)

View file

@ -0,0 +1,35 @@
"""API for Netatmo bound to HASS OAuth."""
from asyncio import run_coroutine_threadsafe
import logging
import pyatmo
from homeassistant import config_entries, core
from homeassistant.helpers import config_entry_oauth2_flow
_LOGGER = logging.getLogger(__name__)
class ConfigEntryNetatmoAuth(pyatmo.auth.NetatmOAuth2):
"""Provide Netatmo authentication tied to an OAuth2 based config entry."""
def __init__(
self,
hass: core.HomeAssistant,
config_entry: config_entries.ConfigEntry,
implementation: config_entry_oauth2_flow.AbstractOAuth2Implementation,
):
"""Initialize Netatmo Auth."""
self.hass = hass
self.session = config_entry_oauth2_flow.OAuth2Session(
hass, config_entry, implementation
)
super().__init__(token=self.session.token)
def refresh_tokens(self,) -> dict:
"""Refresh and return new Netatmo tokens using Home Assistant OAuth2 session."""
run_coroutine_threadsafe(
self.session.async_ensure_token_valid(), self.hass.loop
).result()
return self.session.token

View file

@ -1,15 +1,12 @@
"""Support for the Netatmo binary sensors.""" """Support for the Netatmo binary sensors."""
import logging import logging
from pyatmo import NoDevice import pyatmo
import voluptuous as vol
from homeassistant.components.binary_sensor import PLATFORM_SCHEMA, BinarySensorDevice from homeassistant.components.binary_sensor import BinarySensorDevice
from homeassistant.const import CONF_TIMEOUT
from homeassistant.helpers import config_validation as cv
from . import CameraData from .camera import CameraData
from .const import DATA_NETATMO_AUTH from .const import AUTH, DOMAIN, MANUFACTURER
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -27,6 +24,8 @@ PRESENCE_SENSOR_TYPES = {
} }
TAG_SENSOR_TYPES = {"Tag Vibration": "vibration", "Tag Open": "opening"} TAG_SENSOR_TYPES = {"Tag Vibration": "vibration", "Tag Open": "opening"}
SENSOR_TYPES = {"NACamera": WELCOME_SENSOR_TYPES, "NOC": PRESENCE_SENSOR_TYPES}
CONF_HOME = "home" CONF_HOME = "home"
CONF_CAMERAS = "cameras" CONF_CAMERAS = "cameras"
CONF_WELCOME_SENSORS = "welcome_sensors" CONF_WELCOME_SENSORS = "welcome_sensors"
@ -35,130 +34,80 @@ CONF_TAG_SENSORS = "tag_sensors"
DEFAULT_TIMEOUT = 90 DEFAULT_TIMEOUT = 90
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_CAMERAS, default=[]): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_HOME): cv.string,
vol.Optional(
CONF_PRESENCE_SENSORS, default=list(PRESENCE_SENSOR_TYPES)
): vol.All(cv.ensure_list, [vol.In(PRESENCE_SENSOR_TYPES)]),
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
vol.Optional(CONF_WELCOME_SENSORS, default=list(WELCOME_SENSOR_TYPES)): vol.All(
cv.ensure_list, [vol.In(WELCOME_SENSOR_TYPES)]
),
}
)
async def async_setup_entry(hass, entry, async_add_entities):
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the access to Netatmo binary sensor.""" """Set up the access to Netatmo binary sensor."""
home = config.get(CONF_HOME) auth = hass.data[DOMAIN][entry.entry_id][AUTH]
timeout = config.get(CONF_TIMEOUT)
if timeout is None:
timeout = DEFAULT_TIMEOUT
module_name = None def get_entities():
"""Retrieve Netatmo entities."""
entities = []
auth = hass.data[DATA_NETATMO_AUTH] def get_camera_home_id(data, camera_id):
"""Return the home id for a given camera id."""
try: for home_id in data.camera_data.cameras:
data = CameraData(hass, auth, home) for camera in data.camera_data.cameras[home_id].values():
if not data.get_camera_names(): if camera["id"] == camera_id:
return home_id
return None return None
except NoDevice:
return None
welcome_sensors = config.get(CONF_WELCOME_SENSORS, WELCOME_SENSOR_TYPES) try:
presence_sensors = config.get(CONF_PRESENCE_SENSORS, PRESENCE_SENSOR_TYPES) data = CameraData(hass, auth)
tag_sensors = config.get(CONF_TAG_SENSORS, TAG_SENSOR_TYPES)
for camera_name in data.get_camera_names(): for camera in data.get_all_cameras():
camera_type = data.get_camera_type(camera=camera_name, home=home) home_id = get_camera_home_id(data, camera_id=camera["id"])
if camera_type == "NACamera":
if CONF_CAMERAS in config:
if (
config[CONF_CAMERAS] != []
and camera_name not in config[CONF_CAMERAS]
):
continue
for variable in welcome_sensors:
add_entities(
[
NetatmoBinarySensor(
data,
camera_name,
module_name,
home,
timeout,
camera_type,
variable,
)
],
True,
)
if camera_type == "NOC":
if CONF_CAMERAS in config:
if (
config[CONF_CAMERAS] != []
and camera_name not in config[CONF_CAMERAS]
):
continue
for variable in presence_sensors:
add_entities(
[
NetatmoBinarySensor(
data,
camera_name,
module_name,
home,
timeout,
camera_type,
variable,
)
],
True,
)
for module_name in data.get_module_names(camera_name): sensor_types = {}
for variable in tag_sensors: sensor_types.update(SENSOR_TYPES[camera["type"]])
camera_type = None
add_entities( # Tags are only supported with Netatmo Welcome indoor cameras
[ if camera["type"] == "NACamera" and data.get_modules(camera["id"]):
NetatmoBinarySensor( sensor_types.update(TAG_SENSOR_TYPES)
data,
camera_name, for sensor_name in sensor_types:
module_name, entities.append(
home, NetatmoBinarySensor(data, camera["id"], home_id, sensor_name)
timeout, )
camera_type, except pyatmo.NoDevice:
variable, _LOGGER.debug("No camera entities to add")
)
], return entities
True,
) async_add_entities(await hass.async_add_executor_job(get_entities), True)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the access to Netatmo binary sensor."""
pass
class NetatmoBinarySensor(BinarySensorDevice): class NetatmoBinarySensor(BinarySensorDevice):
"""Represent a single binary sensor in a Netatmo Camera device.""" """Represent a single binary sensor in a Netatmo Camera device."""
def __init__( def __init__(self, data, camera_id, home_id, sensor_type, module_id=None):
self, data, camera_name, module_name, home, timeout, camera_type, sensor
):
"""Set up for access to the Netatmo camera events.""" """Set up for access to the Netatmo camera events."""
self._data = data self._data = data
self._camera_name = camera_name self._camera_id = camera_id
self._module_name = module_name self._module_id = module_id
self._home = home self._sensor_type = sensor_type
self._timeout = timeout camera_info = data.camera_data.cameraById(cid=camera_id)
if home: self._camera_name = camera_info["name"]
self._name = f"{home} / {camera_name}" self._camera_type = camera_info["type"]
self._home_id = home_id
self._home_name = self._data.camera_data.getHomeName(home_id=home_id)
self._timeout = DEFAULT_TIMEOUT
if module_id:
self._module_name = data.camera_data.moduleById(mid=module_id)["name"]
self._name = (
f"{MANUFACTURER} {self._camera_name} {self._module_name} {sensor_type}"
)
self._unique_id = (
f"{self._camera_id}-{self._module_id}-"
f"{self._camera_type}-{sensor_type}"
)
else: else:
self._name = camera_name self._name = f"{MANUFACTURER} {self._camera_name} {sensor_type}"
if module_name: self._unique_id = f"{self._camera_id}-{self._camera_type}-{sensor_type}"
self._name += f" / {module_name}"
self._sensor_name = sensor
self._name += f" {sensor}"
self._cameratype = camera_type
self._state = None self._state = None
@property @property
@ -167,13 +116,19 @@ class NetatmoBinarySensor(BinarySensorDevice):
return self._name return self._name
@property @property
def device_class(self): def unique_id(self):
"""Return the class of this sensor, from DEVICE_CLASSES.""" """Return the unique ID for this sensor."""
if self._cameratype == "NACamera": return self._unique_id
return WELCOME_SENSOR_TYPES.get(self._sensor_name)
if self._cameratype == "NOC": @property
return PRESENCE_SENSOR_TYPES.get(self._sensor_name) def device_info(self):
return TAG_SENSOR_TYPES.get(self._sensor_name) """Return the device info for the sensor."""
return {
"identifiers": {(DOMAIN, self._camera_id)},
"name": self._camera_name,
"manufacturer": MANUFACTURER,
"model": self._camera_type,
}
@property @property
def is_on(self): def is_on(self):
@ -183,43 +138,43 @@ class NetatmoBinarySensor(BinarySensorDevice):
def update(self): def update(self):
"""Request an update from the Netatmo API.""" """Request an update from the Netatmo API."""
self._data.update() self._data.update()
self._data.update_event() self._data.update_event(camera_type=self._camera_type)
if self._cameratype == "NACamera": if self._camera_type == "NACamera":
if self._sensor_name == "Someone known": if self._sensor_type == "Someone known":
self._state = self._data.camera_data.someoneKnownSeen( self._state = self._data.camera_data.someone_known_seen(
self._home, self._camera_name, self._timeout cid=self._camera_id, exclude=self._timeout
) )
elif self._sensor_name == "Someone unknown": elif self._sensor_type == "Someone unknown":
self._state = self._data.camera_data.someoneUnknownSeen( self._state = self._data.camera_data.someone_unknown_seen(
self._home, self._camera_name, self._timeout cid=self._camera_id, exclude=self._timeout
) )
elif self._sensor_name == "Motion": elif self._sensor_type == "Motion":
self._state = self._data.camera_data.motionDetected( self._state = self._data.camera_data.motion_detected(
self._home, self._camera_name, self._timeout cid=self._camera_id, exclude=self._timeout
) )
elif self._cameratype == "NOC": elif self._camera_type == "NOC":
if self._sensor_name == "Outdoor motion": if self._sensor_type == "Outdoor motion":
self._state = self._data.camera_data.outdoormotionDetected( self._state = self._data.camera_data.outdoor_motion_detected(
self._home, self._camera_name, self._timeout cid=self._camera_id, offset=self._timeout
) )
elif self._sensor_name == "Outdoor human": elif self._sensor_type == "Outdoor human":
self._state = self._data.camera_data.humanDetected( self._state = self._data.camera_data.human_detected(
self._home, self._camera_name, self._timeout cid=self._camera_id, offset=self._timeout
) )
elif self._sensor_name == "Outdoor animal": elif self._sensor_type == "Outdoor animal":
self._state = self._data.camera_data.animalDetected( self._state = self._data.camera_data.animal_detected(
self._home, self._camera_name, self._timeout cid=self._camera_id, offset=self._timeout
) )
elif self._sensor_name == "Outdoor vehicle": elif self._sensor_type == "Outdoor vehicle":
self._state = self._data.camera_data.carDetected( self._state = self._data.camera_data.car_detected(
self._home, self._camera_name, self._timeout cid=self._camera_id, offset=self._timeout
) )
if self._sensor_name == "Tag Vibration": if self._sensor_type == "Tag Vibration":
self._state = self._data.camera_data.moduleMotionDetected( self._state = self._data.camera_data.module_motion_detected(
self._home, self._module_name, self._camera_name, self._timeout mid=self._module_id, cid=self._camera_id, exclude=self._timeout
) )
elif self._sensor_name == "Tag Open": elif self._sensor_type == "Tag Open":
self._state = self._data.camera_data.moduleOpened( self._state = self._data.camera_data.module_opened(
self._home, self._module_name, self._camera_name, self._timeout mid=self._module_id, cid=self._camera_id, exclude=self._timeout
) )

View file

@ -1,25 +1,28 @@
"""Support for the Netatmo cameras.""" """Support for the Netatmo cameras."""
import logging import logging
from pyatmo import NoDevice import pyatmo
import requests import requests
import voluptuous as vol import voluptuous as vol
from homeassistant.components.camera import ( from homeassistant.components.camera import (
CAMERA_SERVICE_SCHEMA, DOMAIN as CAMERA_DOMAIN,
PLATFORM_SCHEMA,
SUPPORT_STREAM, SUPPORT_STREAM,
Camera, Camera,
) )
from homeassistant.const import CONF_VERIFY_SSL, STATE_OFF, STATE_ON from homeassistant.const import ATTR_ENTITY_ID, STATE_OFF, STATE_ON
from homeassistant.helpers import config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import ( from homeassistant.util import Throttle
async_dispatcher_connect,
async_dispatcher_send,
)
from . import CameraData from .const import (
from .const import DATA_NETATMO_AUTH, DOMAIN ATTR_PSEUDO,
AUTH,
DATA_PERSONS,
DOMAIN,
MANUFACTURER,
MIN_TIME_BETWEEN_EVENT_UPDATES,
MIN_TIME_BETWEEN_UPDATES,
)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -31,96 +34,61 @@ DEFAULT_QUALITY = "high"
VALID_QUALITIES = ["high", "medium", "low", "poor"] VALID_QUALITIES = ["high", "medium", "low", "poor"]
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_VERIFY_SSL, default=True): cv.boolean,
vol.Optional(CONF_HOME): cv.string,
vol.Optional(CONF_CAMERAS, default=[]): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_QUALITY, default=DEFAULT_QUALITY): vol.All(
cv.string, vol.In(VALID_QUALITIES)
),
}
)
_BOOL_TO_STATE = {True: STATE_ON, False: STATE_OFF} _BOOL_TO_STATE = {True: STATE_ON, False: STATE_OFF}
SCHEMA_SERVICE_SETLIGHTAUTO = vol.Schema(
{vol.Optional(ATTR_ENTITY_ID): cv.entity_domain(CAMERA_DOMAIN)}
)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up access to Netatmo cameras."""
home = config.get(CONF_HOME)
verify_ssl = config.get(CONF_VERIFY_SSL, True)
quality = config.get(CONF_QUALITY, DEFAULT_QUALITY)
auth = hass.data[DATA_NETATMO_AUTH] async def async_setup_entry(hass, entry, async_add_entities):
"""Set up the Netatmo camera platform."""
try: def get_entities():
data = CameraData(hass, auth, home) """Retrieve Netatmo entities."""
for camera_name in data.get_camera_names(): entities = []
camera_type = data.get_camera_type(camera=camera_name, home=home) try:
if CONF_CAMERAS in config: camera_data = CameraData(hass, hass.data[DOMAIN][entry.entry_id][AUTH])
if ( for camera in camera_data.get_all_cameras():
config[CONF_CAMERAS] != [] _LOGGER.debug("Setting up camera %s %s", camera["id"], camera["name"])
and camera_name not in config[CONF_CAMERAS] entities.append(
):
continue
add_entities(
[
NetatmoCamera( NetatmoCamera(
data, camera_name, home, camera_type, verify_ssl, quality camera_data, camera["id"], camera["type"], True, DEFAULT_QUALITY
) )
] )
) camera_data.update_persons()
data.get_persons() except pyatmo.NoDevice:
except NoDevice: _LOGGER.debug("No cameras found")
return None return entities
async def async_service_handler(call): async_add_entities(await hass.async_add_executor_job(get_entities), True)
"""Handle service call."""
_LOGGER.debug(
"Service handler invoked with service=%s and data=%s",
call.service,
call.data,
)
service = call.service
entity_id = call.data["entity_id"][0]
async_dispatcher_send(hass, f"{service}_{entity_id}")
hass.services.async_register(
DOMAIN, "set_light_auto", async_service_handler, CAMERA_SERVICE_SCHEMA async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
) """Set up the Netatmo camera platform."""
hass.services.async_register( return
DOMAIN, "set_light_on", async_service_handler, CAMERA_SERVICE_SCHEMA
)
hass.services.async_register(
DOMAIN, "set_light_off", async_service_handler, CAMERA_SERVICE_SCHEMA
)
class NetatmoCamera(Camera): class NetatmoCamera(Camera):
"""Representation of the images published from a Netatmo camera.""" """Representation of a Netatmo camera."""
def __init__(self, data, camera_name, home, camera_type, verify_ssl, quality): def __init__(self, data, camera_id, camera_type, verify_ssl, quality):
"""Set up for access to the Netatmo camera images.""" """Set up for access to the Netatmo camera images."""
super().__init__() super().__init__()
self._data = data self._data = data
self._camera_name = camera_name self._camera_id = camera_id
self._home = home self._camera_name = self._data.camera_data.get_camera(cid=camera_id).get("name")
if home: self._name = f"{MANUFACTURER} {self._camera_name}"
self._name = f"{home} / {camera_name}" self._camera_type = camera_type
else: self._unique_id = f"{self._camera_id}-{self._camera_type}"
self._name = camera_name
self._cameratype = camera_type
self._verify_ssl = verify_ssl self._verify_ssl = verify_ssl
self._quality = quality self._quality = quality
# URLs. # URLs
self._vpnurl = None self._vpnurl = None
self._localurl = None self._localurl = None
# Identifier # Monitoring status
self._id = None
# Monitoring status.
self._status = None self._status = None
# SD Card status # SD Card status
@ -132,12 +100,6 @@ class NetatmoCamera(Camera):
# Is local # Is local
self._is_local = None self._is_local = None
# VPN URL
self._vpn_url = None
# Light mode status
self._light_mode_status = None
def camera_image(self): def camera_image(self):
"""Return a still image response from the camera.""" """Return a still image response from the camera."""
try: try:
@ -152,23 +114,21 @@ class NetatmoCamera(Camera):
verify=self._verify_ssl, verify=self._verify_ssl,
) )
else: else:
_LOGGER.error("Welcome VPN URL is None") _LOGGER.error("Welcome/Presence VPN URL is None")
self._data.update() self._data.update()
(self._vpnurl, self._localurl) = self._data.camera_data.cameraUrls( (self._vpnurl, self._localurl) = self._data.camera_data.camera_urls(
camera=self._camera_name cid=self._camera_id
) )
return None return None
except requests.exceptions.RequestException as error: except requests.exceptions.RequestException as error:
_LOGGER.error("Welcome URL changed: %s", error) _LOGGER.info("Welcome/Presence URL changed: %s", error)
self._data.update() self._data.update()
(self._vpnurl, self._localurl) = self._data.camera_data.cameraUrls( (self._vpnurl, self._localurl) = self._data.camera_data.camera_urls(
camera=self._camera_name cid=self._camera_id
) )
return None return None
return response.content return response.content
# Entity property overrides
@property @property
def should_poll(self) -> bool: def should_poll(self) -> bool:
"""Return True if entity has to be polled for state. """Return True if entity has to be polled for state.
@ -182,24 +142,26 @@ class NetatmoCamera(Camera):
"""Return the name of this Netatmo camera device.""" """Return the name of this Netatmo camera device."""
return self._name return self._name
@property
def device_info(self):
"""Return the device info for the sensor."""
return {
"identifiers": {(DOMAIN, self._camera_id)},
"name": self._camera_name,
"manufacturer": MANUFACTURER,
"model": self._camera_type,
}
@property @property
def device_state_attributes(self): def device_state_attributes(self):
"""Return the Netatmo-specific camera state attributes.""" """Return the Netatmo-specific camera state attributes."""
_LOGGER.debug("Getting new attributes from camera netatmo '%s'", self._name)
attr = {} attr = {}
attr["id"] = self._id attr["id"] = self._camera_id
attr["status"] = self._status attr["status"] = self._status
attr["sd_status"] = self._sd_status attr["sd_status"] = self._sd_status
attr["alim_status"] = self._alim_status attr["alim_status"] = self._alim_status
attr["is_local"] = self._is_local attr["is_local"] = self._is_local
attr["vpn_url"] = self._vpn_url attr["vpn_url"] = self._vpnurl
if self.model == "Presence":
attr["light_mode_status"] = self._light_mode_status
_LOGGER.debug("Attributes of '%s' = %s", self._name, attr)
return attr return attr
@ -221,7 +183,7 @@ class NetatmoCamera(Camera):
@property @property
def brand(self): def brand(self):
"""Return the camera brand.""" """Return the camera brand."""
return "Netatmo" return MANUFACTURER
@property @property
def motion_detection_enabled(self): def motion_detection_enabled(self):
@ -243,173 +205,84 @@ class NetatmoCamera(Camera):
@property @property
def model(self): def model(self):
"""Return the camera model.""" """Return the camera model."""
if self._cameratype == "NOC": if self._camera_type == "NOC":
return "Presence" return "Presence"
if self._cameratype == "NACamera": if self._camera_type == "NACamera":
return "Welcome" return "Welcome"
return None return None
# Other Entity method overrides @property
def unique_id(self):
async def async_added_to_hass(self): """Return the unique ID for this sensor."""
"""Subscribe to signals and add camera to list.""" return self._unique_id
_LOGGER.debug("Registering services for entity_id=%s", self.entity_id)
async_dispatcher_connect(
self.hass, f"set_light_auto_{self.entity_id}", self.set_light_auto
)
async_dispatcher_connect(
self.hass, f"set_light_on_{self.entity_id}", self.set_light_on
)
async_dispatcher_connect(
self.hass, f"set_light_off_{self.entity_id}", self.set_light_off
)
def update(self): def update(self):
"""Update entity status.""" """Update entity status."""
_LOGGER.debug("Updating camera netatmo '%s'", self._name) # Refresh camera data
# Refresh camera data.
self._data.update() self._data.update()
# URLs. camera = self._data.camera_data.get_camera(cid=self._camera_id)
self._vpnurl, self._localurl = self._data.camera_data.cameraUrls(
camera=self._camera_name # URLs
self._vpnurl, self._localurl = self._data.camera_data.camera_urls(
cid=self._camera_id
) )
# Identifier # Monitoring status
self._id = self._data.camera_data.cameraByName( self._status = camera.get("status")
camera=self._camera_name, home=self._home
)["id"]
# Monitoring status.
self._status = self._data.camera_data.cameraByName(
camera=self._camera_name, home=self._home
)["status"]
_LOGGER.debug("Status of '%s' = %s", self._name, self._status)
# SD Card status # SD Card status
self._sd_status = self._data.camera_data.cameraByName( self._sd_status = camera.get("sd_status")
camera=self._camera_name, home=self._home
)["sd_status"]
# Power status # Power status
self._alim_status = self._data.camera_data.cameraByName( self._alim_status = camera.get("alim_status")
camera=self._camera_name, home=self._home
)["alim_status"]
# Is local # Is local
self._is_local = self._data.camera_data.cameraByName( self._is_local = camera.get("is_local")
camera=self._camera_name, home=self._home
)["is_local"]
# VPN URL
self._vpn_url = self._data.camera_data.cameraByName(
camera=self._camera_name, home=self._home
)["vpn_url"]
self.is_streaming = self._alim_status == "on" self.is_streaming = self._alim_status == "on"
if self.model == "Presence":
# Light mode status
self._light_mode_status = self._data.camera_data.cameraByName(
camera=self._camera_name, home=self._home
)["light_mode_status"]
# Camera method overrides class CameraData:
"""Get the latest data from Netatmo."""
def enable_motion_detection(self): def __init__(self, hass, auth):
"""Enable motion detection in the camera.""" """Initialize the data object."""
_LOGGER.debug("Enable motion detection of the camera '%s'", self._name) self._hass = hass
self._enable_motion_detection(True) self.auth = auth
self.camera_data = None
def disable_motion_detection(self): def get_all_cameras(self):
"""Disable motion detection in camera.""" """Return all camera available on the API as a list."""
_LOGGER.debug("Disable motion detection of the camera '%s'", self._name) self.update()
self._enable_motion_detection(False) cameras = []
for camera in self.camera_data.cameras.values():
cameras.extend(camera.values())
return cameras
def _enable_motion_detection(self, enable): def get_modules(self, camera_id):
"""Enable or disable motion detection.""" """Return all modules for a given camera."""
try: return self.camera_data.get_camera(camera_id).get("modules", [])
if self._localurl:
requests.get( def get_camera_type(self, camera_id):
f"{self._localurl}/command/changestatus?status={_BOOL_TO_STATE.get(enable)}", """Return camera type for a camera, cid has preference over camera."""
timeout=10, return self.camera_data.cameraType(cid=camera_id)
)
elif self._vpnurl: def update_persons(self):
requests.get( """Gather person data for webhooks."""
f"{self._vpnurl}/command/changestatus?status={_BOOL_TO_STATE.get(enable)}", for person_id, person_data in self.camera_data.persons.items():
timeout=10, self._hass.data[DOMAIN][DATA_PERSONS][person_id] = person_data.get(
verify=self._verify_ssl, ATTR_PSEUDO
)
else:
_LOGGER.error("Welcome/Presence VPN URL is None")
self._data.update()
(self._vpnurl, self._localurl) = self._data.camera_data.cameraUrls(
camera=self._camera_name
)
return None
except requests.exceptions.RequestException as error:
_LOGGER.error("Welcome/Presence URL changed: %s", error)
self._data.update()
(self._vpnurl, self._localurl) = self._data.camera_data.cameraUrls(
camera=self._camera_name
) )
return None
else:
self.async_schedule_update_ha_state(True)
# Netatmo Presence specific camera method. @Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Call the Netatmo API to update the data."""
self.camera_data = pyatmo.CameraData(self.auth, size=100)
self.update_persons()
def set_light_auto(self): @Throttle(MIN_TIME_BETWEEN_EVENT_UPDATES)
"""Set flood light in automatic mode.""" def update_event(self, camera_type):
_LOGGER.debug( """Call the Netatmo API to update the events."""
"Set the flood light in automatic mode for the camera '%s'", self._name self.camera_data.updateEvent(devicetype=camera_type)
)
self._set_light_mode("auto")
def set_light_on(self):
"""Set flood light on."""
_LOGGER.debug("Set the flood light on for the camera '%s'", self._name)
self._set_light_mode("on")
def set_light_off(self):
"""Set flood light off."""
_LOGGER.debug("Set the flood light off for the camera '%s'", self._name)
self._set_light_mode("off")
def _set_light_mode(self, mode):
"""Set light mode ('auto', 'on', 'off')."""
if self.model == "Presence":
try:
config = f'{{"mode":"{mode}"}}'
if self._localurl:
requests.get(
f"{self._localurl}/command/floodlight_set_config?config={config}",
timeout=10,
)
elif self._vpnurl:
requests.get(
f"{self._vpnurl}/command/floodlight_set_config?config={config}",
timeout=10,
verify=self._verify_ssl,
)
else:
_LOGGER.error("Presence VPN URL is None")
self._data.update()
(self._vpnurl, self._localurl) = self._data.camera_data.cameraUrls(
camera=self._camera_name
)
return None
except requests.exceptions.RequestException as error:
_LOGGER.error("Presence URL changed: %s", error)
self._data.update()
(self._vpnurl, self._localurl) = self._data.camera_data.cameraUrls(
camera=self._camera_name
)
return None
else:
self.async_schedule_update_ha_state(True)
else:
_LOGGER.error("Unsupported camera model for light mode")

View file

@ -7,7 +7,7 @@ import pyatmo
import requests import requests
import voluptuous as vol import voluptuous as vol
from homeassistant.components.climate import PLATFORM_SCHEMA, ClimateDevice from homeassistant.components.climate import ClimateDevice
from homeassistant.components.climate.const import ( from homeassistant.components.climate.const import (
CURRENT_HVAC_HEAT, CURRENT_HVAC_HEAT,
CURRENT_HVAC_IDLE, CURRENT_HVAC_IDLE,
@ -23,15 +23,21 @@ from homeassistant.components.climate.const import (
from homeassistant.const import ( from homeassistant.const import (
ATTR_BATTERY_LEVEL, ATTR_BATTERY_LEVEL,
ATTR_TEMPERATURE, ATTR_TEMPERATURE,
CONF_NAME,
PRECISION_HALVES, PRECISION_HALVES,
STATE_OFF, STATE_OFF,
TEMP_CELSIUS, TEMP_CELSIUS,
) )
import homeassistant.helpers.config_validation as cv from homeassistant.helpers import config_validation as cv
from homeassistant.util import Throttle from homeassistant.util import Throttle
from .const import DATA_NETATMO_AUTH from .const import (
ATTR_HOME_NAME,
ATTR_SCHEDULE_NAME,
AUTH,
DOMAIN,
MANUFACTURER,
SERVICE_SETSCHEDULE,
)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -85,63 +91,67 @@ CONF_ROOMS = "rooms"
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=300) MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=300)
HOME_CONFIG_SCHEMA = vol.Schema(
{
vol.Required(CONF_NAME): cv.string,
vol.Optional(CONF_ROOMS, default=[]): vol.All(cv.ensure_list, [cv.string]),
}
)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{vol.Optional(CONF_HOMES): vol.All(cv.ensure_list, [HOME_CONFIG_SCHEMA])}
)
DEFAULT_MAX_TEMP = 30 DEFAULT_MAX_TEMP = 30
NA_THERM = "NATherm1" NA_THERM = "NATherm1"
NA_VALVE = "NRV" NA_VALVE = "NRV"
SCHEMA_SERVICE_SETSCHEDULE = vol.Schema(
{
vol.Required(ATTR_SCHEDULE_NAME): cv.string,
vol.Required(ATTR_HOME_NAME): cv.string,
}
)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the NetAtmo Thermostat."""
homes_conf = config.get(CONF_HOMES)
auth = hass.data[DATA_NETATMO_AUTH] async def async_setup_entry(hass, entry, async_add_entities):
"""Set up the Netatmo energy platform."""
auth = hass.data[DOMAIN][entry.entry_id][AUTH]
home_data = HomeData(auth) home_data = HomeData(auth)
try:
home_data.setup()
except pyatmo.NoDevice:
return
home_ids = [] def get_entities():
rooms = {} """Retrieve Netatmo entities."""
if homes_conf is not None: entities = []
for home_conf in homes_conf:
home = home_conf[CONF_NAME]
home_id = home_data.homedata.gethomeId(home)
if home_conf[CONF_ROOMS] != []:
rooms[home_id] = home_conf[CONF_ROOMS]
home_ids.append(home_id)
else:
home_ids = home_data.get_home_ids()
devices = []
for home_id in home_ids:
_LOGGER.debug("Setting up %s ...", home_id)
try: try:
room_data = ThermostatData(auth, home_id) home_data.setup()
except pyatmo.NoDevice: except pyatmo.NoDevice:
continue return
for room_id in room_data.get_room_ids(): home_ids = home_data.get_all_home_ids()
room_name = room_data.homedata.rooms[home_id][room_id]["name"]
_LOGGER.debug("Setting up %s (%s) ...", room_name, room_id) for home_id in home_ids:
if home_id in rooms and room_name not in rooms[home_id]: _LOGGER.debug("Setting up home %s ...", home_id)
_LOGGER.debug("Excluding %s ...", room_name) try:
room_data = ThermostatData(auth, home_id)
except pyatmo.NoDevice:
continue continue
_LOGGER.debug("Adding devices for room %s (%s) ...", room_name, room_id) for room_id in room_data.get_room_ids():
devices.append(NetatmoThermostat(room_data, room_id)) room_name = room_data.homedata.rooms[home_id][room_id]["name"]
add_entities(devices, True) _LOGGER.debug("Setting up room %s (%s) ...", room_name, room_id)
entities.append(NetatmoThermostat(room_data, room_id))
return entities
async_add_entities(await hass.async_add_executor_job(get_entities), True)
def _service_setschedule(service):
"""Service to change current home schedule."""
home_name = service.data.get(ATTR_HOME_NAME)
schedule_name = service.data.get(ATTR_SCHEDULE_NAME)
home_data.homedata.switchHomeSchedule(schedule=schedule_name, home=home_name)
_LOGGER.info("Set home (%s) schedule to %s", home_name, schedule_name)
if home_data.homedata is not None:
hass.services.async_register(
DOMAIN,
SERVICE_SETSCHEDULE,
_service_setschedule,
schema=SCHEMA_SERVICE_SETSCHEDULE,
)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the Netatmo energy sensors."""
return
class NetatmoThermostat(ClimateDevice): class NetatmoThermostat(ClimateDevice):
@ -153,7 +163,7 @@ class NetatmoThermostat(ClimateDevice):
self._state = None self._state = None
self._room_id = room_id self._room_id = room_id
self._room_name = self._data.homedata.rooms[self._data.home_id][room_id]["name"] self._room_name = self._data.homedata.rooms[self._data.home_id][room_id]["name"]
self._name = f"netatmo_{self._room_name}" self._name = f"{MANUFACTURER} {self._room_name}"
self._current_temperature = None self._current_temperature = None
self._target_temperature = None self._target_temperature = None
self._preset = None self._preset = None
@ -168,6 +178,23 @@ class NetatmoThermostat(ClimateDevice):
if self._module_type == NA_THERM: if self._module_type == NA_THERM:
self._operation_list.append(HVAC_MODE_OFF) self._operation_list.append(HVAC_MODE_OFF)
self._unique_id = f"{self._room_id}-{self._module_type}"
@property
def device_info(self):
"""Return the device info for the thermostat/valve."""
return {
"identifiers": {(DOMAIN, self._room_id)},
"name": self._room_name,
"manufacturer": MANUFACTURER,
"model": self._module_type,
}
@property
def unique_id(self):
"""Return a unique ID."""
return self._unique_id
@property @property
def supported_features(self): def supported_features(self):
"""Return the list of supported features.""" """Return the list of supported features."""
@ -330,7 +357,7 @@ class NetatmoThermostat(ClimateDevice):
except KeyError as err: except KeyError as err:
_LOGGER.error( _LOGGER.error(
"The thermostat in room %s seems to be out of reach. (%s)", "The thermostat in room %s seems to be out of reach. (%s)",
self._room_id, self._room_name,
err, err,
) )
self._away = self._hvac_mode == HVAC_MAP_NETATMO[STATE_NETATMO_AWAY] self._away = self._hvac_mode == HVAC_MAP_NETATMO[STATE_NETATMO_AWAY]
@ -350,7 +377,7 @@ class HomeData:
self.home = home self.home = home
self.home_id = None self.home_id = None
def get_home_ids(self): def get_all_home_ids(self):
"""Get all the home ids returned by NetAtmo API.""" """Get all the home ids returned by NetAtmo API."""
if self.homedata is None: if self.homedata is None:
return [] return []
@ -426,8 +453,6 @@ class ThermostatData:
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
_LOGGER.warning("Timed out when connecting to Netatmo server") _LOGGER.warning("Timed out when connecting to Netatmo server")
return return
_LOGGER.debug("Following is the debugging output for homestatus:")
_LOGGER.debug(self.homestatus.rawData)
for room in self.homestatus.rooms: for room in self.homestatus.rooms:
try: try:
roomstatus = {} roomstatus = {}

View file

@ -0,0 +1,56 @@
"""Config flow for Netatmo."""
import logging
from homeassistant import config_entries
from homeassistant.helpers import config_entry_oauth2_flow
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
class NetatmoFlowHandler(
config_entry_oauth2_flow.AbstractOAuth2FlowHandler, domain=DOMAIN
):
"""Config flow to handle Netatmo OAuth2 authentication."""
DOMAIN = DOMAIN
CONNECTION_CLASS = config_entries.CONN_CLASS_CLOUD_POLL
@property
def logger(self) -> logging.Logger:
"""Return logger."""
return logging.getLogger(__name__)
@property
def extra_authorize_data(self) -> dict:
"""Extra data that needs to be appended to the authorize url."""
return {
"scope": (
" ".join(
[
"read_station",
"read_camera",
"access_camera",
"write_camera",
"read_presence",
"access_presence",
"read_homecoach",
"read_smokedetector",
"read_thermostat",
"write_thermostat",
]
)
)
}
async def async_step_user(self, user_input=None):
"""Handle a flow start."""
if self.hass.config_entries.async_entries(DOMAIN):
return self.async_abort(reason="already_setup")
return await super().async_step_user(user_input)
async def async_step_homekit(self, homekit_info):
"""Handle HomeKit discovery."""
return await self.async_step_user()

View file

@ -1,5 +1,57 @@
"""Constants used by the Netatmo component.""" """Constants used by the Netatmo component."""
DOMAIN = "netatmo" from datetime import timedelta
DATA_NETATMO = "netatmo" API = "api"
DATA_NETATMO_AUTH = "netatmo_auth"
DOMAIN = "netatmo"
MANUFACTURER = "Netatmo"
AUTH = "netatmo_auth"
CONF_PUBLIC = "public_sensor_config"
CAMERA_DATA = "netatmo_camera"
HOME_DATA = "netatmo_home_data"
OAUTH2_AUTHORIZE = "https://api.netatmo.com/oauth2/authorize"
OAUTH2_TOKEN = "https://api.netatmo.com/oauth2/token"
DATA_PERSONS = "netatmo_persons"
NETATMO_WEBHOOK_URL = None
DEFAULT_PERSON = "Unknown"
DEFAULT_DISCOVERY = True
DEFAULT_WEBHOOKS = False
EVENT_PERSON = "person"
EVENT_MOVEMENT = "movement"
EVENT_HUMAN = "human"
EVENT_ANIMAL = "animal"
EVENT_VEHICLE = "vehicle"
EVENT_BUS_PERSON = "netatmo_person"
EVENT_BUS_MOVEMENT = "netatmo_movement"
EVENT_BUS_HUMAN = "netatmo_human"
EVENT_BUS_ANIMAL = "netatmo_animal"
EVENT_BUS_VEHICLE = "netatmo_vehicle"
EVENT_BUS_OTHER = "netatmo_other"
ATTR_ID = "id"
ATTR_PSEUDO = "pseudo"
ATTR_NAME = "name"
ATTR_EVENT_TYPE = "event_type"
ATTR_MESSAGE = "message"
ATTR_CAMERA_ID = "camera_id"
ATTR_HOME_ID = "home_id"
ATTR_HOME_NAME = "home_name"
ATTR_PERSONS = "persons"
ATTR_IS_KNOWN = "is_known"
ATTR_FACE_URL = "face_url"
ATTR_SNAPSHOT_URL = "snapshot_url"
ATTR_VIGNETTE_URL = "vignette_url"
ATTR_SCHEDULE_ID = "schedule_id"
ATTR_SCHEDULE_NAME = "schedule_name"
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)
MIN_TIME_BETWEEN_EVENT_UPDATES = timedelta(seconds=5)
SERVICE_SETSCHEDULE = "set_schedule"

View file

@ -2,7 +2,21 @@
"domain": "netatmo", "domain": "netatmo",
"name": "Netatmo", "name": "Netatmo",
"documentation": "https://www.home-assistant.io/integrations/netatmo", "documentation": "https://www.home-assistant.io/integrations/netatmo",
"requirements": ["pyatmo==3.1.0"], "requirements": [
"dependencies": ["webhook"], "pyatmo==3.2.0"
"codeowners": [] ],
} "dependencies": [
"webhook"
],
"codeowners": [
"@cgtobi"
],
"config_flow": true,
"homekit": {
"models": [
"Netatmo Relay",
"Presence",
"Welcome"
]
}
}

View file

@ -1,29 +1,20 @@
"""Support for the Netatmo Weather Service.""" """Support for the Netatmo Weather Service."""
from datetime import timedelta from datetime import timedelta
import logging import logging
import threading
from time import time from time import time
import pyatmo import pyatmo
import requests
import urllib3
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import ( from homeassistant.const import (
CONF_MODE,
CONF_NAME,
DEVICE_CLASS_BATTERY, DEVICE_CLASS_BATTERY,
DEVICE_CLASS_HUMIDITY, DEVICE_CLASS_HUMIDITY,
DEVICE_CLASS_TEMPERATURE, DEVICE_CLASS_TEMPERATURE,
TEMP_CELSIUS, TEMP_CELSIUS,
) )
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import call_later
from homeassistant.util import Throttle from homeassistant.util import Throttle
from .const import DATA_NETATMO_AUTH, DOMAIN from .const import AUTH, DOMAIN, MANUFACTURER
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -38,13 +29,11 @@ CONF_LON_SW = "lon_sw"
DEFAULT_MODE = "avg" DEFAULT_MODE = "avg"
MODE_TYPES = {"max", "avg"} MODE_TYPES = {"max", "avg"}
DEFAULT_NAME_PUBLIC = "Netatmo Public Data"
# This is the Netatmo data upload interval in seconds # This is the Netatmo data upload interval in seconds
NETATMO_UPDATE_INTERVAL = 600 NETATMO_UPDATE_INTERVAL = 600
# NetAtmo Public Data is uploaded to server every 10 minutes # NetAtmo Public Data is uploaded to server every 10 minutes
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=600) MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=NETATMO_UPDATE_INTERVAL)
SUPPORTED_PUBLIC_SENSOR_TYPES = [ SUPPORTED_PUBLIC_SENSOR_TYPES = [
"temperature", "temperature",
@ -90,26 +79,6 @@ SENSOR_TYPES = {
"health_idx": ["Health", "", "mdi:cloud", None], "health_idx": ["Health", "", "mdi:cloud", None],
} }
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_STATION): cv.string,
vol.Optional(CONF_MODULES): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_AREAS): vol.All(
cv.ensure_list,
[
{
vol.Required(CONF_LAT_NE): cv.latitude,
vol.Required(CONF_LAT_SW): cv.latitude,
vol.Required(CONF_LON_NE): cv.longitude,
vol.Required(CONF_LON_SW): cv.longitude,
vol.Optional(CONF_MODE, default=DEFAULT_MODE): vol.In(MODE_TYPES),
vol.Optional(CONF_NAME, default=DEFAULT_NAME_PUBLIC): cv.string,
}
],
),
}
)
MODULE_TYPE_OUTDOOR = "NAModule1" MODULE_TYPE_OUTDOOR = "NAModule1"
MODULE_TYPE_WIND = "NAModule2" MODULE_TYPE_WIND = "NAModule2"
MODULE_TYPE_RAIN = "NAModule3" MODULE_TYPE_RAIN = "NAModule3"
@ -122,75 +91,47 @@ NETATMO_DEVICE_TYPES = {
} }
def setup_platform(hass, config, add_entities, discovery_info=None): async def async_setup_entry(hass, entry, async_add_entities):
"""Set up the available Netatmo weather sensors.""" """Set up the Netatmo weather and homecoach platform."""
dev = [] auth = hass.data[DOMAIN][entry.entry_id][AUTH]
auth = hass.data[DATA_NETATMO_AUTH]
if config.get(CONF_AREAS) is not None: def find_entities(data):
for area in config[CONF_AREAS]: """Find all entities."""
data = NetatmoPublicData( all_module_infos = data.get_module_infos()
auth, entities = []
lat_ne=area[CONF_LAT_NE], for module in all_module_infos.values():
lon_ne=area[CONF_LON_NE], _LOGGER.debug("Adding module %s %s", module["module_name"], module["id"])
lat_sw=area[CONF_LAT_SW], for condition in data.station_data.monitoredConditions(
lon_sw=area[CONF_LON_SW], moduleId=module["id"]
) ):
for sensor_type in SUPPORTED_PUBLIC_SENSOR_TYPES: entities.append(NetatmoSensor(data, module, condition.lower()))
dev.append( return entities
NetatmoPublicSensor(
area[CONF_NAME], data, sensor_type, area[CONF_MODE]
)
)
else:
def find_devices(data): def get_entities():
"""Find all devices.""" """Retrieve Netatmo entities."""
all_module_infos = data.get_module_infos() entities = []
all_module_names = [e["module_name"] for e in all_module_infos.values()]
module_names = config.get(CONF_MODULES, all_module_names)
entities = []
for module_name in module_names:
if module_name not in all_module_names:
_LOGGER.info("Module %s not found", module_name)
for module in all_module_infos.values():
if module["module_name"] not in module_names:
continue
_LOGGER.debug(
"Adding module %s %s", module["module_name"], module["id"]
)
for condition in data.station_data.monitoredConditions(
moduleId=module["id"]
):
entities.append(NetatmoSensor(data, module, condition.lower()))
return entities
def _retry(_data):
try:
entities = find_devices(_data)
except requests.exceptions.Timeout:
return call_later(
hass, NETATMO_UPDATE_INTERVAL, lambda _: _retry(_data)
)
if entities:
add_entities(entities, True)
for data_class in [pyatmo.WeatherStationData, pyatmo.HomeCoachData]: for data_class in [pyatmo.WeatherStationData, pyatmo.HomeCoachData]:
try: try:
data = NetatmoData(auth, data_class, config.get(CONF_STATION)) dc_data = data_class(auth)
_LOGGER.debug("%s detected!", NETATMO_DEVICE_TYPES[data_class.__name__])
data = NetatmoData(auth, dc_data)
except pyatmo.NoDevice: except pyatmo.NoDevice:
_LOGGER.info( _LOGGER.debug(
"No %s devices found", NETATMO_DEVICE_TYPES[data_class.__name__] "No %s entities found", NETATMO_DEVICE_TYPES[data_class.__name__]
) )
continue continue
try: entities.extend(find_entities(data))
dev.extend(find_devices(data))
except requests.exceptions.Timeout:
call_later(hass, NETATMO_UPDATE_INTERVAL, lambda _: _retry(data))
if dev: return entities
add_entities(dev, True)
async_add_entities(await hass.async_add_executor_job(get_entities), True)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the Netatmo weather and homecoach platform."""
return
class NetatmoSensor(Entity): class NetatmoSensor(Entity):
@ -212,7 +153,7 @@ class NetatmoSensor(Entity):
f"{module_info['station_name']} {module_info['module_name']}" f"{module_info['station_name']} {module_info['module_name']}"
) )
self._name = f"{DOMAIN} {self.module_name} {SENSOR_TYPES[sensor_type][0]}" self._name = f"{MANUFACTURER} {self.module_name} {SENSOR_TYPES[sensor_type][0]}"
self.type = sensor_type self.type = sensor_type
self._state = None self._state = None
self._device_class = SENSOR_TYPES[self.type][3] self._device_class = SENSOR_TYPES[self.type][3]
@ -237,6 +178,16 @@ class NetatmoSensor(Entity):
"""Return the device class of the sensor.""" """Return the device class of the sensor."""
return self._device_class return self._device_class
@property
def device_info(self):
"""Return the device info for the sensor."""
return {
"identifiers": {(DOMAIN, self._module_id)},
"name": self.module_name,
"manufacturer": MANUFACTURER,
"model": self._module_type,
}
@property @property
def state(self): def state(self):
"""Return the state of the device.""" """Return the state of the device."""
@ -258,14 +209,15 @@ class NetatmoSensor(Entity):
if self.netatmo_data.data is None: if self.netatmo_data.data is None:
if self._state is None: if self._state is None:
return return
_LOGGER.warning("No data found for %s", self.module_name) _LOGGER.warning("No data from update")
self._state = None self._state = None
return return
data = self.netatmo_data.data.get(self._module_id) data = self.netatmo_data.data.get(self._module_id)
if data is None: if data is None:
_LOGGER.warning("No data found for %s", self.module_name) _LOGGER.info("No data found for %s (%s)", self.module_name, self._module_id)
_LOGGER.error("data: %s", self.netatmo_data.data)
self._state = None self._state = None
return return
@ -420,7 +372,7 @@ class NetatmoSensor(Entity):
elif data["health_idx"] == 4: elif data["health_idx"] == 4:
self._state = "Unhealthy" self._state = "Unhealthy"
except KeyError: except KeyError:
_LOGGER.error("No %s data found for %s", self.type, self.module_name) _LOGGER.info("No %s data found for %s", self.type, self.module_name)
self._state = None self._state = None
return return
@ -433,7 +385,7 @@ class NetatmoPublicSensor(Entity):
self.netatmo_data = data self.netatmo_data = data
self.type = sensor_type self.type = sensor_type
self._mode = mode self._mode = mode
self._name = "{} {}".format(area_name, SENSOR_TYPES[self.type][0]) self._name = f"{MANUFACTURER} {area_name} {SENSOR_TYPES[self.type][0]}"
self._area_name = area_name self._area_name = area_name
self._state = None self._state = None
self._device_class = SENSOR_TYPES[self.type][3] self._device_class = SENSOR_TYPES[self.type][3]
@ -455,6 +407,16 @@ class NetatmoPublicSensor(Entity):
"""Return the device class of the sensor.""" """Return the device class of the sensor."""
return self._device_class return self._device_class
@property
def device_info(self):
"""Return the device info for the sensor."""
return {
"identifiers": {(DOMAIN, self._area_name)},
"name": self._area_name,
"manufacturer": MANUFACTURER,
"model": "public",
}
@property @property
def state(self): def state(self):
"""Return the state of the device.""" """Return the state of the device."""
@ -470,7 +432,7 @@ class NetatmoPublicSensor(Entity):
self.netatmo_data.update() self.netatmo_data.update()
if self.netatmo_data.data is None: if self.netatmo_data.data is None:
_LOGGER.warning("No data found for %s", self._name) _LOGGER.info("No data found for %s", self._name)
self._state = None self._state = None
return return
@ -522,14 +484,21 @@ class NetatmoPublicData:
@Throttle(MIN_TIME_BETWEEN_UPDATES) @Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self): def update(self):
"""Request an update from the Netatmo API.""" """Request an update from the Netatmo API."""
data = pyatmo.PublicData( try:
self.auth, data = pyatmo.PublicData(
LAT_NE=self.lat_ne, self.auth,
LON_NE=self.lon_ne, LAT_NE=self.lat_ne,
LAT_SW=self.lat_sw, LON_NE=self.lon_ne,
LON_SW=self.lon_sw, LAT_SW=self.lat_sw,
filtering=True, LON_SW=self.lon_sw,
) filtering=True,
)
except pyatmo.NoDevice:
data = None
if not data:
_LOGGER.debug("No data received when updating public station data")
return
if data.CountStationInArea() == 0: if data.CountStationInArea() == 0:
_LOGGER.warning("No Stations available in this area.") _LOGGER.warning("No Stations available in this area.")
@ -541,83 +510,24 @@ class NetatmoPublicData:
class NetatmoData: class NetatmoData:
"""Get the latest data from Netatmo.""" """Get the latest data from Netatmo."""
def __init__(self, auth, data_class, station): def __init__(self, auth, station_data):
"""Initialize the data object.""" """Initialize the data object."""
self.auth = auth
self.data_class = data_class
self.data = {} self.data = {}
self.station_data = self.data_class(self.auth) self.station_data = station_data
self.station = station
self.station_id = None
if station:
station_data = self.station_data.stationByName(self.station)
if station_data:
self.station_id = station_data.get("_id")
self._next_update = time() self._next_update = time()
self._update_in_progress = threading.Lock() self.auth = auth
def get_module_infos(self): def get_module_infos(self):
"""Return all modules available on the API as a dict.""" """Return all modules available on the API as a dict."""
if self.station_id is not None:
return self.station_data.getModules(station_id=self.station_id)
return self.station_data.getModules() return self.station_data.getModules()
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self): def update(self):
"""Call the Netatmo API to update the data. """Call the Netatmo API to update the data."""
self.station_data = self.station_data.__class__(self.auth)
This method is not throttled by the builtin Throttle decorator data = self.station_data.lastData(exclude=3600, byId=True)
but with a custom logic, which takes into account the time if not data:
of the last update from the cloud. _LOGGER.debug("No data received when updating station data")
"""
if time() < self._next_update or not self._update_in_progress.acquire(False):
return return
try: self.data = data
try:
self.station_data = self.data_class(self.auth)
_LOGGER.debug("%s detected!", str(self.data_class.__name__))
except pyatmo.NoDevice:
_LOGGER.warning(
"No Weather or HomeCoach devices found for %s", str(self.station)
)
return
except (requests.exceptions.Timeout, urllib3.exceptions.ReadTimeoutError):
_LOGGER.warning("Timed out when connecting to Netatmo server.")
return
data = self.station_data.lastData(
station=self.station_id, exclude=3600, byId=True
)
if not data:
self._next_update = time() + NETATMO_UPDATE_INTERVAL
return
self.data = data
newinterval = 0
try:
for module in self.data:
if "When" in self.data[module]:
newinterval = self.data[module]["When"]
break
except TypeError:
_LOGGER.debug("No %s modules found", self.data_class.__name__)
if newinterval:
# Try and estimate when fresh data will be available
newinterval += NETATMO_UPDATE_INTERVAL - time()
if newinterval > NETATMO_UPDATE_INTERVAL - 30:
newinterval = NETATMO_UPDATE_INTERVAL
else:
if newinterval < NETATMO_UPDATE_INTERVAL / 2:
# Never hammer the Netatmo API more than
# twice per update interval
newinterval = NETATMO_UPDATE_INTERVAL / 2
_LOGGER.info(
"Netatmo refresh interval reset to %d seconds", newinterval
)
else:
# Last update time not found, fall back to default value
newinterval = NETATMO_UPDATE_INTERVAL
self._next_update = time() + newinterval
finally:
self._update_in_progress.release()

View file

@ -1,37 +1,10 @@
addwebhook: # Describes the format for available Netatmo services
description: Add webhook during runtime (e.g. if it has been banned).
fields:
url:
description: URL for which to add the webhook.
example: https://yourdomain.com:443/api/webhook/webhook_id
dropwebhook:
description: Drop active webhooks.
set_light_auto:
description: Set the camera (Presence only) light in automatic mode.
fields:
entity_id:
description: Entity id.
example: 'camera.living_room'
set_light_on:
description: Set the camera (Netatmo Presence only) light on.
fields:
entity_id:
description: Entity id.
example: 'camera.living_room'
set_light_off:
description: Set the camera (Netatmo Presence only) light off.
fields:
entity_id:
description: Entity id.
example: 'camera.living_room'
set_schedule: set_schedule:
description: Set the home heating schedule description: Set the heating schedule.
fields: fields:
schedule: schedule_name:
description: Schedule name description: Schedule name.
example: Standard example: Standard
home_name:
description: Home name.
example: MyHome

View file

@ -0,0 +1,18 @@
{
"config": {
"title": "Netatmo",
"step": {
"pick_implementation": {
"title": "Pick Authentication Method"
}
},
"abort": {
"already_setup": "You can only configure one Netatmo account.",
"authorize_url_timeout": "Timeout generating authorize url.",
"missing_configuration": "The Netatmo component is not configured. Please follow the documentation."
},
"create_entry": {
"default": "Successfully authenticated with Netatmo."
}
}
}

View file

@ -57,6 +57,7 @@ FLOWS = [
"mqtt", "mqtt",
"neato", "neato",
"nest", "nest",
"netatmo",
"notion", "notion",
"opentherm_gw", "opentherm_gw",
"openuv", "openuv",

View file

@ -32,6 +32,9 @@ ZEROCONF = {
HOMEKIT = { HOMEKIT = {
"BSB002": "hue", "BSB002": "hue",
"LIFX": "lifx", "LIFX": "lifx",
"Netatmo Relay": "netatmo",
"Presence": "netatmo",
"TRADFRI": "tradfri", "TRADFRI": "tradfri",
"Welcome": "netatmo",
"Wemo": "wemo" "Wemo": "wemo"
} }

View file

@ -1140,7 +1140,7 @@ pyalmond==0.0.2
pyarlo==0.2.3 pyarlo==0.2.3
# homeassistant.components.netatmo # homeassistant.components.netatmo
pyatmo==3.1.0 pyatmo==3.2.0
# homeassistant.components.atome # homeassistant.components.atome
pyatome==0.1.1 pyatome==0.1.1

View file

@ -398,6 +398,9 @@ pyalmond==0.0.2
# homeassistant.components.arlo # homeassistant.components.arlo
pyarlo==0.2.3 pyarlo==0.2.3
# homeassistant.components.netatmo
pyatmo==3.2.0
# homeassistant.components.blackbird # homeassistant.components.blackbird
pyblackbird==0.5 pyblackbird==0.5

View file

@ -0,0 +1,93 @@
"""Test the Netatmo config flow."""
from homeassistant import config_entries, data_entry_flow, setup
from homeassistant.components.netatmo import config_flow
from homeassistant.components.netatmo.const import (
DOMAIN,
OAUTH2_AUTHORIZE,
OAUTH2_TOKEN,
)
from homeassistant.helpers import config_entry_oauth2_flow
from tests.common import MockConfigEntry
CLIENT_ID = "1234"
CLIENT_SECRET = "5678"
async def test_abort_if_existing_entry(hass):
"""Check flow abort when an entry already exist."""
MockConfigEntry(domain=DOMAIN).add_to_hass(hass)
flow = config_flow.NetatmoFlowHandler()
flow.hass = hass
result = await hass.config_entries.flow.async_init(
"netatmo", context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_setup"
result = await hass.config_entries.flow.async_init(
"netatmo",
context={"source": "homekit"},
data={"host": "0.0.0.0", "properties": {"id": "aa:bb:cc:dd:ee:ff"}},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_setup"
async def test_full_flow(hass, aiohttp_client, aioclient_mock):
"""Check full flow."""
assert await setup.async_setup_component(
hass,
"netatmo",
{
"netatmo": {"client_id": CLIENT_ID, "client_secret": CLIENT_SECRET},
"http": {"base_url": "https://example.com"},
},
)
result = await hass.config_entries.flow.async_init(
"netatmo", context={"source": config_entries.SOURCE_USER}
)
state = config_entry_oauth2_flow._encode_jwt(hass, {"flow_id": result["flow_id"]})
scope = "+".join(
[
"read_station",
"read_camera",
"access_camera",
"write_camera",
"read_presence",
"access_presence",
"read_homecoach",
"read_smokedetector",
"read_thermostat",
"write_thermostat",
]
)
assert result["url"] == (
f"{OAUTH2_AUTHORIZE}?response_type=code&client_id={CLIENT_ID}"
"&redirect_uri=https://example.com/auth/external/callback"
f"&state={state}&scope={scope}"
)
client = await aiohttp_client(hass.http.app)
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
assert resp.status == 200
assert resp.headers["content-type"] == "text/html; charset=utf-8"
aioclient_mock.post(
OAUTH2_TOKEN,
json={
"refresh_token": "mock-refresh-token",
"access_token": "mock-access-token",
"type": "Bearer",
"expires_in": 60,
},
)
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert len(hass.config_entries.async_entries(DOMAIN)) == 1