X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=listen.py;h=e431a0b199fa67073d315b42e8413187555ef1f1;hb=5ea88ab72e175e2d4f57ae8645ca6f825549a7a9;hp=11a4db33f58d76b4a1f5a7b4a8b9a4fa6d982e46;hpb=18913d6fecc43aeecfd5b617030e9cd3e840ef08;p=kiosk.git diff --git a/listen.py b/listen.py index 11a4db3..e431a0b 100755 --- a/listen.py +++ b/listen.py @@ -1,46 +1,51 @@ #!/usr/bin/env python3 +import logging import os import struct -from datetime import datetime -from threading import Thread -import numpy as np import pvporcupine import pyaudio -import soundfile import speech_recognition as sr +from pyutils import logging_utils + +import kiosk_secrets as secrets + +logger = logging.getLogger(__name__) + class HotwordListener(object): - def __init__(self, - command_queue, - keyword_paths, - sensitivities, - input_device_index=None, - library_path=pvporcupine.LIBRARY_PATH, - model_path=pvporcupine.MODEL_PATH): - super(HotwordListener, self).__init__() + def __init__( + self, + command_queue, + keyword_paths, + sensitivities, + input_device_index=None, +# library_path=pvporcupine.LIBRARY_PATH, +# model_path=pvporcupine.MODEL_PATH, + ): self._queue = command_queue - self._library_path = library_path - self._model_path = model_path +# self._library_path = library_path +# self._model_path = model_path self._keyword_paths = keyword_paths self._sensitivities = sensitivities self._input_device_index = input_device_index + @logging_utils.LoggingContext(logger, prefix="listener:") def listen_forever(self): keywords = list() for x in self._keyword_paths: - keywords.append(os.path.basename(x).replace('.ppn', '').split('_')[0]) + keywords.append(os.path.basename(x).replace(".ppn", "").split("_")[0]) porcupine = None pa = None audio_stream = None try: porcupine = pvporcupine.create( - library_path=self._library_path, - model_path=self._model_path, keyword_paths=self._keyword_paths, - sensitivities=self._sensitivities) + sensitivities=self._sensitivities, + access_key=secrets.pvporcupine_key, + ) recognizer = sr.Recognizer() pa = pyaudio.PyAudio() @@ -50,50 +55,57 @@ class HotwordListener(object): format=pyaudio.paInt16, input=True, frames_per_buffer=porcupine.frame_length, - input_device_index=self._input_device_index) + input_device_index=self._input_device_index, + ) - print('Listening {') + logger.info("Listening {") for keyword, sensitivity in zip(keywords, self._sensitivities): - print(' %s (%.2f)' % (keyword, sensitivity)) - print('}') + logger.info(" %s (%.2f)" % (keyword, sensitivity)) + logger.info("}") while True: - raw = audio_stream.read(porcupine.frame_length, exception_on_overflow=False) + raw = audio_stream.read( + porcupine.frame_length, exception_on_overflow=False + ) pcm = struct.unpack_from("h" * porcupine.frame_length, raw) result = porcupine.process(pcm) if result >= 0: - os.system('/usr/bin/aplay /var/www/kiosk/attention.wav') - print('[%s] >>>>>>>>>>>>> Detected wakeword %s' % ( - str(datetime.now()), keywords[result]) + cmd = "aplay /var/www/kiosk/attention.wav" + logger.info( + "Running %s (attention tone) because I heard the wake-word", cmd + ) + os.system(cmd) + logger.debug( + ">>>>>>>>>>>>> Detected wakeword %s" % keywords[result] ) - print('>>>>>>>>>>>>>>> Listening for command now...') raw = bytearray() - for i in range(0, int(porcupine.sample_rate / porcupine.frame_length * 4)): - raw += audio_stream.read(porcupine.frame_length, - exception_on_overflow=False) - print(f'>>>>>>>>>>>>>> Recognizing command... {len(raw)} bytes') + for i in range( + 0, int(porcupine.sample_rate / porcupine.frame_length * 4) + ): + raw += audio_stream.read( + porcupine.frame_length, exception_on_overflow=False + ) + logger.debug( + f">>>>>>>>>>>>>> Recognizing command... {len(raw)} bytes" + ) speech = sr.AudioData( - frame_data = bytes(raw), - sample_rate = porcupine.sample_rate, - sample_width = 2, # 16 bits + frame_data=bytes(raw), + sample_rate=porcupine.sample_rate, + sample_width=2, # 16 bits ) - try: - command = recognizer.recognize_google(speech) - print('[%s] >>>>>>>>>>>>> Google says command was %s' % ( - str(datetime.now()), command) - ) - except Exception: - command = 'weather' + command = recognizer.recognize_google(speech) + logger.debug(">>>>>>>>>>>>> Google says command was %s" % command) + logger.info("Enqueued command=%s", command) self._queue.put(command) - except Exception as e: - print(e) - print('Stopping ...') + except Exception: + logger.exception("Stopping listener because of unexpected exception!") except KeyboardInterrupt: - print('Stopping ...') + logger.exception("Stopping listener because of ^C!") finally: + logger.debug("Cleaning up... one sec...") if porcupine is not None: porcupine.delete() @@ -105,11 +117,11 @@ class HotwordListener(object): @classmethod def show_audio_devices(cls): - fields = ('index', 'name', 'defaultSampleRate', 'maxInputChannels') + fields = ("index", "name", "defaultSampleRate", "maxInputChannels") pa = pyaudio.PyAudio() for i in range(pa.get_device_count()): info = pa.get_device_info_by_index(i) - print(', '.join("'%s': '%s'" % (k, str(info[k])) for k in fields)) + print(", ".join("'%s': '%s'" % (k, str(info[k])) for k in fields)) pa.terminate() @@ -117,9 +129,11 @@ def main(): keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in ["blueberry", "bumblebee"]] sensitivities = [0.85, 0.95] HotwordListener( + [], keyword_paths, sensitivities, ).listen_forever() -if __name__ == '__main__': + +if __name__ == "__main__": main()