?
以下示例伪代码和示意图展示了如何使用内存共享模型解决生产者消费者问题。
生产者消费者与共享内存间交互示意图
为了避免不同生产者或消费者同时访问一块共享内存的容器时产生的脏读,脏写现象,同一时间只能有一个生产者或消费者访问该容器,也就是不同生产者和消费者争夺使用容器的锁。当一个角色获取锁之后其他角色需要等待该角色释放锁之后才能重新尝试获取锁以访问该容器。
// 此段示例为伪代码仅作为逻辑示意,便于开发者理解使用内存共享模型和Actor模型的区别
BufferQueue {
Queue queue
Mutex mutex
add(value) {
// 尝试获取锁
if (mutex.lock()) {
queue.push(value)
mutex.unlock()
}
}
take() {
// 尝试获取锁
if (mutex.lock()) {
if (queue.empty()) {
return null
}
let res = queue.pop(value)
mutex.unlock()
return res
}
}
}
// 构造一段全局共享的内存
let g_bufferQueue = new BufferQueue()
Producer {
run() {
let value = random()
// 跨线程访问bufferQueue对象
g_bufferQueue.add(value)
}
}
Consumer {
run() {
// 跨线程访问bufferQueue对象
let res = g_bufferQueue.take()
if (res != null) {
// 添加消费逻辑
}
}
}
Main() {
let consumer = new Consumer()
let producer = new Producer()
// 多线程执行生产任务
for 0 in 10 :
let thread = new Thread()
thread.run(producer.run())
consumer.run()
}
以下示例简单展示了如何使用基于Actor模型的TaskPool并发能力来解决生产者消费者问题。
Actor模型线程间通信示意图
Actor模型不同角色之间并不共享内存,生产者线程和UI线程都有自己独占的内存。生产者生产出结果后通过序列化通信将结果发送给UI线程,UI线程消费结果后再发送新的生产任务给生产者线程。
import taskpool from '@ohos.taskpool';
// 跨线程并发任务
@Concurrent
async function produce(): Promise<number>{
// 添加生产相关逻辑
console.log("producing...");
return Math.random();
}
class Consumer {
public consume(value : number) {
// 添加消费相关逻辑
console.log("consuming value: " + value);
}
}
@Entry
@Component
struct Index {
@State message: string = 'Hello World'
build() {
Row() {
Column() {
Text(this.message)
.fontSize(50)
.fontWeight(FontWeight.Bold)
Button() {
Text("start")
}.onClick(() => {
let produceTask: taskpool.Task = new taskpool.Task(produce);
let consumer: Consumer = new Consumer();
for (let index: number = 0; index < 10; index++) {
// 执行生产异步并发任务
taskpool.execute(produceTask).then((res : number) => {
consumer.consume(res);
}).catch((e : Error) => {
console.error(e.message);
})
}
})
.width('20%')
.height('20%')
}
.width('100%')
}
.height('100%')
}
}
Java 开发者仍然习惯于使用基于共享内存的并发模型,上手 actor 并发模型仍有一定的学习成本,但是借助 OpenHarmony源码转换器,Java 开发者可以无痛将代码迁移到 ArkTS,减轻工作负担。
Java 开发者在处理并发问题时经常会碰到三个问题:可见性、原子性、有序性,好在都有解决方案,比如用锁来保证可见性,原子指令保证原子性,内存屏障保证有序性。正确使用时,并发粒度高,但出现问题时,很难调试复现。
ArkTS 并发模型不基于共享内存,因此并不会存在上述并发法问题,但并发粒度比较低。
OpenHarmony 实际上提供了两种线程机制:TaskPool、Worker。TaskPool 是对 Worker 的封装,开发者不需要去手动管理生命周期。Worker 线程机制最多开启 8 个线程。
TaskPool支持开发者在主线程封装任务抛给任务队列,系统选择合适的工作线程,进行任务的分发及执行,再将结果返回给主线程。接口直观易用,支持任务的执行、取消。工作线程数量上限为4。
创建Worker的线程称为宿主线程(不一定是主线程,工作线程也支持创建Worker子线程),Worker自身的线程称为Worker子线程(或Actor线程、工作线程)。每个Worker子线程与宿主线程拥有独立的实例,包含基础设施、对象、代码段等。Worker子线程和宿主线程之间的通信是基于消息传递的,Worker通过序列化机制与宿主线程之间相互通信,完成命令及数据交互。
对于可序列化的数据能够通过Structured Clone算法在线程间进行复制传输。支持的可序列化对象有限:除Symbol之外的基础类型、DateString、RegExp、Array、MapSet、Object (仅限简单对象,比如通过{}
或者new Object
创建,普通对象仅支持传递属性,不支持传递其原型及方法)、TypedArray。
可转移对象(Transferable object)传输采用地址转移进行序列化,不需要内容拷贝,会将ArrayBuffer的所有权转移给接收该ArrayBuffer的线程,转移后该ArrayBuffer在发送它的线程中变为不可用,不允许再访问
let buffer = new ArrayBuffer(100)
共享对象SharedArrayBuffer,拥有固定长度,可以存储任何类型的数据,包括数字、字符串等。
共享对象传输指SharedArrayBuffer支持在多线程之间传递,传递之后的SharedArrayBuffer对象和原始的SharedArrayBuffer对象可以指向同一块内存,进而达到内存共享的目的。
SharedArrayBuffer对象存储的数据在同时被修改时,需要通过原子操作保证其同步性,即下个操作开始之前务必需要等到上个操作已经结束。
let ia = new SharedArrayBuffer(1024): // 定义可共享对象,可以使用Atomics进行操作
ia[37] = 163;
console.log(ia[37]); // Prints 163, the 38th prime
Atomics.store(ia, 37, 123);
while (Atomics.load(ia, 37) == 163);
console.log(ia[37]); // Prints 123
需要考虑到Java如下常用能力