hass-core/homeassistant/components/progettihwsw/binary_sensor.py
Arda ŞEREMET e707b50658
Add integration for ProgettiHWSW automation boards (#37922)
* Opened a new fresh page to clean my mess.

* Solved pylint warnings

* Fixing pylint issue of defining attr outside init.

* Excluded files from being tested by codecov.

* Solved binary sensor error.

* Fixed some stylisation errors.

* Resolved input not updating problem.

* Added port entry to test file.

* Added tests for create_entry.

* Added support for better state management.

* Increased code coverage of config_flow.py & made some tweaks.

* Increased coverage of config_flow.py by adding tests for unknown exceptions.

* A small bugfix.

* Stylised code as per Chris' suggestions.

* Stylised code again.

* Improved quality of test code.

* Added step_id in config flow tests.
2020-08-30 15:03:33 -05:00

95 lines
2.7 KiB
Python

"""Control binary sensor instances."""
from datetime import timedelta
import logging
from ProgettiHWSW.input import Input
import async_timeout
from homeassistant.components.binary_sensor import BinarySensorEntity
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from . import setup_input
from .const import DEFAULT_POLLING_INTERVAL_SEC, DOMAIN
_LOGGER = logging.getLogger(DOMAIN)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set the progettihwsw platform up and create sensor instances (legacy)."""
return True
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up the binary sensors from a config entry."""
board_api = hass.data[DOMAIN][config_entry.entry_id]
input_count = config_entry.data["input_count"]
binary_sensors = []
async def async_update_data():
"""Fetch data from API endpoint of board."""
async with async_timeout.timeout(5):
return await board_api.get_inputs()
coordinator = DataUpdateCoordinator(
hass,
_LOGGER,
name="binary_sensor",
update_method=async_update_data,
update_interval=timedelta(seconds=DEFAULT_POLLING_INTERVAL_SEC),
)
await coordinator.async_refresh()
for i in range(1, int(input_count) + 1):
binary_sensors.append(
ProgettihwswBinarySensor(
hass,
coordinator,
config_entry,
f"Input #{i}",
setup_input(board_api, i),
)
)
async_add_entities(binary_sensors)
class ProgettihwswBinarySensor(BinarySensorEntity):
"""Represent a binary sensor."""
def __init__(self, hass, coordinator, config_entry, name, sensor: Input):
"""Set initializing values."""
self._name = name
self._sensor = sensor
self._coordinator = coordinator
async def async_added_to_hass(self):
"""When entity is added to hass."""
self.async_on_remove(
self._coordinator.async_add_listener(self.async_write_ha_state)
)
@property
def name(self):
"""Return the sensor name."""
return self._name
@property
def is_on(self):
"""Get sensor state."""
return self._coordinator.data[self._sensor.id]
@property
def should_poll(self):
"""No need to poll. Coordinator notifies entity of updates."""
return False
@property
def available(self):
"""Return if entity is available."""
return self._coordinator.last_update_success
async def async_update(self):
"""Update the state of binary sensor."""
await self._coordinator.async_request_refresh()