声波通过空气传播,被麦克风接收,通过采样、量化、编码转换为离散的数字信号,即波形文件。音量、音高和音色是声音的基本属性。
采样:原始的语音信号是连续的模拟信号,需要对语音进行采样,转化为时间轴上离散的数据。采样后,模拟信号被等间隔地取样,这时信号在时间上就不再连续了,但在幅度上还是连续的。经过采样处理之后,模拟信号变成了离散时间信号。采样频率是指一秒钟内对声音信号的采样次数,采样频率越高声音的还原就越真实越自然。在当今的主流采集卡上,采样频率一般共分为22.05KHz、44.1KHz、48KHz三个等级,22.05KHz只能达到FM广播的声音品质,44.1KHz则是理论上的CD音质界限(人耳一般可以感觉到20-20K Hz的声音,根据香农采样定理,采样频率应该不小于最高频率的两倍,所以40KHz是能够将人耳听见的声音进行很好的还原的一个数值,于是CD公司把采样率定为44.1KHz),48KHz则更加精确一些。对于高于48KHz的采样频率人耳已无法辨别出来了,所以在电脑上没有多少使用价值。
量化:进行分级量化,将信号采样的幅度划分成几个区段,把落在某区段的采样到的样品值归成一类,并给出相应的量化值。根据量化间隔是否均匀划分,又分为均匀量化和非均匀量化。均匀量化的特点为“大信号的信噪比大,小信号的信噪比小”。缺点为“为了保证信噪比要求,编码位数必须足够大,但是这样导致了信道利用率低,如果减少编码位数又不能满足信噪比的要求”(根据信噪比公式,编码位数越大,信噪比越大,通信质量越好)。通常对语音信号采用非均匀量化,基本方法是对大信号使用大的量化间隔,对小信号使用小的量化间隔。由于小信号时量化间隔变小,其相应的量化噪声功率也减小(根据量化噪声功率公式),从而使小信号时的量化信噪比增大,改善了小信号时的信噪比。量化后,信号不仅在时间上不再连续,在幅度上也不连续了。经过量化处理之后,离散时间信号变成了数字信号。
编码:在量化之后信号已经变成了数字信号,需要将数字信号编码成二进制。“CD质量”的语音采用44100个样本每秒的采样率,每个样本16比特,这个16比特就是编码的位数。
采样,量化,编码的过程称为A/D(从模拟信号到数字信号)转换。
VAD,也就是语音端点检测技术,是Voice Activity Detection的缩写。这个技术的主要任务是从带有噪声的语音中准确的定位出语音的开始和结束点,因为语音中含有很长的静音,也就是把静音和实际语音分离开来,因为是语音数据的原始处理,所以VAD是语音信号处理过程的关键技术之一。它的好坏,直接影响成败,由于技术本身的特殊性,所以在涉及语音信号处理的领域,端点检测技术的应用非常广泛。语音识别系统在识别或者声学模型训练阶段所遇到的第一个技术就是端点检测,把静音和噪声作为干扰信号从原始数据中去除,并且端点检测对于语音识别系统的性能至关重要。
Voice Activity Detection (VAD) 在语音信号处理中,例如语音增强,语音识别等领域有着非常重要的作用。它的作用是从一段语音(纯净或带噪)信号中标识出语音片段与非语音片段。VAD系统通常包括两个部分,特征提取和语音/非语音判决;
常用的特征提取可以分为五类:
基于能量:基于能量的准则是检测信号的强度,并且假设语音能量大于背景噪声能量,这样当能量大于某一门限时,可以认为有语音存在。然而当噪声大到和语音一样时,能量这个特征无法区分语音还是纯噪声。早先基于能量的方法,将宽带语音分成各个子带,在子带上求能量;因为语音在2KHz以下频带包含大量的能量,而噪声在24KHz或者4KHz以上频带比02HKz频带倾向有更高的能量。这其实就是频谱平坦度的概念,webrtc中已经用到了。在信噪比低于10dB时,语音和噪声的区分能力会加速下降。
频域:通过STFT将时域信号变成频域信号,即使在SNR到0dB时,一些频带的长时包络还是可以区分语音和噪声。
倒谱:对于VAD,能量倒谱峰值确定了语音信号的基频(pitch),也有使用MFCC做为特征的;
谐波:语音的一个明显特征是包含了基频F0 及其多个谐波频率,即使在强噪声场景,谐波这一特征也是存在的。可以使用自相关的方法找到基频。
长时信息:语音是非稳态信号。普通语速通常每秒发出10~15个音素,音素见的谱分布是不一样的,这就导致了随着时间变化语音统计特性也是变化的。另一方面,日常的绝大多数噪声是稳态的(变化比较慢的),如白噪声/机器噪声。
基于能量的特征常用硬件实现,谱(频谱和倒谱)在低SNR可以获得较好的效果。当SNR到达0dB时,基于语音谐波和长时语音特征更具有鲁棒性。
判决准则可以分为三类:
对于VAD分类问题,特征尤为重要,好的特征应该能具备如下特性:
在语音增强中,我们希望从带噪语音信号中剔除噪音,得到纯净的语音信号,第一步就是提取噪音信息。通常的思路是通过VAD函数得到非语音片段,而非语音片段可以认为是纯噪音片段。从而可以从纯噪音信号中提取出有用信息,例如进行傅里叶变换得到噪音频谱等,再进而做下一步处理。例如谱减法,维纳滤波。此处不作讨论。
音频的能量通常指的是时域上每帧的能量,幅度的平方。在简单的语音活动检测(Voice Activity Detection,VAD)中,直接利用能量特征:能量大的音频片段是语音,能量小的音频片段是非语音(包括噪音、静音段等)。这种VAD的局限性比较大,正确率也不高,对噪音非常敏感。
def __init__(self, input_file, sr=None, frame_len=512, n_fft=None, win_step=2 / 3, window="hamming"):
"""
初始化
:param input_file: 输入音频文件
:param sr: 所输入音频文件的采样率,默认为None
:param frame_len: 帧长,默认512个采样点(32ms,16kHz),与窗长相同
:param n_fft: FFT窗口的长度,默认与窗长相同
:param win_step: 窗移,默认移动2/3,512*2/3=341个采样点(21ms,16kHz)
:param window: 窗类型,默认汉明窗
"""
self.input_file = input_file
self.frame_len = frame_len # 帧长,单位采样点数
self.wave_data, self.sr = librosa.load(self.input_file, sr=sr)
self.window_len = frame_len # 窗长512
if n_fft is None:
self.fft_num = self.window_len # 设置NFFT点数与窗长相等
else:
self.fft_num = n_fft
self.win_step = win_step
self.hop_length = round(self.window_len * win_step) # 重叠部分采样点数设置为窗长的1/3(1/3~1/2),即帧移(窗移)2/3
self.window = window
def energy(self):
"""
每帧内所有采样点的幅值平方和作为能量值
:return: 每帧能量值,np.ndarray[shape=(1,n_frames), dtype=float64]
"""
mag_spec = np.abs(librosa.stft(self.wave_data, n_fft=self.fft_num, hop_length=self.hop_length,
win_length=self.frame_len, window=self.window))
pow_spec = np.square(mag_spec) # [frequency, time (n_frames)]
energy = np.sum(pow_spec, axis=0) # [n_frames]
energy = np.where(energy == 0, np.finfo(np.float64).eps, energy) # 避免能量值为0,防止后续取log出错(eps是取非负的最小值), 即np.finfo(np.float64).eps = 2.220446049250313e-16
return energy
短时能量体现的是信号在不同时刻的强弱程度。设第n帧语音信号的短时能量用表示,则其计算公式为:
def short_time_energy(self):
"""
计算语音短时能量:每一帧中所有语音信号的平方和
:return: 语音短时能量列表(值范围0-每帧归一化后能量平方和,这里帧长512,则最大值为512),
np.ndarray[shape=(1,无加窗,帧移为0的n_frames), dtype=float64]
"""
energy = [] # 语音短时能量列表
energy_sum_per_frame = 0 # 每一帧短时能量累加和
for i in range(len(self.wave_data)): # 遍历每一个采样点数据
energy_sum_per_frame += self.wave_data[i] ** 2 # 求语音信号能量的平方和
if (i + 1) % self.frame_len == 0: # 一帧所有采样点遍历结束
energy.append(energy_sum_per_frame) # 加入短时能量列表
energy_sum_per_frame = 0 # 清空和
elif i == len(self.wave_data) - 1: # 不满一帧,最后一个采样点
energy.append(energy_sum_per_frame) # 将最后一帧短时能量加入列表
energy = np.array(energy)
energy = np.where(energy == 0, np.finfo(np.float64).eps, energy) # 避免能量值为0,防止后续取log出错(eps是取非负的最小值)
return energy
单位时间内通过垂直于声波传播方向的单位面积的平均声能,称作声强,声强用P表示,单位为“瓦/平米”。实验研究表明,人对声音的强弱感觉并不是与声强成正比,而是与其对数成正比,所以一般声强用声强级来表示:
def intensity(self):
"""
计算声音强度,用声压级表示:每帧语音在空气中的声压级Sound Pressure Level(SPL),单位dB
公式:20*lg(P/Pref),P为声压(Pa),Pref为参考压力(听力阈值压力),一般为1.0*10-6 Pa
这里P认定为声音的幅值:求得每帧所有幅值平方和均值,除以Pref平方,再取10倍lg
:return: 每帧声压级,dB,np.ndarray[shape=(1,无加窗,帧移为0的n_frames), dtype=float64]
"""
p0 = 1.0e-6 # 听觉阈限压力auditory threshold pressure: 2.0*10-5 Pa
e = self.short_time_energy()
spl = 10 * np.log10(1 / (np.power(p0, 2) * self.frame_len) * e)
return spl
过零率体现的是信号过零点的次数,体现的是频率特性。
def zero_crossing_rate(self):
"""
计算语音短时过零率:单位时间(每帧)穿过横轴(过零)的次数
:return: 每帧过零率次数列表,np.ndarray[shape=(1,无加窗,帧移为0的n_frames), dtype=uint32]
"""
zcr = [] # 语音短时过零率列表
counting_sum_per_frame = 0 # 每一帧过零次数累加和,即过零率
for i in range(len(self.wave_data)): # 遍历每一个采样点数据
if i % self.frame_len == 0: # 开头采样点无过零,因此每一帧的第一个采样点跳过
continue
if self.wave_data[i] * self.wave_data[i - 1] < 0: # 相邻两个采样点乘积小于0,则说明穿过横轴
counting_sum_per_frame += 1 # 过零次数加一
if (i + 1) % self.frame_len == 0: # 一帧所有采样点遍历结束
zcr.append(counting_sum_per_frame) # 加入短时过零率列表
counting_sum_per_frame = 0 # 清空和
elif i == len(self.wave_data) - 1: # 不满一帧,最后一个采样点
zcr.append(counting_sum_per_frame) # 将最后一帧短时过零率加入列表
return np.array(zcr, dtype=np.uint32)
基音周期反映了声门相邻两次开闭之间的时间间隔,基频(fundamental frequency, F0)则是基音周期的倒数,对应着声带振动的频率,代表声音的音高,声带振动越快,基频越高。如图2所示,蓝色箭头指向的就是基频的位置,决定音高。它是语音激励源的一个重要特征,比如可以通过基频区分性别。一般来说,成年男性基频在 100-250Hz左右,成年女性基频在 150-350Hz左右,女声的音高一般比男声稍高。 人类可感知声音的频率大致在20-20000Hz之间,人类对于基频的感知遵循对数律,也就是说,人们会感觉100Hz到200Hz的差距,与200Hz到400Hz的差距相同。因此,音高常常用基频的对数来表示。
音高(pitch)是由声音的基频决定的,音高和基频常常混用。可以这样认为,音高(pitch)是稀疏离散化的基频(F0)。由规律振动产生的声音一般都会有基频,比如语音中的元音和浊辅音;也有些声音没有基频,比如人类通过口腔挤压气流的清辅音。在汉语中,元音有a/e/i/o/u,浊辅音有y/w/v,其余音素比如b/p/q/x等均为清辅音,在发音时,可以通过触摸喉咙感受和判断发音所属音素的种类。
def pitch(self, ts_mag=0.25):
"""
获取每帧音高,即基频,这里应该包括基频和各次谐波,最小的为基频(一次谐波),其他的依次为二次、三次...谐波
各次谐波等于基频的对应倍数,因此基频也等于各次谐波除以对应的次数,精确些等于所有谐波之和除以谐波次数之和
:param ts_mag: 幅值倍乘因子阈值,>0,大于np.average(np.nonzero(magnitudes)) * ts_mag则认为对应的音高有效,默认0.25
:return: 每帧基频及其对应峰的幅值(>0),
np.ndarray[shape=(1 + n_fft/2,n_frames), dtype=float32],(257,全部采样点数/(512*2/3)+1)
"""
mag_spec = np.abs(librosa.stft(self.wave_data, n_fft=self.fft_num, hop_length=self.hop_length,
win_length=self.frame_len, window=self.window))
pitches, magnitudes = librosa.piptrack(S=mag_spec, sr=self.sr, threshold=1.0, ref=np.mean,
fmin=50, fmax=500) # 人类正常说话基频最大可能范围50-500Hz
ts = np.average(magnitudes[np.nonzero(magnitudes)]) * ts_mag
pit_likely = pitches
mag_likely = magnitudes
pit_likely[magnitudes < ts] = 0
mag_likely[magnitudes < ts] = 0
return pit_likely, mag_likely
pitches, mags = self.pitch() # 获取每帧基频
f0_likely = [] # 可能的基频F0
for i in range(pitches.shape[1]): # 按列遍历非0最小值,作为每帧可能的F0
try:
f0_likely.append(np.min(pitches[np.nonzero(pitches[:, i]), i]))
except ValueError:
f0_likely.append(np.nan) # 当一列,即一帧全为0时,赋值最小值为nan
f0_all = np.array(f0_likely)
语音经过说话人的口唇辐射发出,受到唇端辐射抑制,高频能量明显降低。一般来说,当语音信号的频率提高两倍时,其功率谱的幅度下降约6dB,即语音信号的高频部分受到的抑制影响较大。比如像元音等一些因素的发音包含了较多的高频信号的成分,高频信号的丢失,可能会导致音素的共振峰并不明显,使得声学模型对这些音素的建模能力不强。预加重(pre-emphasis)是个一阶高通滤波器,可以提高信号高频部分的能量,给定时域输入信号,预加重之后信号为:
其中,a是预加重系数,一般取0.97或0.95。
def preemphasis(y, coef=0.97, zi=None, return_zf=False):
"""Pre-emphasize an audio signal with a first-order auto-regressive filter:
y[n] -> y[n] - coef * y[n-1]
Parameters
----------
y : np.ndarray
Audio signal
coef : positive number
Pre-emphasis coefficient. Typical values of ``coef`` are between 0 and 1.
At the limit ``coef=0``, the signal is unchanged.
At ``coef=1``, the result is the first-order difference of the signal.
The default (0.97) matches the pre-emphasis filter used in the HTK
implementation of MFCCs [#]_.
.. [#] http://htk.eng.cam.ac.uk/
zi : number
Initial filter state. When making successive calls to non-overlapping
frames, this can be set to the ``zf`` returned from the previous call.
(See example below.)
By default ``zi`` is initialized as ``2*y[0] - y[1]``.
return_zf : boolean
If ``True``, return the final filter state.
If ``False``, only return the pre-emphasized signal.
Returns
-------
y_out : np.ndarray
pre-emphasized signal
zf : number
if ``return_zf=True``, the final filter state is also returned
"""
b = np.asarray([1.0, -coef], dtype=y.dtype)
a = np.asarray([1.0], dtype=y.dtype)
if zi is None:
# Initialize the filter to implement linear extrapolation
zi = 2 * y[..., 0] - y[..., 1]
zi = np.atleast_1d(zi)
y_out, z_f = scipy.signal.lfilter(b, a, y, zi=np.asarray(zi, dtype=y.dtype))
if return_zf:
return y_out, z_f
return y_out
wave_data, self.sr = librosa.load(input_file, sr=sr) # 音频全部采样点的归一化数组形式数据
wave_data = preemphasis(wave_data, coef=preemph) # 预加重,系数0.97
语音信号是非平稳信号,考虑到发浊音时声带有规律振动,即基音频率在短时范围内时相对固定的,因此可以认为语音信号具有短时平稳特性,一般认为10ms~50ms的语音信号片段是一个准稳态过程。短时分析采用分帧方式,一般每帧帧长为20ms或50ms。假设语音采样率为16kHz,帧长为20ms,则一帧有16000×0.02=320个样本点。
相邻两帧之间的基音有可能发生变化,如两个音节之间,或者声母向韵母过渡。为确保声学特征参数的平滑性,一般采用重叠取帧的方式,即相邻帧之间存在重叠部分。一般来说,帧长和帧移的比例为1:4或1:5。
短时分析:虽然语音信号具有时变特性,但是在一个短时间范围内(一般认为在10-30ms)其特性基本保持相对稳定,即语音具有短时平稳性。所以任何语音信号的分析和处理必须建立在“短时”的基础上,即进行“短时分析”。
def framesig(sig,frame_len,frame_step):
"""Frame a signal into overlapping frames.
:param sig: the audio signal to frame.
:param frame_len: length of each frame measured in samples.
:param frame_step: number of samples after the start of the previous frame that the next frame should begin.
:returns: an array of frames. Size is NUMFRAMES by frame_len.
"""
slen = len(sig)
frame_len = int(round_half_up(frame_len))
frame_step = int(round_half_up(frame_step))
if slen <= frame_len:
numframes = 1
else:
numframes = 1 + int(math.ceil((1.0*slen - frame_len)/frame_step))
padlen = int((numframes-1)*frame_step + frame_len)
zeros = numpy.zeros((padlen - slen,))
padsignal = numpy.concatenate((sig,zeros))
indices = numpy.tile(numpy.arange(0,frame_len),(numframes,1)) + numpy.tile(numpy.arange(0,numframes*frame_step,frame_step),(frame_len,1)).T
indices = numpy.array(indices,dtype=numpy.int32)
frames = padsignal[indices]
return frames
frames = framesig(sig=sig, frame_len=0.030 * sample_rate, # 取帧长为30ms
frame_step=0.006 * sample_rate, # 取帧移为6ms
)
分帧相当于对语音信号加矩形窗(用矩形窗其实就是不加窗),矩形窗在时域上对信号进行截断,在边界处存在多个旁瓣,会发生频谱泄露。为了减少频谱泄露,通常对分帧之后的信号进行其它形式的加窗操作。常用的窗函数有:汉明(Hamming)窗、汉宁(Hanning)窗和布莱克曼(Blackman)窗等。 加窗主要是为了使时域信号似乎更好地满足FFT处理的周期性要求,减少泄漏(加窗不能消除泄漏,只能减少)。
什么是频谱泄露?
音频处理中,经常需要利用傅里叶变换将时域信号转换到频域,而一次快速傅里叶变换(FFT)只能处理有限长的时域信号,但语音信号通常是长的,所以需要将原始语音截断成一帧一帧长度的数据块。这个过程叫信号截断,也叫分帧。分完帧后再对每帧做FFT,得到对应的频域信号。FFT是离散傅里叶变换(DFT)的快速计算方式,而做DFT有一个先验条件:分帧得到的数据块必须是整数周期的信号,也即是每次截断得到的信号要求是周期主值序列。但做分帧时,很难满足周期截断,因此就会导致频谱泄露。一句话,频谱泄露就是分析结果中,出现了本来没有的频率分量。比如说,50Hz的纯正弦波,本来只有一种频率分量,分析结果却包含了与50Hz频率相近的其它频率分量。
非周期的无限长序列,任意截取一段有限长的序列,都不能代表实际信号,分析结果当然与实际信号不一致!也就是会造成频谱泄露。而周期的无限长序列,假设截取的是正好一个或整数个信号周期的序列,这个有限长序列就可以代表原无限长序列,如果分析的方法得当的话,分析结果应该与实际信号一致!因此也就不会造成频谱泄露。
?
def framesig(sig,frame_len,frame_step,winfunc=lambda x:numpy.ones((x,))):
"""Frame a signal into overlapping frames.
:param sig: the audio signal to frame.
:param frame_len: length of each frame measured in samples.
:param frame_step: number of samples after the start of the previous frame that the next frame should begin.
:param winfunc: the analysis window to apply to each frame. By default no window is applied.
:returns: an array of frames. Size is NUMFRAMES by frame_len.
"""
slen = len(sig)
frame_len = int(round_half_up(frame_len))
frame_step = int(round_half_up(frame_step))
if slen <= frame_len:
numframes = 1
else:
numframes = 1 + int(math.ceil((1.0*slen - frame_len)/frame_step))
padlen = int((numframes-1)*frame_step + frame_len)
zeros = numpy.zeros((padlen - slen,))
padsignal = numpy.concatenate((sig,zeros))
indices = numpy.tile(numpy.arange(0,frame_len),(numframes,1)) + numpy.tile(numpy.arange(0,numframes*frame_step,frame_step),(frame_len,1)).T
indices = numpy.array(indices,dtype=numpy.int32)
frames = padsignal[indices]
# 加窗操作
win = numpy.tile(winfunc(frame_len),(numframes,1))
return frames*win
frames = framesig(sig=sig, frame_len=0.030 * sample_rate, # 取帧长为30ms
frame_step=0.006 * sample_rate, # 取帧移为6ms
winfunc=np.hamming
)
VAD有很多种方法,此处介绍一种最简单直接的办法。 通过short timeenergy (STE)和zero cross counter (ZCC) 来测定。(实际上精确度高的VAD会提取4种或更多的特征进行判断,这里只介绍两种特征的基本方法)。
理论基础是在信噪比(SNR)不是很低的情况下,语音片段的STE相对较大,而ZCC相对较小;而非语音片段的STE相对较小,但是ZCC相对较大。因为语音信号能量绝大部分包含在低频带内,而噪音信号通常能量较小且含有较高频段的信息。故而可以通过测量语音信号的这两个特征并且与两个门限(阈值)进行对比,从而判断语音信号与非语音信号。一段音频小于STE门限同时大于ZCC门限的部分,我们可以认为它是噪声通常在对语音信号分帧时,取一帧20ms (因为一般会进行短时傅里叶变换,时域和频域的分辨率需要一个平衡,20ms为平衡点,此处不用考虑)。此处输入信号采样率为8000HZ。因此每一帧长度为160 samples.
STE的计算方法是 , 即帧内信号的平方和。
ZCC的计算方法是,将帧内所有sample平移1,再对应点做乘积,符号为负的则说明此处过零,只需将帧内所有负数乘积数目求出则得到该帧的过零率。
# 需要添加录音互斥功能能,某些功能开启的时候录音暂时关闭
def ZCR(curFrame):
# 过零率
tmp1 = curFrame[:-1]
tmp2 = curFrame[1:]
sings = (tmp1 * tmp2 <= 0)
diffs = (tmp1 - tmp2) > 0.02
zcr = np.sum(sings * diffs)
return zcr
def STE(curFrame):
# 短时能量
amp = np.sum(np.abs(curFrame))
return amp
def read_file_data(filename):
"""
输入:需要读取的文件名
返回:(声道,量化位数,采样率,数据)
"""
read_file = wave.open(filename, "r")
params = read_file.getparams()
nchannels, sampwidth, framerate, nframes = params[:4]
data = read_file.readframes(nframes)
return nchannels, sampwidth, framerate, data
# -*- coding: utf-8 -*-
import os
import time
import wave
from time import sleep
import numpy as np
SUCCESS = 0
FAIL = 1
# 需要添加录音互斥功能能,某些功能开启的时候录音暂时关闭
def ZCR(curFrame):
# 过零率
tmp1 = curFrame[:-1]
tmp2 = curFrame[1:]
sings = (tmp1 * tmp2 <= 0)
diffs = (tmp1 - tmp2) > 0.02
zcr = np.sum(sings * diffs)
return zcr
def STE(curFrame):
# 短时能量
amp = np.sum(np.abs(curFrame))
return amp
class Vad(object):
def __init__(self):
# 初始短时能量高门限
self.amp1 = 140
# 初始短时能量低门限
self.amp2 = 120
# 初始短时过零率高门限
self.zcr1 = 10
# 初始短时过零率低门限
self.zcr2 = 5
# 允许最大静音长度
self.maxsilence = 100
# 语音的最短长度
self.minlen = 40
# 偏移值
self.offsets = 40
self.offsete = 40
# 能量最大值
self.max_en = 20000
# 初始状态为静音
self.status = 0
self.count = 0
self.silence = 0
self.frame_len = 256 * 2
self.frame_inc = 128
self.cur_status = 0
self.frames = []
# 数据开始偏移
self.frames_start = []
self.frames_start_num = 0
# 数据结束偏移
self.frames_end = []
self.frames_end_num = 0
# 缓存数据
self.cache_frames = []
self.cache = ""
# 最大缓存长度
self.cache_frames_num = 0
self.end_flag = False
self.wait_flag = False
self.on = True
self.callback = None
self.callback_res = []
self.callback_kwargs = {}
def clean(self):
self.frames = []
# 数据开始偏移
self.frames_start = []
self.frames_start_num = 0
# 数据结束偏移
self.frames_end = []
self.frames_end_num = 0
# 缓存数据
self.cache_frames = []
# 最大缓存长度
self.cache_frames_num = 0
self.end_flag = False
self.wait_flag = False
def go(self):
self.wait_flag = False
def wait(self):
self.wait_flag = True
def stop(self):
self.on = False
def add(self, frame, wait=True):
if wait:
print('wait')
frame = self.cache + frame
while len(frame) > self.frame_len:
frame_block = frame[:self.frame_len]
self.cache_frames.append(frame_block)
frame = frame[self.frame_len:]
if wait:
self.cache = frame
else:
self.cache = ""
self.cache_frames.append(-1)
def run(self, hasNum):
print("开始执行音频端点检测")
step = self.frame_len - self.frame_inc
num = 0
x = []
while 1:
# 开始端点
# 获得音频文件数字信号
if self.wait_flag:
sleep(1)
continue
if len(self.cache_frames) < 2:
sleep(0.05)
continue
if self.cache_frames[1] == -1:
print('----------------没有声音--------------')
break
# 从缓存中读取音频数据
record_stream = b"".join(self.cache_frames[:2])
wave_data = np.frombuffer(record_stream, dtype=np.int16)
wave_data = wave_data * 1.0 / self.max_en
data = wave_data[np.arange(0, self.frame_len)]
speech_data = self.cache_frames.pop(0)
# 获得音频过零率
zcr = ZCR(data)
# 获得音频的短时能量, 平方放大
amp = STE(data) ** 2
# 返回当前音频数据状态
res = self.speech_status(amp, zcr)
if res == 2:
hasNum += 1
print("有声音", len(speech_data), speech_data)
x.append(speech_data)
else:
print("没有声音", len(speech_data), speech_data)
if len(x) > 5:
x.append(speech_data)
nchannels, sampwidth, framerate, data = read_file_data(filename)
with wave.open(f"sound{time.time()}.wav", "wb") as f:
f.setnchannels(nchannels)
f.setsampwidth(sampwidth)
f.setframerate(framerate)
f.writeframesraw(b"".join(x))
print("保存音频", len(x))
x = []
if hasNum > 10000:
print('+++++++++++++++++++++++++有声音++++++++++++++++++++++++')
break
num = num + 1
# 一段一段进行检测
self.frames_start.append(speech_data)
self.frames_start_num += 1
if self.frames_start_num == self.offsets:
# 开始音频开始的缓存部分
self.frames_start.pop(0)
self.frames_start_num -= 1
if self.end_flag:
# 当音频结束后进行后部缓存
self.frames_end_num += 1
# 下一段语音开始,或达到缓存阀值
if res == 2 or self.frames_end_num == self.offsete:
speech_stream = b"".join(self.frames + self.frames_end)
if self.callback:
self.callback_res.append(self.callback(speech_stream, **self.callback_kwargs))
# 数据环境初始化
# self.clean()
self.end_flag = False
self.frames = []
self.frames_end_num = 0
self.frames_end = []
self.frames_end.append(speech_data)
if res == 2:
if self.cur_status in [0, 1]:
# 添加开始偏移数据到数据缓存
self.frames.append(b"".join(self.frames_start))
# 添加当前的语音数据
self.frames.append(speech_data)
if res == 3:
print('检测音频结束')
self.frames.append(speech_data)
# 开启音频结束标志
self.end_flag = True
self.cur_status = res
# return self.callback_res
def speech_status(self, amp, zcr):
status = 0
# 0= 静音, 1= 可能开始, 2=确定进入语音段
if self.cur_status in [0, 1]:
# 确定进入语音段
if amp > self.amp1:
status = 2
self.silence = 0
self.count += 1
# 可能处于语音段
elif amp > self.amp2 or zcr > self.zcr2:
status = 1
self.count += 1
# 静音状态
else:
status = 0
self.count = 0
self.count = 0
# 2 = 语音段
elif self.cur_status == 2:
# 保持在语音段
if amp > self.amp2 or zcr > self.zcr2:
self.count += 1
status = 2
# 语音将结束
else:
# 静音还不够长,尚未结束
self.silence += 1
if self.silence < self.maxsilence:
self.count += 1
status = 2
# 语音长度太短认为是噪声
elif self.count < self.minlen:
status = 0
self.silence = 0
self.count = 0
# 语音结束
else:
status = 3
self.silence = 0
self.count = 0
return status
def read_file_data(filename):
"""
输入:需要读取的文件名
返回:(声道,量化位数,采样率,数据)
"""
read_file = wave.open(filename, "r")
params = read_file.getparams()
nchannels, sampwidth, framerate, nframes = params[:4]
data = read_file.readframes(nframes)
return nchannels, sampwidth, framerate, data
class FileParser(Vad):
def __init__(self):
self.block_size = 256
Vad.__init__(self)
def read_file(self, filename):
if not os.path.isfile(filename):
print("文件%s不存在" % filename)
return FAIL
datas = read_file_data(filename)[-1]
self.add(datas, False)
if __name__ == "__main__":
stream_test = FileParser()
filename = 'D:/dataset/lyb01.wav'
result = stream_test.read_file(filename)
if result != FAIL:
stream_test.run(0)