Change line endings to LN (#7660)

This commit is contained in:
Fabian Affolter 2017-05-19 16:39:13 +02:00 committed by Paulus Schoutsen
parent d369d70ca5
commit b5c54864ac
6 changed files with 843 additions and 843 deletions

View file

@ -1,82 +1,82 @@
"""
Demo platform that has two fake binary sensors.
For more details about this platform, please refer to the documentation
https://home-assistant.io/components/demo/
"""
import homeassistant.util.dt as dt_util
from homeassistant.components.calendar import CalendarEventDevice
from homeassistant.components.google import CONF_DEVICE_ID, CONF_NAME
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the Demo Calendar platform."""
calendar_data_future = DemoGoogleCalendarDataFuture()
calendar_data_current = DemoGoogleCalendarDataCurrent()
add_devices([
DemoGoogleCalendar(hass, calendar_data_future, {
CONF_NAME: 'Future Event',
CONF_DEVICE_ID: 'future_event',
}),
DemoGoogleCalendar(hass, calendar_data_current, {
CONF_NAME: 'Current Event',
CONF_DEVICE_ID: 'current_event',
}),
])
class DemoGoogleCalendarData(object):
"""Representation of a Demo Calendar element."""
# pylint: disable=no-self-use
def update(self):
"""Return true so entity knows we have new data."""
return True
class DemoGoogleCalendarDataFuture(DemoGoogleCalendarData):
"""Representation of a Demo Calendar for a future event."""
def __init__(self):
"""Set the event to a future event."""
one_hour_from_now = dt_util.now() \
+ dt_util.dt.timedelta(minutes=30)
self.event = {
'start': {
'dateTime': one_hour_from_now.isoformat()
},
'end': {
'dateTime': (one_hour_from_now + dt_util.dt.
timedelta(minutes=60)).isoformat()
},
'summary': 'Future Event',
}
class DemoGoogleCalendarDataCurrent(DemoGoogleCalendarData):
"""Representation of a Demo Calendar for a current event."""
def __init__(self):
"""Set the event data."""
middle_of_event = dt_util.now() \
- dt_util.dt.timedelta(minutes=30)
self.event = {
'start': {
'dateTime': middle_of_event.isoformat()
},
'end': {
'dateTime': (middle_of_event + dt_util.dt.
timedelta(minutes=60)).isoformat()
},
'summary': 'Current Event',
}
class DemoGoogleCalendar(CalendarEventDevice):
"""Representation of a Demo Calendar element."""
def __init__(self, hass, calendar_data, data):
"""Initialize Google Calendar but without the API calls."""
self.data = calendar_data
super().__init__(hass, data)
"""
Demo platform that has two fake binary sensors.
For more details about this platform, please refer to the documentation
https://home-assistant.io/components/demo/
"""
import homeassistant.util.dt as dt_util
from homeassistant.components.calendar import CalendarEventDevice
from homeassistant.components.google import CONF_DEVICE_ID, CONF_NAME
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the Demo Calendar platform."""
calendar_data_future = DemoGoogleCalendarDataFuture()
calendar_data_current = DemoGoogleCalendarDataCurrent()
add_devices([
DemoGoogleCalendar(hass, calendar_data_future, {
CONF_NAME: 'Future Event',
CONF_DEVICE_ID: 'future_event',
}),
DemoGoogleCalendar(hass, calendar_data_current, {
CONF_NAME: 'Current Event',
CONF_DEVICE_ID: 'current_event',
}),
])
class DemoGoogleCalendarData(object):
"""Representation of a Demo Calendar element."""
# pylint: disable=no-self-use
def update(self):
"""Return true so entity knows we have new data."""
return True
class DemoGoogleCalendarDataFuture(DemoGoogleCalendarData):
"""Representation of a Demo Calendar for a future event."""
def __init__(self):
"""Set the event to a future event."""
one_hour_from_now = dt_util.now() \
+ dt_util.dt.timedelta(minutes=30)
self.event = {
'start': {
'dateTime': one_hour_from_now.isoformat()
},
'end': {
'dateTime': (one_hour_from_now + dt_util.dt.
timedelta(minutes=60)).isoformat()
},
'summary': 'Future Event',
}
class DemoGoogleCalendarDataCurrent(DemoGoogleCalendarData):
"""Representation of a Demo Calendar for a current event."""
def __init__(self):
"""Set the event data."""
middle_of_event = dt_util.now() \
- dt_util.dt.timedelta(minutes=30)
self.event = {
'start': {
'dateTime': middle_of_event.isoformat()
},
'end': {
'dateTime': (middle_of_event + dt_util.dt.
timedelta(minutes=60)).isoformat()
},
'summary': 'Current Event',
}
class DemoGoogleCalendar(CalendarEventDevice):
"""Representation of a Demo Calendar element."""
def __init__(self, hass, calendar_data, data):
"""Initialize Google Calendar but without the API calls."""
self.data = calendar_data
super().__init__(hass, data)

View file

@ -1,78 +1,78 @@
"""
Support for Google Calendar Search binary sensors.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/binary_sensor.google_calendar/
"""
# pylint: disable=import-error
import logging
from datetime import timedelta
from homeassistant.components.calendar import CalendarEventDevice
from homeassistant.components.google import (
CONF_CAL_ID, CONF_ENTITIES, CONF_TRACK, TOKEN_FILE,
GoogleCalendarService)
from homeassistant.util import Throttle, dt
_LOGGER = logging.getLogger(__name__)
DEFAULT_GOOGLE_SEARCH_PARAMS = {
'orderBy': 'startTime',
'maxResults': 1,
'singleEvents': True,
}
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=15)
def setup_platform(hass, config, add_devices, disc_info=None):
"""Set up the calendar platform for event devices."""
if disc_info is None:
return
if not any([data[CONF_TRACK] for data in disc_info[CONF_ENTITIES]]):
return
calendar_service = GoogleCalendarService(hass.config.path(TOKEN_FILE))
add_devices([GoogleCalendarEventDevice(hass, calendar_service,
disc_info[CONF_CAL_ID], data)
for data in disc_info[CONF_ENTITIES] if data[CONF_TRACK]])
# pylint: disable=too-many-instance-attributes
class GoogleCalendarEventDevice(CalendarEventDevice):
"""A calendar event device."""
def __init__(self, hass, calendar_service, calendar, data):
"""Create the Calendar event device."""
self.data = GoogleCalendarData(calendar_service, calendar,
data.get('search', None))
super().__init__(hass, data)
class GoogleCalendarData(object):
"""Class to utilize calendar service object to get next event."""
def __init__(self, calendar_service, calendar_id, search=None):
"""Set up how we are going to search the google calendar."""
self.calendar_service = calendar_service
self.calendar_id = calendar_id
self.search = search
self.event = None
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Get the latest data."""
service = self.calendar_service.get()
params = dict(DEFAULT_GOOGLE_SEARCH_PARAMS)
params['timeMin'] = dt.now().isoformat('T')
params['calendarId'] = self.calendar_id
if self.search:
params['q'] = self.search
events = service.events() # pylint: disable=no-member
result = events.list(**params).execute()
items = result.get('items', [])
self.event = items[0] if len(items) == 1 else None
return True
"""
Support for Google Calendar Search binary sensors.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/binary_sensor.google_calendar/
"""
# pylint: disable=import-error
import logging
from datetime import timedelta
from homeassistant.components.calendar import CalendarEventDevice
from homeassistant.components.google import (
CONF_CAL_ID, CONF_ENTITIES, CONF_TRACK, TOKEN_FILE,
GoogleCalendarService)
from homeassistant.util import Throttle, dt
_LOGGER = logging.getLogger(__name__)
DEFAULT_GOOGLE_SEARCH_PARAMS = {
'orderBy': 'startTime',
'maxResults': 1,
'singleEvents': True,
}
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=15)
def setup_platform(hass, config, add_devices, disc_info=None):
"""Set up the calendar platform for event devices."""
if disc_info is None:
return
if not any([data[CONF_TRACK] for data in disc_info[CONF_ENTITIES]]):
return
calendar_service = GoogleCalendarService(hass.config.path(TOKEN_FILE))
add_devices([GoogleCalendarEventDevice(hass, calendar_service,
disc_info[CONF_CAL_ID], data)
for data in disc_info[CONF_ENTITIES] if data[CONF_TRACK]])
# pylint: disable=too-many-instance-attributes
class GoogleCalendarEventDevice(CalendarEventDevice):
"""A calendar event device."""
def __init__(self, hass, calendar_service, calendar, data):
"""Create the Calendar event device."""
self.data = GoogleCalendarData(calendar_service, calendar,
data.get('search', None))
super().__init__(hass, data)
class GoogleCalendarData(object):
"""Class to utilize calendar service object to get next event."""
def __init__(self, calendar_service, calendar_id, search=None):
"""Set up how we are going to search the google calendar."""
self.calendar_service = calendar_service
self.calendar_id = calendar_id
self.search = search
self.event = None
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Get the latest data."""
service = self.calendar_service.get()
params = dict(DEFAULT_GOOGLE_SEARCH_PARAMS)
params['timeMin'] = dt.now().isoformat('T')
params['calendarId'] = self.calendar_id
if self.search:
params['q'] = self.search
events = service.events() # pylint: disable=no-member
result = events.list(**params).execute()
items = result.get('items', [])
self.event = items[0] if len(items) == 1 else None
return True

