Add Influxdb precision option (#38454)
This commit is contained in:
parent
62c4e072f5
commit
052e8f0983
3 changed files with 111 additions and 5 deletions
|
@ -57,6 +57,7 @@ from .const import (
|
|||
CONF_PASSWORD,
|
||||
CONF_PATH,
|
||||
CONF_PORT,
|
||||
CONF_PRECISION,
|
||||
CONF_RETRY_COUNT,
|
||||
CONF_SSL,
|
||||
CONF_TAGS,
|
||||
|
@ -307,13 +308,13 @@ def get_influx_connection(conf, test_write=False, test_read=False):
|
|||
kwargs = {
|
||||
CONF_TIMEOUT: TIMEOUT,
|
||||
}
|
||||
precision = conf.get(CONF_PRECISION)
|
||||
|
||||
if conf[CONF_API_VERSION] == API_VERSION_2:
|
||||
kwargs[CONF_URL] = conf[CONF_URL]
|
||||
kwargs[CONF_TOKEN] = conf[CONF_TOKEN]
|
||||
kwargs[INFLUX_CONF_ORG] = conf[CONF_ORG]
|
||||
bucket = conf.get(CONF_BUCKET)
|
||||
|
||||
influx = InfluxDBClientV2(**kwargs)
|
||||
query_api = influx.query_api()
|
||||
initial_write_mode = SYNCHRONOUS if test_write else ASYNCHRONOUS
|
||||
|
@ -322,7 +323,7 @@ def get_influx_connection(conf, test_write=False, test_read=False):
|
|||
def write_v2(json):
|
||||
"""Write data to V2 influx."""
|
||||
try:
|
||||
write_api.write(bucket=bucket, record=json)
|
||||
write_api.write(bucket=bucket, record=json, write_precision=precision)
|
||||
except (urllib3.exceptions.HTTPError, OSError) as exc:
|
||||
raise ConnectionError(CONNECTION_ERROR % exc) from exc
|
||||
except ApiException as exc:
|
||||
|
@ -393,7 +394,7 @@ def get_influx_connection(conf, test_write=False, test_read=False):
|
|||
def write_v1(json):
|
||||
"""Write data to V1 influx."""
|
||||
try:
|
||||
influx.write_points(json)
|
||||
influx.write_points(json, time_precision=precision)
|
||||
except (
|
||||
requests.exceptions.RequestException,
|
||||
exceptions.InfluxDBServerError,
|
||||
|
|
|
@ -29,6 +29,7 @@ CONF_COMPONENT_CONFIG_GLOB = "component_config_glob"
|
|||
CONF_COMPONENT_CONFIG_DOMAIN = "component_config_domain"
|
||||
CONF_RETRY_COUNT = "max_retries"
|
||||
CONF_IGNORE_ATTRIBUTES = "ignore_attributes"
|
||||
CONF_PRECISION = "precision"
|
||||
|
||||
CONF_LANGUAGE = "language"
|
||||
CONF_QUERIES = "queries"
|
||||
|
@ -136,6 +137,7 @@ COMPONENT_CONFIG_SCHEMA_CONNECTION = {
|
|||
vol.Optional(CONF_PATH): cv.string,
|
||||
vol.Optional(CONF_PORT): cv.port,
|
||||
vol.Optional(CONF_SSL): cv.boolean,
|
||||
vol.Optional(CONF_PRECISION): vol.In(["ms", "s", "us", "ns"]),
|
||||
# Connection config for V1 API only.
|
||||
vol.Inclusive(CONF_USERNAME, "authentication"): cv.string,
|
||||
vol.Inclusive(CONF_PASSWORD, "authentication"): cv.string,
|
||||
|
|
|
@ -62,9 +62,11 @@ def mock_client_fixture(request):
|
|||
def get_mock_call_fixture(request):
|
||||
"""Get version specific lambda to make write API call mock."""
|
||||
if request.param == influxdb.API_VERSION_2:
|
||||
return lambda body: call(bucket=DEFAULT_BUCKET, record=body)
|
||||
return lambda body, precision=None: call(
|
||||
bucket=DEFAULT_BUCKET, record=body, write_precision=precision
|
||||
)
|
||||
# pylint: disable=unnecessary-lambda
|
||||
return lambda body: call(body)
|
||||
return lambda body, precision=None: call(body, time_precision=precision)
|
||||
|
||||
|
||||
def _get_write_api_mock_v1(mock_influx_client):
|
||||
|
@ -1474,3 +1476,104 @@ async def test_invalid_inputs_error(
|
|||
== 1
|
||||
)
|
||||
sleep.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"mock_client, config_ext, get_write_api, get_mock_call, precision",
|
||||
[
|
||||
(
|
||||
influxdb.DEFAULT_API_VERSION,
|
||||
BASE_V1_CONFIG,
|
||||
_get_write_api_mock_v1,
|
||||
influxdb.DEFAULT_API_VERSION,
|
||||
"ns",
|
||||
),
|
||||
(
|
||||
influxdb.API_VERSION_2,
|
||||
BASE_V2_CONFIG,
|
||||
_get_write_api_mock_v2,
|
||||
influxdb.API_VERSION_2,
|
||||
"ns",
|
||||
),
|
||||
(
|
||||
influxdb.DEFAULT_API_VERSION,
|
||||
BASE_V1_CONFIG,
|
||||
_get_write_api_mock_v1,
|
||||
influxdb.DEFAULT_API_VERSION,
|
||||
"us",
|
||||
),
|
||||
(
|
||||
influxdb.API_VERSION_2,
|
||||
BASE_V2_CONFIG,
|
||||
_get_write_api_mock_v2,
|
||||
influxdb.API_VERSION_2,
|
||||
"us",
|
||||
),
|
||||
(
|
||||
influxdb.DEFAULT_API_VERSION,
|
||||
BASE_V1_CONFIG,
|
||||
_get_write_api_mock_v1,
|
||||
influxdb.DEFAULT_API_VERSION,
|
||||
"ms",
|
||||
),
|
||||
(
|
||||
influxdb.API_VERSION_2,
|
||||
BASE_V2_CONFIG,
|
||||
_get_write_api_mock_v2,
|
||||
influxdb.API_VERSION_2,
|
||||
"ms",
|
||||
),
|
||||
(
|
||||
influxdb.DEFAULT_API_VERSION,
|
||||
BASE_V1_CONFIG,
|
||||
_get_write_api_mock_v1,
|
||||
influxdb.DEFAULT_API_VERSION,
|
||||
"s",
|
||||
),
|
||||
(
|
||||
influxdb.API_VERSION_2,
|
||||
BASE_V2_CONFIG,
|
||||
_get_write_api_mock_v2,
|
||||
influxdb.API_VERSION_2,
|
||||
"s",
|
||||
),
|
||||
],
|
||||
indirect=["mock_client", "get_mock_call"],
|
||||
)
|
||||
async def test_precision(
|
||||
hass, mock_client, config_ext, get_write_api, get_mock_call, precision
|
||||
):
|
||||
"""Test the precision setup."""
|
||||
config = {
|
||||
"precision": precision,
|
||||
}
|
||||
config.update(config_ext)
|
||||
handler_method = await _setup(hass, mock_client, config, get_write_api)
|
||||
|
||||
value = "1.9"
|
||||
attrs = {
|
||||
"unit_of_measurement": "foobars",
|
||||
}
|
||||
state = MagicMock(
|
||||
state=value,
|
||||
domain="fake",
|
||||
entity_id="fake.entity-id",
|
||||
object_id="entity",
|
||||
attributes=attrs,
|
||||
)
|
||||
event = MagicMock(data={"new_state": state}, time_fired=12345)
|
||||
body = [
|
||||
{
|
||||
"measurement": "foobars",
|
||||
"tags": {"domain": "fake", "entity_id": "entity"},
|
||||
"time": 12345,
|
||||
"fields": {"value": float(value)},
|
||||
}
|
||||
]
|
||||
handler_method(event)
|
||||
hass.data[influxdb.DOMAIN].block_till_done()
|
||||
|
||||
write_api = get_write_api(mock_client)
|
||||
assert write_api.call_count == 1
|
||||
assert write_api.call_args == get_mock_call(body, precision)
|
||||
write_api.reset_mock()
|
||||
|
|
Loading…
Add table
Reference in a new issue