在当今软件开发的世界中,多线程编程已经变得愈发重要。面对多核处理器的普及和复杂的系统架构,开发人员需要深入了解并发编程的原理和实践,以充分发挥硬件的性能潜力。本文将带您深入探讨Java中的并发与多线程编程,介绍一系列强大的Java库和框架,助您更好地处理并发挑战。
Executor 框架是 Java 提供的用于管理线程的框架,它包含一组接口和类,用于简化多线程编程。其中,Executor
接口是整个框架的核心,定义了执行任务的基本协议。下面是一个简单的使用 ThreadPoolExecutor
的例子:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecutorExample {
public static void main(String[] args) {
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 提交任务
executorService.submit(() -> {
System.out.println("Task 1 executed by " + Thread.currentThread().getName());
});
executorService.submit(() -> {
System.out.println("Task 2 executed by " + Thread.currentThread().getName());
});
// 关闭线程池
executorService.shutdown();
}
}
并发集合是为了在多线程环境中提供安全的数据操作而设计的。ConcurrentHashMap
是一个线程安全的哈希表实现,CopyOnWriteArrayList
是一个线程安全的动态数组实现。以下是它们的简单应用:
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
public class ConcurrentCollectionExample {
public static void main(String[] args) {
// 使用 ConcurrentHashMap
Map<String, String> concurrentMap = new ConcurrentHashMap<>();
concurrentMap.put("key1", "value1");
concurrentMap.put("key2", "value2");
// 使用 CopyOnWriteArrayList
CopyOnWriteArrayList<String> copyOnWriteList = new CopyOnWriteArrayList<>();
copyOnWriteList.add("Item1");
copyOnWriteList.add("Item2");
}
}
同步器用于协调多个线程的执行。CountDownLatch
允许一个或多个线程等待其他线程完成操作,CyclicBarrier
用于多线程之间的同步,Semaphore
用于控制同时访问的线程数量。下面是一个使用 CountDownLatch
的例子:
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
// 创建 CountDownLatch,设置计数器为2
CountDownLatch latch = new CountDownLatch(2);
// 启动两个线程
new Thread(() -> {
System.out.println("Task 1 executed");
latch.countDown();
}).start();
new Thread(() -> {
System.out.println("Task 2 executed");
latch.countDown();
}).start();
// 等待两个线程执行完毕
latch.await();
System.out.println("Both tasks completed");
}
}
原子变量提供了一种无锁的线程安全的操作方式。AtomicInteger
、AtomicLong
和 AtomicReference
分别用于整数、长整数和对象的原子操作。以下是 AtomicInteger
的使用示例:
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicIntegerExample {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(0);
// 原子地增加值
int result = atomicInteger.incrementAndGet();
System.out.println("Incremented value: " + result);
// 原子地减少值
result = atomicInteger.decrementAndGet();
System.out.println("Decremented value: " + result);
}
}
CompletableFuture
是 Java 8 引入的一个强大的异步编程工具,它提供了一种简单而灵活的方式来处理异步操作的结果。相较于传统的 Future
,CompletableFuture
允许你以声明式的方式构建异步操作流水线,轻松地进行组合和转换。下面是一个使用 CompletableFuture
的示例:
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExample {
public static void main(String[] args) {
// 异步执行任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Task executed by " + Thread.currentThread().getName());
return "Result";
});
// 注册回调函数
future.thenAccept(result -> System.out.println("Async result: " + result));
// 等待任务完成
future.join();
}
}
在上述示例中,通过 CompletableFuture.supplyAsync
异步执行任务,使用 thenAccept
注册回调函数,实现了异步任务的执行和结果处理。
Phaser
是 Java 7 引入的同步辅助类,它允许线程在多阶段并发算法中协同工作。Phaser
提供了更灵活的同步机制,比传统的 CountDownLatch
和 CyclicBarrier
更强大。以下是一个简单的 Phaser
使用示例:
import java.util.concurrent.Phaser;
public class PhaserExample {
public static void main(String[] args) {
// 创建 Phaser,设置参与的线程数目
Phaser phaser = new Phaser(3);
// 启动三个线程
new Thread(() -> {
System.out.println("Thread 1 arrived");
phaser.arriveAndAwaitAdvance();
}).start();
new Thread(() -> {
System.out.println("Thread 2 arrived");
phaser.arriveAndAwaitAdvance();
}).start();
new Thread(() -> {
System.out.println("Thread 3 arrived");
phaser.arriveAndAwaitAdvance();
}).start();
}
}
在上述示例中,通过 Phaser
控制三个线程同时到达同一阶段,实现了更加灵活的线程同步。
LinkedTransferQueue
是 java.util.concurrent
包中的一个并发队列实现,它具有高性能和可伸缩性。相较于其他阻塞队列,LinkedTransferQueue
具有更好的性能特征,特别适用于高并发场景。以下是一个简单的 LinkedTransferQueue
使用示例:
import java.util.concurrent.LinkedTransferQueue;
public class LinkedTransferQueueExample {
public static void main(String[] args) {
// 创建 LinkedTransferQueue
LinkedTransferQueue<String> transferQueue = new LinkedTransferQueue<>();
// 生产者线程
new Thread(() -> {
try {
// 将元素传输给消费者
transferQueue.transfer("Message from producer");
System.out.println("Message sent by producer");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 消费者线程
new Thread(() -> {
try {
// 接收生产者传输的元素
String message = transferQueue.take();
System.out.println("Message received by consumer: " + message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
在上述示例中,LinkedTransferQueue
实现了生产者与消费者之间的消息传输,具有更好的性能表现。
Actor 模型是一种并发计算的模型,其中的 Actor 是并发执行的基本单位。在 Akka 中,ActorSystem
是整个 Actor 模型的入口,ActorRef
用于在 Actor 之间传递消息。以下是一个简单的 Actor 示例:
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
public class ActorExample {
public static void main(String[] args) {
// 创建 Actor 系统
ActorSystem system = ActorSystem.create("MySystem");
// 创建 Actor
ActorRef myActor = system.actorOf(Props.create(MyActor.class), "myActor");
// 发送消息给 Actor
myActor.tell("Hello, Actor!", ActorRef.noSender());
}
// 定义一个简单的 Actor
static class MyActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, message -> {
System.out.println("Received message: " + message);
})
.build();
}
}
}
Akka 提供了强大的并发和分布式支持。Clustering
允许将多个 ActorSystem 组成集群,Sharding
用于将 Actor 分布到多个节点,而 Distributed Data
提供了分布式数据结构的实现。
Clustering
是 Akka 中用于构建集群的核心模块。通过 Cluster
模块,可以将多个运行在不同 JVM 中的 ActorSystem
组成一个分布式集群,实现节点之间的通信和协同工作。以下是一个简单的集群示例:
import akka.actor.AbstractActor;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.Cluster;
public class ClusterExample {
public static void main(String[] args) {
// 创建 Actor 系统
ActorSystem system1 = ActorSystem.create("ClusterSystem");
ActorSystem system2 = ActorSystem.create("ClusterSystem");
// 将两个 ActorSystem 加入同一个集群
Cluster.get(system1).join(Cluster.get(system2).selfAddress());
// 创建一个运行在集群中的 Actor
system1.actorOf(Props.create(ClusterActor.class), "clusterActor");
}
// 集群中的 Actor
static class ClusterActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchAny(message -> {
System.out.println("Received message: " + message + " by " + self().path());
})
.build();
}
}
}
上述示例中,ClusterActor
在两个不同的 ActorSystem 中运行,并通过 Cluster
模块加入了同一个集群。这样,它们可以相互通信,形成一个分布式集群。
Sharding
是 Akka 中用于分片管理的模块。它允许将大量的 Actor 实例分布到多个节点上,每个节点负责一部分数据。这有助于提高并发性能和分布式扩展性。以下是一个简单的 Sharding 示例:
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.cluster.sharding.ShardRegion;
public class ShardingExample {
public static void main(String[] args) {
// 创建 Actor 系统
ActorSystem system = ActorSystem.create("ShardingSystem");
// 创建 Sharding 区域
ActorRef shardingRegion = ClusterSharding.get(system)
.start("MyShardingActor", Props.create(ShardingActor.class), ClusterShardingSettings.create(system), new ShardingMessageExtractor(), new ShardingMessageExtractor());
// 发送消息给 Sharding Actor
shardingRegion.tell(new ShardingMessageExtractor.ShardingMessage("shard-1", "Hello, Sharding Actor!"), ActorRef.noSender());
}
// Sharding Actor
static class ShardingActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ShardingMessageExtractor.ShardingMessage.class, message -> {
System.out.println("Received sharded message: " + message.getMessage());
})
.build();
}
}
// Sharding 消息提取器
static class ShardingMessageExtractor implements ShardRegion.MessageExtractor {
@Override
public Object entityMessage(Object message) {
return message;
}
@Override
public String entityId(Object message) {
if (message instanceof ShardingMessage) {
return ((ShardingMessage) message).getShardId();
}
return null;
}
@Override
public String shardId(Object message) {
if (message instanceof ShardingMessage) {
return ((ShardingMessage) message).getShardId();
}
return null;
}
static class ShardingMessage {
private final String shardId;
private final String message;
public ShardingMessage(String shardId, String message) {
this.shardId = shardId;
this.message = message;
}
public String getShardId() {
return shardId;
}
public String getMessage() {
return message;
}
}
}
}
在上述示例中,通过 ClusterSharding
模块,创建了一个名为 “MyShardingActor” 的 Sharding 区域。消息通过 ShardingMessageExtractor
进行分片,并由相应的 ShardingActor
处理。
Distributed Data
是 Akka 中用于处理分布式数据的模块。它提供了一系列的分布式数据结构,如 Replicator
、LWWMap
等,使得在分布式系统中更容易实现数据的一致性和复制。以下是一个简单的 Replicator
示例:
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ddata.*;
public class DistributedDataExample {
public static void main(String[] args) {
// 创建 Actor 系统
ActorSystem system1 = ActorSystem.create("DistributedDataSystem");
ActorSystem system2 = ActorSystem.create("DistributedDataSystem");
// 加入同一个集群
Cluster.get(system1).join(Cluster.get(system2).selfAddress());
// 创建 Replicator
ActorRef replicator1 = DistributedData.get(system1).replicator();
ActorRef replicator2 = DistributedData.get(system2).replicator();
// 创建分布式 Map
ReplicatedDataKey<String> dataKey = ReplicatedDataKey.create("myData", Replicators.CausalDiamond);
// 在系统1中更新值
replicator1.tell(new Replicator.Update<>(dataKey, Replicators.writeLocal(), "key", new Replicator.UpdateData<>("value", Replicators.writeLocal())),
ActorRef.noSender());
// 在系统2中读取值
replicator2.tell(new Replicator.Get<>(dataKey, Replicators.readLocal()), ActorRef.noSender());
}
// Actor 处理分布式数据的更新和读取
static class DistributedDataActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Replicator.GetSuccess.class, success -> {
System.out.println("Read value: " + success.get(dataKey).get("key"));
})
.build();
}
}
}
在上述示例中,通过 DistributedData
模块创建了两个 ActorSystem,并在集群中加入了相同的地址。通过Replicator
进行分布式数据的复制和同步。在示例中,通过 ReplicatedDataKey
创建了一个名为 “myData” 的分布式 Map,并在系统1中更新了键值对 “key” 和 “value”,然后在系统2中读取了该值。
Akka Streams
是 Akka 中用于处理流数据的模块。它提供了一种声明式的方式来操作和处理数据流,适用于异步、高并发的场景。以下是一个简单的 Akka Streams
示例:
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.Arrays;
public class AkkaStreamsExample {
public static void main(String[] args) {
// 创建 Actor 系统
ActorSystem system = ActorSystem.create("AkkaStreamsSystem");
// 创建流执行环境
ActorMaterializer materializer = ActorMaterializer.create(system);
// 创建数据源
Source<Integer, ?> source = Source.from(Arrays.asList(1, 2, 3, 4, 5));
// 定义数据处理流程
source.map(value -> value * 2)
.filter(value -> value > 5)
.to(Sink.foreach(System.out::println))
.run(materializer);
}
}
在上述示例中,通过 Akka Streams
创建了一个数据源,并定义了一个数据处理流程,包括将每个元素乘以2、过滤掉小于等于5的元素,最后将结果打印出来。
Akka HTTP
是 Akka 中用于构建高性能、可伸缩的 HTTP 服务的模块。它提供了一套强大而灵活的 API,支持异步和流式处理。以下是一个简单的 Akka HTTP
服务示例:
import akka.actor.ActorSystem;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.server.AllDirectives;
import akka.http.javadsl.server.Route;
import akka.stream.ActorMaterializer;
public class AkkaHttpExample extends AllDirectives {
public static void main(String[] args) {
// 创建 Actor 系统
ActorSystem system = ActorSystem.create("AkkaHttpSystem");
// 创建流执行环境
ActorMaterializer materializer = ActorMaterializer.create(system);
// 定义路由
Route route = path("hello", () ->
get(() ->
complete("Hello, Akka HTTP!"))
);
// 启动 HTTP 服务
Http.get(system).bindAndHandle(route.flow(system, materializer), ConnectHttp.toHost("localhost", 8080), materializer);
}
}
在上述示例中,通过 Akka HTTP
定义了一个简单的路由,当访问 “/hello” 路径时,返回 “Hello, Akka HTTP!”。
RxJava 是响应式编程库,基于观察者模式。Observable
代表一个可被观察的对象,而 Observer
则是观察者。以下是一个简单的 RxJava 示例:
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class RxJavaExample {
public static void main(String[] args) {
// 创建 Observable
Observable<String> observable = Observable.just("Hello", "RxJava");
// 创建 Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed");
}
@Override
public void onNext(String value) {
System.out.println("Received: " + value);
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("Completed");
}
};
// 订阅 Observable
observable.subscribe(observer);
}
}
RxJava 提供了丰富的操作符,用于对发射的数据进行变换、过滤和合并等操作。以下是一些常用的操作符示例:
import io.reactivex.Observable;
import io.reactivex.functions.Function;
public class RxJavaOperatorsExample {
public static void main(String[] args) {
// 转换操作符:map
Observable<Integer> numbers = Observable.just(1, 2, 3, 4, 5);
numbers.map(value -> value * 2)
.subscribe(System.out::println);
// 过滤操作符:filter
Observable<String> fruits = Observable.just("Apple", "Banana", "Orange", "Grape");
fruits.filter(fruit -> fruit.length() > 5)
.subscribe(System.out::println);
// 合并操作符:zip
Observable<Integer> integers = Observable.just(1, 2, 3);
Observable<String> strings = Observable.just("A", "B", "C");
Observable.zip(integers, strings, (num, str) -> num + str)
.subscribe(System.out::println);
}
}
在处理大量数据或者处理速度较快的情况下,为了避免产生过多的数据导致内存溢出,需要使用背压处理机制。RxJava 提供了 Flowable
类来支持背压处理。以下是一个简单的背压处理示例:
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
public class RxJavaBackpressureExample {
public static void main(String[] args) throws InterruptedException {
Flowable<Integer> flowable = Flowable.create(emitter -> {
for (int i = 1; i <= 1000; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
flowable.observeOn(Schedulers.io())
.subscribe(System.out::println);
Thread.sleep(1000); // 等待异步线程执行
}
}
RxJava 提供了不同的调度器(Scheduler)来控制任务在不同线程上的执行。例如,IoScheduler
用于执行 I/O 操作,ComputationScheduler
用于执行计算密集型任务。以下是一个简单的任务调度器示例:
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class RxJavaSchedulersExample {
public static void main(String[] args) throws InterruptedException {
Observable.just("Task 1", "Task 2", "Task 3")
.observeOn(Schedulers.io())
.map(task -> {
System.out.println("Executing " + task + " on thread " + Thread.currentThread().getName());
return task;
})
.subscribe();
Thread.sleep(1000); // 等待异步线程执行
}
}
在 RxJava 中,错误处理是一个重要的方面,可以通过 onError
回调来处理发生的异常。以下是一个简单的错误处理示例:
import io.reactivex.Observable;
public class RxJavaErrorHandlingExample {
public static void main(String[] args) {
Observable<Integer> numbers = Observable.just(1, 2, 3, 4, 5);
numbers.map(value -> {
if (value == 3) {
throw new RuntimeException("Error at value 3");
}
return value;
}).subscribe(
System.out::println,
throwable -> System.err.println("Error: " + throwable.getMessage())
);
}
}
RxJava 提供了多种方式来实现异步和并行操作。以下是一个简单的异步与并行示例:
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class RxJavaAsyncParallelExample {
public static void main(String[] args) throws InterruptedException {
Observable.just("Task 1", "Task 2", "Task 3")
.observeOn(Schedulers.io())
.map(task -> {
System.out.println("Executing " + task + " on thread " + Thread.currentThread().getName());
return task;
})
.subscribe();
Thread.sleep(1000); // 等待异步线程执行
}
}
RxJava 允许开发者扩展和自定义操作符,以满足特定的需求。以下是一个简单的自定义操作符示例:
import io.reactivex.Observable;
import io.reactivex.ObservableOperator;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class RxJavaCustomOperatorExample {
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5)
.lift(upperCaseOperator())
.subscribe(System.out::println);
}
private static ObservableOperator<String, Integer> upperCaseOperator() {
return observer -> new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
observer.onSubscribe(d);
}
@Override
public void onNext(Integer value) {
observer.onNext(String.valueOf(value).toUpperCase());
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
}
ForkJoin 框架采用工作窃取算法(Work-Stealing),使得任务可以在多个线程之间高效地分配和执行。工作窃取算法允许线程在执行完自己的任务后,主动去窃取其他线程的任务执行,从而实现任务的动态负载均衡。这样,当某个线程执行完自己的任务后,它可以帮助其他线程执行任务,提高整体的并发效率。
ForkJoinPool
是 ForkJoin 框架的核心类,负责管理工作线程池。它提供了一个通用的线程池,用于执行 ForkJoinTask
。在一般情况下,可以使用默认的无参构造函数创建一个 ForkJoinPool
,也可以通过构造函数指定并行度(Parallelism),即并发执行的线程数目。
下面是一个使用 ForkJoin 框架计算数组元素和的示例:
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
public class ForkJoinExample {
public static void main(String[] args) {
int[] array = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 创建一个计算任务
SumTask task = new SumTask(array, 0, array.length);
// 提交任务并获取结果
int result = forkJoinPool.invoke(task);
System.out.println("Sum of array elements: " + result);
}
static class SumTask extends RecursiveTask<Integer> {
private final int[] array;
private final int start;
private final int end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
// 如果任务足够小,直接计算结果
if (end - start <= 2) {
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// 否则,拆分任务
int middle = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, middle);
SumTask rightTask = new SumTask(array, middle, end);
// 并行执行子任务
leftTask.fork();
rightTask.fork();
// 合并子任务的结果
return leftTask.join() + rightTask.join();
}
}
}
上述示例中,我们通过 ForkJoinPool
创建了一个工作线程池,然后定义了一个 SumTask
继承自 RecursiveTask
,用于计算数组元素的和。在 compute
方法中,我们判断任务是否足够小,如果是,则直接计算结果;否则,将任务拆分成两个子任务并并行执行。最后,合并子任务的结果得到最终的计算结果。
RecursiveTask
用于表示有返回值的任务,而 RecursiveAction
用于表示无返回值的任务。在上面的例子中,我们使用了 RecursiveTask
,因为我们希望计算出数组元素的和,并返回一个结果。
ForkJoinTask
是 ForkJoin 框架中所有任务的基类,它提供了一些用于管理任务执行的方法。在前面的例子中,RecursiveTask
继承了 ForkJoinTask
,并通过 fork
方法实现了任务的拆分与并行执行,通过 join
方法实现了子任务结果的合并。
这样,通过 ForkJoin 框架,我们能够方便地实现复杂的任务拆分与并行执行,从而充分利用多核处理器的性能。
在 ForkJoinTask
中,RecursiveTask
和 RecursiveAction
分别用于表示有返回值的任务和无返回值的任务。
RecursiveTask
是一个泛型类,用于表示有返回值的任务。通过继承 RecursiveTask
,可以实现自定义的有返回值的任务。以下是一个简单的示例,计算斐波那契数列的第 n 项:
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
public class FibonacciExample {
public static void main(String[] args) {
int n = 10;
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 创建一个计算任务
FibonacciTask task = new FibonacciTask(n);
// 提交任务并获取结果
int result = forkJoinPool.invoke(task);
System.out.println("Fibonacci number at position " + n + ": " + result);
}
static class FibonacciTask extends RecursiveTask<Integer> {
private final int n;
public FibonacciTask(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
FibonacciTask leftTask = new FibonacciTask(n - 1);
leftTask.fork();
FibonacciTask rightTask = new FibonacciTask(n - 2);
rightTask.fork();
return leftTask.join() + rightTask.join();
}
}
}
在上述示例中,通过继承 RecursiveTask
,实现了一个计算斐波那契数列第 n 项的任务。任务在计算过程中拆分为两个子任务,分别计算第 n - 1 和第 n - 2 项,然后合并子任务的结果得到最终结果。
RecursiveAction
是一个泛型类,用于表示无返回值的任务。通过继承 RecursiveAction
,可以实现自定义的无返回值的任务。以下是一个简单的示例,打印斐波那契数列的前 n 项:
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.ForkJoinPool;
public class FibonacciPrintExample {
public static void main(String[] args) {
int n = 10;
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 创建一个打印任务
FibonacciPrintTask task = new FibonacciPrintTask(n);
// 提交任务
forkJoinPool.invoke(task);
}
static class FibonacciPrintTask extends RecursiveAction {
private final int n;
public FibonacciPrintTask(int n) {
this.n = n;
}
@Override
protected void compute() {
int[] fib = new int[n];
computeFibonacci(fib, n);
for (int i = 0; i < n; i++) {
System.out.print(fib[i] + " ");
}
}
private void computeFibonacci(int[] fib, int n) {
fib[0] = 0;
if (n > 1) {
fib[1] = 1;
computeFibonacci(fib, n, 2);
}
}
private void computeFibonacci(int[] fib, int n, int current) {
if (current < n) {
fib[current] = fib[current - 1] + fib[current - 2];
computeFibonacci(fib, n, current + 1);
}
}
}
}
在上述示例中,通过继承 RecursiveAction
,实现了一个打印斐波那契数列前 n 项的任务。任务在计算过程中递归调用自身,直到计算完成。由于该任务无返回值,因此 compute
方法不需要返回结果。
Disruptor 是一个专注于高性能、无锁并发的框架,主要应用于金融领域的低延迟系统。它的核心思想是通过环形缓冲区(RingBuffer)实现高效的事件发布与订阅,避免了传统锁机制可能带来的性能瓶颈。
RingBuffer
是 Disruptor 中的核心数据结构,采用环形缓冲区的形式存储事件。它通过使用预分配的数组,避免了链式结构的内存分配,减少了垃圾回收的压力。多个生产者可以同时向 RingBuffer 中发布事件,多个消费者可以同时订阅并处理事件。这种设计使得 Disruptor 能够以极低的延迟处理大量的事件。
Disruptor 基于生产者-消费者模式,通过无锁的方式实现了高效的事件处理。生产者负责向 RingBuffer 中发布事件,消费者负责订阅并处理事件。由于 RingBuffer 的环形结构,生产者和消费者之间不存在竞争关系,不需要加锁,从而避免了传统并发编程中锁带来的性能开销。
Disruptor 的高性能和低延迟特性使其在金融领域的高频交易系统中得到广泛应用。在这些系统中,对事件的处理速度要求极高,而 Disruptor 的设计理念正好满足了这些需求。通过有效地利用现代计算机硬件的特性,避免了传统锁机制可能引入的性能问题,使得 Disruptor 成为处理金融交易等对低延迟要求极高的场景的理想选择。
以下是一个简单的使用 Disruptor 的示例,演示了生产者和消费者之间的协作:
import com.lmax.disruptor.*;
import java.util.concurrent.Executors;
public class DisruptorExample {
public static void main(String[] args) {
// 创建 Disruptor 环境
Disruptor<Event> disruptor = new Disruptor<>(Event::new, 1024, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
// 设置事件处理器
disruptor.handleEventsWith(new EventHandler<Event>() {
@Override
public void onEvent(Event event, long sequence, boolean endOfBatch) {
// 处理事件
System.out.println("Event: " + event.getData() + " processed by " + Thread.currentThread().getName());
}
});
// 启动 Disruptor
disruptor.start();
// 创建生产者
RingBuffer<Event> ringBuffer = disruptor.getRingBuffer();
EventProducer producer = new EventProducer(ringBuffer);
// 发布事件
for (int i = 0; i < 10; i++) {
producer.publishEvent(i);
}
// 关闭 Disruptor
disruptor.shutdown();
}
static class Event {
private int data;
public int getData() {
return data;
}
public void setData(int data) {
this.data = data;
}
}
static class EventProducer {
private final RingBuffer<Event> ringBuffer;
public EventProducer(RingBuffer<Event> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void publishEvent(int data) {
// 获取下一个可用的序号
long sequence = ringBuffer.next();
try {
// 获取序号对应的事件对象
Event event = ringBuffer.get(sequence);
// 设置事件数据
event.setData(data);
} finally {
// 发布事件
ringBuffer.publish(sequence);
}
}
}
}
上述示例中,我们使用了 Disruptor 框架创建了一个环境,定义了一个事件类 Event
,并设置了事件处理器。然后创建了一个生产者 EventProducer
,通过调用 publishEvent
发布事件。通过 Disruptor 的内部机制,生产者和消费者之间的通信实现了高效的事件处理。在实际应用中,可以根据具体场景进一步定制事件处理逻辑。
ListenableFuture
是 Guava 提供的接口,扩展了 JDK 的 Future
接口,允许注册回调函数。这个接口的设计目的是为了在异步操作完成时执行特定操作,而不需要显式地等待异步操作的完成。以下是一个简单的使用示例:
import com.google.common.util.concurrent.*;
import java.util.concurrent.Executors;
public class ListenableFutureExample {
public static void main(String[] args) {
// 创建 ListeningExecutorService
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
// 提交异步任务
ListenableFuture<String> future = executorService.submit(() -> {
System.out.println("Task executed by " + Thread.currentThread().getName());
return "Result";
});
// 注册回调函数
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("Success! Result: " + result);
}
@Override
public void onFailure(Throwable t) {
System.err.println("Failure: " + t.getMessage());
}
}, executorService);
// 关闭 executorService
executorService.shutdown();
}
}
在上述示例中,我们使用 Guava 的 ListeningExecutorService
包装了 JDK 的 ExecutorService
,并通过 Futures.addCallback
注册了回调函数。这样,当异步任务完成时,将自动执行注册的回调函数。
Guava 的 Futures
工具类提供了一系列用于处理 ListenableFuture
的静态方法。除了 addCallback
,还有其他一些方法,例如 transform
、transformAsync
等,用于对异步操作的结果进行转换或组合。这些方法的设计目的是为了简化异步编程的复杂性,使代码更加清晰。
import com.google.common.util.concurrent.*;
import java.util.concurrent.Executors;
public class FuturesExample {
public static void main(String[] args) {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
// 提交异步任务
ListenableFuture<Integer> future = executorService.submit(() -> {
System.out.println("Task executed by " + Thread.currentThread().getName());
return 42;
});
// 使用 transform 转换结果
ListenableFuture<String> transformedFuture = Futures.transform(future, Object::toString, executorService);
// 注册回调函数
Futures.addCallback(transformedFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("Transformed Result: " + result);
}
@Override
public void onFailure(Throwable t) {
System.err.println("Failure: " + t.getMessage());
}
}, executorService);
// 关闭 executorService
executorService.shutdown();
}
}
在上述示例中,我们使用 Futures.transform
将异步任务的结果从整数转换为字符串。通过使用 Guava 的工具方法,我们能够更加方便地对异步操作的结果进行处理。
SettableFuture
是 Guava 提供的一个实现了 ListenableFuture
接口的类,可以手动设置异步任务的结果。它允许在异步任务的执行体中,根据实际情况设置成功或失败的结果,从而更加灵活地控制异步任务的执行。
以下是一个使用 SettableFuture
的示例:
import com.google.common.util.concurrent.*;
import java.util.concurrent.Executors;
public class SettableFutureExample {
public static void main(String[] args) {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
// 创建 SettableFuture
SettableFuture<String> settableFuture = SettableFuture.create();
// 手动设置异步任务的结果
executorService.submit(() -> {
try {
Thread.sleep(1000); // 模拟异步操作
settableFuture.set("Success Result");
} catch (InterruptedException e) {
settableFuture.setException(e);
}
});
// 注册回调函数
Futures.addCallback(settableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("Success! Result: " + result);
}
@Override
public void onFailure(Throwable t) {
System.err.println("Failure: " + t.getMessage());
}
}, executorService);
// 关闭 executorService
executorService.shutdown();
}
}
在上述示例中,我们使用 SettableFuture
创建了一个实现了 ListenableFuture
接口的对象,并在异步任务的执行体中手动设置了成功的结果。这样,我们就可以灵活地在异步操作中控制结果的设置。
Guava 的 ListeningExecutorService
接口是对 JDK 的 ExecutorService
的扩展,允许执行异步任务,并提供了一些用于处理 ListenableFuture
的方法。它提供了 submit
方法用于提交异步任务,以及 shutdown
方法用于关闭 executor。通过使用 ListeningExecutorService
,我们可以更方便地处理异步任务的结果。
以上是 Guava 并发库的一些基本用法和实例,Guava 提供了丰富的工具类和接口,能够简化并发编程的复杂性,提高代码的可读性和可维护性。在实际应用中,可以根据具体的需求选择合适的工具类和接口,以便更高效地处理并发操作。
RateLimiter
是 Guava 提供的一个用于令牌桶算法的实现类,用于控制某个资源访问的速度。通过 RateLimiter
,我们可以限制对资源的访问频率,以便更好地控制系统的并发性。
以下是一个使用 RateLimiter
的简单示例:
import com.google.common.util.concurrent.RateLimiter;
public class RateLimiterExample {
public static void main(String[] args) {
// 创建一个每秒发放两个令牌的 RateLimiter
RateLimiter rateLimiter = RateLimiter.create(2.0);
// 模拟请求
for (int i = 0; i < 10; i++) {
// 尝试获取令牌
double waitTime = rateLimiter.acquire();
// 执行业务逻辑
System.out.println("Request " + i + " served after waiting for " + waitTime + " seconds");
}
}
}
在上述示例中,我们创建了一个每秒发放两个令牌的 RateLimiter
,然后模拟了一系列请求。通过 acquire
方法,我们可以获取令牌,并在获取令牌的过程中进行阻塞,以控制请求的速率。
Cache
是 Guava 提供的缓存实现类,用于将键值对存储在内存中,以便更快地检索数据。Cache
提供了一系列的方法,用于将数据放入缓存、从缓存中获取数据以及清理缓存等操作。
以下是一个使用 Cache
的简单示例:
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.TimeUnit;
public class CacheExample {
public static void main(String[] args) {
// 创建一个最大容量为 100,过期时间为 5 分钟的缓存
Cache<String, String> cache = CacheBuilder.newBuilder()
.maximumSize(100)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build();
// 将数据放入缓存
cache.put("key1", "value1");
cache.put("key2", "value2");
// 从缓存中获取数据
String value1 = cache.getIfPresent("key1");
String value3 = cache.getOrDefault("key3", "default");
System.out.println("Value 1: " + value1);
System.out.println("Value 3: " + value3);
}
}
在上述示例中,我们使用 CacheBuilder
创建了一个最大容量为 100,过期时间为 5 分钟的缓存。然后,我们将数据放入缓存,并通过 getIfPresent
和 getOrDefault
方法从缓存中获取数据。
Guava 并发库中还有其他许多有用的类和接口,用于处理并发编程中的各种场景。在实际应用中,可以根据具体的需求选择合适的工具类和接口,以便更高效地处理并发操作。
通过学习本文提供的内容,读者将掌握Java中并发编程的核心概念和高级工具。了解多线程编程的基本原理、并发库的使用方法以及适用场景,将使开发者能够更加自信地构建高性能、可伸缩且稳定的应用程序。在竞争激烈的软件开发领域,具备并发编程的深入知识将成为区分优秀开发者的重要标志。