如何自行开发代码访问阿里语音服务_智能语音交互(ISI)-阿里云帮助中心
begin_time?time?含义
客户端循环发送语音数据,持续接收识别结果:
{
"header": {
"namespace": "SpeechTranscriber",
"name": "TranscriptionResultChanged",
"status": 20000000,
"message_id": "dc21193fada84380a3b6137875ab****",
"task_id": "5ec521b5aa104e3abccf3d361822****",
"status_text": "Gateway:SUCCESS:Success."
},
"payload": {
"index": 1,
"time": 1835,
"result": "北京的天",
"confidence": 1.0,
"words": [{
"text": "北京",
"startTime": 630,
"endTime": 930
}, {
"text": "的",
"startTime": 930,
"endTime": 1110
}, {
"text": "天",
"startTime": 1110,
"endTime": 1140
}]
}
}
payload?参数说明:
参数 | 类型 | 说明 |
index | integer | 句子编号,1?开始递增 |
time | integer | 已处理音频时长,ms |
result | string | 当前句子识别结果 |
words | list<> | 句子的词信息,enable_words?需要设置为?true |
confidence | double | 当前句子识别结果的置信度,取值范围:[0.0,?1.0]。值越大置信度越高 |
{
"header":?{
"namespace":?"SpeechTranscriber",
"name":?"SentenceEnd",
"status":?20000000,
"message_id":?"c3a9ae4b231649d5ae05d4af36fd****",
"task_id":?"5ec521b5aa104e3abccf3d361822****",
"status_text":?"Gateway:SUCCESS:Success."
????????},
"payload":?{
"index":?1,
"time":?1820,
"begin_time":?0,
"result":?"北京的天气。",
"confidence":?1.0,
"words":?[{
"text":?"北京",
"startTime":?630,
"endTime":?930
????????????????},?{
"text":?"的",
"startTime":?930,
"endTime":?1110
????????????????},?{
"text":?"天气",
"startTime":?1110,
"endTime":?1380
????????????????}]
????????}
}
payload对象参数说明:
参数 | 类型 | 说明 |
index | Integer | 句子编号,从1开始递增。 |
time | Integer | 当前已处理的音频时长,单位是毫秒。 |
begin_time | Integer | 当前句子对应的SentenceBegin事件的时间,单位是毫秒。 |
result | String | 当前的识别结果。 |
words | List<?Word?> | 当前句子的词信息,需要将enable_words设置为true。 |
confidence | Double | 当前句子识别结果的置信度,取值范围:[0.0,1.0]。值越大表示置信度越高。 |
Words对象参数说明:
参数 | 类型 | 说明 |
text | String | 文本。 |
startTime | Integer | 词开始时间,单位为毫秒。 |
endTime | Integer | 词结束时间,单位为毫秒。 |
语音识别?实时语音识别(websocket)-API?文档-文档中心-腾讯云
无相应字段配置
????在上面描述端上调用步骤的时候提到了句内修正,那么怎么知道什么样的音频是一句话,下面有两个协议参数,对应了断句的两种策略。
max_sentence_silence | int | 语音断句检测阈值,静音时长超过该阈值会被认为断句,参数范围200ms~6000ms,默认值800ms。 开启语义断句enable_semantic_sentence_detection后,此参数无效。 | 默认是200,单位毫秒 |
enable_semantic_sentence_detection | bool | 是否开启语义断句,默认是False。语义断句参数需要和开启中间结果配合使用,即开启该语义断句参数需将中间结果参数同时打开:enable_intermediate_result=true。 | 默认是False,目前不支持 |
????????max_sentence_silence和enable_semantic_sentence_detection参数是互斥的两个参数,两个参数不能同时生效。第一个参数生效时候是根据静音来进行断句的并且只支持识别结果句内修正,第二种参数生效的时候是根据语义(标点符号),进行断句的。
???????两种参数分别对应了两种buffer的处理方式,下面在单路设计中会体现出来。
前面不需要静音进行处理,通过句号来处理。
第一步将识别文本放入下标为x的数据元素。
第二步由于没有收到静音或者未到达30s,将数据覆盖之前的数据元素。
第三步数据处理线程发现断句,将识别出来的文本数据,存入下标为x+1的数组元素中。
第四步使用数据处理线程识别出来的数据覆盖数组最后元素的文本。
第五步数据发送线程从文本数组中按照数组下标递增的顺序取数据并发送给客户端,如果当发送线程当前的句子index和文本数组index相等,发送最后一句。
????该参数断句逻辑是根据识别文本中的句号或者逗号进行断句,所以将识别的数据连续存入string,根据标点符号断句发送给客户端就可以了。
字段名称 | 字段类型 | 字段含义 | 说明 |
websocket | websocket | websocket句柄 | 用于该路数据的收发 |
sessionId | string | 每一路数据唯一标识sessionId | 32位uuid,用于标识某一路数据。 |
bytes | bytes?of?array | 二进制数据数组 | 用来存储二进制数据,每一个bytes都是一段buffer。 |
textArray | string[] | 文本结果数组 | 用来存储结果文本数据,如果enable_intermediate_result能力为true,数组的最后一个元素是可变的,以便支持该能力。 |
format | string | 音频数据的类型 | 默认是“PCM”,v1只支持处理pcm格式数据 |
sample_rate | int | 音频数据采样率 | 默认是16000 |
channels | int | 通道数 | 默认是1,目前只支持单通道 |
enable_intermediate_result | bool | 是否返回中间识别结果 | 默认是true |
enable_punctuation_prediction | bool | 是否在后处理中加标点 | 默认是false,目前不支持 |
enable_inverse_text_normalization | bool | ITN(逆文本inverse?text?normalization)中文数字转换阿拉伯数字。设置为True时,中文数字将转为阿拉伯数字输出,默认值:False | 目前不支持 |
customization_id | string | 自学习模型id | 目前不支持 |
vocabulary_id | string | 定制泛热词ID | 目前不支持 |
max_sentence_silence | int | 语音断句检测阈值,静音时长超过该阈值会被认为断句,参数范围200ms~6000ms,默认值800ms。 开启语义断句enable_semantic_sentence_detection后,此参数无效。 | 默认是200,单位毫秒 |
enable_words | bool | 是否开启返回词信息 | 默认是false |
enable_ignore_sentence_timeout | bool | 是否忽略实时识别中的单句识别超时 | 默认是false |
disfluency | bool | 过滤语气词,即声音顺 | 默认值false,目前不支持 |
speech_noise_threshold | float | 噪音参数阈值,参数范围:[-1,1]。取值说明如下:
| 默认是0.0 |
enable_semantic_sentence_detection | bool | 是否开启语义断句,默认是False。语义断句参数需要和开启中间结果配合使用,即开启该语义断句参数需将中间结果参数同时打开:enable_intermediate_result=true。 | 默认是False,目前不支持 |
special_word_filter | json?string | 敏感词过滤功能,可根据实际需求开启或关闭自定义词或默认词表。该参数支持以下选项:
| 默认不处理,目前不支持 |
?????????python中的队列(queue.Queue)是线程安全的,不需要加锁(如果在多线程环境中进行通信,应该使用queue.Queue。如果在多进程环境中进行通信,你应该使用multiprocessing.Queue。如果在协程之间进行通信,你应该使用asyncio.Queue)。
相关资料:
8.10.?Queue?—?A?synchronized?queue?class?—?Python?2.7.18?documentation
队列源码实现:
class Queue:
"""Create?a?queue?object?with?a?given?maximum?size.
????If?maxsize?is?<=?0,?the?queue?size?is?infinite.
????"""
def __init__(self,?maxsize=0):
????????self.maxsize?=?maxsize
????????self._init(maxsize)
#?mutex?must?be?held?whenever?the?queue?is?mutating.??All?methods
#?that?acquire?mutex?must?release?it?before?returning.??mutex
#?is?shared?between?the?three?conditions,?so?acquiring?and
#?releasing?the?conditions?also?acquires?and?releases?mutex.
????????self.mutex?=?_threading.Lock()
#?Notify?not_empty?whenever?an?item?is?added?to?the?queue;?a
#?thread?waiting?to?get?is?notified?then.
????????self.not_empty?=?_threading.Condition(self.mutex)
#?Notify?not_full?whenever?an?item?is?removed?from?the?queue;
#?a?thread?waiting?to?put?is?notified?then.
????????self.not_full?=?_threading.Condition(self.mutex)
#?Notify?all_tasks_done?whenever?the?number?of?unfinished?tasks
#?drops?to?zero;?thread?waiting?to?join()?is?notified?to?resume
????????self.all_tasks_done?=?_threading.Condition(self.mutex)
????????self.unfinished_tasks?= 0
def task_done(self):
"""Indicate?that?a?formerly?enqueued?task?is?complete.
????????Used?by?Queue?consumer?threads.??For?each?get()?used?to?fetch?a?task,
????????a?subsequent?call?to?task_done()?tells?the?queue?that?the?processing
????????on?the?task?is?complete.
????????If?a?join()?is?currently?blocking,?it?will?resume?when?all?items
????????have?been?processed?(meaning?that?a?task_done()?call?was?received
????????for?every?item?that?had?been?put()?into?the?queue).
????????Raises?a?ValueError?if?called?more?times?than?there?were?items
????????placed?in?the?queue.
????????"""
????????self.all_tasks_done.acquire()
try:
????????????unfinished?=?self.unfinished_tasks?- 1
if?unfinished?<= 0:
if?unfinished?< 0:
raise?ValueError('task_done()?called?too?many?times')
????????????????self.all_tasks_done.notify_all()
????????????self.unfinished_tasks?=?unfinished
finally:
????????????self.all_tasks_done.release()
def join(self):
"""Blocks?until?all?items?in?the?Queue?have?been?gotten?and?processed.
????????The?count?of?unfinished?tasks?goes?up?whenever?an?item?is?added?to?the
????????queue.?The?count?goes?down?whenever?a?consumer?thread?calls?task_done()
????????to?indicate?the?item?was?retrieved?and?all?work?on?it?is?complete.
????????When?the?count?of?unfinished?tasks?drops?to?zero,?join()?unblocks.
????????"""
????????self.all_tasks_done.acquire()
try:
while?self.unfinished_tasks:
????????????????self.all_tasks_done.wait()
finally:
????????????self.all_tasks_done.release()
def qsize(self):
"""Return?the?approximate?size?of?the?queue?(not?reliable!)."""
????????self.mutex.acquire()
????????n?=?self._qsize()
????????self.mutex.release()
return?n
def empty(self):
"""Return?True?if?the?queue?is?empty,?False?otherwise?(not?reliable!)."""
????????self.mutex.acquire()
????????n?= not?self._qsize()
????????self.mutex.release()
return?n
def full(self):
"""Return?True?if?the?queue?is?full,?False?otherwise?(not?reliable!)."""
????????self.mutex.acquire()
????????n?= 0 <?self.maxsize?==?self._qsize()
????????self.mutex.release()
return?n
def put(self,?item,?block=True,?timeout=None):
"""Put?an?item?into?the?queue.
????????If?optional?args?'block'?is?true?and?'timeout'?is?None?(the?default),
????????block?if?necessary?until?a?free?slot?is?available.?If?'timeout'?is
????????a?non-negative?number,?it?blocks?at?most?'timeout'?seconds?and?raises
????????the?Full?exception?if?no?free?slot?was?available?within?that?time.
????????Otherwise?('block'?is?false),?put?an?item?on?the?queue?if?a?free?slot
????????is?immediately?available,?else?raise?the?Full?exception?('timeout'
????????is?ignored?in?that?case).
????????"""
????????self.not_full.acquire()
try:
if?self.maxsize?> 0:
if not?block:
if?self._qsize() ==?self.maxsize:
raise?Full
elif?timeout?is None:
while?self._qsize() ==?self.maxsize:
????????????????????????self.not_full.wait()
elif?timeout?< 0:
raise?ValueError("'timeout'?must?be?a?non-negative?number")
else:
????????????????????endtime?=?_time() +?timeout
while?self._qsize() ==?self.maxsize:
????????????????????????remaining?=?endtime?-?_time()
if?remaining?<= 0.0:
raise?Full
????????????????????????self.not_full.wait(remaining)
????????????self._put(item)
????????????self.unfinished_tasks?+= 1
????????????self.not_empty.notify()
finally:
????????????self.not_full.release()
def put_nowait(self,?item):
"""Put?an?item?into?the?queue?without?blocking.
????????Only?enqueue?the?item?if?a?free?slot?is?immediately?available.
????????Otherwise?raise?the?Full?exception.
????????"""
return?self.put(item, False)
def get(self,?block=True,?timeout=None):
"""Remove?and?return?an?item?from?the?queue.
????????If?optional?args?'block'?is?true?and?'timeout'?is?None?(the?default),
????????block?if?necessary?until?an?item?is?available.?If?'timeout'?is
????????a?non-negative?number,?it?blocks?at?most?'timeout'?seconds?and?raises
????????the?Empty?exception?if?no?item?was?available?within?that?time.
????????Otherwise?('block'?is?false),?return?an?item?if?one?is?immediately
????????available,?else?raise?the?Empty?exception?('timeout'?is?ignored
????????in?that?case).
????????"""
????????self.not_empty.acquire()
try:
if not?block:
if not?self._qsize():
raise?Empty
elif?timeout?is None:
while not?self._qsize():
????????????????????self.not_empty.wait()
elif?timeout?< 0:
raise?ValueError("'timeout'?must?be?a?non-negative?number")
else:
????????????????endtime?=?_time() +?timeout
while not?self._qsize():
????????????????????remaining?=?endtime?-?_time()
if?remaining?<= 0.0:
raise?Empty
????????????????????self.not_empty.wait(remaining)
????????????item?=?self._get()
????????????self.not_full.notify()
return?item
finally:
????????????self.not_empty.release()
from?multiprocessing?import?Process,?Queue
def producer(q):
for?i?in range(5):
????????q.put('Message?{}'.format(i))
print('Message?{}?put?in?queue?by?producer'.format(i))
def consumer(q):
while True:
????????message?=?q.get()
print('Message?received?by?consumer:?{}'.format(message))
if?message?== 'Message?4':
break
if?__name__?== '__main__':
????q?=?Queue()
????p1?=?Process(target=producer,?args=(q,))
????p2?=?Process(target=consumer,?args=(q,))
????p1.start()
????p2.start()
????p1.join()
????p2.join()
import?queue
import?threading
def producer(q):
for?i?in range(5):
????????q.put('Message?{}'.format(i))
print('Message?{}?put?in?queue?by?producer'.format(i))
def consumer(q):
while True:
????????message?=?q.get()
print('Message?received?by?consumer:?{}'.format(message))
if?message?== 'Message?4':
break
if?__name__?== '__main__':
????q?=?queue.Queue()
????t1?=?threading.Thread(target=producer,?args=(q,))
????t2?=?threading.Thread(target=consumer,?args=(q,))
????t1.start()
????t2.start()
????t1.join()
????t2.join()
import?asyncio
async def producer(q):
for?i?in range(5):
await?q.put('Message?{}'.format(i))
print('Message?{}?put?in?queue?by?producer'.format(i))
async def consumer(q):
while True:
????????message?= await?q.get()
print('Message?received?by?consumer:?{}'.format(message))
if?message?== 'Message?4':
break
if?__name__?== '__main__':
????q?=?asyncio.Queue()
????loop?=?asyncio.get_event_loop()
????loop.run_until_complete(asyncio.gather(producer(q),?consumer(q)))
????loop.close()