* Remove unnecessary exception re-wraps * Preserve exception chains on re-raise We slap "from cause" to almost all possible cases here. In some cases it could conceivably be better to do "from None" if we really want to hide the cause. However those should be in the minority, and "from cause" should be an improvement over the corresponding raise without a "from" in all cases anyway. The only case where we raise from None here is in plex, where the exception for an original invalid SSL cert is not the root cause for failure to validate a newly fetched one. Follow local convention on exception variable names if there is a consistent one, otherwise `err` to match with majority of codebase. * Fix mistaken re-wrap in homematicip_cloud/hap.py Missed the difference between HmipConnectionError and HmipcConnectionError. * Do not hide original error on plex new cert validation error Original is not the cause for the new one, but showing old in the traceback is useful nevertheless.
470 lines
15 KiB
Python
470 lines
15 KiB
Python
"""iCloud account."""
|
|
from datetime import timedelta
|
|
import logging
|
|
import operator
|
|
from typing import Dict
|
|
|
|
from pyicloud import PyiCloudService
|
|
from pyicloud.exceptions import (
|
|
PyiCloudFailedLoginException,
|
|
PyiCloudNoDevicesException,
|
|
PyiCloudServiceNotActivatedException,
|
|
)
|
|
from pyicloud.services.findmyiphone import AppleDevice
|
|
|
|
from homeassistant.components.zone import async_active_zone
|
|
from homeassistant.const import ATTR_ATTRIBUTION
|
|
from homeassistant.exceptions import ConfigEntryNotReady
|
|
from homeassistant.helpers.dispatcher import dispatcher_send
|
|
from homeassistant.helpers.event import track_point_in_utc_time
|
|
from homeassistant.helpers.storage import Store
|
|
from homeassistant.helpers.typing import HomeAssistantType
|
|
from homeassistant.util import slugify
|
|
from homeassistant.util.async_ import run_callback_threadsafe
|
|
from homeassistant.util.dt import utcnow
|
|
from homeassistant.util.location import distance
|
|
|
|
from .const import (
|
|
DEVICE_BATTERY_LEVEL,
|
|
DEVICE_BATTERY_STATUS,
|
|
DEVICE_CLASS,
|
|
DEVICE_DISPLAY_NAME,
|
|
DEVICE_ID,
|
|
DEVICE_LOCATION,
|
|
DEVICE_LOCATION_HORIZONTAL_ACCURACY,
|
|
DEVICE_LOCATION_LATITUDE,
|
|
DEVICE_LOCATION_LONGITUDE,
|
|
DEVICE_LOST_MODE_CAPABLE,
|
|
DEVICE_LOW_POWER_MODE,
|
|
DEVICE_NAME,
|
|
DEVICE_PERSON_ID,
|
|
DEVICE_RAW_DEVICE_MODEL,
|
|
DEVICE_STATUS,
|
|
DEVICE_STATUS_CODES,
|
|
DEVICE_STATUS_SET,
|
|
DOMAIN,
|
|
)
|
|
|
|
ATTRIBUTION = "Data provided by Apple iCloud"
|
|
|
|
# entity attributes
|
|
ATTR_ACCOUNT_FETCH_INTERVAL = "account_fetch_interval"
|
|
ATTR_BATTERY = "battery"
|
|
ATTR_BATTERY_STATUS = "battery_status"
|
|
ATTR_DEVICE_NAME = "device_name"
|
|
ATTR_DEVICE_STATUS = "device_status"
|
|
ATTR_LOW_POWER_MODE = "low_power_mode"
|
|
ATTR_OWNER_NAME = "owner_fullname"
|
|
|
|
# services
|
|
SERVICE_ICLOUD_PLAY_SOUND = "play_sound"
|
|
SERVICE_ICLOUD_DISPLAY_MESSAGE = "display_message"
|
|
SERVICE_ICLOUD_LOST_DEVICE = "lost_device"
|
|
SERVICE_ICLOUD_UPDATE = "update"
|
|
ATTR_ACCOUNT = "account"
|
|
ATTR_LOST_DEVICE_MESSAGE = "message"
|
|
ATTR_LOST_DEVICE_NUMBER = "number"
|
|
ATTR_LOST_DEVICE_SOUND = "sound"
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class IcloudAccount:
|
|
"""Representation of an iCloud account."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistantType,
|
|
username: str,
|
|
password: str,
|
|
icloud_dir: Store,
|
|
with_family: bool,
|
|
max_interval: int,
|
|
gps_accuracy_threshold: int,
|
|
):
|
|
"""Initialize an iCloud account."""
|
|
self.hass = hass
|
|
self._username = username
|
|
self._password = password
|
|
self._with_family = with_family
|
|
self._fetch_interval = max_interval
|
|
self._max_interval = max_interval
|
|
self._gps_accuracy_threshold = gps_accuracy_threshold
|
|
|
|
self._icloud_dir = icloud_dir
|
|
|
|
self.api: PyiCloudService = None
|
|
self._owner_fullname = None
|
|
self._family_members_fullname = {}
|
|
self._devices = {}
|
|
self._retried_fetch = False
|
|
|
|
self.listeners = []
|
|
|
|
def setup(self) -> None:
|
|
"""Set up an iCloud account."""
|
|
try:
|
|
self.api = PyiCloudService(
|
|
self._username,
|
|
self._password,
|
|
self._icloud_dir.path,
|
|
with_family=self._with_family,
|
|
)
|
|
except PyiCloudFailedLoginException as error:
|
|
self.api = None
|
|
_LOGGER.error("Error logging into iCloud Service: %s", error)
|
|
return
|
|
|
|
try:
|
|
api_devices = self.api.devices
|
|
# Gets device owners infos
|
|
user_info = api_devices.response["userInfo"]
|
|
except (
|
|
PyiCloudServiceNotActivatedException,
|
|
PyiCloudNoDevicesException,
|
|
) as err:
|
|
_LOGGER.error("No iCloud device found")
|
|
raise ConfigEntryNotReady from err
|
|
|
|
self._owner_fullname = f"{user_info['firstName']} {user_info['lastName']}"
|
|
|
|
self._family_members_fullname = {}
|
|
if user_info.get("membersInfo") is not None:
|
|
for prs_id, member in user_info["membersInfo"].items():
|
|
self._family_members_fullname[
|
|
prs_id
|
|
] = f"{member['firstName']} {member['lastName']}"
|
|
|
|
self._devices = {}
|
|
self.update_devices()
|
|
|
|
def update_devices(self) -> None:
|
|
"""Update iCloud devices."""
|
|
if self.api is None:
|
|
return
|
|
|
|
api_devices = {}
|
|
try:
|
|
api_devices = self.api.devices
|
|
except Exception as err: # pylint: disable=broad-except
|
|
_LOGGER.error("Unknown iCloud error: %s", err)
|
|
self._fetch_interval = 2
|
|
dispatcher_send(self.hass, self.signal_device_update)
|
|
track_point_in_utc_time(
|
|
self.hass,
|
|
self.keep_alive,
|
|
utcnow() + timedelta(minutes=self._fetch_interval),
|
|
)
|
|
return
|
|
|
|
# Gets devices infos
|
|
new_device = False
|
|
for device in api_devices:
|
|
status = device.status(DEVICE_STATUS_SET)
|
|
device_id = status[DEVICE_ID]
|
|
device_name = status[DEVICE_NAME]
|
|
|
|
if (
|
|
status[DEVICE_BATTERY_STATUS] == "Unknown"
|
|
or status.get(DEVICE_BATTERY_LEVEL) is None
|
|
):
|
|
continue
|
|
|
|
if self._devices.get(device_id) is not None:
|
|
# Seen device -> updating
|
|
_LOGGER.debug("Updating iCloud device: %s", device_name)
|
|
self._devices[device_id].update(status)
|
|
else:
|
|
# New device, should be unique
|
|
_LOGGER.debug(
|
|
"Adding iCloud device: %s [model: %s]",
|
|
device_name,
|
|
status[DEVICE_RAW_DEVICE_MODEL],
|
|
)
|
|
self._devices[device_id] = IcloudDevice(self, device, status)
|
|
self._devices[device_id].update(status)
|
|
new_device = True
|
|
|
|
if (
|
|
DEVICE_STATUS_CODES.get(list(api_devices)[0][DEVICE_STATUS]) == "pending"
|
|
and not self._retried_fetch
|
|
):
|
|
_LOGGER.debug("Pending devices, trying again in 15s")
|
|
self._fetch_interval = 0.25
|
|
self._retried_fetch = True
|
|
else:
|
|
self._fetch_interval = self._determine_interval()
|
|
self._retried_fetch = False
|
|
|
|
dispatcher_send(self.hass, self.signal_device_update)
|
|
if new_device:
|
|
dispatcher_send(self.hass, self.signal_device_new)
|
|
|
|
track_point_in_utc_time(
|
|
self.hass,
|
|
self.keep_alive,
|
|
utcnow() + timedelta(minutes=self._fetch_interval),
|
|
)
|
|
|
|
def _determine_interval(self) -> int:
|
|
"""Calculate new interval between two API fetch (in minutes)."""
|
|
intervals = {"default": self._max_interval}
|
|
for device in self._devices.values():
|
|
# Max interval if no location
|
|
if device.location is None:
|
|
continue
|
|
|
|
current_zone = run_callback_threadsafe(
|
|
self.hass.loop,
|
|
async_active_zone,
|
|
self.hass,
|
|
device.location[DEVICE_LOCATION_LATITUDE],
|
|
device.location[DEVICE_LOCATION_LONGITUDE],
|
|
device.location[DEVICE_LOCATION_HORIZONTAL_ACCURACY],
|
|
).result()
|
|
|
|
# Max interval if in zone
|
|
if current_zone is not None:
|
|
continue
|
|
|
|
zones = (
|
|
self.hass.states.get(entity_id)
|
|
for entity_id in sorted(self.hass.states.entity_ids("zone"))
|
|
)
|
|
|
|
distances = []
|
|
for zone_state in zones:
|
|
zone_state_lat = zone_state.attributes[DEVICE_LOCATION_LATITUDE]
|
|
zone_state_long = zone_state.attributes[DEVICE_LOCATION_LONGITUDE]
|
|
zone_distance = distance(
|
|
device.location[DEVICE_LOCATION_LATITUDE],
|
|
device.location[DEVICE_LOCATION_LONGITUDE],
|
|
zone_state_lat,
|
|
zone_state_long,
|
|
)
|
|
distances.append(round(zone_distance / 1000, 1))
|
|
|
|
# Max interval if no zone
|
|
if not distances:
|
|
continue
|
|
mindistance = min(distances)
|
|
|
|
# Calculate out how long it would take for the device to drive
|
|
# to the nearest zone at 120 km/h:
|
|
interval = round(mindistance / 2, 0)
|
|
|
|
# Never poll more than once per minute
|
|
interval = max(interval, 1)
|
|
|
|
if interval > 180:
|
|
# Three hour drive?
|
|
# This is far enough that they might be flying
|
|
interval = self._max_interval
|
|
|
|
if (
|
|
device.battery_level is not None
|
|
and device.battery_level <= 33
|
|
and mindistance > 3
|
|
):
|
|
# Low battery - let's check half as often
|
|
interval = interval * 2
|
|
|
|
intervals[device.name] = interval
|
|
|
|
return max(
|
|
int(min(intervals.items(), key=operator.itemgetter(1))[1]),
|
|
self._max_interval,
|
|
)
|
|
|
|
def keep_alive(self, now=None) -> None:
|
|
"""Keep the API alive."""
|
|
if self.api is None:
|
|
self.setup()
|
|
|
|
if self.api is None:
|
|
return
|
|
|
|
self.api.authenticate()
|
|
self.update_devices()
|
|
|
|
def get_devices_with_name(self, name: str) -> [any]:
|
|
"""Get devices by name."""
|
|
result = []
|
|
name_slug = slugify(name.replace(" ", "", 99))
|
|
for device in self.devices.values():
|
|
if slugify(device.name.replace(" ", "", 99)) == name_slug:
|
|
result.append(device)
|
|
if not result:
|
|
raise Exception(f"No device with name {name}")
|
|
return result
|
|
|
|
@property
|
|
def username(self) -> str:
|
|
"""Return the account username."""
|
|
return self._username
|
|
|
|
@property
|
|
def owner_fullname(self) -> str:
|
|
"""Return the account owner fullname."""
|
|
return self._owner_fullname
|
|
|
|
@property
|
|
def family_members_fullname(self) -> Dict[str, str]:
|
|
"""Return the account family members fullname."""
|
|
return self._family_members_fullname
|
|
|
|
@property
|
|
def fetch_interval(self) -> int:
|
|
"""Return the account fetch interval."""
|
|
return self._fetch_interval
|
|
|
|
@property
|
|
def devices(self) -> Dict[str, any]:
|
|
"""Return the account devices."""
|
|
return self._devices
|
|
|
|
@property
|
|
def signal_device_new(self) -> str:
|
|
"""Event specific per Freebox entry to signal new device."""
|
|
return f"{DOMAIN}-{self._username}-device-new"
|
|
|
|
@property
|
|
def signal_device_update(self) -> str:
|
|
"""Event specific per Freebox entry to signal updates in devices."""
|
|
return f"{DOMAIN}-{self._username}-device-update"
|
|
|
|
|
|
class IcloudDevice:
|
|
"""Representation of a iCloud device."""
|
|
|
|
def __init__(self, account: IcloudAccount, device: AppleDevice, status):
|
|
"""Initialize the iCloud device."""
|
|
self._account = account
|
|
|
|
self._device = device
|
|
self._status = status
|
|
|
|
self._name = self._status[DEVICE_NAME]
|
|
self._device_id = self._status[DEVICE_ID]
|
|
self._device_class = self._status[DEVICE_CLASS]
|
|
self._device_model = self._status[DEVICE_DISPLAY_NAME]
|
|
|
|
if self._status[DEVICE_PERSON_ID]:
|
|
owner_fullname = account.family_members_fullname[
|
|
self._status[DEVICE_PERSON_ID]
|
|
]
|
|
else:
|
|
owner_fullname = account.owner_fullname
|
|
|
|
self._battery_level = None
|
|
self._battery_status = None
|
|
self._location = None
|
|
|
|
self._attrs = {
|
|
ATTR_ATTRIBUTION: ATTRIBUTION,
|
|
ATTR_ACCOUNT_FETCH_INTERVAL: self._account.fetch_interval,
|
|
ATTR_DEVICE_NAME: self._device_model,
|
|
ATTR_DEVICE_STATUS: None,
|
|
ATTR_OWNER_NAME: owner_fullname,
|
|
}
|
|
|
|
def update(self, status) -> None:
|
|
"""Update the iCloud device."""
|
|
self._status = status
|
|
|
|
self._status[ATTR_ACCOUNT_FETCH_INTERVAL] = self._account.fetch_interval
|
|
|
|
device_status = DEVICE_STATUS_CODES.get(self._status[DEVICE_STATUS], "error")
|
|
self._attrs[ATTR_DEVICE_STATUS] = device_status
|
|
|
|
self._battery_status = self._status[DEVICE_BATTERY_STATUS]
|
|
self._attrs[ATTR_BATTERY_STATUS] = self._battery_status
|
|
device_battery_level = self._status.get(DEVICE_BATTERY_LEVEL, 0)
|
|
if self._battery_status != "Unknown" and device_battery_level is not None:
|
|
self._battery_level = int(device_battery_level * 100)
|
|
self._attrs[ATTR_BATTERY] = self._battery_level
|
|
self._attrs[ATTR_LOW_POWER_MODE] = self._status[DEVICE_LOW_POWER_MODE]
|
|
|
|
if (
|
|
self._status[DEVICE_LOCATION]
|
|
and self._status[DEVICE_LOCATION][DEVICE_LOCATION_LATITUDE]
|
|
):
|
|
location = self._status[DEVICE_LOCATION]
|
|
if self._location is None:
|
|
dispatcher_send(self._account.hass, self._account.signal_device_new)
|
|
self._location = location
|
|
|
|
def play_sound(self) -> None:
|
|
"""Play sound on the device."""
|
|
if self._account.api is None:
|
|
return
|
|
|
|
self._account.api.authenticate()
|
|
_LOGGER.debug("Playing sound for %s", self.name)
|
|
self.device.play_sound()
|
|
|
|
def display_message(self, message: str, sound: bool = False) -> None:
|
|
"""Display a message on the device."""
|
|
if self._account.api is None:
|
|
return
|
|
|
|
self._account.api.authenticate()
|
|
_LOGGER.debug("Displaying message for %s", self.name)
|
|
self.device.display_message("Subject not working", message, sound)
|
|
|
|
def lost_device(self, number: str, message: str) -> None:
|
|
"""Make the device in lost state."""
|
|
if self._account.api is None:
|
|
return
|
|
|
|
self._account.api.authenticate()
|
|
if self._status[DEVICE_LOST_MODE_CAPABLE]:
|
|
_LOGGER.debug("Make device lost for %s", self.name)
|
|
self.device.lost_device(number, message, None)
|
|
else:
|
|
_LOGGER.error("Cannot make device lost for %s", self.name)
|
|
|
|
@property
|
|
def unique_id(self) -> str:
|
|
"""Return a unique ID."""
|
|
return self._device_id
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
"""Return the Apple device name."""
|
|
return self._name
|
|
|
|
@property
|
|
def device(self) -> AppleDevice:
|
|
"""Return the Apple device."""
|
|
return self._device
|
|
|
|
@property
|
|
def device_class(self) -> str:
|
|
"""Return the Apple device class."""
|
|
return self._device_class
|
|
|
|
@property
|
|
def device_model(self) -> str:
|
|
"""Return the Apple device model."""
|
|
return self._device_model
|
|
|
|
@property
|
|
def battery_level(self) -> int:
|
|
"""Return the Apple device battery level."""
|
|
return self._battery_level
|
|
|
|
@property
|
|
def battery_status(self) -> str:
|
|
"""Return the Apple device battery status."""
|
|
return self._battery_status
|
|
|
|
@property
|
|
def location(self) -> Dict[str, any]:
|
|
"""Return the Apple device location."""
|
|
return self._location
|
|
|
|
@property
|
|
def state_attributes(self) -> Dict[str, any]:
|
|
"""Return the attributes."""
|
|
return self._attrs
|