Cleanup modbus test mocks (#115412)
This commit is contained in:
parent
dcd61ac086
commit
3202743b6c
9 changed files with 96 additions and 137 deletions
|
@ -47,12 +47,36 @@ class ReadResult:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(name="check_config_loaded")
|
||||||
|
def check_config_loaded_fixture():
|
||||||
|
"""Set default for check_config_loaded."""
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(name="register_words")
|
||||||
|
def register_words_fixture():
|
||||||
|
"""Set default for register_words."""
|
||||||
|
return [0x00, 0x00]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(name="config_addon")
|
||||||
|
def config_addon_fixture():
|
||||||
|
"""Add extra configuration items."""
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(name="do_exception")
|
||||||
|
def do_exception_fixture():
|
||||||
|
"""Remove side_effect to pymodbus calls."""
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name="mock_pymodbus")
|
@pytest.fixture(name="mock_pymodbus")
|
||||||
def mock_pymodbus_fixture():
|
def mock_pymodbus_fixture(do_exception, register_words):
|
||||||
"""Mock pymodbus."""
|
"""Mock pymodbus."""
|
||||||
mock_pb = mock.AsyncMock()
|
mock_pb = mock.AsyncMock()
|
||||||
mock_pb.close = mock.MagicMock()
|
mock_pb.close = mock.MagicMock()
|
||||||
read_result = ReadResult([])
|
read_result = ReadResult(register_words if register_words else [])
|
||||||
mock_pb.read_coils.return_value = read_result
|
mock_pb.read_coils.return_value = read_result
|
||||||
mock_pb.read_discrete_inputs.return_value = read_result
|
mock_pb.read_discrete_inputs.return_value = read_result
|
||||||
mock_pb.read_input_registers.return_value = read_result
|
mock_pb.read_input_registers.return_value = read_result
|
||||||
|
@ -61,6 +85,16 @@ def mock_pymodbus_fixture():
|
||||||
mock_pb.write_registers.return_value = read_result
|
mock_pb.write_registers.return_value = read_result
|
||||||
mock_pb.write_coil.return_value = read_result
|
mock_pb.write_coil.return_value = read_result
|
||||||
mock_pb.write_coils.return_value = read_result
|
mock_pb.write_coils.return_value = read_result
|
||||||
|
if do_exception:
|
||||||
|
exc = ModbusException("mocked pymodbus exception")
|
||||||
|
mock_pb.read_coils.side_effect = exc
|
||||||
|
mock_pb.read_discrete_inputs.side_effect = exc
|
||||||
|
mock_pb.read_input_registers.side_effect = exc
|
||||||
|
mock_pb.read_holding_registers.side_effect = exc
|
||||||
|
mock_pb.write_register.side_effect = exc
|
||||||
|
mock_pb.write_registers.side_effect = exc
|
||||||
|
mock_pb.write_coil.side_effect = exc
|
||||||
|
mock_pb.write_coils.side_effect = exc
|
||||||
with (
|
with (
|
||||||
mock.patch(
|
mock.patch(
|
||||||
"homeassistant.components.modbus.modbus.AsyncModbusTcpClient",
|
"homeassistant.components.modbus.modbus.AsyncModbusTcpClient",
|
||||||
|
@ -81,33 +115,9 @@ def mock_pymodbus_fixture():
|
||||||
yield mock_pb
|
yield mock_pb
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name="check_config_loaded")
|
|
||||||
def check_config_loaded_fixture():
|
|
||||||
"""Set default for check_config_loaded."""
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name="register_words")
|
|
||||||
def register_words_fixture():
|
|
||||||
"""Set default for register_words."""
|
|
||||||
return [0x00, 0x00]
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name="config_addon")
|
|
||||||
def config_addon_fixture():
|
|
||||||
"""Add entra configuration items."""
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name="do_exception")
|
|
||||||
def do_exception_fixture():
|
|
||||||
"""Remove side_effect to pymodbus calls."""
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name="mock_modbus")
|
@pytest.fixture(name="mock_modbus")
|
||||||
async def mock_modbus_fixture(
|
async def mock_modbus_fixture(
|
||||||
hass, caplog, register_words, check_config_loaded, config_addon, do_config
|
hass, caplog, check_config_loaded, config_addon, do_config, mock_pymodbus
|
||||||
):
|
):
|
||||||
"""Load integration modbus using mocked pymodbus."""
|
"""Load integration modbus using mocked pymodbus."""
|
||||||
conf = copy.deepcopy(do_config)
|
conf = copy.deepcopy(do_config)
|
||||||
|
@ -132,57 +142,23 @@ async def mock_modbus_fixture(
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
mock_pb = mock.AsyncMock()
|
now = dt_util.utcnow()
|
||||||
mock_pb.close = mock.MagicMock()
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"homeassistant.components.modbus.modbus.AsyncModbusTcpClient",
|
"homeassistant.helpers.event.dt_util.utcnow",
|
||||||
return_value=mock_pb,
|
return_value=now,
|
||||||
autospec=True,
|
autospec=True,
|
||||||
):
|
):
|
||||||
now = dt_util.utcnow()
|
result = await async_setup_component(hass, DOMAIN, config)
|
||||||
with mock.patch(
|
assert result or not check_config_loaded
|
||||||
"homeassistant.helpers.event.dt_util.utcnow",
|
await hass.async_block_till_done()
|
||||||
return_value=now,
|
return mock_pymodbus
|
||||||
autospec=True,
|
|
||||||
):
|
|
||||||
result = await async_setup_component(hass, DOMAIN, config)
|
|
||||||
assert result or not check_config_loaded
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
yield mock_pb
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name="mock_pymodbus_exception")
|
|
||||||
async def mock_pymodbus_exception_fixture(hass, do_exception, mock_modbus):
|
|
||||||
"""Trigger update call with time_changed event."""
|
|
||||||
if do_exception:
|
|
||||||
exc = ModbusException("fail read_coils")
|
|
||||||
mock_modbus.read_coils.side_effect = exc
|
|
||||||
mock_modbus.read_discrete_inputs.side_effect = exc
|
|
||||||
mock_modbus.read_input_registers.side_effect = exc
|
|
||||||
mock_modbus.read_holding_registers.side_effect = exc
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name="mock_pymodbus_return")
|
|
||||||
async def mock_pymodbus_return_fixture(hass, register_words, mock_modbus):
|
|
||||||
"""Trigger update call with time_changed event."""
|
|
||||||
read_result = ReadResult(register_words if register_words else [])
|
|
||||||
mock_modbus.read_coils.return_value = read_result
|
|
||||||
mock_modbus.read_discrete_inputs.return_value = read_result
|
|
||||||
mock_modbus.read_input_registers.return_value = read_result
|
|
||||||
mock_modbus.read_holding_registers.return_value = read_result
|
|
||||||
mock_modbus.write_register.return_value = read_result
|
|
||||||
mock_modbus.write_registers.return_value = read_result
|
|
||||||
mock_modbus.write_coil.return_value = read_result
|
|
||||||
mock_modbus.write_coils.return_value = read_result
|
|
||||||
return mock_modbus
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name="mock_do_cycle")
|
@pytest.fixture(name="mock_do_cycle")
|
||||||
async def mock_do_cycle_fixture(
|
async def mock_do_cycle_fixture(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
freezer: FrozenDateTimeFactory,
|
freezer: FrozenDateTimeFactory,
|
||||||
mock_pymodbus_exception,
|
mock_modbus,
|
||||||
mock_pymodbus_return,
|
|
||||||
) -> FrozenDateTimeFactory:
|
) -> FrozenDateTimeFactory:
|
||||||
"""Trigger update call with time_changed event."""
|
"""Trigger update call with time_changed event."""
|
||||||
freezer.tick(timedelta(seconds=1))
|
freezer.tick(timedelta(seconds=1))
|
||||||
|
@ -207,11 +183,12 @@ async def mock_test_state_fixture(hass, request):
|
||||||
return request.param
|
return request.param
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name="mock_ha")
|
@pytest.fixture(name="mock_modbus_ha")
|
||||||
async def mock_ha_fixture(hass, mock_pymodbus_return):
|
async def mock_modbus_ha_fixture(hass, mock_modbus):
|
||||||
"""Load homeassistant to allow service calls."""
|
"""Load homeassistant to allow service calls."""
|
||||||
assert await async_setup_component(hass, "homeassistant", {})
|
assert await async_setup_component(hass, "homeassistant", {})
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
return mock_modbus
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name="caplog_setup_text")
|
@pytest.fixture(name="caplog_setup_text")
|
||||||
|
|
|
@ -207,7 +207,7 @@ async def test_all_binary_sensor(hass: HomeAssistant, expected, mock_do_cycle) -
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_service_binary_sensor_update(
|
async def test_service_binary_sensor_update(
|
||||||
hass: HomeAssistant, mock_modbus, mock_ha
|
hass: HomeAssistant, mock_modbus_ha
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run test for service homeassistant.update_entity."""
|
"""Run test for service homeassistant.update_entity."""
|
||||||
|
|
||||||
|
@ -217,7 +217,7 @@ async def test_service_binary_sensor_update(
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert hass.states.get(ENTITY_ID).state == STATE_OFF
|
assert hass.states.get(ENTITY_ID).state == STATE_OFF
|
||||||
|
|
||||||
mock_modbus.read_coils.return_value = ReadResult([0x01])
|
mock_modbus_ha.read_coils.return_value = ReadResult([0x01])
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
|
|
|
@ -496,10 +496,10 @@ async def test_temperature_error(hass: HomeAssistant, expected, mock_do_cycle) -
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_service_climate_update(
|
async def test_service_climate_update(
|
||||||
hass: HomeAssistant, mock_modbus, mock_ha, result, register_words
|
hass: HomeAssistant, mock_modbus_ha, result, register_words
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run test for service homeassistant.update_entity."""
|
"""Run test for service homeassistant.update_entity."""
|
||||||
mock_modbus.read_holding_registers.return_value = ReadResult(register_words)
|
mock_modbus_ha.read_holding_registers.return_value = ReadResult(register_words)
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
|
@ -611,10 +611,10 @@ async def test_service_climate_update(
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_service_climate_fan_update(
|
async def test_service_climate_fan_update(
|
||||||
hass: HomeAssistant, mock_modbus, mock_ha, result, register_words
|
hass: HomeAssistant, mock_modbus_ha, result, register_words
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run test for service homeassistant.update_entity."""
|
"""Run test for service homeassistant.update_entity."""
|
||||||
mock_modbus.read_holding_registers.return_value = ReadResult(register_words)
|
mock_modbus_ha.read_holding_registers.return_value = ReadResult(register_words)
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
|
@ -751,10 +751,10 @@ async def test_service_climate_fan_update(
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_service_climate_swing_update(
|
async def test_service_climate_swing_update(
|
||||||
hass: HomeAssistant, mock_modbus, mock_ha, result, register_words
|
hass: HomeAssistant, mock_modbus_ha, result, register_words
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run test for service homeassistant.update_entity."""
|
"""Run test for service homeassistant.update_entity."""
|
||||||
mock_modbus.read_holding_registers.return_value = ReadResult(register_words)
|
mock_modbus_ha.read_holding_registers.return_value = ReadResult(register_words)
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
|
@ -844,10 +844,10 @@ async def test_service_climate_swing_update(
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_service_climate_set_temperature(
|
async def test_service_climate_set_temperature(
|
||||||
hass: HomeAssistant, temperature, result, mock_modbus, mock_ha
|
hass: HomeAssistant, temperature, result, mock_modbus_ha
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test set_temperature."""
|
"""Test set_temperature."""
|
||||||
mock_modbus.read_holding_registers.return_value = ReadResult(result)
|
mock_modbus_ha.read_holding_registers.return_value = ReadResult(result)
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
CLIMATE_DOMAIN,
|
CLIMATE_DOMAIN,
|
||||||
"set_temperature",
|
"set_temperature",
|
||||||
|
@ -954,10 +954,10 @@ async def test_service_climate_set_temperature(
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_service_set_hvac_mode(
|
async def test_service_set_hvac_mode(
|
||||||
hass: HomeAssistant, hvac_mode, result, mock_modbus, mock_ha
|
hass: HomeAssistant, hvac_mode, result, mock_modbus_ha
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test set HVAC mode."""
|
"""Test set HVAC mode."""
|
||||||
mock_modbus.read_holding_registers.return_value = ReadResult(result)
|
mock_modbus_ha.read_holding_registers.return_value = ReadResult(result)
|
||||||
|
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
CLIMATE_DOMAIN,
|
CLIMATE_DOMAIN,
|
||||||
|
@ -1018,10 +1018,10 @@ async def test_service_set_hvac_mode(
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_service_set_fan_mode(
|
async def test_service_set_fan_mode(
|
||||||
hass: HomeAssistant, fan_mode, result, mock_modbus, mock_ha
|
hass: HomeAssistant, fan_mode, result, mock_modbus_ha
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test set Fan mode."""
|
"""Test set Fan mode."""
|
||||||
mock_modbus.read_holding_registers.return_value = ReadResult(result)
|
mock_modbus_ha.read_holding_registers.return_value = ReadResult(result)
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
CLIMATE_DOMAIN,
|
CLIMATE_DOMAIN,
|
||||||
"set_fan_mode",
|
"set_fan_mode",
|
||||||
|
@ -1081,10 +1081,10 @@ async def test_service_set_fan_mode(
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_service_set_swing_mode(
|
async def test_service_set_swing_mode(
|
||||||
hass: HomeAssistant, swing_mode, result, mock_modbus, mock_ha
|
hass: HomeAssistant, swing_mode, result, mock_modbus_ha
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test set Swing mode."""
|
"""Test set Swing mode."""
|
||||||
mock_modbus.read_holding_registers.return_value = ReadResult(result)
|
mock_modbus_ha.read_holding_registers.return_value = ReadResult(result)
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
CLIMATE_DOMAIN,
|
CLIMATE_DOMAIN,
|
||||||
"set_swing_mode",
|
"set_swing_mode",
|
||||||
|
|
|
@ -182,13 +182,13 @@ async def test_register_cover(hass: HomeAssistant, expected, mock_do_cycle) -> N
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_service_cover_update(hass: HomeAssistant, mock_modbus, mock_ha) -> None:
|
async def test_service_cover_update(hass: HomeAssistant, mock_modbus_ha) -> None:
|
||||||
"""Run test for service homeassistant.update_entity."""
|
"""Run test for service homeassistant.update_entity."""
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
assert hass.states.get(ENTITY_ID).state == STATE_CLOSED
|
assert hass.states.get(ENTITY_ID).state == STATE_CLOSED
|
||||||
mock_modbus.read_holding_registers.return_value = ReadResult([0x01])
|
mock_modbus_ha.read_holding_registers.return_value = ReadResult([0x01])
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
|
@ -255,30 +255,30 @@ async def test_restore_state_cover(
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_service_cover_move(hass: HomeAssistant, mock_modbus, mock_ha) -> None:
|
async def test_service_cover_move(hass: HomeAssistant, mock_modbus_ha) -> None:
|
||||||
"""Run test for service homeassistant.update_entity."""
|
"""Run test for service homeassistant.update_entity."""
|
||||||
|
|
||||||
mock_modbus.read_holding_registers.return_value = ReadResult([0x01])
|
mock_modbus_ha.read_holding_registers.return_value = ReadResult([0x01])
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"cover", "open_cover", {"entity_id": ENTITY_ID}, blocking=True
|
"cover", "open_cover", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
assert hass.states.get(ENTITY_ID).state == STATE_OPEN
|
assert hass.states.get(ENTITY_ID).state == STATE_OPEN
|
||||||
|
|
||||||
mock_modbus.read_holding_registers.return_value = ReadResult([0x00])
|
mock_modbus_ha.read_holding_registers.return_value = ReadResult([0x00])
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"cover", "close_cover", {"entity_id": ENTITY_ID}, blocking=True
|
"cover", "close_cover", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
assert hass.states.get(ENTITY_ID).state == STATE_CLOSED
|
assert hass.states.get(ENTITY_ID).state == STATE_CLOSED
|
||||||
|
|
||||||
await mock_modbus.reset()
|
await mock_modbus_ha.reset()
|
||||||
mock_modbus.read_holding_registers.side_effect = ModbusException("fail write_")
|
mock_modbus_ha.read_holding_registers.side_effect = ModbusException("fail write_")
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"cover", "close_cover", {"entity_id": ENTITY_ID}, blocking=True
|
"cover", "close_cover", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
assert mock_modbus.read_holding_registers.called
|
assert mock_modbus_ha.read_holding_registers.called
|
||||||
assert hass.states.get(ENTITY_ID).state == STATE_UNAVAILABLE
|
assert hass.states.get(ENTITY_ID).state == STATE_UNAVAILABLE
|
||||||
|
|
||||||
mock_modbus.read_coils.side_effect = ModbusException("fail write_")
|
mock_modbus_ha.read_coils.side_effect = ModbusException("fail write_")
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"cover", "close_cover", {"entity_id": ENTITY_ID2}, blocking=True
|
"cover", "close_cover", {"entity_id": ENTITY_ID2}, blocking=True
|
||||||
)
|
)
|
||||||
|
|
|
@ -262,7 +262,6 @@ async def test_fan_service_turn(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
caplog: pytest.LogCaptureFixture,
|
caplog: pytest.LogCaptureFixture,
|
||||||
mock_modbus,
|
mock_modbus,
|
||||||
mock_pymodbus_return,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run test for service turn_on/turn_off."""
|
"""Run test for service turn_on/turn_off."""
|
||||||
|
|
||||||
|
@ -323,13 +322,13 @@ async def test_fan_service_turn(
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_service_fan_update(hass: HomeAssistant, mock_modbus, mock_ha) -> None:
|
async def test_service_fan_update(hass: HomeAssistant, mock_modbus_ha) -> None:
|
||||||
"""Run test for service homeassistant.update_entity."""
|
"""Run test for service homeassistant.update_entity."""
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
assert hass.states.get(ENTITY_ID).state == STATE_OFF
|
assert hass.states.get(ENTITY_ID).state == STATE_OFF
|
||||||
mock_modbus.read_coils.return_value = ReadResult([0x01])
|
mock_modbus_ha.read_coils.return_value = ReadResult([0x01])
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
|
|
|
@ -1366,7 +1366,6 @@ async def mock_modbus_read_pymodbus_fixture(
|
||||||
do_type,
|
do_type,
|
||||||
do_scan_interval,
|
do_scan_interval,
|
||||||
do_return,
|
do_return,
|
||||||
do_exception,
|
|
||||||
caplog,
|
caplog,
|
||||||
mock_pymodbus,
|
mock_pymodbus,
|
||||||
freezer: FrozenDateTimeFactory,
|
freezer: FrozenDateTimeFactory,
|
||||||
|
@ -1374,10 +1373,6 @@ async def mock_modbus_read_pymodbus_fixture(
|
||||||
"""Load integration modbus using mocked pymodbus."""
|
"""Load integration modbus using mocked pymodbus."""
|
||||||
caplog.clear()
|
caplog.clear()
|
||||||
caplog.set_level(logging.ERROR)
|
caplog.set_level(logging.ERROR)
|
||||||
mock_pymodbus.read_coils.side_effect = do_exception
|
|
||||||
mock_pymodbus.read_discrete_inputs.side_effect = do_exception
|
|
||||||
mock_pymodbus.read_input_registers.side_effect = do_exception
|
|
||||||
mock_pymodbus.read_holding_registers.side_effect = do_exception
|
|
||||||
mock_pymodbus.read_coils.return_value = do_return
|
mock_pymodbus.read_coils.return_value = do_return
|
||||||
mock_pymodbus.read_discrete_inputs.return_value = do_return
|
mock_pymodbus.read_discrete_inputs.return_value = do_return
|
||||||
mock_pymodbus.read_input_registers.return_value = do_return
|
mock_pymodbus.read_input_registers.return_value = do_return
|
||||||
|
@ -1646,7 +1641,7 @@ async def test_shutdown(
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_stop_restart(
|
async def test_stop_restart(
|
||||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_pymodbus_return
|
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_modbus
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run test for service stop."""
|
"""Run test for service stop."""
|
||||||
|
|
||||||
|
@ -1657,7 +1652,7 @@ async def test_stop_restart(
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert hass.states.get(entity_id).state == "17"
|
assert hass.states.get(entity_id).state == "17"
|
||||||
|
|
||||||
mock_pymodbus_return.reset_mock()
|
mock_modbus.reset_mock()
|
||||||
caplog.clear()
|
caplog.clear()
|
||||||
data = {
|
data = {
|
||||||
ATTR_HUB: TEST_MODBUS_NAME,
|
ATTR_HUB: TEST_MODBUS_NAME,
|
||||||
|
@ -1665,23 +1660,23 @@ async def test_stop_restart(
|
||||||
await hass.services.async_call(DOMAIN, SERVICE_STOP, data, blocking=True)
|
await hass.services.async_call(DOMAIN, SERVICE_STOP, data, blocking=True)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
|
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
|
||||||
assert mock_pymodbus_return.close.called
|
assert mock_modbus.close.called
|
||||||
assert f"modbus {TEST_MODBUS_NAME} communication closed" in caplog.text
|
assert f"modbus {TEST_MODBUS_NAME} communication closed" in caplog.text
|
||||||
|
|
||||||
mock_pymodbus_return.reset_mock()
|
mock_modbus.reset_mock()
|
||||||
caplog.clear()
|
caplog.clear()
|
||||||
await hass.services.async_call(DOMAIN, SERVICE_RESTART, data, blocking=True)
|
await hass.services.async_call(DOMAIN, SERVICE_RESTART, data, blocking=True)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert not mock_pymodbus_return.close.called
|
assert not mock_modbus.close.called
|
||||||
assert mock_pymodbus_return.connect.called
|
assert mock_modbus.connect.called
|
||||||
assert f"modbus {TEST_MODBUS_NAME} communication open" in caplog.text
|
assert f"modbus {TEST_MODBUS_NAME} communication open" in caplog.text
|
||||||
|
|
||||||
mock_pymodbus_return.reset_mock()
|
mock_modbus.reset_mock()
|
||||||
caplog.clear()
|
caplog.clear()
|
||||||
await hass.services.async_call(DOMAIN, SERVICE_RESTART, data, blocking=True)
|
await hass.services.async_call(DOMAIN, SERVICE_RESTART, data, blocking=True)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert mock_pymodbus_return.close.called
|
assert mock_modbus.close.called
|
||||||
assert mock_pymodbus_return.connect.called
|
assert mock_modbus.connect.called
|
||||||
assert f"modbus {TEST_MODBUS_NAME} communication closed" in caplog.text
|
assert f"modbus {TEST_MODBUS_NAME} communication closed" in caplog.text
|
||||||
assert f"modbus {TEST_MODBUS_NAME} communication open" in caplog.text
|
assert f"modbus {TEST_MODBUS_NAME} communication open" in caplog.text
|
||||||
|
|
||||||
|
@ -1711,7 +1706,7 @@ async def test_write_no_client(hass: HomeAssistant, mock_modbus) -> None:
|
||||||
async def test_integration_reload(
|
async def test_integration_reload(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
caplog: pytest.LogCaptureFixture,
|
caplog: pytest.LogCaptureFixture,
|
||||||
mock_pymodbus_return,
|
mock_modbus,
|
||||||
freezer: FrozenDateTimeFactory,
|
freezer: FrozenDateTimeFactory,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run test for integration reload."""
|
"""Run test for integration reload."""
|
||||||
|
@ -1732,7 +1727,7 @@ async def test_integration_reload(
|
||||||
|
|
||||||
@pytest.mark.parametrize("do_config", [{}])
|
@pytest.mark.parametrize("do_config", [{}])
|
||||||
async def test_integration_reload_failed(
|
async def test_integration_reload_failed(
|
||||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_pymodbus_return
|
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_modbus
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run test for integration connect failure on reload."""
|
"""Run test for integration connect failure on reload."""
|
||||||
caplog.set_level(logging.INFO)
|
caplog.set_level(logging.INFO)
|
||||||
|
@ -1741,9 +1736,7 @@ async def test_integration_reload_failed(
|
||||||
yaml_path = get_fixture_path("configuration.yaml", "modbus")
|
yaml_path = get_fixture_path("configuration.yaml", "modbus")
|
||||||
with (
|
with (
|
||||||
mock.patch.object(hass_config, "YAML_CONFIG_FILE", yaml_path),
|
mock.patch.object(hass_config, "YAML_CONFIG_FILE", yaml_path),
|
||||||
mock.patch.object(
|
mock.patch.object(mock_modbus, "connect", side_effect=ModbusException("error")),
|
||||||
mock_pymodbus_return, "connect", side_effect=ModbusException("error")
|
|
||||||
),
|
|
||||||
):
|
):
|
||||||
await hass.services.async_call(DOMAIN, SERVICE_RELOAD, blocking=True)
|
await hass.services.async_call(DOMAIN, SERVICE_RELOAD, blocking=True)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
@ -1754,7 +1747,7 @@ async def test_integration_reload_failed(
|
||||||
|
|
||||||
@pytest.mark.parametrize("do_config", [{}])
|
@pytest.mark.parametrize("do_config", [{}])
|
||||||
async def test_integration_setup_failed(
|
async def test_integration_setup_failed(
|
||||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_pymodbus_return
|
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, mock_modbus
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run test for integration setup on reload."""
|
"""Run test for integration setup on reload."""
|
||||||
with mock.patch.object(
|
with mock.patch.object(
|
||||||
|
|
|
@ -262,7 +262,6 @@ async def test_light_service_turn(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
caplog: pytest.LogCaptureFixture,
|
caplog: pytest.LogCaptureFixture,
|
||||||
mock_modbus,
|
mock_modbus,
|
||||||
mock_pymodbus_return,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run test for service turn_on/turn_off."""
|
"""Run test for service turn_on/turn_off."""
|
||||||
|
|
||||||
|
@ -300,12 +299,6 @@ async def test_light_service_turn(
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert hass.states.get(ENTITY_ID2).state == STATE_UNAVAILABLE
|
assert hass.states.get(ENTITY_ID2).state == STATE_UNAVAILABLE
|
||||||
mock_modbus.write_coil.side_effect = ModbusException("fail write_")
|
|
||||||
await hass.services.async_call(
|
|
||||||
"light", "turn_off", service_data={"entity_id": ENTITY_ID}
|
|
||||||
)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
assert hass.states.get(ENTITY_ID).state == STATE_UNAVAILABLE
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
|
@ -323,13 +316,13 @@ async def test_light_service_turn(
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_service_light_update(hass: HomeAssistant, mock_modbus, mock_ha) -> None:
|
async def test_service_light_update(hass: HomeAssistant, mock_modbus_ha) -> None:
|
||||||
"""Run test for service homeassistant.update_entity."""
|
"""Run test for service homeassistant.update_entity."""
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
assert hass.states.get(ENTITY_ID).state == STATE_OFF
|
assert hass.states.get(ENTITY_ID).state == STATE_OFF
|
||||||
mock_modbus.read_coils.return_value = ReadResult([0x01])
|
mock_modbus_ha.read_coils.return_value = ReadResult([0x01])
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
|
|
|
@ -1391,14 +1391,14 @@ async def test_restore_state_sensor(
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_service_sensor_update(hass: HomeAssistant, mock_modbus, mock_ha) -> None:
|
async def test_service_sensor_update(hass: HomeAssistant, mock_modbus_ha) -> None:
|
||||||
"""Run test for service homeassistant.update_entity."""
|
"""Run test for service homeassistant.update_entity."""
|
||||||
mock_modbus.read_input_registers.return_value = ReadResult([27])
|
mock_modbus_ha.read_input_registers.return_value = ReadResult([27])
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
assert hass.states.get(ENTITY_ID).state == "27"
|
assert hass.states.get(ENTITY_ID).state == "27"
|
||||||
mock_modbus.read_input_registers.return_value = ReadResult([32])
|
mock_modbus_ha.read_input_registers.return_value = ReadResult([32])
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
|
|
|
@ -277,7 +277,6 @@ async def test_switch_service_turn(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
caplog: pytest.LogCaptureFixture,
|
caplog: pytest.LogCaptureFixture,
|
||||||
mock_modbus,
|
mock_modbus,
|
||||||
mock_pymodbus_return,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Run test for service turn_on/turn_off."""
|
"""Run test for service turn_on/turn_off."""
|
||||||
assert MODBUS_DOMAIN in hass.config.components
|
assert MODBUS_DOMAIN in hass.config.components
|
||||||
|
@ -337,13 +336,13 @@ async def test_switch_service_turn(
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_service_switch_update(hass: HomeAssistant, mock_modbus, mock_ha) -> None:
|
async def test_service_switch_update(hass: HomeAssistant, mock_modbus_ha) -> None:
|
||||||
"""Run test for service homeassistant.update_entity."""
|
"""Run test for service homeassistant.update_entity."""
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
assert hass.states.get(ENTITY_ID).state == STATE_OFF
|
assert hass.states.get(ENTITY_ID).state == STATE_OFF
|
||||||
mock_modbus.read_coils.return_value = ReadResult([0x01])
|
mock_modbus_ha.read_coils.return_value = ReadResult([0x01])
|
||||||
await hass.services.async_call(
|
await hass.services.async_call(
|
||||||
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
"homeassistant", "update_entity", {"entity_id": ENTITY_ID}, blocking=True
|
||||||
)
|
)
|
||||||
|
@ -368,9 +367,7 @@ async def test_service_switch_update(hass: HomeAssistant, mock_modbus, mock_ha)
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
async def test_delay_switch(
|
async def test_delay_switch(hass: HomeAssistant, mock_modbus) -> None:
|
||||||
hass: HomeAssistant, mock_modbus, mock_pymodbus_return
|
|
||||||
) -> None:
|
|
||||||
"""Run test for switch verify delay."""
|
"""Run test for switch verify delay."""
|
||||||
mock_modbus.read_holding_registers.return_value = ReadResult([0x01])
|
mock_modbus.read_holding_registers.return_value = ReadResult([0x01])
|
||||||
now = dt_util.utcnow()
|
now = dt_util.utcnow()
|
||||||
|
|
Loading…
Add table
Reference in a new issue