实时语音多实例实现设计方案(服务端)

发布时间:2024年01月17日

?1.端上接入协议

如何自行开发代码访问阿里语音服务_智能语音交互(ISI)-阿里云帮助中心

?2.接口修改结果逻辑及端上调用步骤??

阿里client??server交互流程图

阿里语音接收识别结果

begin_time?time?含义

客户端循环发送语音数据,持续接收识别结果:

  • SentenceBegin?事件表示检测到一句话的开始
  • TranscriptionResultChanged?事件表示识别结果发生了变化。仅当请求消息中设置参数?enable_intermediate_result?为?ture?才会返回,默认为?false。

{
        "header": {
                "namespace": "SpeechTranscriber",
                "name": "TranscriptionResultChanged",
                "status": 20000000,
                "message_id": "dc21193fada84380a3b6137875ab****",
                "task_id": "5ec521b5aa104e3abccf3d361822****",
                "status_text": "Gateway:SUCCESS:Success."
        },
        "payload": {
                "index": 1,
                "time": 1835,
                "result": "北京的天",
                "confidence": 1.0,
                "words": [{
                        "text": "北京",
                        "startTime": 630,
                        "endTime": 930
                }, {
                        "text": "的",
                        "startTime": 930,
                        "endTime": 1110
                }, {
                        "text": "天",
                        "startTime": 1110,
                        "endTime": 1140
                }]
        }
}       

payload?参数说明:

参数

类型

说明

index

integer

句子编号,1?开始递增

time

integer

已处理音频时长,ms

result

string

当前句子识别结果

words

list<>

句子的词信息,enable_words?需要设置为?true

confidence

double

当前句子识别结果的置信度,取值范围:[0.0,?1.0]。值越大置信度越高

  • SentenceEnd?事件表示服务端检测到一句话的结束并将这句话最终结果发给用户,所谓最终结果,就是后面即使接收到音频,也不会对这个数据进行修正,也就是说目前阿里的设计是只支持句内修正,因为一般情况下句内相关性比较大,用来做参考纠正效果比较好。

{

"header":?{

"namespace":?"SpeechTranscriber",

"name":?"SentenceEnd",

"status":?20000000,

"message_id":?"c3a9ae4b231649d5ae05d4af36fd****",

"task_id":?"5ec521b5aa104e3abccf3d361822****",

"status_text":?"Gateway:SUCCESS:Success."

????????},

"payload":?{

"index":?1,

"time":?1820,

"begin_time":?0,

"result":?"北京的天气。",

"confidence":?1.0,

"words":?[{

"text":?"北京",

"startTime":?630,

"endTime":?930

????????????????},?{

"text":?"的",

"startTime":?930,

"endTime":?1110

????????????????},?{

"text":?"天气",

"startTime":?1110,

"endTime":?1380

????????????????}]

????????}

}

payload对象参数说明:

参数

类型

说明

index

Integer

句子编号,从1开始递增。

time

Integer

当前已处理的音频时长,单位是毫秒。

begin_time

Integer

当前句子对应的SentenceBegin事件的时间,单位是毫秒。

result

String

当前的识别结果。

words

List<?Word?>

当前句子的词信息,需要将enable_words设置为true。

confidence

Double

当前句子识别结果的置信度,取值范围:[0.0,1.0]。值越大表示置信度越高。

Words对象参数说明:

参数

类型

说明

text

String

文本。

startTime

Integer

词开始时间,单位为毫秒。

endTime

Integer

词结束时间,单位为毫秒。

腾讯实时语音接收识别结果

语音识别?实时语音识别(websocket)-API?文档-文档中心-腾讯云

无相应字段配置

  1. 3.两个重要的功能字段

????在上面描述端上调用步骤的时候提到了句内修正,那么怎么知道什么样的音频是一句话,下面有两个协议参数,对应了断句的两种策略。

max_sentence_silence

int

语音断句检测阈值,静音时长超过该阈值会被认为断句,参数范围200ms~6000ms,默认值800ms。

开启语义断句enable_semantic_sentence_detection后,此参数无效。

默认是200,单位毫秒

enable_semantic_sentence_detection

bool

是否开启语义断句,默认是False。语义断句参数需要和开启中间结果配合使用,即开启该语义断句参数需将中间结果参数同时打开:enable_intermediate_result=true。

默认是False,目前不支持

????????max_sentence_silence和enable_semantic_sentence_detection参数是互斥的两个参数,两个参数不能同时生效。第一个参数生效时候是根据静音来进行断句的并且只支持识别结果句内修正,第二种参数生效的时候是根据语义(标点符号),进行断句的。