View file

@ -1,250 +1,250 @@
"""
Support for Synology Surveillance Station Cameras.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/camera.synology/
"""
import asyncio
import logging
import voluptuous as vol
import aiohttp
import async_timeout
from homeassistant.const import (
CONF_NAME, CONF_USERNAME, CONF_PASSWORD,
CONF_URL, CONF_WHITELIST, CONF_VERIFY_SSL, CONF_TIMEOUT)
from homeassistant.components.camera import (
Camera, PLATFORM_SCHEMA)
from homeassistant.helpers.aiohttp_client import (
async_get_clientsession, async_create_clientsession,
async_aiohttp_proxy_web)
import homeassistant.helpers.config_validation as cv
from homeassistant.util.async import run_coroutine_threadsafe
_LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = 'Synology Camera'
DEFAULT_STREAM_ID = '0'
DEFAULT_TIMEOUT = 5
CONF_CAMERA_NAME = 'camera_name'
CONF_STREAM_ID = 'stream_id'
QUERY_CGI = 'query.cgi'
QUERY_API = 'SYNO.API.Info'
AUTH_API = 'SYNO.API.Auth'
CAMERA_API = 'SYNO.SurveillanceStation.Camera'
STREAMING_API = 'SYNO.SurveillanceStation.VideoStream'
SESSION_ID = '0'
WEBAPI_PATH = '/webapi/'
AUTH_PATH = 'auth.cgi'
CAMERA_PATH = 'camera.cgi'
STREAMING_PATH = 'SurveillanceStation/videoStreaming.cgi'
CONTENT_TYPE_HEADER = 'Content-Type'
SYNO_API_URL = '{0}{1}{2}'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_URL): cv.string,
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
vol.Optional(CONF_WHITELIST, default=[]): cv.ensure_list,
vol.Optional(CONF_VERIFY_SSL, default=True): cv.boolean,
})
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Set up a Synology IP Camera."""
verify_ssl = config.get(CONF_VERIFY_SSL)
timeout = config.get(CONF_TIMEOUT)
websession_init = async_get_clientsession(hass, verify_ssl)
# Determine API to use for authentication
syno_api_url = SYNO_API_URL.format(
config.get(CONF_URL), WEBAPI_PATH, QUERY_CGI)
query_payload = {
'api': QUERY_API,
'method': 'Query',
'version': '1',
'query': 'SYNO.'
}
try:
with async_timeout.timeout(timeout, loop=hass.loop):
query_req = yield from websession_init.get(
syno_api_url,
params=query_payload
)
# Skip content type check because Synology doesn't return JSON with
# right content type
query_resp = yield from query_req.json(content_type=None)
auth_path = query_resp['data'][AUTH_API]['path']
camera_api = query_resp['data'][CAMERA_API]['path']
camera_path = query_resp['data'][CAMERA_API]['path']
streaming_path = query_resp['data'][STREAMING_API]['path']
except (asyncio.TimeoutError, aiohttp.ClientError):
_LOGGER.exception("Error on %s", syno_api_url)
return False
# Authticate to NAS to get a session id
syno_auth_url = SYNO_API_URL.format(
config.get(CONF_URL), WEBAPI_PATH, auth_path)
session_id = yield from get_session_id(
hass,
websession_init,
config.get(CONF_USERNAME),
config.get(CONF_PASSWORD),
syno_auth_url,
timeout
)
# init websession
websession = async_create_clientsession(
hass, verify_ssl, cookies={'id': session_id})
# Use SessionID to get cameras in system
syno_camera_url = SYNO_API_URL.format(
config.get(CONF_URL), WEBAPI_PATH, camera_api)
camera_payload = {
'api': CAMERA_API,
'method': 'List',
'version': '1'
}
try:
with async_timeout.timeout(timeout, loop=hass.loop):
camera_req = yield from websession.get(
syno_camera_url,
params=camera_payload
)
except (asyncio.TimeoutError, aiohttp.ClientError):
_LOGGER.exception("Error on %s", syno_camera_url)
return False
camera_resp = yield from camera_req.json(content_type=None)
cameras = camera_resp['data']['cameras']
# add cameras
devices = []
for camera in cameras:
if not config.get(CONF_WHITELIST):
camera_id = camera['id']
snapshot_path = camera['snapshot_path']
device = SynologyCamera(
hass, websession, config, camera_id, camera['name'],
snapshot_path, streaming_path, camera_path, auth_path, timeout
)
devices.append(device)
async_add_devices(devices)
@asyncio.coroutine
def get_session_id(hass, websession, username, password, login_url, timeout):
"""Get a session id."""
auth_payload = {
'api': AUTH_API,
'method': 'Login',
'version': '2',
'account': username,
'passwd': password,
'session': 'SurveillanceStation',
'format': 'sid'
}
try:
with async_timeout.timeout(timeout, loop=hass.loop):
auth_req = yield from websession.get(
login_url,
params=auth_payload
)
auth_resp = yield from auth_req.json(content_type=None)
return auth_resp['data']['sid']
except (asyncio.TimeoutError, aiohttp.ClientError):
_LOGGER.exception("Error on %s", login_url)
return False
class SynologyCamera(Camera):
"""An implementation of a Synology NAS based IP camera."""
def __init__(self, hass, websession, config, camera_id,
camera_name, snapshot_path, streaming_path, camera_path,
auth_path, timeout):
"""Initialize a Synology Surveillance Station camera."""
super().__init__()
self.hass = hass
self._websession = websession
self._name = camera_name
self._synology_url = config.get(CONF_URL)
self._camera_name = config.get(CONF_CAMERA_NAME)
self._stream_id = config.get(CONF_STREAM_ID)
self._camera_id = camera_id
self._snapshot_path = snapshot_path
self._streaming_path = streaming_path
self._camera_path = camera_path
self._auth_path = auth_path
self._timeout = timeout
def camera_image(self):
"""Return bytes of camera image."""
return run_coroutine_threadsafe(
self.async_camera_image(), self.hass.loop).result()
@asyncio.coroutine
def async_camera_image(self):
"""Return a still image response from the camera."""
image_url = SYNO_API_URL.format(
self._synology_url, WEBAPI_PATH, self._camera_path)
image_payload = {
'api': CAMERA_API,
'method': 'GetSnapshot',
'version': '1',
'cameraId': self._camera_id
}
try:
with async_timeout.timeout(self._timeout, loop=self.hass.loop):
response = yield from self._websession.get(
image_url,
params=image_payload
)
except (asyncio.TimeoutError, aiohttp.ClientError):
_LOGGER.error("Error fetching %s", image_url)
return None
image = yield from response.read()
return image
@asyncio.coroutine
def handle_async_mjpeg_stream(self, request):
"""Return a MJPEG stream image response directly from the camera."""
streaming_url = SYNO_API_URL.format(
self._synology_url, WEBAPI_PATH, self._streaming_path)
streaming_payload = {
'api': STREAMING_API,
'method': 'Stream',
'version': '1',
'cameraId': self._camera_id,
'format': 'mjpeg'
}
stream_coro = self._websession.get(
streaming_url, params=streaming_payload)
yield from async_aiohttp_proxy_web(self.hass, request, stream_coro)
@property
def name(self):
"""Return the name of this device."""
return self._name
"""
Support for Synology Surveillance Station Cameras.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/camera.synology/
"""
import asyncio
import logging
import voluptuous as vol
import aiohttp
import async_timeout
from homeassistant.const import (
CONF_NAME, CONF_USERNAME, CONF_PASSWORD,
CONF_URL, CONF_WHITELIST, CONF_VERIFY_SSL, CONF_TIMEOUT)
from homeassistant.components.camera import (
Camera, PLATFORM_SCHEMA)
from homeassistant.helpers.aiohttp_client import (
async_get_clientsession, async_create_clientsession,
async_aiohttp_proxy_web)
import homeassistant.helpers.config_validation as cv
from homeassistant.util.async import run_coroutine_threadsafe
_LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = 'Synology Camera'
DEFAULT_STREAM_ID = '0'
DEFAULT_TIMEOUT = 5
CONF_CAMERA_NAME = 'camera_name'
CONF_STREAM_ID = 'stream_id'
QUERY_CGI = 'query.cgi'
QUERY_API = 'SYNO.API.Info'
AUTH_API = 'SYNO.API.Auth'
CAMERA_API = 'SYNO.SurveillanceStation.Camera'
STREAMING_API = 'SYNO.SurveillanceStation.VideoStream'
SESSION_ID = '0'
WEBAPI_PATH = '/webapi/'
AUTH_PATH = 'auth.cgi'
CAMERA_PATH = 'camera.cgi'
STREAMING_PATH = 'SurveillanceStation/videoStreaming.cgi'
CONTENT_TYPE_HEADER = 'Content-Type'
SYNO_API_URL = '{0}{1}{2}'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_URL): cv.string,
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
vol.Optional(CONF_WHITELIST, default=[]): cv.ensure_list,
vol.Optional(CONF_VERIFY_SSL, default=True): cv.boolean,
})
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Set up a Synology IP Camera."""
verify_ssl = config.get(CONF_VERIFY_SSL)
timeout = config.get(CONF_TIMEOUT)
websession_init = async_get_clientsession(hass, verify_ssl)
# Determine API to use for authentication
syno_api_url = SYNO_API_URL.format(
config.get(CONF_URL), WEBAPI_PATH, QUERY_CGI)
query_payload = {
'api': QUERY_API,
'method': 'Query',
'version': '1',
'query': 'SYNO.'
}
try:
with async_timeout.timeout(timeout, loop=hass.loop):
query_req = yield from websession_init.get(
syno_api_url,
params=query_payload
)
# Skip content type check because Synology doesn't return JSON with
# right content type
query_resp = yield from query_req.json(content_type=None)
auth_path = query_resp['data'][AUTH_API]['path']
camera_api = query_resp['data'][CAMERA_API]['path']
camera_path = query_resp['data'][CAMERA_API]['path']
streaming_path = query_resp['data'][STREAMING_API]['path']
except (asyncio.TimeoutError, aiohttp.ClientError):
_LOGGER.exception("Error on %s", syno_api_url)
return False
# Authticate to NAS to get a session id
syno_auth_url = SYNO_API_URL.format(
config.get(CONF_URL), WEBAPI_PATH, auth_path)
session_id = yield from get_session_id(
hass,
websession_init,
config.get(CONF_USERNAME),
config.get(CONF_PASSWORD),
syno_auth_url,
timeout
)
# init websession
websession = async_create_clientsession(
hass, verify_ssl, cookies={'id': session_id})
# Use SessionID to get cameras in system
syno_camera_url = SYNO_API_URL.format(
config.get(CONF_URL), WEBAPI_PATH, camera_api)
camera_payload = {
'api': CAMERA_API,
'method': 'List',
'version': '1'
}
try:
with async_timeout.timeout(timeout, loop=hass.loop):
camera_req = yield from websession.get(
syno_camera_url,
params=camera_payload
)
except (asyncio.TimeoutError, aiohttp.ClientError):
_LOGGER.exception("Error on %s", syno_camera_url)
return False
camera_resp = yield from camera_req.json(content_type=None)
cameras = camera_resp['data']['cameras']
# add cameras
devices = []
for camera in cameras:
if not config.get(CONF_WHITELIST):
camera_id = camera['id']
snapshot_path = camera['snapshot_path']
device = SynologyCamera(
hass, websession, config, camera_id, camera['name'],
snapshot_path, streaming_path, camera_path, auth_path, timeout
)
devices.append(device)
async_add_devices(devices)
@asyncio.coroutine
def get_session_id(hass, websession, username, password, login_url, timeout):
"""Get a session id."""
auth_payload = {
'api': AUTH_API,
'method': 'Login',
'version': '2',
'account': username,
'passwd': password,
'session': 'SurveillanceStation',
'format': 'sid'
}
try:
with async_timeout.timeout(timeout, loop=hass.loop):
auth_req = yield from websession.get(
login_url,
params=auth_payload
)
auth_resp = yield from auth_req.json(content_type=None)
return auth_resp['data']['sid']
except (asyncio.TimeoutError, aiohttp.ClientError):
_LOGGER.exception("Error on %s", login_url)
return False
class SynologyCamera(Camera):
"""An implementation of a Synology NAS based IP camera."""
def __init__(self, hass, websession, config, camera_id,
camera_name, snapshot_path, streaming_path, camera_path,
auth_path, timeout):
"""Initialize a Synology Surveillance Station camera."""
super().__init__()
self.hass = hass
self._websession = websession
self._name = camera_name
self._synology_url = config.get(CONF_URL)
self._camera_name = config.get(CONF_CAMERA_NAME)
self._stream_id = config.get(CONF_STREAM_ID)
self._camera_id = camera_id
self._snapshot_path = snapshot_path
self._streaming_path = streaming_path
self._camera_path = camera_path
self._auth_path = auth_path
self._timeout = timeout
def camera_image(self):
"""Return bytes of camera image."""
return run_coroutine_threadsafe(
self.async_camera_image(), self.hass.loop).result()
@asyncio.coroutine
def async_camera_image(self):
"""Return a still image response from the camera."""
image_url = SYNO_API_URL.format(
self._synology_url, WEBAPI_PATH, self._camera_path)
image_payload = {
'api': CAMERA_API,
'method': 'GetSnapshot',
'version': '1',
'cameraId': self._camera_id
}
try:
with async_timeout.timeout(self._timeout, loop=self.hass.loop):
response = yield from self._websession.get(
image_url,
params=image_payload
)
except (asyncio.TimeoutError, aiohttp.ClientError):
_LOGGER.error("Error fetching %s", image_url)
return None
image = yield from response.read()
return image
@asyncio.coroutine
def handle_async_mjpeg_stream(self, request):
"""Return a MJPEG stream image response directly from the camera."""
streaming_url = SYNO_API_URL.format(
self._synology_url, WEBAPI_PATH, self._streaming_path)
streaming_payload = {
'api': STREAMING_API,
'method': 'Stream',
'version': '1',
'cameraId': self._camera_id,
'format': 'mjpeg'
}
stream_coro = self._websession.get(
streaming_url, params=streaming_payload)
yield from async_aiohttp_proxy_web(self.hass, request, stream_coro)
@property
def name(self):
"""Return the name of this device."""
return self._name

