Use third-party lib aioautomatic for automatic (#7126)

This commit is contained in:
Adam Mills 2017-04-15 21:11:36 -04:00 committed by Paulus Schoutsen
parent 815422a886
commit 35de3a1dc4
3 changed files with 146 additions and 321 deletions

View file

@ -4,19 +4,20 @@ Support for the Automatic platform.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.automatic/
"""
import asyncio
from datetime import timedelta
import logging
import re
import requests
import voluptuous as vol
from homeassistant.components.device_tracker import (
PLATFORM_SCHEMA, ATTR_ATTRIBUTES)
from homeassistant.const import CONF_USERNAME, CONF_PASSWORD
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.event import track_utc_time_change
from homeassistant.util import datetime as dt_util
from homeassistant.helpers.event import async_track_time_interval
REQUIREMENTS = ['aioautomatic==0.1.1']
_LOGGER = logging.getLogger(__name__)
@ -24,129 +25,101 @@ CONF_CLIENT_ID = 'client_id'
CONF_SECRET = 'secret'
CONF_DEVICES = 'devices'
SCOPE = 'scope:location scope:vehicle:profile scope:user:profile scope:trip'
ATTR_ACCESS_TOKEN = 'access_token'
ATTR_EXPIRES_IN = 'expires_in'
ATTR_RESULTS = 'results'
ATTR_VEHICLE = 'vehicle'
ATTR_ENDED_AT = 'ended_at'
ATTR_END_LOCATION = 'end_location'
URL_AUTHORIZE = 'https://accounts.automatic.com/oauth/access_token/'
URL_VEHICLES = 'https://api.automatic.com/vehicle/'
URL_TRIPS = 'https://api.automatic.com/trip/'
_VEHICLE_ID_REGEX = re.compile(
(URL_VEHICLES + '(.*)?[/]$').replace('/', r'\/'))
DEFAULT_TIMEOUT = 5
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_CLIENT_ID): cv.string,
vol.Required(CONF_SECRET): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_DEVICES): vol.All(cv.ensure_list, [cv.string])
vol.Optional(CONF_DEVICES, default=None): vol.All(
cv.ensure_list, [cv.string])
})
def setup_scanner(hass, config: dict, see, discovery_info=None):
@asyncio.coroutine
def async_setup_scanner(hass, config, async_see, discovery_info=None):
"""Validate the configuration and return an Automatic scanner."""
import aioautomatic
client = aioautomatic.Client(
client_id=config[CONF_CLIENT_ID],
client_secret=config[CONF_SECRET],
client_session=async_get_clientsession(hass),
request_kwargs={'timeout': DEFAULT_TIMEOUT})
try:
AutomaticDeviceScanner(hass, config, see)
except requests.HTTPError as err:
session = yield from client.create_session_from_password(
config[CONF_USERNAME], config[CONF_PASSWORD])
data = AutomaticData(hass, session, config[CONF_DEVICES], async_see)
except aioautomatic.exceptions.AutomaticError as err:
_LOGGER.error(str(err))
return False
yield from data.update()
return True
class AutomaticDeviceScanner(object):
"""A class representing an Automatic device."""
class AutomaticData(object):
"""A class representing an Automatic cloud service connection."""
def __init__(self, hass, config: dict, see) -> None:
def __init__(self, hass, session, devices, async_see):
"""Initialize the automatic device scanner."""
self.hass = hass
self._devices = config.get(CONF_DEVICES, None)
self._access_token_payload = {
'username': config.get(CONF_USERNAME),
'password': config.get(CONF_PASSWORD),
'client_id': config.get(CONF_CLIENT_ID),
'client_secret': config.get(CONF_SECRET),
'grant_type': 'password',
'scope': SCOPE
}
self._headers = None
self._token_expires = dt_util.now()
self.last_results = {}
self.last_trips = {}
self.see = see
self.devices = devices
self.session = session
self.async_see = async_see
self._update_info()
async_track_time_interval(hass, self.update, timedelta(seconds=30))
track_utc_time_change(self.hass, self._update_info,
second=range(0, 60, 30))
def _update_headers(self):
"""Get the access token from automatic."""
if self._headers is None or self._token_expires <= dt_util.now():
resp = requests.post(
URL_AUTHORIZE,
data=self._access_token_payload)
resp.raise_for_status()
json = resp.json()
access_token = json[ATTR_ACCESS_TOKEN]
self._token_expires = dt_util.now() + timedelta(
seconds=json[ATTR_EXPIRES_IN])
self._headers = {
'Authorization': 'Bearer {}'.format(access_token)
}
def _update_info(self, now=None) -> None:
@asyncio.coroutine
def update(self, now=None):
"""Update the device info."""
import aioautomatic
_LOGGER.debug('Updating devices %s', now)
self._update_headers()
response = requests.get(URL_VEHICLES, headers=self._headers)
try:
vehicles = yield from self.session.get_vehicles()
except aioautomatic.exceptions.AutomaticError as err:
_LOGGER.error(str(err))
return False
response.raise_for_status()
for vehicle in vehicles:
name = vehicle.display_name
if name is None:
name = ' '.join(filter(None, (
str(vehicle.year), vehicle.make, vehicle.model)))
self.last_results = [item for item in response.json()[ATTR_RESULTS]
if self._devices is None or item[
'display_name'] in self._devices]
if self.devices is not None and name not in self.devices:
continue
response = requests.get(URL_TRIPS, headers=self._headers)
self.hass.async_add_job(self.update_vehicle(vehicle, name))
if response.status_code == 200:
for trip in response.json()[ATTR_RESULTS]:
vehicle_id = _VEHICLE_ID_REGEX.match(
trip[ATTR_VEHICLE]).group(1)
if vehicle_id not in self.last_trips:
self.last_trips[vehicle_id] = trip
elif self.last_trips[vehicle_id][ATTR_ENDED_AT] < trip[
ATTR_ENDED_AT]:
self.last_trips[vehicle_id] = trip
@asyncio.coroutine
def update_vehicle(self, vehicle, name):
"""Updated the specified vehicle's data."""
import aioautomatic
for vehicle in self.last_results:
dev_id = vehicle.get('id')
host_name = vehicle.get('display_name')
kwargs = {
'dev_id': vehicle.id,
'host_name': name,
'mac': vehicle.id,
ATTR_ATTRIBUTES: {
'fuel_level': vehicle.fuel_level_percent,
}
}
attrs = {
'fuel_level': vehicle.get('fuel_level_percent')
}
trips = []
try:
# Get the most recent trip for this vehicle
trips = yield from self.session.get_trips(
vehicle=vehicle.id, limit=1)
except aioautomatic.exceptions.AutomaticError as err:
_LOGGER.error(str(err))
kwargs = {
'dev_id': dev_id,
'host_name': host_name,
'mac': dev_id,
ATTR_ATTRIBUTES: attrs
}
if trips:
end_location = trips[0].end_location
kwargs['gps'] = (end_location.lat, end_location.lon)
kwargs['gps_accuracy'] = end_location.accuracy_m
if dev_id in self.last_trips:
end_location = self.last_trips[dev_id][ATTR_END_LOCATION]
kwargs['gps'] = (end_location['lat'], end_location['lon'])
kwargs['gps_accuracy'] = end_location['accuracy_m']
self.see(**kwargs)
yield from self.async_see(**kwargs)

View file

@ -37,6 +37,9 @@ SoCo==0.12
# homeassistant.components.notify.twitter
TwitterAPI==2.4.5
# homeassistant.components.device_tracker.automatic
aioautomatic==0.1.1
# homeassistant.components.sensor.dnsip
aiodns==1.1.1

View file

@ -1,241 +1,90 @@
"""Test the automatic device tracker platform."""
import asyncio
import logging
import requests
import unittest
from unittest.mock import patch
from unittest.mock import patch, MagicMock
import aioautomatic
from homeassistant.components.device_tracker.automatic import (
URL_AUTHORIZE, URL_VEHICLES, URL_TRIPS, setup_scanner)
from tests.common import get_test_home_assistant
async_setup_scanner)
_LOGGER = logging.getLogger(__name__)
INVALID_USERNAME = 'bob'
VALID_USERNAME = 'jim'
PASSWORD = 'password'
CLIENT_ID = '12345'
CLIENT_SECRET = '54321'
FUEL_LEVEL = 77.2
LATITUDE = 32.82336
LONGITUDE = -117.23743
ACCURACY = 8
DISPLAY_NAME = 'My Vehicle'
@patch('aioautomatic.Client.create_session_from_password')
def test_invalid_credentials(mock_create_session, hass):
"""Test with invalid credentials."""
@asyncio.coroutine
def get_session(*args, **kwargs):
"""Return the test session."""
raise aioautomatic.exceptions.ForbiddenError()
mock_create_session.side_effect = get_session
config = {
'platform': 'automatic',
'username': 'bad_username',
'password': 'bad_password',
'client_id': 'client_id',
'secret': 'client_secret',
'devices': None,
}
result = hass.loop.run_until_complete(
async_setup_scanner(hass, config, None))
assert not result
def mocked_requests(*args, **kwargs):
"""Mock requests.get invocations."""
class MockResponse:
"""Class to represent a mocked response."""
@patch('aioautomatic.Client.create_session_from_password')
def test_valid_credentials(mock_create_session, hass):
"""Test with valid credentials."""
session = MagicMock()
vehicle = MagicMock()
trip = MagicMock()
mock_see = MagicMock()
def __init__(self, json_data, status_code):
"""Initialize the mock response class."""
self.json_data = json_data
self.status_code = status_code
vehicle.id = 'mock_id'
vehicle.display_name = 'mock_display_name'
vehicle.fuel_level_percent = 45.6
def json(self):
"""Return the json of the response."""
return self.json_data
trip.end_location.lat = 45.567
trip.end_location.lon = 34.345
trip.end_location.accuracy_m = 5.6
@property
def content(self):
"""Return the content of the response."""
return self.json()
@asyncio.coroutine
def get_session(*args, **kwargs):
"""Return the test session."""
return session
def raise_for_status(self):
"""Raise an HTTPError if status is not 200."""
if self.status_code != 200:
raise requests.HTTPError(self.status_code)
@asyncio.coroutine
def get_vehicles(*args, **kwargs):
"""Return list of test vehicles."""
return [vehicle]
data = kwargs.get('data')
@asyncio.coroutine
def get_trips(*args, **kwargs):
"""Return list of test trips."""
return [trip]
if data and data.get('username', None) == INVALID_USERNAME:
return MockResponse({
"error": "invalid_credentials"
}, 401)
elif str(args[0]).startswith(URL_AUTHORIZE):
return MockResponse({
"user": {
"sid": "sid",
"id": "id"
},
"token_type": "Bearer",
"access_token": "accesstoken",
"refresh_token": "refreshtoken",
"expires_in": 31521669,
"scope": ""
}, 200)
elif str(args[0]).startswith(URL_VEHICLES):
return MockResponse({
"_metadata": {
"count": 2,
"next": None,
"previous": None
},
"results": [
{
"url": "https://api.automatic.com/vehicle/vid/",
"id": "vid",
"created_at": "2016-03-05T20:05:16.240000Z",
"updated_at": "2016-08-29T01:52:59.597898Z",
"make": "Honda",
"model": "Element",
"year": 2007,
"submodel": "EX",
"display_name": DISPLAY_NAME,
"fuel_grade": "regular",
"fuel_level_percent": FUEL_LEVEL,
"active_dtcs": []
}]
}, 200)
elif str(args[0]).startswith(URL_TRIPS):
return MockResponse({
"_metadata": {
"count": 1594,
"next": "https://api.automatic.com/trip/?page=2",
"previous": None
},
"results": [
{
"url": "https://api.automatic.com/trip/tid1/",
"id": "tid1",
"driver": "https://api.automatic.com/user/uid/",
"user": "https://api.automatic.com/user/uid/",
"started_at": "2016-08-28T19:37:23.986000Z",
"ended_at": "2016-08-28T19:43:22.500000Z",
"distance_m": 3931.6,
"duration_s": 358.5,
"vehicle": "https://api.automatic.com/vehicle/vid/",
"start_location": {
"lat": 32.87336,
"lon": -117.22743,
"accuracy_m": 10
},
"start_address": {
"name": "123 Fake St, Nowhere, NV 12345",
"display_name": "123 Fake St, Nowhere, NV",
"street_number": "Unknown",
"street_name": "Fake St",
"city": "Nowhere",
"state": "NV",
"country": "US"
},
"end_location": {
"lat": LATITUDE,
"lon": LONGITUDE,
"accuracy_m": ACCURACY
},
"end_address": {
"name": "321 Fake St, Nowhere, NV 12345",
"display_name": "321 Fake St, Nowhere, NV",
"street_number": "Unknown",
"street_name": "Fake St",
"city": "Nowhere",
"state": "NV",
"country": "US"
},
"path": "path",
"vehicle_events": [],
"start_timezone": "America/Denver",
"end_timezone": "America/Denver",
"idling_time_s": 0,
"tags": []
},
{
"url": "https://api.automatic.com/trip/tid2/",
"id": "tid2",
"driver": "https://api.automatic.com/user/uid/",
"user": "https://api.automatic.com/user/uid/",
"started_at": "2016-08-28T18:48:00.727000Z",
"ended_at": "2016-08-28T18:55:25.800000Z",
"distance_m": 3969.1,
"duration_s": 445.1,
"vehicle": "https://api.automatic.com/vehicle/vid/",
"start_location": {
"lat": 32.87336,
"lon": -117.22743,
"accuracy_m": 11
},
"start_address": {
"name": "123 Fake St, Nowhere, NV, USA",
"display_name": "Fake St, Nowhere, NV",
"street_number": "123",
"street_name": "Fake St",
"city": "Nowhere",
"state": "NV",
"country": "US"
},
"end_location": {
"lat": 32.82336,
"lon": -117.23743,
"accuracy_m": 10
},
"end_address": {
"name": "321 Fake St, Nowhere, NV, USA",
"display_name": "Fake St, Nowhere, NV",
"street_number": "Unknown",
"street_name": "Fake St",
"city": "Nowhere",
"state": "NV",
"country": "US"
},
"path": "path",
"vehicle_events": [],
"start_timezone": "America/Denver",
"end_timezone": "America/Denver",
"idling_time_s": 0,
"tags": []
}
]
}, 200)
else:
_LOGGER.debug('UNKNOWN ROUTE')
mock_create_session.side_effect = get_session
session.get_vehicles.side_effect = get_vehicles
session.get_trips.side_effect = get_trips
config = {
'platform': 'automatic',
'username': 'bad_username',
'password': 'bad_password',
'client_id': 'client_id',
'secret': 'client_secret',
'devices': None,
}
result = hass.loop.run_until_complete(
async_setup_scanner(hass, config, mock_see))
class TestAutomatic(unittest.TestCase):
"""Test cases around the automatic device scanner."""
def see_mock(self, **kwargs):
"""Mock see function."""
self.assertEqual('vid', kwargs.get('dev_id'))
self.assertEqual(FUEL_LEVEL,
kwargs.get('attributes', {}).get('fuel_level'))
self.assertEqual((LATITUDE, LONGITUDE), kwargs.get('gps'))
self.assertEqual(ACCURACY, kwargs.get('gps_accuracy'))
def setUp(self):
"""Set up test data."""
self.hass = get_test_home_assistant()
def tearDown(self):
"""Tear down test data."""
self.hass.stop()
@patch('requests.get', side_effect=mocked_requests)
@patch('requests.post', side_effect=mocked_requests)
def test_invalid_credentials(self, mock_get, mock_post):
"""Test error is raised with invalid credentials."""
config = {
'platform': 'automatic',
'username': INVALID_USERNAME,
'password': PASSWORD,
'client_id': CLIENT_ID,
'secret': CLIENT_SECRET
}
self.assertFalse(setup_scanner(self.hass, config, self.see_mock))
@patch('requests.get', side_effect=mocked_requests)
@patch('requests.post', side_effect=mocked_requests)
def test_valid_credentials(self, mock_get, mock_post):
"""Test error is raised with invalid credentials."""
config = {
'platform': 'automatic',
'username': VALID_USERNAME,
'password': PASSWORD,
'client_id': CLIENT_ID,
'secret': CLIENT_SECRET
}
self.assertTrue(setup_scanner(self.hass, config, self.see_mock))
assert result
assert mock_see.called
assert len(mock_see.mock_calls) == 2
assert mock_see.mock_calls[0][2]['dev_id'] == 'mock_id'
assert mock_see.mock_calls[0][2]['mac'] == 'mock_id'
assert mock_see.mock_calls[0][2]['host_name'] == 'mock_display_name'
assert mock_see.mock_calls[0][2]['attributes'] == {'fuel_level': 45.6}
assert mock_see.mock_calls[0][2]['gps'] == (45.567, 34.345)
assert mock_see.mock_calls[0][2]['gps_accuracy'] == 5.6