提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
最近由于有实现录音和播放功能的需求,最近将pyaudio实现录音和播音功能进行实现,特此进行记录
提示:以下是本篇文章正文内容,下面案例可供参考
代码如下(示例):
import threading
import time
import pyaudio
from pyaudio import Stream
import wave
from typing import Optional
class SoundRecord:
"""
功能:实现录音功能
"""
def __init__(self):
self.__output_file = None # 输出文件地址
self.__chunk = 1024 # 读取数据段
self.__sample_format = pyaudio.paInt16 # 采样格式
self.__channels = 1 # 声道数
self.__sample_rate = 44100 # 采样率44.1kz
self.__input_stream: Optional[Stream] = None # 输入音频流
self.__output_stream: Optional[Stream] = None # 输出音频流
self.__input_audio = None
self.__output_audio = None
self.__record_state = True
self.__play_flag = True
self.__frame = list()
self.__wf = None
def open_audio_input_stream(self):
"""
功能:打开录音stream音频流
"""
# 创建Pyaudio对象
self.__input_audio = pyaudio.PyAudio()
# 打开音频流
try:
self.__input_stream = self.__input_audio.open(format=self.__sample_format,
channels=self.__channels,
rate=self.__sample_rate,
frames_per_buffer=self.__chunk,
input=True)
return True
except OSError: # 没有找到设备
return False
def open_audio_output_stream(self, file_path):
"""
功能:打开播放stream音频流
"""
self.__wf = wave.open(file_path, 'rb')
self.__output_audio = pyaudio.PyAudio()
try:
self.__output_stream = self.__output_audio.open(format=self.__output_audio.get_format_from_width(self.__wf.getsampwidth()),
channels=self.__wf.getnchannels(),
rate=self.__wf.getframerate(),
output=True)
return True
except OSError: # 没有找到设备
return False
def play_file_thread(self):
"""
功能:播放文件线程
"""
play_thread = threading.Thread(target=self.play_file)
play_thread.start()
def play_file(self):
"""
功能:播放文件
"""
data = self.__wf.readframes(1024) # 将文件写入进播放音频流
while len(data) > 0:
self.__output_stream.write(data)
data = self.__wf.readframes(1024)
self.__output_stream.start_stream()
self.__play_flag = False
def close_output_thread(self):
"""
功能:释放相关资源线程
"""
output_thread = threading.Thread(target=self.close_output)
output_thread.start()
def close_output(self):
"""
功能:释放相关资源
"""
while self.__play_flag is False:
self.__output_stream.stop_stream()
self.__output_stream.close()
self.__output_audio.terminate()
time.sleep(0.1)
self.__play_flag = True
def play_wav_file(self):
"""
功能:播放wav文件
"""
self.play_file_thread()
self.close_output_thread()
def start_record_thread(self, path):
"""
功能:开启录音线程
参数1:需要报文文件的路径
"""
self.__output_file = path
record_thread = threading.Thread(target=self.start_sound_record)
record_thread.start()
def start_sound_record(self):
"""
功能:开启录音函数
"""
self.__frame = list()
while self.__record_state:
data = self.__input_stream.read(self.__chunk, exception_on_overflow=False) # exception_on_overflow防止出现阻塞问题
if data != b'':
self.__frame.append(data)
def stop_sound_record(self):
"""
功能:停止录音
"""
time.sleep(1) # 暂停1秒,防止有未录入的数据
self.__record_state = False
self.close_stream()
self.save_sound_record()
def close_stream(self):
"""
功能:关闭音频流
"""
if self.__record_state is False: # 判断是否已停止录制,此处如果直接执行,会存在录制线程还未停止的问题
self.__input_stream.stop_stream()
self.__input_stream.close()
self.__input_audio.terminate()
def save_sound_record(self):
"""
功能:保存录音文件
"""
wave_file = wave.open(self.__output_file, 'wb')
wave_file.setnchannels(self.__channels)
wave_file.setsampwidth(self.__input_audio.get_sample_size(self.__sample_format))
wave_file.setframerate(self.__sample_rate)
wave_file.writeframes(b''.join(self.__frame))
wave_file.close()
if __name__ == '__main__':
sound_record = SoundRecord()
# audio_state = sound_record.open_audio_input_stream() # 录制音频
# if audio_state is True:
# sound_record.start_record_thread("testcode.wav")
# time.sleep(10)
# sound_record.stop_sound_record()
# else:
# print("设备未连接")
audio_output_state = sound_record.open_audio_output_stream(r'testcode.wav') # 播放录音
if audio_output_state is True:
sound_record.play_wav_file()
else:
print("扬声器被占用")
我是一名车载测试开发工程师,希望能和志同道合的朋友一起相互学习进步