最近在学响应式编程,这里先记录下,响应式编程的一些基础内容
Reactive Streams、Reactor、WebFlux以及响应式编程之间存在密切的关系,它们共同构成了在Java生态系统中处理异步和响应式编程的一系列工具和框架。
Reactive Streams:
java.util.concurrent.Flow
类,实现了Reactive Streams规范。Reactor:
map
、filter
、flatMap
等,使得开发者能够方便地进行数据流的转换和处理。WebFlux:
响应式编程:
综上所述,Reactive Streams 提供了规范,Reactor 是一个实现了该规范的响应式编程框架,而WebFlux是Spring对于响应式编程的支持。它们共同致力于构建异步、非阻塞、响应式的应用程序。响应式编程则是一种更广义的编程范式,与Reactive Streams和Reactor等具体实现密切相关。
在java.util.concurrent.Flow
类中,定义了Reactive Streams规范
interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
subscribe(Subscriber<? super T> subscriber)
: 用于订阅数据流。当订阅者调用这个方法时,发布者将建立与订阅者的订阅关系,并开始推送数据。interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T item);
void onError(Throwable throwable);
void onComplete();
}
onSubscribe(Subscription subscription)
: 在订阅关系建立时调用。通过这个方法,订阅者可以持有 Subscription
对象,以便后续请求数据和取消订阅。
onNext(T item)
: 在接收到新元素时调用。订阅者通过这个方法处理收到的数据。
onError(Throwable throwable)
: 在数据流中出现错误时调用。订阅者通过这个方法处理错误情况。
onComplete()
: 在数据流完成时调用。通知订阅者数据流结束,不再有新的元素。
interface Subscription {
void request(long n);
void cancel();
}
request(long n)
: 用于请求订阅者处理指定数量的元素。订阅者通过这个方法告知发布者它可以处理多少个元素。
cancel()
: 用于取消订阅关系。当订阅者不再需要接收数据时,调用此方法取消订阅。
interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Processor
接口是 Subscriber
和 Publisher
的组合,表示一个中间处理组件,可以同时充当订阅者和发布者的角色。
Subscriber
部分的方法:onSubscribe(Subscription subscription)
, onNext(T item)
, onError(Throwable throwable)
, onComplete()
。
Publisher
部分的方法:subscribe(Subscriber<? super R> subscriber)
。表示 Processor
可以被其他订阅者订阅。
泛型T即为数据流
这些方法共同构成 Reactive Streams 协议,定义了发布者和订阅者之间的协作方式,以及订阅者如何处理数据流。在实际的使用中,这些方法的实现通常需要考虑异步处理、背压机制等方面,以确保响应式编程的目标得以实现。
在 Reactive Streams 中,Publisher
、Subscriber
、Subscription
和 Processor
之间的协作流程如下:
有时间再补流程图
Publisher(发布者):
Publisher
是异步产生数据流的组件,它通过 subscribe
方法允许订阅者订阅。subscribe
方法会接收一个 Subscriber
对象作为参数。Publisher
有新数据准备好时,通过调用订阅者的 onNext
方法将数据推送给订阅者。interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
Subscriber(订阅者):
Subscriber
是数据流的消费者,通过实现 Subscriber
接口来接收来自发布者的数据。订阅者通过调用 subscription.request(n)
请求一定数量的数据,处理数据时通过 onNext
方法接收元素。subscription.cancel()
来取消订阅。interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T item);
void onError(Throwable throwable);
void onComplete();
}
Subscription(订阅):
Subscription
表示订阅关系,它在 onSubscribe
方法中被传递给订阅者。通过 Subscription
,订阅者可以请求数据和取消订阅。request(long n)
方法请求处理 n 个元素,通过 cancel()
方法取消订阅。interface Subscription {
void request(long n);
void cancel();
}
Processor(处理器):
Processor
是一个同时实现了 Publisher
和 Subscriber
接口的中间组件,可以作为数据流的处理器,对数据进行转换和处理。Processor
既能接收数据,也能发布数据。它将 onNext
、onError
和 onComplete
方法委托给下游的订阅者,并将数据推送给上游的发布者。interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
这些接口一起构成了 Reactive Streams 的基本协议。发布者产生数据,订阅者订阅数据流并通过 onNext
方法接收元素,订阅者通过 request
方法请求处理一定数量的元素,同时可以通过 cancel
方法取消订阅。Processor
则可以用于在订阅者和发布者之间进行数据转换和处理。在 Reactive Streams 的实现中,这些接口的方法调用是异步进行的,以支持非阻塞的数据流处理。
自己实现了一个,参考了SubmissionPublisher
- 同步实现的
- 功能不完善
- 有bug
class MyPublisher implements Flow.Publisher<String>{
MySubscription<String> subscription;
public int request ;
public void publish(String item){
subscription.items.add(item);
while (true) {
if (request > 0) {
for (int i = 0; i < request; i++) {
if (!subscription.items.isEmpty()) {
try {
Object o = subscription.items.get(subscription.items.size() - 1);
subscription.subscriber.onNext(o.toString());
subscription.items.remove(o);
}catch (Exception e){
subscription.subscriber.onError(e);
return;
}
}
}
}
if (subscription.items.isEmpty()) {
break;
}
}
}
@Override
public void subscribe(Flow.Subscriber<? super String> subscriber) {
System.out.println("第一步:绑定订阅者" );
MySubscription<String> subscription = new MySubscription<>(subscriber,this);
this.subscription = subscription;
subscriber.onSubscribe(subscription);
}
}
class MySubscriber implements Flow.Subscriber<String>{
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("第二步:接收Subscription" );
this.subscription = subscription;
// 请求订阅者处理的元素数量
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("第四步:推送数据" );
System.out.println("MySubscriber 消费了item = " + item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("出异常了 = " + throwable);
}
@Override
public void onComplete() {
}
}
class MySubscription<T> implements Flow.Subscription{
final Flow.Subscriber<? super T> subscriber;
final MyPublisher publisher;
List items = new ArrayList();
public MySubscription(Flow.Subscriber<? super T> subscriber, MyPublisher publisher) {
this.subscriber = subscriber;
this.publisher = publisher;
}
@Override
public void request(long n) {
this.publisher.request++;
System.out.println("第三步:拉取请求" );
}
@Override
public void cancel() {
}
}
public class FlowDemo {
public static void main(String[] args) {
MyPublisher myPublisher = new MyPublisher();
MySubscriber mySubscriber = new MySubscriber();
myPublisher.subscribe(mySubscriber);
myPublisher.publish("111");
myPublisher.publish("222");
myPublisher.publish(null);
}
}
class SimplePublisher implements Flow.Publisher<Integer> {
private final SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
public void publishItems() {
for (int i = 1; i <= 5; i++) {
publisher.submit(i);
}
// 发布者完成发布
publisher.close();
}
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
publisher.subscribe(subscriber);
}
}
class SimpleSubscriber implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
// 请求订阅者处理的元素数量
subscription.request(1);
}
@Override
public void onNext(Integer item) {
System.out.println("Received item: " + item);
// 处理完一个元素后请求下一个
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error occurred: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Processing completed.");
}
}
public class ReactiveStreamsExample {
public static void main(String[] args) throws InterruptedException {
// 创建发布者和订阅者
SimplePublisher simplePublisher = new SimplePublisher();
SimpleSubscriber simpleSubscriber = new SimpleSubscriber();
// 订阅者订阅发布者
simplePublisher.subscribe(simpleSubscriber);
// 发布者发布数据
simplePublisher.publishItems();
// 睡一觉,确保数据处理完成
Thread.sleep(3000);
}
}