View file

@ -1,4 +1,4 @@
"""
"""
Tado component to create a climate device for each zone.
For more details about this platform, please refer to the documentation at

View file

@ -1,187 +1,187 @@
"""
Support for Harmony Hub devices.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/remote.harmony/
"""
import logging
from os import path
import urllib.parse
import voluptuous as vol
import homeassistant.components.remote as remote
import homeassistant.helpers.config_validation as cv
from homeassistant.const import (
CONF_NAME, CONF_HOST, CONF_PORT, ATTR_ENTITY_ID)
from homeassistant.components.remote import (
PLATFORM_SCHEMA, DOMAIN, ATTR_DEVICE, ATTR_COMMAND, ATTR_ACTIVITY)
from homeassistant.util import slugify
from homeassistant.config import load_yaml_config_file
REQUIREMENTS = ['pyharmony==1.0.12']
_LOGGER = logging.getLogger(__name__)
DEFAULT_PORT = 5222
DEVICES = []
SERVICE_SYNC = 'harmony_sync'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_NAME): cv.string,
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Required(ATTR_ACTIVITY, default=None): cv.string,
})
HARMONY_SYNC_SCHEMA = vol.Schema({
vol.Optional(ATTR_ENTITY_ID): cv.entity_ids,
})
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the Harmony platform."""
import pyharmony
global DEVICES
name = config.get(CONF_NAME)
host = config.get(CONF_HOST)
port = config.get(CONF_PORT)
_LOGGER.debug("Loading Harmony platform: %s", name)
harmony_conf_file = hass.config.path(
'{}{}{}'.format('harmony_', slugify(name), '.conf'))
try:
_LOGGER.debug("Calling pyharmony.ha_get_token for remote at: %s:%s",
host, port)
token = urllib.parse.quote_plus(pyharmony.ha_get_token(host, port))
except ValueError as err:
_LOGGER.warning("%s for remote: %s", err.args[0], name)
return False
_LOGGER.debug("Received token: %s", token)
DEVICES = [HarmonyRemote(
config.get(CONF_NAME), config.get(CONF_HOST), config.get(CONF_PORT),
config.get(ATTR_ACTIVITY), harmony_conf_file, token)]
add_devices(DEVICES, True)
register_services(hass)
return True
def register_services(hass):
"""Register all services for harmony devices."""
descriptions = load_yaml_config_file(
path.join(path.dirname(__file__), 'services.yaml'))
hass.services.register(
DOMAIN, SERVICE_SYNC, _sync_service, descriptions.get(SERVICE_SYNC),
schema=HARMONY_SYNC_SCHEMA)
def _apply_service(service, service_func, *service_func_args):
"""Handle services to apply."""
entity_ids = service.data.get('entity_id')
if entity_ids:
_devices = [device for device in DEVICES
if device.entity_id in entity_ids]
else:
_devices = DEVICES
for device in _devices:
service_func(device, *service_func_args)
device.schedule_update_ha_state(True)
def _sync_service(service):
_apply_service(service, HarmonyRemote.sync)
class HarmonyRemote(remote.RemoteDevice):
"""Remote representation used to control a Harmony device."""
def __init__(self, name, host, port, activity, out_path, token):
"""Initialize HarmonyRemote class."""
import pyharmony
from pathlib import Path
_LOGGER.debug("HarmonyRemote device init started for: %s", name)
self._name = name
self._ip = host
self._port = port
self._state = None
self._current_activity = None
self._default_activity = activity
self._token = token
self._config_path = out_path
_LOGGER.debug("Retrieving harmony config using token: %s", token)
self._config = pyharmony.ha_get_config(self._token, host, port)
if not Path(self._config_path).is_file():
_LOGGER.debug("Writing harmony configuration to file: %s",
out_path)
pyharmony.ha_write_config_file(self._config, self._config_path)
@property
def name(self):
"""Return the Harmony device's name."""
return self._name
@property
def device_state_attributes(self):
"""Add platform specific attributes."""
return {'current_activity': self._current_activity}
@property
def is_on(self):
"""Return False if PowerOff is the current activity, otherwise True."""
return self._current_activity != 'PowerOff'
def update(self):
"""Return current activity."""
import pyharmony
name = self._name
_LOGGER.debug("Polling %s for current activity", name)
state = pyharmony.ha_get_current_activity(
self._token, self._config, self._ip, self._port)
_LOGGER.debug("%s current activity reported as: %s", name, state)
self._current_activity = state
self._state = bool(state != 'PowerOff')
def turn_on(self, **kwargs):
"""Start an activity from the Harmony device."""
import pyharmony
if kwargs[ATTR_ACTIVITY]:
activity = kwargs[ATTR_ACTIVITY]
else:
activity = self._default_activity
if activity:
pyharmony.ha_start_activity(
self._token, self._ip, self._port, self._config, activity)
self._state = True
else:
_LOGGER.error("No activity specified with turn_on service")
def turn_off(self):
"""Start the PowerOff activity."""
import pyharmony
pyharmony.ha_power_off(self._token, self._ip, self._port)
def send_command(self, **kwargs):
"""Send a command to one device."""
import pyharmony
pyharmony.ha_send_command(
self._token, self._ip, self._port, kwargs[ATTR_DEVICE],
kwargs[ATTR_COMMAND])
def sync(self):
"""Sync the Harmony device with the web service."""
import pyharmony
_LOGGER.debug("Syncing hub with Harmony servers")
pyharmony.ha_sync(self._token, self._ip, self._port)
self._config = pyharmony.ha_get_config(
self._token, self._ip, self._port)
_LOGGER.debug("Writing hub config to file: %s", self._config_path)
pyharmony.ha_write_config_file(self._config, self._config_path)
"""
Support for Harmony Hub devices.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/remote.harmony/
"""
import logging
from os import path
import urllib.parse
import voluptuous as vol
import homeassistant.components.remote as remote
import homeassistant.helpers.config_validation as cv
from homeassistant.const import (
CONF_NAME, CONF_HOST, CONF_PORT, ATTR_ENTITY_ID)
from homeassistant.components.remote import (
PLATFORM_SCHEMA, DOMAIN, ATTR_DEVICE, ATTR_COMMAND, ATTR_ACTIVITY)
from homeassistant.util import slugify
from homeassistant.config import load_yaml_config_file
REQUIREMENTS = ['pyharmony==1.0.12']
_LOGGER = logging.getLogger(__name__)
DEFAULT_PORT = 5222
DEVICES = []
SERVICE_SYNC = 'harmony_sync'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_NAME): cv.string,
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Required(ATTR_ACTIVITY, default=None): cv.string,
})
HARMONY_SYNC_SCHEMA = vol.Schema({
vol.Optional(ATTR_ENTITY_ID): cv.entity_ids,
})
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the Harmony platform."""
import pyharmony
global DEVICES
name = config.get(CONF_NAME)
host = config.get(CONF_HOST)
port = config.get(CONF_PORT)
_LOGGER.debug("Loading Harmony platform: %s", name)
harmony_conf_file = hass.config.path(
'{}{}{}'.format('harmony_', slugify(name), '.conf'))
try:
_LOGGER.debug("Calling pyharmony.ha_get_token for remote at: %s:%s",
host, port)
token = urllib.parse.quote_plus(pyharmony.ha_get_token(host, port))
except ValueError as err:
_LOGGER.warning("%s for remote: %s", err.args[0], name)
return False
_LOGGER.debug("Received token: %s", token)
DEVICES = [HarmonyRemote(
config.get(CONF_NAME), config.get(CONF_HOST), config.get(CONF_PORT),
config.get(ATTR_ACTIVITY), harmony_conf_file, token)]
add_devices(DEVICES, True)
register_services(hass)
return True
def register_services(hass):
"""Register all services for harmony devices."""
descriptions = load_yaml_config_file(
path.join(path.dirname(__file__), 'services.yaml'))
hass.services.register(
DOMAIN, SERVICE_SYNC, _sync_service, descriptions.get(SERVICE_SYNC),
schema=HARMONY_SYNC_SCHEMA)
def _apply_service(service, service_func, *service_func_args):
"""Handle services to apply."""
entity_ids = service.data.get('entity_id')
if entity_ids:
_devices = [device for device in DEVICES
if device.entity_id in entity_ids]
else:
_devices = DEVICES
for device in _devices:
service_func(device, *service_func_args)
device.schedule_update_ha_state(True)
def _sync_service(service):
_apply_service(service, HarmonyRemote.sync)
class HarmonyRemote(remote.RemoteDevice):
"""Remote representation used to control a Harmony device."""
def __init__(self, name, host, port, activity, out_path, token):
"""Initialize HarmonyRemote class."""
import pyharmony
from pathlib import Path
_LOGGER.debug("HarmonyRemote device init started for: %s", name)
self._name = name
self._ip = host
self._port = port
self._state = None
self._current_activity = None
self._default_activity = activity
self._token = token
self._config_path = out_path
_LOGGER.debug("Retrieving harmony config using token: %s", token)
self._config = pyharmony.ha_get_config(self._token, host, port)
if not Path(self._config_path).is_file():
_LOGGER.debug("Writing harmony configuration to file: %s",
out_path)
pyharmony.ha_write_config_file(self._config, self._config_path)
@property
def name(self):
"""Return the Harmony device's name."""
return self._name
@property
def device_state_attributes(self):
"""Add platform specific attributes."""
return {'current_activity': self._current_activity}
@property
def is_on(self):
"""Return False if PowerOff is the current activity, otherwise True."""
return self._current_activity != 'PowerOff'
def update(self):
"""Return current activity."""
import pyharmony
name = self._name
_LOGGER.debug("Polling %s for current activity", name)
state = pyharmony.ha_get_current_activity(
self._token, self._config, self._ip, self._port)
_LOGGER.debug("%s current activity reported as: %s", name, state)
self._current_activity = state
self._state = bool(state != 'PowerOff')
def turn_on(self, **kwargs):
"""Start an activity from the Harmony device."""
import pyharmony
if kwargs[ATTR_ACTIVITY]:
activity = kwargs[ATTR_ACTIVITY]
else:
activity = self._default_activity
if activity:
pyharmony.ha_start_activity(
self._token, self._ip, self._port, self._config, activity)
self._state = True
else:
_LOGGER.error("No activity specified with turn_on service")
def turn_off(self):
"""Start the PowerOff activity."""
import pyharmony
pyharmony.ha_power_off(self._token, self._ip, self._port)
def send_command(self, **kwargs):
"""Send a command to one device."""
import pyharmony
pyharmony.ha_send_command(
self._token, self._ip, self._port, kwargs[ATTR_DEVICE],
kwargs[ATTR_COMMAND])
def sync(self):
"""Sync the Harmony device with the web service."""
import pyharmony
_LOGGER.debug("Syncing hub with Harmony servers")
pyharmony.ha_sync(self._token, self._ip, self._port)
self._config = pyharmony.ha_get_config(
self._token, self._ip, self._port)
_LOGGER.debug("Writing hub config to file: %s", self._config_path)
pyharmony.ha_write_config_file(self._config, self._config_path)

