Check buckets/dbs for validity during Influx sensor startup (#37391)
* Check buckets/dbs for validity during sensor startup * Empty array instead of none
This commit is contained in:
parent
9964bd40ed
commit
d60c52bbc7
4 changed files with 73 additions and 11 deletions
|
@ -282,6 +282,7 @@ def _generate_event_to_json(conf: Dict) -> Callable[[Dict], str]:
|
|||
class InfluxClient:
|
||||
"""An InfluxDB client wrapper for V1 or V2."""
|
||||
|
||||
data_repositories: List[str]
|
||||
write: Callable[[str], None]
|
||||
query: Callable[[str, str], List[Any]]
|
||||
close: Callable[[], None]
|
||||
|
@ -330,20 +331,24 @@ def get_influx_connection(conf, test_write=False, test_read=False):
|
|||
"""Close V2 influx client."""
|
||||
influx.close()
|
||||
|
||||
influx_client = InfluxClient(write_v2, query_v2, close_v2)
|
||||
buckets = []
|
||||
if test_write:
|
||||
# Try to write [] to influx. If we can connect and creds are valid
|
||||
# Then invalid inputs is returned. Anything else is a broken config
|
||||
try:
|
||||
influx_client.write([])
|
||||
write_v2([])
|
||||
except ValueError:
|
||||
pass
|
||||
write_api = influx.write_api(write_options=ASYNCHRONOUS)
|
||||
|
||||
if test_read:
|
||||
influx_client.query(TEST_QUERY_V2)
|
||||
tables = query_v2(TEST_QUERY_V2)
|
||||
if tables and tables[0].records:
|
||||
buckets = [bucket.values["name"] for bucket in tables[0].records]
|
||||
else:
|
||||
buckets = []
|
||||
|
||||
return influx_client
|
||||
return InfluxClient(buckets, write_v2, query_v2, close_v2)
|
||||
|
||||
# Else it's a V1 client
|
||||
kwargs[CONF_VERIFY_SSL] = conf[CONF_VERIFY_SSL]
|
||||
|
@ -405,14 +410,14 @@ def get_influx_connection(conf, test_write=False, test_read=False):
|
|||
"""Close the V1 Influx client."""
|
||||
influx.close()
|
||||
|
||||
influx_client = InfluxClient(write_v1, query_v1, close_v1)
|
||||
databases = []
|
||||
if test_write:
|
||||
influx_client.write([])
|
||||
write_v1([])
|
||||
|
||||
if test_read:
|
||||
influx_client.query(TEST_QUERY_V1)
|
||||
databases = [db["name"] for db in query_v1(TEST_QUERY_V1)]
|
||||
|
||||
return influx_client
|
||||
return InfluxClient(databases, write_v1, query_v1, close_v1)
|
||||
|
||||
|
||||
def setup(hass, config):
|
||||
|
|
|
@ -76,7 +76,7 @@ BATCH_BUFFER_SIZE = 100
|
|||
LANGUAGE_INFLUXQL = "influxQL"
|
||||
LANGUAGE_FLUX = "flux"
|
||||
TEST_QUERY_V1 = "SHOW DATABASES;"
|
||||
TEST_QUERY_V2 = f"buckets() {DEFAULT_FUNCTION_FLUX}"
|
||||
TEST_QUERY_V2 = "buckets()"
|
||||
CODE_INVALID_INPUTS = 400
|
||||
|
||||
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=60)
|
||||
|
@ -99,6 +99,14 @@ CLIENT_ERROR_V1 = (
|
|||
"Please check that the database, username and password are correct and "
|
||||
"that the specified user has the correct permissions set."
|
||||
)
|
||||
NO_BUCKET_ERROR = (
|
||||
"InfluxDB bucket '%s' cannot be found. "
|
||||
"Check the name is correct and the token has access to it."
|
||||
)
|
||||
NO_DATABASE_ERROR = (
|
||||
"InfluxDB database '%s' cannot be found. "
|
||||
"Check the name is correct and the user has access to it."
|
||||
)
|
||||
WRITE_ERROR = "Could not write '%s' to influx due to '%s'."
|
||||
QUERY_ERROR = (
|
||||
"Could not execute query '%s' due to '%s'. Check the syntax of your query."
|
||||
|
|
|
@ -46,6 +46,8 @@ from .const import (
|
|||
LANGUAGE_FLUX,
|
||||
LANGUAGE_INFLUXQL,
|
||||
MIN_TIME_BETWEEN_UPDATES,
|
||||
NO_BUCKET_ERROR,
|
||||
NO_DATABASE_ERROR,
|
||||
QUERY_MULTIPLE_RESULTS_MESSAGE,
|
||||
QUERY_NO_RESULTS_MESSAGE,
|
||||
RENDERING_QUERY_ERROR_MESSAGE,
|
||||
|
@ -147,8 +149,20 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
|
|||
_LOGGER.error(exc)
|
||||
raise PlatformNotReady()
|
||||
|
||||
queries = config[CONF_QUERIES_FLUX if CONF_QUERIES_FLUX in config else CONF_QUERIES]
|
||||
entities = [InfluxSensor(hass, influx, query) for query in queries]
|
||||
entities = []
|
||||
if CONF_QUERIES_FLUX in config:
|
||||
for query in config[CONF_QUERIES_FLUX]:
|
||||
if query[CONF_BUCKET] in influx.data_repositories:
|
||||
entities.append(InfluxSensor(hass, influx, query))
|
||||
else:
|
||||
_LOGGER.error(NO_BUCKET_ERROR, query[CONF_BUCKET])
|
||||
else:
|
||||
for query in config[CONF_QUERIES]:
|
||||
if query[CONF_DB_NAME] in influx.data_repositories:
|
||||
entities.append(InfluxSensor(hass, influx, query))
|
||||
else:
|
||||
_LOGGER.error(NO_DATABASE_ERROR, query[CONF_DB_NAME])
|
||||
|
||||
add_entities(entities, update_before_add=True)
|
||||
|
||||
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, lambda _: influx.close())
|
||||
|
|
|
@ -551,3 +551,38 @@ async def test_connection_error_at_startup(
|
|||
async_fire_time_changed(hass, new_time)
|
||||
await hass.async_block_till_done()
|
||||
assert hass.states.get(expected_sensor) is not None
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"mock_client, config_ext, queries, set_query_mock",
|
||||
[
|
||||
(
|
||||
DEFAULT_API_VERSION,
|
||||
{"database": "bad_db"},
|
||||
BASE_V1_QUERY,
|
||||
_set_query_mock_v1,
|
||||
),
|
||||
(
|
||||
API_VERSION_2,
|
||||
{
|
||||
"api_version": API_VERSION_2,
|
||||
"organization": "org",
|
||||
"token": "token",
|
||||
"bucket": "bad_bucket",
|
||||
},
|
||||
BASE_V2_QUERY,
|
||||
_set_query_mock_v2,
|
||||
),
|
||||
],
|
||||
indirect=["mock_client"],
|
||||
)
|
||||
async def test_data_repository_not_found(
|
||||
hass, caplog, mock_client, config_ext, queries, set_query_mock,
|
||||
):
|
||||
"""Test sensor is not setup when bucket not available."""
|
||||
set_query_mock(mock_client)
|
||||
await _setup(hass, config_ext, queries, [])
|
||||
assert hass.states.get("sensor.test") is None
|
||||
assert (
|
||||
len([record for record in caplog.records if record.levelname == "ERROR"]) == 1
|
||||
)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue