diff --git a/homeassistant/components/assist_pipeline/vad.py b/homeassistant/components/assist_pipeline/vad.py index 49496e66159..8372dbc54c7 100644 --- a/homeassistant/components/assist_pipeline/vad.py +++ b/homeassistant/components/assist_pipeline/vad.py @@ -6,13 +6,11 @@ from collections.abc import Callable, Iterable from dataclasses import dataclass from enum import StrEnum import logging -from typing import Final + +from .const import SAMPLE_CHANNELS, SAMPLE_RATE, SAMPLE_WIDTH _LOGGER = logging.getLogger(__name__) -_SAMPLE_RATE: Final = 16000 # Hz -_SAMPLE_WIDTH: Final = 2 # bytes - class VadSensitivity(StrEnum): """How quickly the end of a voice command is detected.""" @@ -26,12 +24,12 @@ class VadSensitivity(StrEnum): """Return seconds of silence for sensitivity level.""" sensitivity = VadSensitivity(sensitivity) if sensitivity == VadSensitivity.RELAXED: - return 2.0 + return 1.25 if sensitivity == VadSensitivity.AGGRESSIVE: - return 0.5 + return 0.25 - return 1.0 + return 0.7 class AudioBuffer: @@ -80,7 +78,7 @@ class VoiceCommandSegmenter: speech_seconds: float = 0.3 """Seconds of speech before voice command has started.""" - silence_seconds: float = 1.0 + silence_seconds: float = 0.7 """Seconds of silence after voice command has ended.""" timeout_seconds: float = 15.0 @@ -92,6 +90,9 @@ class VoiceCommandSegmenter: in_command: bool = False """True if inside voice command.""" + timed_out: bool = False + """True a timeout occurred during voice command.""" + _speech_seconds_left: float = 0.0 """Seconds left before considering voice command as started.""" @@ -121,6 +122,9 @@ class VoiceCommandSegmenter: Returns False when command is done. """ + if self.timed_out: + self.timed_out = False + self._timeout_seconds_left -= chunk_seconds if self._timeout_seconds_left <= 0: _LOGGER.warning( @@ -128,6 +132,7 @@ class VoiceCommandSegmenter: self.timeout_seconds, ) self.reset() + self.timed_out = True return False if not self.in_command: @@ -179,7 +184,9 @@ class VoiceCommandSegmenter: """ if vad_samples_per_chunk is None: # No chunking - chunk_seconds = (len(chunk) // _SAMPLE_WIDTH) / _SAMPLE_RATE + chunk_seconds = ( + len(chunk) // (SAMPLE_WIDTH * SAMPLE_CHANNELS) + ) / SAMPLE_RATE is_speech = vad_is_speech(chunk) return self.process(chunk_seconds, is_speech) @@ -187,8 +194,8 @@ class VoiceCommandSegmenter: raise ValueError("leftover_chunk_buffer is required when vad uses chunking") # With chunking - seconds_per_chunk = vad_samples_per_chunk / _SAMPLE_RATE - bytes_per_chunk = vad_samples_per_chunk * _SAMPLE_WIDTH + seconds_per_chunk = vad_samples_per_chunk / SAMPLE_RATE + bytes_per_chunk = vad_samples_per_chunk * (SAMPLE_WIDTH * SAMPLE_CHANNELS) for vad_chunk in chunk_samples(chunk, bytes_per_chunk, leftover_chunk_buffer): is_speech = vad_is_speech(vad_chunk) if not self.process(seconds_per_chunk, is_speech): diff --git a/tests/components/assist_pipeline/test_vad.py b/tests/components/assist_pipeline/test_vad.py index 17cb73a9139..db039ab3140 100644 --- a/tests/components/assist_pipeline/test_vad.py +++ b/tests/components/assist_pipeline/test_vad.py @@ -17,15 +17,12 @@ def test_silence() -> None: # True return value indicates voice command has not finished assert segmenter.process(_ONE_SECOND * 3, False) + assert not segmenter.in_command def test_speech() -> None: """Test that silence + speech + silence triggers a voice command.""" - def is_speech(chunk): - """Anything non-zero is speech.""" - return sum(chunk) > 0 - segmenter = VoiceCommandSegmenter() # silence @@ -33,10 +30,12 @@ def test_speech() -> None: # "speech" assert segmenter.process(_ONE_SECOND, True) + assert segmenter.in_command # silence # False return value indicates voice command is finished assert not segmenter.process(_ONE_SECOND, False) + assert not segmenter.in_command def test_audio_buffer() -> None: @@ -105,3 +104,105 @@ def test_chunk_samples_leftover() -> None: assert len(chunks) == 1 assert leftover_chunk_buffer.bytes() == bytes([5, 6]) + + +def test_silence_seconds() -> None: + """Test end of voice command silence seconds.""" + + segmenter = VoiceCommandSegmenter(silence_seconds=1.0) + + # silence + assert segmenter.process(_ONE_SECOND, False) + assert not segmenter.in_command + + # "speech" + assert segmenter.process(_ONE_SECOND, True) + assert segmenter.in_command + + # not enough silence to end + assert segmenter.process(_ONE_SECOND * 0.5, False) + assert segmenter.in_command + + # exactly enough silence now + assert not segmenter.process(_ONE_SECOND * 0.5, False) + assert not segmenter.in_command + + +def test_silence_reset() -> None: + """Test that speech resets end of voice command detection.""" + + segmenter = VoiceCommandSegmenter(silence_seconds=1.0, reset_seconds=0.5) + + # silence + assert segmenter.process(_ONE_SECOND, False) + assert not segmenter.in_command + + # "speech" + assert segmenter.process(_ONE_SECOND, True) + assert segmenter.in_command + + # not enough silence to end + assert segmenter.process(_ONE_SECOND * 0.5, False) + assert segmenter.in_command + + # speech should reset silence detection + assert segmenter.process(_ONE_SECOND * 0.5, True) + assert segmenter.in_command + + # not enough silence to end + assert segmenter.process(_ONE_SECOND * 0.5, False) + assert segmenter.in_command + + # exactly enough silence now + assert not segmenter.process(_ONE_SECOND * 0.5, False) + assert not segmenter.in_command + + +def test_speech_reset() -> None: + """Test that silence resets start of voice command detection.""" + + segmenter = VoiceCommandSegmenter( + silence_seconds=1.0, reset_seconds=0.5, speech_seconds=1.0 + ) + + # silence + assert segmenter.process(_ONE_SECOND, False) + assert not segmenter.in_command + + # not enough speech to start voice command + assert segmenter.process(_ONE_SECOND * 0.5, True) + assert not segmenter.in_command + + # silence should reset speech detection + assert segmenter.process(_ONE_SECOND, False) + assert not segmenter.in_command + + # not enough speech to start voice command + assert segmenter.process(_ONE_SECOND * 0.5, True) + assert not segmenter.in_command + + # exactly enough speech now + assert segmenter.process(_ONE_SECOND * 0.5, True) + assert segmenter.in_command + + +def test_timeout() -> None: + """Test that voice command detection times out.""" + + segmenter = VoiceCommandSegmenter(timeout_seconds=1.0) + + # not enough to time out + assert not segmenter.timed_out + assert segmenter.process(_ONE_SECOND * 0.5, False) + assert not segmenter.timed_out + + # enough to time out + assert not segmenter.process(_ONE_SECOND * 0.5, True) + assert segmenter.timed_out + + # flag resets with more audio + assert segmenter.process(_ONE_SECOND * 0.5, True) + assert not segmenter.timed_out + + assert not segmenter.process(_ONE_SECOND * 0.5, False) + assert segmenter.timed_out