#!/usr/bin/env python3 import logging import os import struct import pvporcupine import pyaudio import speech_recognition as sr from pyutils import logging_utils logger = logging.getLogger(__name__) class HotwordListener(object): def __init__( self, command_queue, keyword_paths, sensitivities, input_device_index=None, library_path=pvporcupine.LIBRARY_PATH, model_path=pvporcupine.MODEL_PATH, ): self._queue = command_queue self._library_path = library_path self._model_path = model_path self._keyword_paths = keyword_paths self._sensitivities = sensitivities self._input_device_index = input_device_index @logging_utils.LoggingContext(logger, prefix="listener:") def listen_forever(self): keywords = list() for x in self._keyword_paths: keywords.append(os.path.basename(x).replace(".ppn", "").split("_")[0]) porcupine = None pa = None audio_stream = None try: porcupine = pvporcupine.create( library_path=self._library_path, model_path=self._model_path, keyword_paths=self._keyword_paths, sensitivities=self._sensitivities, ) recognizer = sr.Recognizer() pa = pyaudio.PyAudio() audio_stream = pa.open( rate=porcupine.sample_rate, channels=1, format=pyaudio.paInt16, input=True, frames_per_buffer=porcupine.frame_length, input_device_index=self._input_device_index, ) logger.info("Listening {") for keyword, sensitivity in zip(keywords, self._sensitivities): logger.info(" %s (%.2f)" % (keyword, sensitivity)) logger.info("}") while True: raw = audio_stream.read( porcupine.frame_length, exception_on_overflow=False ) pcm = struct.unpack_from("h" * porcupine.frame_length, raw) result = porcupine.process(pcm) if result >= 0: cmd = "aplay /var/www/kiosk/attention.wav" logger.info( "Running %s (attention tone) because I heard the wake-word", cmd ) os.system(cmd) logger.debug( ">>>>>>>>>>>>> Detected wakeword %s" % keywords[result] ) raw = bytearray() for i in range( 0, int(porcupine.sample_rate / porcupine.frame_length * 4) ): raw += audio_stream.read( porcupine.frame_length, exception_on_overflow=False ) logger.debug( f">>>>>>>>>>>>>> Recognizing command... {len(raw)} bytes" ) speech = sr.AudioData( frame_data=bytes(raw), sample_rate=porcupine.sample_rate, sample_width=2, # 16 bits ) command = recognizer.recognize_google(speech) logger.debug(">>>>>>>>>>>>> Google says command was %s" % command) logger.info("Enqueued command=%s", command) self._queue.put(command) except Exception: logger.exception("Stopping listener because of unexpected exception!") except KeyboardInterrupt: logger.exception("Stopping listener because of ^C!") finally: logger.debug("Cleaning up... one sec...") if porcupine is not None: porcupine.delete() if audio_stream is not None: audio_stream.close() if pa is not None: pa.terminate() @classmethod def show_audio_devices(cls): fields = ("index", "name", "defaultSampleRate", "maxInputChannels") pa = pyaudio.PyAudio() for i in range(pa.get_device_count()): info = pa.get_device_info_by_index(i) print(", ".join("'%s': '%s'" % (k, str(info[k])) for k in fields)) pa.terminate() def main(): keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in ["blueberry", "bumblebee"]] sensitivities = [0.85, 0.95] HotwordListener( [], keyword_paths, sensitivities, ).listen_forever() if __name__ == "__main__": main()