#!/usr/bin/env python3 # Real-time speech recognition from a microphone with sherpa-ncnn Python API # with endpoint detection. # # Please refer to # https://k2-fsa.github.io/sherpa/ncnn/pretrained_models/index.html # to download pre-trained models import sys try: import sounddevice as sd except ImportError as e: print("Please install sounddevice first. You can use") print() print(" pip install sounddevice") print() print("to install it") sys.exit(-1) import sherpa_ncnn def create_recognizer(): # Please replace the model files if needed. # See https://k2-fsa.github.io/sherpa/ncnn/pretrained_models/index.html # for download links. recognizer = sherpa_ncnn.Recognizer( tokens="./sherpa-ncnn-conv-emformer-transducer-2022-12-06/tokens.txt", encoder_param="./sherpa-ncnn-conv-emformer-transducer-2022-12-06/encoder_jit_trace-pnnx.ncnn.param", encoder_bin="./sherpa-ncnn-conv-emformer-transducer-2022-12-06/encoder_jit_trace-pnnx.ncnn.bin", decoder_param="./sherpa-ncnn-conv-emformer-transducer-2022-12-06/decoder_jit_trace-pnnx.ncnn.param", decoder_bin="./sherpa-ncnn-conv-emformer-transducer-2022-12-06/decoder_jit_trace-pnnx.ncnn.bin", joiner_param="./sherpa-ncnn-conv-emformer-transducer-2022-12-06/joiner_jit_trace-pnnx.ncnn.param", joiner_bin="./sherpa-ncnn-conv-emformer-transducer-2022-12-06/joiner_jit_trace-pnnx.ncnn.bin", num_threads=4, decoding_method="modified_beam_search", enable_endpoint_detection=True, rule1_min_trailing_silence=2.4, rule2_min_trailing_silence=1.2, rule3_min_utterance_length=300, ) return recognizer def main(): print("Started! Please speak") recognizer = create_recognizer() sample_rate = recognizer.sample_rate samples_per_read = int(0.1 * sample_rate) # 0.1 second = 100 ms last_result = "" segment_id = 0 with sd.InputStream(channels=1, dtype="float32", samplerate=sample_rate) as s: while True: samples, _ = s.read(samples_per_read) # a blocking read samples = samples.reshape(-1) recognizer.accept_waveform(sample_rate, samples) is_endpoint = recognizer.is_endpoint result = recognizer.text if result and (last_result != result): last_result = result print("\r{}:{}".format(segment_id, result), end="", flush=True) if is_endpoint: if result: print("\r{}:{}".format(segment_id, result), flush=True) segment_id += 1 recognizer.reset() if __name__ == "__main__": devices = sd.query_devices() print(devices) default_input_device_idx = sd.default.device[0] print(f'Use default device: {devices[default_input_device_idx]["name"]}') try: main() except KeyboardInterrupt: print("\nCaught Ctrl + C. Exiting")