#!/usr/bin/env python3
+import logging
import os
import struct
-from datetime import datetime
-from threading import Thread
-import numpy as np
import pvporcupine
import pyaudio
-import soundfile
import speech_recognition as sr
+from pyutils import logging_utils
+
+logger = logging.getLogger(__name__)
+
class HotwordListener(object):
- def __init__(self,
- command_queue,
- keyword_paths,
- sensitivities,
- input_device_index=None,
- library_path=pvporcupine.LIBRARY_PATH,
- model_path=pvporcupine.MODEL_PATH):
- super(HotwordListener, self).__init__()
+ def __init__(
+ self,
+ command_queue,
+ keyword_paths,
+ sensitivities,
+ input_device_index=None,
+ library_path=pvporcupine.LIBRARY_PATH,
+ model_path=pvporcupine.MODEL_PATH,
+ ):
self._queue = command_queue
self._library_path = library_path
self._model_path = model_path
self._sensitivities = sensitivities
self._input_device_index = input_device_index
+ @logging_utils.LoggingContext(logger, prefix="listener:")
def listen_forever(self):
keywords = list()
for x in self._keyword_paths:
- keywords.append(os.path.basename(x).replace('.ppn', '').split('_')[0])
+ keywords.append(os.path.basename(x).replace(".ppn", "").split("_")[0])
porcupine = None
pa = None
library_path=self._library_path,
model_path=self._model_path,
keyword_paths=self._keyword_paths,
- sensitivities=self._sensitivities)
+ sensitivities=self._sensitivities,
+ )
recognizer = sr.Recognizer()
pa = pyaudio.PyAudio()
format=pyaudio.paInt16,
input=True,
frames_per_buffer=porcupine.frame_length,
- input_device_index=self._input_device_index)
+ input_device_index=self._input_device_index,
+ )
- print('Listening {')
+ logger.info("Listening {")
for keyword, sensitivity in zip(keywords, self._sensitivities):
- print(' %s (%.2f)' % (keyword, sensitivity))
- print('}')
+ logger.info(" %s (%.2f)" % (keyword, sensitivity))
+ logger.info("}")
while True:
- raw = audio_stream.read(porcupine.frame_length, exception_on_overflow=False)
+ raw = audio_stream.read(
+ porcupine.frame_length, exception_on_overflow=False
+ )
pcm = struct.unpack_from("h" * porcupine.frame_length, raw)
result = porcupine.process(pcm)
if result >= 0:
- os.system('/usr/bin/aplay /var/www/kiosk/attention.wav')
- print('[%s] >>>>>>>>>>>>> Detected wakeword %s' % (
- str(datetime.now()), keywords[result])
+ cmd = "aplay /var/www/kiosk/attention.wav"
+ logger.info(
+ "Running %s (attention tone) because I heard the wake-word", cmd
+ )
+ os.system(cmd)
+ logger.debug(
+ ">>>>>>>>>>>>> Detected wakeword %s" % keywords[result]
)
- print('>>>>>>>>>>>>>>> Listening for command now...')
raw = bytearray()
- for i in range(0, int(porcupine.sample_rate / porcupine.frame_length * 4)):
- raw += audio_stream.read(porcupine.frame_length,
- exception_on_overflow=False)
- print(f'>>>>>>>>>>>>>> Recognizing command... {len(raw)} bytes')
+ for i in range(
+ 0, int(porcupine.sample_rate / porcupine.frame_length * 4)
+ ):
+ raw += audio_stream.read(
+ porcupine.frame_length, exception_on_overflow=False
+ )
+ logger.debug(
+ f">>>>>>>>>>>>>> Recognizing command... {len(raw)} bytes"
+ )
speech = sr.AudioData(
- frame_data = bytes(raw),
- sample_rate = porcupine.sample_rate,
- sample_width = 2, # 16 bits
+ frame_data=bytes(raw),
+ sample_rate=porcupine.sample_rate,
+ sample_width=2, # 16 bits
)
- try:
- command = recognizer.recognize_google(speech)
- print('[%s] >>>>>>>>>>>>> Google says command was %s' % (
- str(datetime.now()), command)
- )
- except Exception:
- command = 'weather'
+ command = recognizer.recognize_google(speech)
+ logger.debug(">>>>>>>>>>>>> Google says command was %s" % command)
+ logger.info("Enqueued command=%s", command)
self._queue.put(command)
- except Exception as e:
- print(e)
- print('Stopping ...')
+ except Exception:
+ logger.exception("Stopping listener because of unexpected exception!")
except KeyboardInterrupt:
- print('Stopping ...')
+ logger.exception("Stopping listener because of ^C!")
finally:
+ logger.debug("Cleaning up... one sec...")
if porcupine is not None:
porcupine.delete()
@classmethod
def show_audio_devices(cls):
- fields = ('index', 'name', 'defaultSampleRate', 'maxInputChannels')
+ fields = ("index", "name", "defaultSampleRate", "maxInputChannels")
pa = pyaudio.PyAudio()
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
- print(', '.join("'%s': '%s'" % (k, str(info[k])) for k in fields))
+ print(", ".join("'%s': '%s'" % (k, str(info[k])) for k in fields))
pa.terminate()
keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in ["blueberry", "bumblebee"]]
sensitivities = [0.85, 0.95]
HotwordListener(
+ [],
keyword_paths,
sensitivities,
).listen_forever()
-if __name__ == '__main__':
+
+if __name__ == "__main__":
main()