???????两种参数分别对应了两种buffer的处理方式,下面在单路设计中会体现出来。

?4.单路设计音频数据核心处理流程

单路流程图:

  • max_sentence_silence参数生效时候数据buffer的生产消费处理流程
  1. 第一步是初始化状态,初始化数据为空(下图第一个框代表数组下标,不存储数据)。
  2. 第二步和第三步是每当收到用户发送的一块固定长度数据之后,将数据填入数组中最后的一个元素buffer之中。
  1. 第四步客户端发送的数据是静音数据,将该数据丢弃,并创建大小为0的buffer数据放入数组。
  2. 第五步将客户端发送的非静音数据拼接在数组尾部元素的buffer里面。
  3. 第六步数据处理线程,将数组头部数据拿走,发现数组大小大于1,将该元素删除出数组。
  4. 循环这个过程。

  • enable_semantic_sentence_detection参数生效时候的buffer处理

前面不需要静音进行处理,通过句号来处理。

  • max_sentence_silence参数生效时候数据文本的生产消费处理流程

第一步将识别文本放入下标为x的数据元素。

第二步由于没有收到静音或者未到达30s,将数据覆盖之前的数据元素。

第三步数据处理线程发现断句,将识别出来的文本数据,存入下标为x+1的数组元素中。

第四步使用数据处理线程识别出来的数据覆盖数组最后元素的文本。

第五步数据发送线程从文本数组中按照数组下标递增的顺序取数据并发送给客户端,如果当发送线程当前的句子index和文本数组index相等,发送最后一句。

  • enable_semantic_sentence_detection参数生效时候数据文本的生产消费处理流程

????该参数断句逻辑是根据识别文本中的句号或者逗号进行断句,所以将识别的数据连续存入string,根据标点符号断句发送给客户端就可以了。

?5.多设计整体流程架构

?6.session上下文数据结构

字段名称

字段类型

字段含义

说明

websocket

websocket

websocket句柄

用于该路数据的收发

sessionId

string

每一路数据唯一标识sessionId

32位uuid,用于标识某一路数据。

bytes

bytes?of?array

二进制数据数组

用来存储二进制数据,每一个bytes都是一段buffer。

textArray

string[]

文本结果数组

用来存储结果文本数据,如果enable_intermediate_result能力为true,数组的最后一个元素是可变的,以便支持该能力。

format

string

音频数据的类型

默认是“PCM”,v1只支持处理pcm格式数据

sample_rate

int

音频数据采样率

默认是16000

channels

int

通道数

默认是1,目前只支持单通道

enable_intermediate_result

bool

是否返回中间识别结果

默认是true

enable_punctuation_prediction

bool

是否在后处理中加标点

默认是false,目前不支持

enable_inverse_text_normalization

bool

ITN(逆文本inverse?text?normalization)中文数字转换阿拉伯数字。设置为True时,中文数字将转为阿拉伯数字输出,默认值:False

目前不支持

customization_id

string

自学习模型id

目前不支持

vocabulary_id

string

定制泛热词ID

目前不支持

max_sentence_silence

int

语音断句检测阈值,静音时长超过该阈值会被认为断句,参数范围200ms~6000ms,默认值800ms。

开启语义断句enable_semantic_sentence_detection后,此参数无效。

默认是200,单位毫秒

enable_words

bool

是否开启返回词信息

默认是false

enable_ignore_sentence_timeout

bool

是否忽略实时识别中的单句识别超时

默认是false

disfluency

bool

过滤语气词,即声音顺

默认值false,目前不支持

speech_noise_threshold

float

噪音参数阈值,参数范围:[-1,1]。取值说明如下:

  • 取值越趋于-1,噪音被判定为语音的概率越大。
  • 取值越趋于+1,语音被判定为噪音的概率越大

默认是0.0

enable_semantic_sentence_detection

bool

是否开启语义断句,默认是False。语义断句参数需要和开启中间结果配合使用,即开启该语义断句参数需将中间结果参数同时打开:enable_intermediate_result=true。

默认是False,目前不支持

special_word_filter

json?string

敏感词过滤功能,可根据实际需求开启或关闭自定义词或默认词表。该参数支持以下选项:

  • 不处理(默认,即展示原文)
  • 过滤词
  • 替换为*

默认不处理,目前不支持

7.线程安全相关调研

?????????python中的队列(queue.Queue)是线程安全的,不需要加锁(如果在多线程环境中进行通信,应该使用queue.Queue。如果在多进程环境中进行通信,你应该使用multiprocessing.Queue。如果在协程之间进行通信,你应该使用asyncio.Queue)。

