1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- #!/usr/bin/env python3
- # Real-time speech recognition from a microphone with sherpa-ncnn Python API
- # with endpoint detection.
- #
- # Please refer to
- # https://k2-fsa.github.io/sherpa/ncnn/pretrained_models/index.html
- # to download pre-trained models
- import sys
- try:
- import sounddevice as sd
- except ImportError as e:
- print("Please install sounddevice first. You can use")
- print()
- print(" pip install sounddevice")
- print()
- print("to install it")
- sys.exit(-1)
- import sherpa_ncnn
- def create_recognizer():
- # Please replace the model files if needed.
- # See https://k2-fsa.github.io/sherpa/ncnn/pretrained_models/index.html
- # for download links.
- recognizer = sherpa_ncnn.Recognizer(
- tokens="./sherpa-ncnn-conv-emformer-transducer-2022-12-06/tokens.txt",
- encoder_param="./sherpa-ncnn-conv-emformer-transducer-2022-12-06/encoder_jit_trace-pnnx.ncnn.param",
- encoder_bin="./sherpa-ncnn-conv-emformer-transducer-2022-12-06/encoder_jit_trace-pnnx.ncnn.bin",
- decoder_param="./sherpa-ncnn-conv-emformer-transducer-2022-12-06/decoder_jit_trace-pnnx.ncnn.param",
- decoder_bin="./sherpa-ncnn-conv-emformer-transducer-2022-12-06/decoder_jit_trace-pnnx.ncnn.bin",
- joiner_param="./sherpa-ncnn-conv-emformer-transducer-2022-12-06/joiner_jit_trace-pnnx.ncnn.param",
- joiner_bin="./sherpa-ncnn-conv-emformer-transducer-2022-12-06/joiner_jit_trace-pnnx.ncnn.bin",
- num_threads=4,
- decoding_method="modified_beam_search",
- enable_endpoint_detection=True,
- rule1_min_trailing_silence=2.4,
- rule2_min_trailing_silence=1.2,
- rule3_min_utterance_length=300,
- )
- return recognizer
- def main():
- print("Started! Please speak")
- recognizer = create_recognizer()
- sample_rate = recognizer.sample_rate
- samples_per_read = int(0.1 * sample_rate) # 0.1 second = 100 ms
- last_result = ""
- segment_id = 0
- with sd.InputStream(channels=1, dtype="float32", samplerate=sample_rate) as s:
- while True:
- samples, _ = s.read(samples_per_read) # a blocking read
- samples = samples.reshape(-1)
- recognizer.accept_waveform(sample_rate, samples)
- is_endpoint = recognizer.is_endpoint
- result = recognizer.text
- if result and (last_result != result):
- last_result = result
- print(f"{segment_id}: {result}")
- if result and is_endpoint:
- segment_id += 1
- if __name__ == "__main__":
- devices = sd.query_devices()
- print(devices)
- default_input_device_idx = sd.default.device[0]
- print(f'Use default device: {devices[default_input_device_idx]["name"]}')
- try:
- main()
- except KeyboardInterrupt:
- print("\nCaught Ctrl + C. Exiting")
|