#!/usr/bin/env python3
+import logging
import os
import struct
-from datetime import datetime
import pvporcupine
import pyaudio
import speech_recognition as sr
+logger = logging.getLogger(__file__)
+
class HotwordListener(object):
- def __init__(self,
- command_queue,
- keyword_paths,
- sensitivities,
- input_device_index=None,
- library_path=pvporcupine.LIBRARY_PATH,
- model_path=pvporcupine.MODEL_PATH):
+ def __init__(
+ self,
+ command_queue,
+ keyword_paths,
+ sensitivities,
+ input_device_index=None,
+ library_path=pvporcupine.LIBRARY_PATH,
+ model_path=pvporcupine.MODEL_PATH,
+ ):
self._queue = command_queue
self._library_path = library_path
self._model_path = model_path
def listen_forever(self):
keywords = list()
for x in self._keyword_paths:
- keywords.append(
- os.path.basename(x).replace('.ppn', '').split('_')[0]
- )
+ keywords.append(os.path.basename(x).replace(".ppn", "").split("_")[0])
porcupine = None
pa = None
library_path=self._library_path,
model_path=self._model_path,
keyword_paths=self._keyword_paths,
- sensitivities=self._sensitivities)
+ sensitivities=self._sensitivities,
+ )
recognizer = sr.Recognizer()
pa = pyaudio.PyAudio()
format=pyaudio.paInt16,
input=True,
frames_per_buffer=porcupine.frame_length,
- input_device_index=self._input_device_index)
+ input_device_index=self._input_device_index,
+ )
- print('Listening {')
+ logger.info("Listening {")
for keyword, sensitivity in zip(keywords, self._sensitivities):
- print(' %s (%.2f)' % (keyword, sensitivity))
- print('}')
+ logger.info(" %s (%.2f)" % (keyword, sensitivity))
+ logger.info("}")
while True:
raw = audio_stream.read(
- porcupine.frame_length,
- exception_on_overflow=False
+ porcupine.frame_length, exception_on_overflow=False
)
pcm = struct.unpack_from("h" * porcupine.frame_length, raw)
result = porcupine.process(pcm)
if result >= 0:
- cmd = 'aplay /var/www/kiosk/attention.wav'
- print(f'Running {cmd}...')
- x = os.system(cmd)
- print(f'---- (done {x}) ----')
- print('[%s] >>>>>>>>>>>>> Detected wakeword %s' % (
- str(datetime.now()), keywords[result])
+ cmd = "aplay /var/www/kiosk/attention.wav"
+ logger.info(
+ "Running %s (attention tone) because I heard the wake-word", cmd
+ )
+ os.system(cmd)
+ logger.debug(
+ ">>>>>>>>>>>>> Detected wakeword %s" % keywords[result]
)
- print('>>>>>>>>>>>>>>> Listening for command now...')
raw = bytearray()
for i in range(
- 0,
- int(porcupine.sample_rate / porcupine.frame_length * 4)
+ 0, int(porcupine.sample_rate / porcupine.frame_length * 4)
):
- raw += audio_stream.read(porcupine.frame_length,
- exception_on_overflow=False)
- print(
- f'>>>>>>>>>>>>>> Recognizing command... {len(raw)} bytes'
+ raw += audio_stream.read(
+ porcupine.frame_length, exception_on_overflow=False
+ )
+ logger.debug(
+ f">>>>>>>>>>>>>> Recognizing command... {len(raw)} bytes"
)
speech = sr.AudioData(
- frame_data = bytes(raw),
- sample_rate = porcupine.sample_rate,
- sample_width = 2, # 16 bits
+ frame_data=bytes(raw),
+ sample_rate=porcupine.sample_rate,
+ sample_width=2, # 16 bits
)
command = recognizer.recognize_google(speech)
- print(
- '[%s] >>>>>>>>>>>>> Google says command was %s' % (
- str(datetime.now()), command)
- )
+ logger.debug(">>>>>>>>>>>>> Google says command was %s" % command)
+ logger.info("Enqueued command=%s", command)
self._queue.put(command)
- except Exception as e:
- print(e)
- print('Stopping ...')
+ except Exception:
+ logger.exception("Stopping listener because of unexpected exception!")
except KeyboardInterrupt:
- print('Stopping ...')
+ logger.exception("Stopping listener because of ^C!")
finally:
+ logger.debug("Cleaning up... one sec...")
if porcupine is not None:
porcupine.delete()
@classmethod
def show_audio_devices(cls):
- fields = ('index', 'name', 'defaultSampleRate', 'maxInputChannels')
+ fields = ("index", "name", "defaultSampleRate", "maxInputChannels")
pa = pyaudio.PyAudio()
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
- print(', '.join("'%s': '%s'" % (k, str(info[k])) for k in fields))
+ print(", ".join("'%s': '%s'" % (k, str(info[k])) for k in fields))
pa.terminate()
def main():
- keyword_paths = [
- pvporcupine.KEYWORD_PATHS[x] for x in ["blueberry", "bumblebee"]
- ]
+ keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in ["blueberry", "bumblebee"]]
sensitivities = [0.85, 0.95]
HotwordListener(
[],
).listen_forever()
-if __name__ == '__main__':
+if __name__ == "__main__":
main()