相关资料:

8.10.?Queue?—?A?synchronized?queue?class?&#8212;?Python?2.7.18?documentation

队列源码实现:

class Queue:
    """Create?a?queue?object?with?a?given?maximum?size.

????If?maxsize?is?<=?0,?the?queue?size?is?infinite.
????"""
    def __init__(self,?maxsize=0):
????????self.maxsize?=?maxsize
????????self._init(maxsize)
        #?mutex?must?be?held?whenever?the?queue?is?mutating.??All?methods
        #?that?acquire?mutex?must?release?it?before?returning.??mutex
        #?is?shared?between?the?three?conditions,?so?acquiring?and
        #?releasing?the?conditions?also?acquires?and?releases?mutex.
????????self.mutex?=?_threading.Lock()
        #?Notify?not_empty?whenever?an?item?is?added?to?the?queue;?a
        #?thread?waiting?to?get?is?notified?then.
????????self.not_empty?=?_threading.Condition(self.mutex)
        #?Notify?not_full?whenever?an?item?is?removed?from?the?queue;
        #?a?thread?waiting?to?put?is?notified?then.
????????self.not_full?=?_threading.Condition(self.mutex)
        #?Notify?all_tasks_done?whenever?the?number?of?unfinished?tasks
        #?drops?to?zero;?thread?waiting?to?join()?is?notified?to?resume
????????self.all_tasks_done?=?_threading.Condition(self.mutex)
????????self.unfinished_tasks?= 0

    def task_done(self):
        """Indicate?that?a?formerly?enqueued?task?is?complete.

????????Used?by?Queue?consumer?threads.??For?each?get()?used?to?fetch?a?task,
????????a?subsequent?call?to?task_done()?tells?the?queue?that?the?processing
????????on?the?task?is?complete.

????????If?a?join()?is?currently?blocking,?it?will?resume?when?all?items
????????have?been?processed?(meaning?that?a?task_done()?call?was?received
????????for?every?item?that?had?been?put()?into?the?queue).

????????Raises?a?ValueError?if?called?more?times?than?there?were?items
????????placed?in?the?queue.
????????"""
????????self.all_tasks_done.acquire()
        try:
????????????unfinished?=?self.unfinished_tasks?- 1
            if?unfinished?<= 0:
                if?unfinished?< 0:
                    raise?ValueError('task_done()?called?too?many?times')
????????????????self.all_tasks_done.notify_all()
????????????self.unfinished_tasks?=?unfinished
        finally:
????????????self.all_tasks_done.release()

    def join(self):
        """Blocks?until?all?items?in?the?Queue?have?been?gotten?and?processed.

????????The?count?of?unfinished?tasks?goes?up?whenever?an?item?is?added?to?the
????????queue.?The?count?goes?down?whenever?a?consumer?thread?calls?task_done()
????????to?indicate?the?item?was?retrieved?and?all?work?on?it?is?complete.

????????When?the?count?of?unfinished?tasks?drops?to?zero,?join()?unblocks.
????????"""
????????self.all_tasks_done.acquire()
        try:
            while?self.unfinished_tasks:
????????????????self.all_tasks_done.wait()
        finally:
????????????self.all_tasks_done.release()

    def qsize(self):
        """Return?the?approximate?size?of?the?queue?(not?reliable!)."""
????????self.mutex.acquire()
????????n?=?self._qsize()
????????self.mutex.release()
        return?n

    def empty(self):
        """Return?True?if?the?queue?is?empty,?False?otherwise?(not?reliable!)."""
????????self.mutex.acquire()
????????n?= not?self._qsize()
????????self.mutex.release()
        return?n

    def full(self):
        """Return?True?if?the?queue?is?full,?False?otherwise?(not?reliable!)."""
????????self.mutex.acquire()
????????n?= 0 <?self.maxsize?==?self._qsize()
????????self.mutex.release()
        return?n

    def put(self,?item,?block=True,?timeout=None):
        """Put?an?item?into?the?queue.

????????If?optional?args?'block'?is?true?and?'timeout'?is?None?(the?default),
????????block?if?necessary?until?a?free?slot?is?available.?If?'timeout'?is
????????a?non-negative?number,?it?blocks?at?most?'timeout'?seconds?and?raises
????????the?Full?exception?if?no?free?slot?was?available?within?that?time.
????????Otherwise?('block'?is?false),?put?an?item?on?the?queue?if?a?free?slot
????????is?immediately?available,?else?raise?the?Full?exception?('timeout'
????????is?ignored?in?that?case).
????????"""
????????self.not_full.acquire()
        try:
            if?self.maxsize?> 0:
                if not?block:
                    if?self._qsize() ==?self.maxsize:
                        raise?Full
                elif?timeout?is None:
                    while?self._qsize() ==?self.maxsize:
