Paulus Schoutsen f5076188ef
Consolidate all platforms that have no tests (#22096)
* Consolidate

* Fix tests

* Update imports

* Fix import

* Use importlib because integration and package share name

* Fix more tests

* Update .coveragerc and CODEOWNERS
2019-03-16 20:44:05 -07:00

316 lines
12 KiB

Support for GTFS (Google/General Transport Format Schema).
For more details about this platform, please refer to the documentation at
import os
import logging
import datetime
import threading
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import CONF_NAME
from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
REQUIREMENTS = ['pygtfs==0.1.5']
_LOGGER = logging.getLogger(__name__)
CONF_DATA = 'data'
CONF_DESTINATION = 'destination'
CONF_ORIGIN = 'origin'
CONF_OFFSET = 'offset'
ICON = 'mdi:train'
0: 'mdi:tram',
1: 'mdi:subway',
2: 'mdi:train',
3: 'mdi:bus',
4: 'mdi:ferry',
5: 'mdi:train-variant',
6: 'mdi:gondola',
7: 'mdi:stairs',
DATE_FORMAT = '%Y-%m-%d'
TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
vol.Required(CONF_ORIGIN): cv.string,
vol.Required(CONF_DESTINATION): cv.string,
vol.Required(CONF_DATA): cv.string,
vol.Optional(CONF_NAME): cv.string,
vol.Optional(CONF_OFFSET, default=0): cv.time_period,
def get_next_departure(sched, start_station_id, end_station_id, offset):
"""Get the next departure for the given schedule."""
origin_station = sched.stops_by_id(start_station_id)[0]
destination_station = sched.stops_by_id(end_station_id)[0]
now = datetime.datetime.now() + offset
day_name = now.strftime('%A').lower()
now_str = now.strftime('%H:%M:%S')
today = now.strftime(DATE_FORMAT)
from sqlalchemy.sql import text
sql_query = text("""
SELECT trip.trip_id, trip.route_id,
time(origin_stop_time.arrival_time) AS origin_arrival_time,
time(origin_stop_time.departure_time) AS origin_depart_time,
origin_stop_time.drop_off_type AS origin_drop_off_type,
origin_stop_time.pickup_type AS origin_pickup_type,
origin_stop_time.shape_dist_traveled AS origin_dist_traveled,
origin_stop_time.stop_headsign AS origin_stop_headsign,
origin_stop_time.stop_sequence AS origin_stop_sequence,
time(destination_stop_time.arrival_time) AS dest_arrival_time,
time(destination_stop_time.departure_time) AS dest_depart_time,
destination_stop_time.drop_off_type AS dest_drop_off_type,
destination_stop_time.pickup_type AS dest_pickup_type,
destination_stop_time.shape_dist_traveled AS dest_dist_traveled,
destination_stop_time.stop_headsign AS dest_stop_headsign,
destination_stop_time.stop_sequence AS dest_stop_sequence
FROM trips trip
INNER JOIN calendar calendar
ON trip.service_id = calendar.service_id
INNER JOIN stop_times origin_stop_time
ON trip.trip_id = origin_stop_time.trip_id
INNER JOIN stops start_station
ON origin_stop_time.stop_id = start_station.stop_id
INNER JOIN stop_times destination_stop_time
ON trip.trip_id = destination_stop_time.trip_id
INNER JOIN stops end_station
ON destination_stop_time.stop_id = end_station.stop_id
WHERE calendar.{day_name} = 1
AND origin_depart_time > time(:now_str)
AND start_station.stop_id = :origin_station_id
AND end_station.stop_id = :end_station_id
AND origin_stop_sequence < dest_stop_sequence
AND calendar.start_date <= :today
AND calendar.end_date >= :today
ORDER BY origin_stop_time.departure_time
result = sched.engine.execute(sql_query, now_str=now_str,
item = {}
for row in result:
item = row
if item == {}:
return None
# Format arrival and departure dates and times, accounting for the
# possibility of times crossing over midnight.
origin_arrival = now
if item['origin_arrival_time'] > item['origin_depart_time']:
origin_arrival -= datetime.timedelta(days=1)
origin_arrival_time = '{} {}'.format(origin_arrival.strftime(DATE_FORMAT),
origin_depart_time = '{} {}'.format(today, item['origin_depart_time'])
dest_arrival = now
if item['dest_arrival_time'] < item['origin_depart_time']:
dest_arrival += datetime.timedelta(days=1)
dest_arrival_time = '{} {}'.format(dest_arrival.strftime(DATE_FORMAT),
dest_depart = dest_arrival
if item['dest_depart_time'] < item['dest_arrival_time']:
dest_depart += datetime.timedelta(days=1)
dest_depart_time = '{} {}'.format(dest_depart.strftime(DATE_FORMAT),
depart_time = datetime.datetime.strptime(origin_depart_time, TIME_FORMAT)
arrival_time = datetime.datetime.strptime(dest_arrival_time, TIME_FORMAT)
seconds_until = (depart_time - datetime.datetime.now()).total_seconds()
minutes_until = int(seconds_until / 60)
route = sched.routes_by_id(item['route_id'])[0]
origin_stop_time_dict = {
'Arrival Time': origin_arrival_time,
'Departure Time': origin_depart_time,
'Drop Off Type': item['origin_drop_off_type'],
'Pickup Type': item['origin_pickup_type'],
'Shape Dist Traveled': item['origin_dist_traveled'],
'Headsign': item['origin_stop_headsign'],
'Sequence': item['origin_stop_sequence']
destination_stop_time_dict = {
'Arrival Time': dest_arrival_time,
'Departure Time': dest_depart_time,
'Drop Off Type': item['dest_drop_off_type'],
'Pickup Type': item['dest_pickup_type'],
'Shape Dist Traveled': item['dest_dist_traveled'],
'Headsign': item['dest_stop_headsign'],
'Sequence': item['dest_stop_sequence']
return {
'trip_id': item['trip_id'],
'trip': sched.trips_by_id(item['trip_id'])[0],
'route': route,
'agency': sched.agencies_by_id(route.agency_id)[0],
'origin_station': origin_station,
'departure_time': depart_time,
'destination_station': destination_station,
'arrival_time': arrival_time,
'seconds_until_departure': seconds_until,
'minutes_until_departure': minutes_until,
'origin_stop_time': origin_stop_time_dict,
'destination_stop_time': destination_stop_time_dict
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the GTFS sensor."""
gtfs_dir = hass.config.path(DEFAULT_PATH)
data = config.get(CONF_DATA)
origin = config.get(CONF_ORIGIN)
destination = config.get(CONF_DESTINATION)
name = config.get(CONF_NAME)
offset = config.get(CONF_OFFSET)
if not os.path.exists(gtfs_dir):
if not os.path.exists(os.path.join(gtfs_dir, data)):
_LOGGER.error("The given GTFS data file/folder was not found")
return False
import pygtfs
(gtfs_root, _) = os.path.splitext(data)
sqlite_file = "{}.sqlite?check_same_thread=False".format(gtfs_root)
joined_path = os.path.join(gtfs_dir, sqlite_file)
gtfs = pygtfs.Schedule(joined_path)
# pylint: disable=no-member
if not gtfs.feeds:
pygtfs.append_feed(gtfs, os.path.join(gtfs_dir, data))
GTFSDepartureSensor(gtfs, name, origin, destination, offset)])
class GTFSDepartureSensor(Entity):
"""Implementation of an GTFS departures sensor."""
def __init__(self, pygtfs, name, origin, destination, offset):
"""Initialize the sensor."""
self._pygtfs = pygtfs
self.origin = origin
self.destination = destination
self._offset = offset
self._custom_name = name
self._icon = ICON
self._name = ''
self._unit_of_measurement = 'min'
self._state = None
self._attributes = {}
self.lock = threading.Lock()
def name(self):
"""Return the name of the sensor."""
return self._name
def state(self):
"""Return the state of the sensor."""
return self._state
def unit_of_measurement(self):
"""Return the unit of measurement of this entity, if any."""
return self._unit_of_measurement
def device_state_attributes(self):
"""Return the state attributes."""
return self._attributes
def icon(self):
"""Icon to use in the frontend, if any."""
return self._icon
def update(self):
"""Get the latest data from GTFS and update the states."""
with self.lock:
self._departure = get_next_departure(
self._pygtfs, self.origin, self.destination, self._offset)
if not self._departure:
self._state = None
self._attributes = {'Info': 'No more departures today'}
if self._name == '':
self._name = (self._custom_name or DEFAULT_NAME)
self._state = self._departure['minutes_until_departure']
origin_station = self._departure['origin_station']
destination_station = self._departure['destination_station']
origin_stop_time = self._departure['origin_stop_time']
destination_stop_time = self._departure['destination_stop_time']
agency = self._departure['agency']
route = self._departure['route']
trip = self._departure['trip']
name = '{} {} to {} next departure'
self._name = (self._custom_name or
# Build attributes
self._attributes = {}
self._attributes['offset'] = self._offset.seconds / 60
self._icon = ICONS.get(route.route_type, ICON)
def dict_for_table(resource):
"""Return a dict for the SQLAlchemy resource given."""
return dict((col, getattr(resource, col))
for col in resource.__table__.columns.keys())
def append_keys(resource, prefix=None):
"""Properly format key val pairs to append to attributes."""
for key, val in resource.items():
if val == "" or val is None or key == 'feed_id':
pretty_key = key.replace('_', ' ')
pretty_key = pretty_key.title()
pretty_key = pretty_key.replace('Id', 'ID')
pretty_key = pretty_key.replace('Url', 'URL')
if prefix is not None and \
pretty_key.startswith(prefix) is False:
pretty_key = '{} {}'.format(prefix, pretty_key)
self._attributes[pretty_key] = val
append_keys(dict_for_table(agency), 'Agency')
append_keys(dict_for_table(route), 'Route')
append_keys(dict_for_table(trip), 'Trip')
append_keys(dict_for_table(origin_station), 'Origin Station')
'Destination Station')
append_keys(origin_stop_time, 'Origin Stop')
append_keys(destination_stop_time, 'Destination Stop')