#!/usr/bin/env python3 import os import struct from datetime import datetime import pvporcupine import pyaudio import speech_recognition as sr class HotwordListener(object): def __init__(self, command_queue, keyword_paths, sensitivities, input_device_index=None, library_path=pvporcupine.LIBRARY_PATH, model_path=pvporcupine.MODEL_PATH): self._queue = command_queue self._library_path = library_path self._model_path = model_path self._keyword_paths = keyword_paths self._sensitivities = sensitivities self._input_device_index = input_device_index def listen_forever(self): keywords = list() for x in self._keyword_paths: keywords.append( os.path.basename(x).replace('.ppn', '').split('_')[0] ) porcupine = None pa = None audio_stream = None try: porcupine = pvporcupine.create( library_path=self._library_path, model_path=self._model_path, keyword_paths=self._keyword_paths, sensitivities=self._sensitivities) recognizer = sr.Recognizer() pa = pyaudio.PyAudio() audio_stream = pa.open( rate=porcupine.sample_rate, channels=1, format=pyaudio.paInt16, input=True, frames_per_buffer=porcupine.frame_length, input_device_index=self._input_device_index) print('Listening {') for keyword, sensitivity in zip(keywords, self._sensitivities): print(' %s (%.2f)' % (keyword, sensitivity)) print('}') while True: raw = audio_stream.read( porcupine.frame_length, exception_on_overflow=False ) pcm = struct.unpack_from("h" * porcupine.frame_length, raw) result = porcupine.process(pcm) if result >= 0: cmd = 'aplay /var/www/kiosk/attention.wav' print(f'Running {cmd}...') x = os.system(cmd) print(f'---- (done {x}) ----') print('[%s] >>>>>>>>>>>>> Detected wakeword %s' % ( str(datetime.now()), keywords[result]) ) print('>>>>>>>>>>>>>>> Listening for command now...') raw = bytearray() for i in range( 0, int(porcupine.sample_rate / porcupine.frame_length * 4) ): raw += audio_stream.read(porcupine.frame_length, exception_on_overflow=False) print( f'>>>>>>>>>>>>>> Recognizing command... {len(raw)} bytes' ) speech = sr.AudioData( frame_data = bytes(raw), sample_rate = porcupine.sample_rate, sample_width = 2, # 16 bits ) command = recognizer.recognize_google(speech) print( '[%s] >>>>>>>>>>>>> Google says command was %s' % ( str(datetime.now()), command) ) self._queue.put(command) except Exception as e: print(e) print('Stopping ...') except KeyboardInterrupt: print('Stopping ...') finally: if porcupine is not None: porcupine.delete() if audio_stream is not None: audio_stream.close() if pa is not None: pa.terminate() @classmethod def show_audio_devices(cls): fields = ('index', 'name', 'defaultSampleRate', 'maxInputChannels') pa = pyaudio.PyAudio() for i in range(pa.get_device_count()): info = pa.get_device_info_by_index(i) print(', '.join("'%s': '%s'" % (k, str(info[k])) for k in fields)) pa.terminate() def main(): keyword_paths = [ pvporcupine.KEYWORD_PATHS[x] for x in ["blueberry", "bumblebee"] ] sensitivities = [0.85, 0.95] HotwordListener( [], keyword_paths, sensitivities, ).listen_forever() if __name__ == '__main__': main()