hass-core/homeassistant/components/device_tracker/setup.py
Paulus Schoutsen e6d7f6ed71
Config entry device tracker (#24040)
* Move zone helpers to zone root

* Add config entry support to device tracker

* Convert Geofency

* Convert GPSLogger

* Track unsub per entry

* Convert locative

* Migrate OwnTracks

* Lint

* location -> latitude, longitude props

* Lint

* lint

* Fix test
2019-05-25 13:34:53 -07:00

184 lines
6 KiB
Python

"""Device tracker helpers."""
import asyncio
from typing import Dict, Any, Callable, Optional
from types import ModuleType
import attr
from homeassistant.core import callback
from homeassistant.setup import async_prepare_setup_platform
from homeassistant.helpers import config_per_platform
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.typing import ConfigType, HomeAssistantType
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.util import dt as dt_util
from homeassistant.const import (
ATTR_LATITUDE,
ATTR_LONGITUDE,
)
from .const import (
DOMAIN,
PLATFORM_TYPE_LEGACY,
CONF_SCAN_INTERVAL,
SCAN_INTERVAL,
SOURCE_TYPE_ROUTER,
LOGGER,
)
@attr.s
class DeviceTrackerPlatform:
"""Class to hold platform information."""
LEGACY_SETUP = (
'async_get_scanner',
'get_scanner',
'async_setup_scanner',
'setup_scanner',
)
name = attr.ib(type=str)
platform = attr.ib(type=ModuleType)
config = attr.ib(type=Dict)
@property
def type(self):
"""Return platform type."""
for methods, platform_type in (
(self.LEGACY_SETUP, PLATFORM_TYPE_LEGACY),
):
for meth in methods:
if hasattr(self.platform, meth):
return platform_type
return None
async def async_setup_legacy(self, hass, tracker, discovery_info=None):
"""Set up a legacy platform."""
LOGGER.info("Setting up %s.%s", DOMAIN, self.type)
try:
scanner = None
setup = None
if hasattr(self.platform, 'async_get_scanner'):
scanner = await self.platform.async_get_scanner(
hass, {DOMAIN: self.config})
elif hasattr(self.platform, 'get_scanner'):
scanner = await hass.async_add_job(
self.platform.get_scanner, hass, {DOMAIN: self.config})
elif hasattr(self.platform, 'async_setup_scanner'):
setup = await self.platform.async_setup_scanner(
hass, self.config, tracker.async_see, discovery_info)
elif hasattr(self.platform, 'setup_scanner'):
setup = await hass.async_add_job(
self.platform.setup_scanner, hass, self.config,
tracker.see, discovery_info)
else:
raise HomeAssistantError(
"Invalid legacy device_tracker platform.")
if scanner:
async_setup_scanner_platform(
hass, self.config, scanner, tracker.async_see, self.type)
return
if not setup:
LOGGER.error("Error setting up platform %s", self.type)
return
except Exception: # pylint: disable=broad-except
LOGGER.exception("Error setting up platform %s", self.type)
async def async_extract_config(hass, config):
"""Extract device tracker config and split between legacy and modern."""
legacy = []
for platform in await asyncio.gather(*[
async_create_platform_type(hass, config, p_type, p_config)
for p_type, p_config in config_per_platform(config, DOMAIN)
]):
if platform is None:
continue
if platform.type == PLATFORM_TYPE_LEGACY:
legacy.append(platform)
else:
raise ValueError("Unable to determine type for {}: {}".format(
platform.name, platform.type))
return legacy
async def async_create_platform_type(hass, config, p_type, p_config) \
-> Optional[DeviceTrackerPlatform]:
"""Determine type of platform."""
platform = await async_prepare_setup_platform(
hass, config, DOMAIN, p_type)
if platform is None:
return None
return DeviceTrackerPlatform(p_type, platform, p_config)
@callback
def async_setup_scanner_platform(hass: HomeAssistantType, config: ConfigType,
scanner: Any, async_see_device: Callable,
platform: str):
"""Set up the connect scanner-based platform to device tracker.
This method must be run in the event loop.
"""
interval = config.get(CONF_SCAN_INTERVAL, SCAN_INTERVAL)
update_lock = asyncio.Lock()
scanner.hass = hass
# Initial scan of each mac we also tell about host name for config
seen = set() # type: Any
async def async_device_tracker_scan(now: dt_util.dt.datetime):
"""Handle interval matches."""
if update_lock.locked():
LOGGER.warning(
"Updating device list from %s took longer than the scheduled "
"scan interval %s", platform, interval)
return
async with update_lock:
found_devices = await scanner.async_scan_devices()
for mac in found_devices:
if mac in seen:
host_name = None
else:
host_name = await scanner.async_get_device_name(mac)
seen.add(mac)
try:
extra_attributes = \
await scanner.async_get_extra_attributes(mac)
except NotImplementedError:
extra_attributes = dict()
kwargs = {
'mac': mac,
'host_name': host_name,
'source_type': SOURCE_TYPE_ROUTER,
'attributes': {
'scanner': scanner.__class__.__name__,
**extra_attributes
}
}
zone_home = hass.states.get(hass.components.zone.ENTITY_ID_HOME)
if zone_home:
kwargs['gps'] = [zone_home.attributes[ATTR_LATITUDE],
zone_home.attributes[ATTR_LONGITUDE]]
kwargs['gps_accuracy'] = 0
hass.async_create_task(async_see_device(**kwargs))
async_track_time_interval(hass, async_device_tracker_scan, interval)
hass.async_create_task(async_device_tracker_scan(None))