Add security layer to send file output things (#8189)

* Add security layer to send file output things

* Make telegram secure

* fix lint

* fix handling

* invert check

* resolve relative paths

* add test for relative paths

* fix lint

* fix tests

* Address paulus comments

* fix style

* fix tests

* Add more tests

* fix tests

* fix tests

* fix test p2

* fix lint

* fix tests

* Make it available for windows

* Change name / address comments

* fix set

* fix test

* fix tests

* fix test

* fix lint
This commit is contained in:
Pascal Vizeli 2017-06-26 00:10:30 +02:00 committed by GitHub
parent 2f2952e0ec
commit 2dd7f0616e
8 changed files with 86 additions and 6 deletions

View file

@ -161,7 +161,7 @@ SERVICE_MAP = {
}
def load_data(url=None, filepath=None, username=None, password=None,
def load_data(hass, url=None, filepath=None, username=None, password=None,
authentication=None, num_retries=5):
"""Load photo/document into ByteIO/File container from a source."""
try:
@ -191,8 +191,10 @@ def load_data(url=None, filepath=None, username=None, password=None,
_LOGGER.warning("Can't load photo in %s after %s retries.",
url, retry_num)
elif filepath is not None:
# Load photo from file
if hass.config.is_allowed_path(filepath):
return open(filepath, "rb")
_LOGGER.warning("'%s' are not secure to load data from!", filepath)
else:
_LOGGER.warning("Can't load photo. No photo found in params!")
@ -510,6 +512,7 @@ class TelegramNotificationService:
caption = kwargs.get(ATTR_CAPTION)
func_send = self.bot.sendPhoto if is_photo else self.bot.sendDocument
file_content = load_data(
self.hass,
url=kwargs.get(ATTR_URL),
filepath=kwargs.get(ATTR_FILE),
username=kwargs.get(ATTR_USERNAME),

View file

@ -16,7 +16,8 @@ from homeassistant.const import (
CONF_LATITUDE, CONF_LONGITUDE, CONF_NAME, CONF_PACKAGES, CONF_UNIT_SYSTEM,
CONF_TIME_ZONE, CONF_ELEVATION, CONF_UNIT_SYSTEM_METRIC,
CONF_UNIT_SYSTEM_IMPERIAL, CONF_TEMPERATURE_UNIT, TEMP_CELSIUS,
__version__, CONF_CUSTOMIZE, CONF_CUSTOMIZE_DOMAIN, CONF_CUSTOMIZE_GLOB)
__version__, CONF_CUSTOMIZE, CONF_CUSTOMIZE_DOMAIN, CONF_CUSTOMIZE_GLOB,
CONF_WHITELIST_EXTERNAL_DIRS)
from homeassistant.core import callback, DOMAIN as CONF_CORE
from homeassistant.exceptions import HomeAssistantError
from homeassistant.loader import get_component, get_platform
@ -130,6 +131,9 @@ CORE_CONFIG_SCHEMA = CUSTOMIZE_CONFIG_SCHEMA.extend({
vol.Optional(CONF_TEMPERATURE_UNIT): cv.temperature_unit,
CONF_UNIT_SYSTEM: cv.unit_system,
CONF_TIME_ZONE: cv.time_zone,
vol.Optional(CONF_WHITELIST_EXTERNAL_DIRS):
# pylint: disable=no-value-for-parameter
vol.All(cv.ensure_list, [vol.IsDir()]),
vol.Optional(CONF_PACKAGES, default={}): PACKAGES_CONFIG_SCHEMA,
})
@ -366,6 +370,12 @@ def async_process_ha_core_config(hass, config):
if CONF_TIME_ZONE in config:
set_time_zone(config.get(CONF_TIME_ZONE))
# init whitelist external dir
hac.whitelist_external_dirs = set((hass.config.path('www'),))
if CONF_WHITELIST_EXTERNAL_DIRS in config:
hac.whitelist_external_dirs.update(
set(config[CONF_WHITELIST_EXTERNAL_DIRS]))
# Customize
cust_exact = dict(config[CONF_CUSTOMIZE])
cust_domain = dict(config[CONF_CUSTOMIZE_DOMAIN])

View file

@ -161,6 +161,7 @@ CONF_VALUE_TEMPLATE = 'value_template'
CONF_VERIFY_SSL = 'verify_ssl'
CONF_WEEKDAY = 'weekday'
CONF_WHITELIST = 'whitelist'
CONF_WHITELIST_EXTERNAL_DIRS = 'whitelist_external_dirs'
CONF_WHITE_VALUE = 'white_value'
CONF_XY = 'xy'
CONF_ZONE = 'zone'

View file

@ -10,6 +10,7 @@ from concurrent.futures import ThreadPoolExecutor
import enum
import logging
import os
import pathlib
import re
import sys
import threading
@ -1053,6 +1054,9 @@ class Config(object):
# Directory that holds the configuration
self.config_dir = None
# List of allowed external dirs to access
self.whitelist_external_dirs = set()
def distance(self: object, lat: float, lon: float) -> float:
"""Calculate distance from Home Assistant.
@ -1070,6 +1074,23 @@ class Config(object):
raise HomeAssistantError("config_dir is not set")
return os.path.join(self.config_dir, *path)
def is_allowed_path(self, path: str) -> bool:
"""Check if the path is valid for access from outside."""
parent = pathlib.Path(path).parent
try:
parent.resolve() # pylint: disable=no-member
except (FileNotFoundError, RuntimeError, PermissionError):
return False
for whitelisted_path in self.whitelist_external_dirs:
try:
parent.relative_to(whitelisted_path)
return True
except ValueError:
pass
return False
def as_dict(self):
"""Create a dictionary representation of this dict.
@ -1086,6 +1107,7 @@ class Config(object):
'time_zone': time_zone.zone,
'components': self.components,
'config_dir': self.config_dir,
'whitelist_external_dirs': self.whitelist_external_dirs,
'version': __version__
}

View file

@ -210,6 +210,9 @@ def test_api_get_config(hass, mock_api_client):
result = yield from resp.json()
if 'components' in result:
result['components'] = set(result['components'])
if 'whitelist_external_dirs' in result:
result['whitelist_external_dirs'] = \
set(result['whitelist_external_dirs'])
assert hass.config.as_dict() == result

View file

@ -278,6 +278,9 @@ def test_get_config(hass, websocket_client):
if 'components' in msg['result']:
msg['result']['components'] = set(msg['result']['components'])
if 'whitelist_external_dirs' in msg['result']:
msg['result']['whitelist_external_dirs'] = \
set(msg['result']['whitelist_external_dirs'])
assert msg['result'] == hass.config.as_dict()

View file

@ -363,6 +363,7 @@ class TestConfig(unittest.TestCase):
'name': 'Huis',
CONF_UNIT_SYSTEM: CONF_UNIT_SYSTEM_IMPERIAL,
'time_zone': 'America/New_York',
'whitelist_external_dirs': '/tmp',
}), self.hass.loop).result()
assert self.hass.config.latitude == 60
@ -371,6 +372,8 @@ class TestConfig(unittest.TestCase):
assert self.hass.config.location_name == 'Huis'
assert self.hass.config.units.name == CONF_UNIT_SYSTEM_IMPERIAL
assert self.hass.config.time_zone.zone == 'America/New_York'
assert len(self.hass.config.whitelist_external_dirs) == 2
assert '/tmp' in self.hass.config.whitelist_external_dirs
def test_loading_configuration_temperature_unit(self):
"""Test backward compatibility when loading core config."""
@ -428,6 +431,7 @@ class TestConfig(unittest.TestCase):
mock_elevation):
"""Test config remains unchanged if discovery fails."""
self.hass.config = Config()
self.hass.config.config_dir = "/test/config"
run_coroutine_threadsafe(
config_util.async_process_ha_core_config(
@ -441,6 +445,8 @@ class TestConfig(unittest.TestCase):
assert self.hass.config.location_name == blankConfig.location_name
assert self.hass.config.units == blankConfig.units
assert self.hass.config.time_zone == blankConfig.time_zone
assert len(self.hass.config.whitelist_external_dirs) == 1
assert "/test/config/www" in self.hass.config.whitelist_external_dirs
@mock.patch('asyncio.create_subprocess_exec')
def test_check_ha_config_file_correct(self, mock_create):

View file

@ -1,11 +1,13 @@
"""Test to verify that Home Assistant core works."""
# pylint: disable=protected-access
import asyncio
import logging
import os
import unittest
from unittest.mock import patch, MagicMock, sentinel
from datetime import datetime, timedelta
from tempfile import TemporaryDirectory
import logging
import pytz
import pytest
@ -796,11 +798,41 @@ class TestConfig(unittest.TestCase):
'time_zone': 'UTC',
'components': set(),
'config_dir': '/tmp/ha-config',
'whitelist_external_dirs': set(),
'version': __version__,
}
self.assertEqual(expected, self.config.as_dict())
def test_is_allowed_path(self):
"""Test is_allowed_path method."""
with TemporaryDirectory() as tmp_dir:
self.config.whitelist_external_dirs = set((
tmp_dir,
))
test_file = os.path.join(tmp_dir, "test.jpg")
with open(test_file, "w") as tmp_file:
tmp_file.write("test")
valid = [
test_file,
]
for path in valid:
assert self.config.is_allowed_path(path)
self.config.whitelist_external_dirs = set(('/home',))
unvalid = [
"/hass/config/secure",
"/etc/passwd",
"/root/secure_file",
"/hass/config/test/../../../etc/passwd",
test_file,
]
for path in unvalid:
assert not self.config.is_allowed_path(path)
@patch('homeassistant.core.monotonic')
def test_create_timer(mock_monotonic, loop):