????????????????????????self.not_full.wait()
                elif?timeout?< 0:
                    raise?ValueError("'timeout'?must?be?a?non-negative?number")
                else:
????????????????????endtime?=?_time() +?timeout
                    while?self._qsize() ==?self.maxsize:
????????????????????????remaining?=?endtime?-?_time()
                        if?remaining?<= 0.0:
                            raise?Full
????????????????????????self.not_full.wait(remaining)
????????????self._put(item)
????????????self.unfinished_tasks?+= 1
????????????self.not_empty.notify()
        finally:
????????????self.not_full.release()

    def put_nowait(self,?item):
        """Put?an?item?into?the?queue?without?blocking.

????????Only?enqueue?the?item?if?a?free?slot?is?immediately?available.
????????Otherwise?raise?the?Full?exception.
????????"""
        return?self.put(item, False)

    def get(self,?block=True,?timeout=None):
        """Remove?and?return?an?item?from?the?queue.

????????If?optional?args?'block'?is?true?and?'timeout'?is?None?(the?default),
????????block?if?necessary?until?an?item?is?available.?If?'timeout'?is
????????a?non-negative?number,?it?blocks?at?most?'timeout'?seconds?and?raises
????????the?Empty?exception?if?no?item?was?available?within?that?time.
????????Otherwise?('block'?is?false),?return?an?item?if?one?is?immediately
????????available,?else?raise?the?Empty?exception?('timeout'?is?ignored
????????in?that?case).
????????"""
????????self.not_empty.acquire()
        try:
            if not?block:
                if not?self._qsize():
                    raise?Empty
            elif?timeout?is None:
                while not?self._qsize():
????????????????????self.not_empty.wait()
            elif?timeout?< 0:
                raise?ValueError("'timeout'?must?be?a?non-negative?number")
            else:
????????????????endtime?=?_time() +?timeout
                while not?self._qsize():
????????????????????remaining?=?endtime?-?_time()
                    if?remaining?<= 0.0:
                        raise?Empty
????????????????????self.not_empty.wait(remaining)
????????????item?=?self._get()
????????????self.not_full.notify()
            return?item
        finally:
????????????self.not_empty.release()

进程间数据安全调用示例

from?multiprocessing?import?Process,?Queue

def producer(q):
    for?i?in range(5):
????????q.put('Message?{}'.format(i))
        print('Message?{}?put?in?queue?by?producer'.format(i))

def consumer(q):
    while True:
????????message?=?q.get()
        print('Message?received?by?consumer:?{}'.format(message))
        if?message?== 'Message?4':
            break

if?__name__?== '__main__':
????q?=?Queue()
????p1?=?Process(target=producer,?args=(q,))
????p2?=?Process(target=consumer,?args=(q,))
????p1.start()
????p2.start()
????p1.join()
????p2.join()

线程间数据安全调用示例

import?queue
import?threading

def producer(q):
    for?i?in range(5):
????????q.put('Message?{}'.format(i))
        print('Message?{}?put?in?queue?by?producer'.format(i))

def consumer(q):
    while True:
????????message?=?q.get()
        print('Message?received?by?consumer:?{}'.format(message))
        if?message?== 'Message?4':
            break

if?__name__?== '__main__':
????q?=?queue.Queue()
????t1?=?threading.Thread(target=producer,?args=(q,))
????t2?=?threading.Thread(target=consumer,?args=(q,))
????t1.start()
????t2.start()
????t1.join()
????t2.join()

协程间线程安全调用示例

import?asyncio

async def producer(q):
    for?i?in range(5):
        await?q.put('Message?{}'.format(i))
        print('Message?{}?put?in?queue?by?producer'.format(i))

async def consumer(q):
    while True:
????????message?= await?q.get()
        print('Message?received?by?consumer:?{}'.format(message))
        if?message?== 'Message?4':
            break

if?__name__?== '__main__':
????q?=?asyncio.Queue()
????loop?=?asyncio.get_event_loop()
????loop.run_until_complete(asyncio.gather(producer(q),?consumer(q)))
????loop.close()

  1. 方案设计进度表格

文章来源:https://blog.csdn.net/weixin_42317175/article/details/135656210
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。