View file

@ -1,245 +1,245 @@
"""
Support for Synology NAS Sensors.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.synologydsm/
"""
import logging
from datetime import timedelta
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.helpers.entity import Entity
from homeassistant.const import (
CONF_HOST, CONF_USERNAME, CONF_PASSWORD, CONF_PORT,
CONF_MONITORED_CONDITIONS, TEMP_CELSIUS, EVENT_HOMEASSISTANT_START)
from homeassistant.util import Throttle
import homeassistant.helpers.config_validation as cv
import voluptuous as vol
REQUIREMENTS = ['python-synology==0.1.0']
_LOGGER = logging.getLogger(__name__)
CONF_DISKS = 'disks'
CONF_VOLUMES = 'volumes'
DEFAULT_NAME = 'Synology DSM'
DEFAULT_PORT = 5000
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=15)
_UTILISATION_MON_COND = {
'cpu_other_load': ['CPU Load (Other)', '%', 'mdi:chip'],
'cpu_user_load': ['CPU Load (User)', '%', 'mdi:chip'],
'cpu_system_load': ['CPU Load (System)', '%', 'mdi:chip'],
'cpu_total_load': ['CPU Load (Total)', '%', 'mdi:chip'],
'cpu_1min_load': ['CPU Load (1 min)', '%', 'mdi:chip'],
'cpu_5min_load': ['CPU Load (5 min)', '%', 'mdi:chip'],
'cpu_15min_load': ['CPU Load (15 min)', '%', 'mdi:chip'],
'memory_real_usage': ['Memory Usage (Real)', '%', 'mdi:memory'],
'memory_size': ['Memory Size', 'Mb', 'mdi:memory'],
'memory_cached': ['Memory Cached', 'Mb', 'mdi:memory'],
'memory_available_swap': ['Memory Available (Swap)', 'Mb', 'mdi:memory'],
'memory_available_real': ['Memory Available (Real)', 'Mb', 'mdi:memory'],
'memory_total_swap': ['Memory Total (Swap)', 'Mb', 'mdi:memory'],
'memory_total_real': ['Memory Total (Real)', 'Mb', 'mdi:memory'],
'network_up': ['Network Up', 'Kbps', 'mdi:upload'],
'network_down': ['Network Down', 'Kbps', 'mdi:download'],
}
_STORAGE_VOL_MON_COND = {
'volume_status': ['Status', None, 'mdi:checkbox-marked-circle-outline'],
'volume_device_type': ['Type', None, 'mdi:harddisk'],
'volume_size_total': ['Total Size', None, 'mdi:chart-pie'],
'volume_size_used': ['Used Space', None, 'mdi:chart-pie'],
'volume_percentage_used': ['Volume Used', '%', 'mdi:chart-pie'],
'volume_disk_temp_avg': ['Average Disk Temp', None, 'mdi:thermometer'],
'volume_disk_temp_max': ['Maximum Disk Temp', None, 'mdi:thermometer'],
}
_STORAGE_DSK_MON_COND = {
'disk_name': ['Name', None, 'mdi:harddisk'],
'disk_device': ['Device', None, 'mdi:dots-horizontal'],
'disk_smart_status': ['Status (Smart)', None,
'mdi:checkbox-marked-circle-outline'],
'disk_status': ['Status', None, 'mdi:checkbox-marked-circle-outline'],
'disk_exceed_bad_sector_thr': ['Exceeded Max Bad Sectors', None,
'mdi:test-tube'],
'disk_below_remain_life_thr': ['Below Min Remaining Life', None,
'mdi:test-tube'],
'disk_temp': ['Temperature', None, 'mdi:thermometer'],
}
_MONITORED_CONDITIONS = list(_UTILISATION_MON_COND.keys()) + \
list(_STORAGE_VOL_MON_COND.keys()) + \
list(_STORAGE_DSK_MON_COND.keys())
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_MONITORED_CONDITIONS):
vol.All(cv.ensure_list, [vol.In(_MONITORED_CONDITIONS)]),
vol.Optional(CONF_DISKS, default=None): cv.ensure_list,
vol.Optional(CONF_VOLUMES, default=None): cv.ensure_list,
})
def setup_platform(hass, config, add_devices_callback, discovery_info=None):
"""Set up the Synology NAS Sensor."""
# pylint: disable=too-many-locals
def run_setup(event):
"""Wait until HASS is fully initialized before creating.
Delay the setup until Home Assistant is fully initialized.
This allows any entities to be created already
"""
# Setup API
api = SynoApi(config.get(CONF_HOST), config.get(CONF_PORT),
config.get(CONF_USERNAME), config.get(CONF_PASSWORD),
hass.config.units.temperature_unit)
sensors = [SynoNasUtilSensor(api, variable,
_UTILISATION_MON_COND[variable])
for variable in config[CONF_MONITORED_CONDITIONS]
if variable in _UTILISATION_MON_COND]
# Handle all Volumes
volumes = config['volumes']
if volumes is None:
volumes = api.storage.volumes
for volume in volumes:
sensors += [SynoNasStorageSensor(api, variable,
_STORAGE_VOL_MON_COND[variable],
volume)
for variable in config[CONF_MONITORED_CONDITIONS]
if variable in _STORAGE_VOL_MON_COND]
# Handle all Disks
disks = config['disks']
if disks is None:
disks = api.storage.disks
for disk in disks:
sensors += [SynoNasStorageSensor(api, variable,
_STORAGE_DSK_MON_COND[variable],
disk)
for variable in config[CONF_MONITORED_CONDITIONS]
if variable in _STORAGE_DSK_MON_COND]
add_devices_callback(sensors)
# Wait until start event is sent to load this component.
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, run_setup)
class SynoApi():
"""Class to interface with API."""
# pylint: disable=too-many-arguments, bare-except
def __init__(self, host, port, username, password, temp_unit):
"""Initialize the API wrapper class."""
from SynologyDSM import SynologyDSM
self.temp_unit = temp_unit
try:
self._api = SynologyDSM(host,
port,
username,
password)
except:
_LOGGER.error("Error setting up Synology DSM")
# Will be updated when `update` gets called.
self.utilisation = self._api.utilisation
self.storage = self._api.storage
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Update function for updating api information."""
self._api.update()
class SynoNasSensor(Entity):
"""Representation of a Synology Nas Sensor."""
def __init__(self, api, variable, variableInfo, monitor_device=None):
"""Initialize the sensor."""
self.var_id = variable
self.var_name = variableInfo[0]
self.var_units = variableInfo[1]
self.var_icon = variableInfo[2]
self.monitor_device = monitor_device
self._api = api
@property
def name(self):
"""Return the name of the sensor, if any."""
if self.monitor_device is not None:
return "{} ({})".format(self.var_name, self.monitor_device)
else:
return self.var_name
@property
def icon(self):
"""Icon to use in the frontend, if any."""
return self.var_icon
@property
def unit_of_measurement(self):
"""Return the unit the value is expressed in."""
if self.var_id in ['volume_disk_temp_avg', 'volume_disk_temp_max',
'disk_temp']:
return self._api.temp_unit
else:
return self.var_units
def update(self):
"""Get the latest data for the states."""
if self._api is not None:
self._api.update()
class SynoNasUtilSensor(SynoNasSensor):
"""Representation a Synology Utilisation Sensor."""
@property
def state(self):
"""Return the state of the sensor."""
network_sensors = ['network_up', 'network_down']
memory_sensors = ['memory_size', 'memory_cached',
'memory_available_swap', 'memory_available_real',
'memory_total_swap', 'memory_total_real']
if self.var_id in network_sensors or self.var_id in memory_sensors:
attr = getattr(self._api.utilisation, self.var_id)(False)
if self.var_id in network_sensors:
return round(attr / 1024.0, 1)
elif self.var_id in memory_sensors:
return round(attr / 1024.0 / 1024.0, 1)
else:
return getattr(self._api.utilisation, self.var_id)
class SynoNasStorageSensor(SynoNasSensor):
"""Representation a Synology Utilisation Sensor."""
@property
def state(self):
"""Return the state of the sensor."""
temp_sensors = ['volume_disk_temp_avg', 'volume_disk_temp_max',
'disk_temp']
if self.monitor_device is not None:
if self.var_id in temp_sensors:
attr = getattr(self._api.storage,
self.var_id)(self.monitor_device)
if self._api.temp_unit == TEMP_CELSIUS:
return attr
else:
return round(attr * 1.8 + 32.0, 1)
else:
return getattr(self._api.storage,
self.var_id)(self.monitor_device)
"""
Support for Synology NAS Sensors.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.synologydsm/
"""
import logging
from datetime import timedelta
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.helpers.entity import Entity
from homeassistant.const import (
CONF_HOST, CONF_USERNAME, CONF_PASSWORD, CONF_PORT,
CONF_MONITORED_CONDITIONS, TEMP_CELSIUS, EVENT_HOMEASSISTANT_START)
from homeassistant.util import Throttle
import homeassistant.helpers.config_validation as cv
import voluptuous as vol
REQUIREMENTS = ['python-synology==0.1.0']
_LOGGER = logging.getLogger(__name__)
CONF_DISKS = 'disks'
CONF_VOLUMES = 'volumes'
DEFAULT_NAME = 'Synology DSM'
DEFAULT_PORT = 5000
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=15)
_UTILISATION_MON_COND = {
'cpu_other_load': ['CPU Load (Other)', '%', 'mdi:chip'],
'cpu_user_load': ['CPU Load (User)', '%', 'mdi:chip'],
'cpu_system_load': ['CPU Load (System)', '%', 'mdi:chip'],
'cpu_total_load': ['CPU Load (Total)', '%', 'mdi:chip'],
'cpu_1min_load': ['CPU Load (1 min)', '%', 'mdi:chip'],
'cpu_5min_load': ['CPU Load (5 min)', '%', 'mdi:chip'],
'cpu_15min_load': ['CPU Load (15 min)', '%', 'mdi:chip'],
'memory_real_usage': ['Memory Usage (Real)', '%', 'mdi:memory'],
'memory_size': ['Memory Size', 'Mb', 'mdi:memory'],
'memory_cached': ['Memory Cached', 'Mb', 'mdi:memory'],
'memory_available_swap': ['Memory Available (Swap)', 'Mb', 'mdi:memory'],
'memory_available_real': ['Memory Available (Real)', 'Mb', 'mdi:memory'],
'memory_total_swap': ['Memory Total (Swap)', 'Mb', 'mdi:memory'],
'memory_total_real': ['Memory Total (Real)', 'Mb', 'mdi:memory'],
'network_up': ['Network Up', 'Kbps', 'mdi:upload'],
'network_down': ['Network Down', 'Kbps', 'mdi:download'],
}
_STORAGE_VOL_MON_COND = {
'volume_status': ['Status', None, 'mdi:checkbox-marked-circle-outline'],
'volume_device_type': ['Type', None, 'mdi:harddisk'],
'volume_size_total': ['Total Size', None, 'mdi:chart-pie'],
'volume_size_used': ['Used Space', None, 'mdi:chart-pie'],
'volume_percentage_used': ['Volume Used', '%', 'mdi:chart-pie'],
'volume_disk_temp_avg': ['Average Disk Temp', None, 'mdi:thermometer'],
'volume_disk_temp_max': ['Maximum Disk Temp', None, 'mdi:thermometer'],
}
_STORAGE_DSK_MON_COND = {
'disk_name': ['Name', None, 'mdi:harddisk'],
'disk_device': ['Device', None, 'mdi:dots-horizontal'],
'disk_smart_status': ['Status (Smart)', None,
'mdi:checkbox-marked-circle-outline'],
'disk_status': ['Status', None, 'mdi:checkbox-marked-circle-outline'],
'disk_exceed_bad_sector_thr': ['Exceeded Max Bad Sectors', None,
'mdi:test-tube'],
'disk_below_remain_life_thr': ['Below Min Remaining Life', None,
'mdi:test-tube'],
'disk_temp': ['Temperature', None, 'mdi:thermometer'],
}
_MONITORED_CONDITIONS = list(_UTILISATION_MON_COND.keys()) + \
list(_STORAGE_VOL_MON_COND.keys()) + \
list(_STORAGE_DSK_MON_COND.keys())
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_MONITORED_CONDITIONS):
vol.All(cv.ensure_list, [vol.In(_MONITORED_CONDITIONS)]),
vol.Optional(CONF_DISKS, default=None): cv.ensure_list,
vol.Optional(CONF_VOLUMES, default=None): cv.ensure_list,
})
def setup_platform(hass, config, add_devices_callback, discovery_info=None):
"""Set up the Synology NAS Sensor."""
# pylint: disable=too-many-locals
def run_setup(event):
"""Wait until HASS is fully initialized before creating.
Delay the setup until Home Assistant is fully initialized.
This allows any entities to be created already
"""
# Setup API
api = SynoApi(config.get(CONF_HOST), config.get(CONF_PORT),
config.get(CONF_USERNAME), config.get(CONF_PASSWORD),
hass.config.units.temperature_unit)
sensors = [SynoNasUtilSensor(api, variable,
_UTILISATION_MON_COND[variable])
for variable in config[CONF_MONITORED_CONDITIONS]
if variable in _UTILISATION_MON_COND]
# Handle all Volumes
volumes = config['volumes']
if volumes is None:
volumes = api.storage.volumes
for volume in volumes:
sensors += [SynoNasStorageSensor(api, variable,
_STORAGE_VOL_MON_COND[variable],
volume)
for variable in config[CONF_MONITORED_CONDITIONS]
if variable in _STORAGE_VOL_MON_COND]
# Handle all Disks
disks = config['disks']
if disks is None:
disks = api.storage.disks
for disk in disks:
sensors += [SynoNasStorageSensor(api, variable,
_STORAGE_DSK_MON_COND[variable],
disk)
for variable in config[CONF_MONITORED_CONDITIONS]
if variable in _STORAGE_DSK_MON_COND]
add_devices_callback(sensors)
# Wait until start event is sent to load this component.
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, run_setup)
class SynoApi():
"""Class to interface with API."""
# pylint: disable=too-many-arguments, bare-except
def __init__(self, host, port, username, password, temp_unit):
"""Initialize the API wrapper class."""
from SynologyDSM import SynologyDSM
self.temp_unit = temp_unit
try:
self._api = SynologyDSM(host,
port,
username,
password)
except:
_LOGGER.error("Error setting up Synology DSM")
# Will be updated when `update` gets called.
self.utilisation = self._api.utilisation
self.storage = self._api.storage
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Update function for updating api information."""
self._api.update()
class SynoNasSensor(Entity):
"""Representation of a Synology Nas Sensor."""
def __init__(self, api, variable, variableInfo, monitor_device=None):
"""Initialize the sensor."""
self.var_id = variable
self.var_name = variableInfo[0]
self.var_units = variableInfo[1]
self.var_icon = variableInfo[2]
self.monitor_device = monitor_device
self._api = api
@property
def name(self):
"""Return the name of the sensor, if any."""
if self.monitor_device is not None:
return "{} ({})".format(self.var_name, self.monitor_device)
else:
return self.var_name
@property
def icon(self):
"""Icon to use in the frontend, if any."""
return self.var_icon
@property
def unit_of_measurement(self):
"""Return the unit the value is expressed in."""
if self.var_id in ['volume_disk_temp_avg', 'volume_disk_temp_max',
'disk_temp']:
return self._api.temp_unit
else:
return self.var_units
def update(self):
"""Get the latest data for the states."""
if self._api is not None:
self._api.update()
class SynoNasUtilSensor(SynoNasSensor):
"""Representation a Synology Utilisation Sensor."""
@property
def state(self):
"""Return the state of the sensor."""
network_sensors = ['network_up', 'network_down']
memory_sensors = ['memory_size', 'memory_cached',
'memory_available_swap', 'memory_available_real',
'memory_total_swap', 'memory_total_real']
if self.var_id in network_sensors or self.var_id in memory_sensors:
attr = getattr(self._api.utilisation, self.var_id)(False)
if self.var_id in network_sensors:
return round(attr / 1024.0, 1)
elif self.var_id in memory_sensors:
return round(attr / 1024.0 / 1024.0, 1)
else:
return getattr(self._api.utilisation, self.var_id)
class SynoNasStorageSensor(SynoNasSensor):
"""Representation a Synology Utilisation Sensor."""
@property
def state(self):
"""Return the state of the sensor."""
temp_sensors = ['volume_disk_temp_avg', 'volume_disk_temp_max',
'disk_temp']
if self.monitor_device is not None:
if self.var_id in temp_sensors:
attr = getattr(self._api.storage,
self.var_id)(self.monitor_device)
if self._api.temp_unit == TEMP_CELSIUS:
return attr
else:
return round(attr * 1.8 + 32.0, 1)
else:
return getattr(self._api.storage,
self.var_id)(self.monitor_device)