本章将从响应式编程的开始,从 stream 开始逐步递进,如对流式编程或响应式编程十分熟悉的可直接跳过对应小节。本章内容因目前使用有限,仅供参考,目前也不必花费过多时间在该章内容上,待未来使用时再深入研究即可。本章内容可作为 webflux
学习的前置知识。
最佳实战:凡是写for循环处理数据的统一全部用StreamAPI进行替换
Stream所有数据和操作被组合成流管道流管道组成:
流的创建:主要有集合的 of
方法,以及 Stream.of
中间操作:可通过查看对应 api 源码注释看具体是中间操作还是终止操作, intermediate operation
filter
:过滤map
:映射,一对一flatMap
:散列,一对多filter、
map、mapToInt、mapToLong、mapToDouble
flatMap、flatMapToInt、flatMapToLong、flatMapToDouble
mapMulti、mapMultiToInt、mapMultiToLong、mapMultiToDouble、
parallel、unordered、onClose、sequential
distinct、sorted、peek、limit、skip、takeWhile、dropWhile
终止操作:terminal operation
,必须调用终止操作才会真正执行。
forEach、forEachOrdered、toArray、reduce、collect、toList、min、
max、count、anyMatch、allMatch、noneMatch、findFirst、findAny、iterator
一个使用示例:
public static void main(String[] args) {
Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5, 6, 7, 8);
List<Integer> list = stream.filter(i -> i % 2 == 0).peek(System.out::println).toList();
System.out.println(list);
}
流式编程就像制定工厂流水线,开发人员定制好流水线的原材料,各个加工步骤,最终产物。这条流水线完全制定好才会开始运行并产出。
Reactive Streams是JVM面向流的库的标准和规范,是 jdk9 中的 API,方便本地开发基于异步、消息驱动的全事件回调系统:响应式系统。
API Components:注意,使用这些组件是天然异步且由 ForkJoinPool
线程池启用执行,所以程序最后调用 System.in.read()
来控制主线程不完全结束
Publisher
:发布者接口;产生数据流Subscriber
:订阅者接口; 消费数据流Subscription
:订阅关系;订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素,也可以取消订阅Processor
:处理器是同时实现了发布者和订阅者接口的组件。它可以接收来自一个发布者的数据,进行处理,并将结果发布给下一个订阅者。处理器在Reactor中充当中间环节,代表一个处理阶段,允许在数据流中进行转换、过滤和其他操作数据流向:Publisher
( dataBuffer) -> N 个 Processor
-> Subscriber
public static void main(String[] args) throws Exception {
// 1. 定义发布者:可发布消息
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()){
// 2. 定义订阅者:订阅者可订阅发布者发布的消息
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
// 保存绑定关系
private Flow.Subscription subscription;
// 绑定订阅消息时触发
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("订阅事件发生了");
subscription.request(1); // 背压模式,订阅者向发布者请求发布信息
System.out.println("订阅者线程:" + Thread.currentThread()); // Thread[ForkJoinPool.commonPool-worker-1,5,main]
}
@Override
public void onNext(String item) {
System.out.println("本轮:" + item);
subscription.request(1);
if (item.equals("原材料0")) {
// throw new RuntimeException("自控异常");
}
System.out.println("订阅者线程Next:" + Thread.currentThread()); // Thread[ForkJoinPool.commonPool-worker-1,5,main]
}
@Override
public void onError(Throwable throwable) {
System.out.println("异常了:" + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("完成了");
}
};
// 3. 发布者的订阅者列表中添加这名订阅者,后续发布信息会发送给订阅者
publisher.subscribe(subscriber);
// 4. 发布者发布消息
for (int i = 0; i < 10; i++) {
publisher.submit("原材料" + i);
}
System.out.println("主线程:" + Thread.currentThread()); // Thread[main,5,main]
publisher.close(); // 这样才会回调 onComplete 方法
System.in.read();
}
}
响应式编程:通常作为观察者模式的拓展,一般会使用线程池、DataBuffer
发布订阅模式:
响应式:
响应式编程是一种关注于数据流(data streams)和变化传递(propagation of change)的异步编程方式。 这意味着它可以用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。
在响应式编程方面,微软跨出了第一步,它在 .NET 生态中创建了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在JVM上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式 编程规范,它定义了一系列标准接口和交互规范。并整合到 Java 9 中(使用 Flow 类)。
响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 因为其中也有 Iterable-Iterator 这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。
使用 iterator 是一种“命令式”(imperative)编程范式,即使访问元素的方法是 Iterable 的唯一职责。关键在于,什么时候执行 next() 获取元素取决于开发者。在响应式流中,相对应的 角色是 Publisher-Subscriber,但是 当有新的值到来的时候 ,却反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键。此外,对推送来的数据的操作 是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过 描述“控制流程”来定义对数据流的处理逻辑。
除了数据推送,对错误处理(error handling)和完成(completion)信号的定义也很完善。 一个 Publisher 可以推送新的值到它的 Subscriber(调用 onNext 方法), 同样也可以推送错误(调用 onError 方法)和完成(调用 onComplete 方法)信号。 错误和完成信号都可以终止响应式流。可以用下边的表达式描述:
onNext x 0..N [onError | onComplete]
Java 提供了两种异步编程方式:
回调模式是简单的,但是缺点 是在复杂的处理逻辑中,回调中会层层嵌入回调,导致 回调地狱(Callback Hell) 。Reactor 提供了丰富的编排操作,从而代码直观反映了处理流程,并且所有的操作保持在同一层次 (尽量避免了嵌套)。
reactor是基于Reactive Streams的第四代响应式库规范,用于在JVM上构建非阻塞应用程序。
Flux[N]
和 Mono[0|1]
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2023.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 单元测试依赖 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.10.1</version>
<scope>test</scope>
</dependency>
</dependencies>
Flux
:N个元素的流Mono
:0 | 1 个元素的流响应式流:元素(内容) + 信号(完成/异常)
public class Main {
public static void main(String[] args) throws Exception {
Flux<Integer> just = Flux.just(1, 2, 3, 45, 0, 8);
just.subscribe(System.out::println); // 逐个遍历打印
just.subscribe(new BaseSubscriber<>() {
@Override
protected void hookOnNext(Integer value) {
System.out.println(String.valueOf(value) + Thread.currentThread()); // Thread[main,5,main]
}
});
System.out.println(Thread.currentThread()); // Thread[main,5,main]
}
}
该方法传入参数可自定义消费者或订阅者,订阅者一般可继承 BaseSubscriber
,另外可传三个参数分别定义信号感知回调,分别是正常流元素消费,感知异常,感知正常结束。
Flux.just(1, 2, 3, 45, 0, 8).subscribe(
v -> System.out.println("v=" + 10 / v), // 流元素消费
throwable -> System.out.println("throwable=" + throwable.getMessage()), // 感知异常结束
() -> System.out.println("感知流结束") // 感知正常结束
);
doOnXXX
和 BaseSubscriber
以及取消流,订阅者推荐直接继承 BaseSubscriber
可理解为该方法传入的即最终消费者或订阅者,而中间调用的一系列方法是中间处理过程
public static void main(String[] args) throws Exception {
Flux.just(1, 2, 3, 45, 0, 8).doOnNext(value -> {
System.out.println("一起玩" + value + Thread.currentThread());
}).subscribe(new BaseSubscriber<>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("绑定成功" + Thread.currentThread());
request(1); // 向发布者请求 1 次数据,n 表示请求 n 次数据
// requestUnbounded(); // 请求无限次数据,用了该方法则 onNext 中无需再写 request(1)
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("当前数据:" + value + Thread.currentThread());
if (value == 45) {
cancel(); // 取消流
}
request(1); // 继续要数据
}
});
}
输出:
绑定成功Thread[main,5,main]
一起玩1Thread[main,5,main]
当前数据:1Thread[main,5,main]
一起玩2Thread[main,5,main]
当前数据:2Thread[main,5,main]
一起玩3Thread[main,5,main]
当前数据:3Thread[main,5,main]
一起玩45Thread[main,5,main]
当前数据:45Thread[main,5,main]
默认订阅者都是使用当前线程执行,而发布者默认与订阅者使用同一线程,可使用 publishOn
和 subscribeOn
指定对应 Scheduler
publishOn
:指定后续执行的线程,影响调用位置起到后续的线程subscribeOn
:指定源到后续的所有线程,但不会影响 publishOn
指定的逻辑@Test
public void testThread() throws IOException {
System.out.println(Thread.currentThread().getName());
Scheduler publisherScheduler = Schedulers.newParallel("Publisher");
Scheduler subscriberScheduler = Schedulers.newParallel("Subscriber");
Flux.range(1, 3)
.doOnNext(item -> System.out.println("Publisher Default:" + item + Thread.currentThread().getName()))
.publishOn(publisherScheduler)
.doOnNext(item -> System.out.println("Publisher publishOn:" + item + Thread.currentThread().getName()))
.subscribeOn(subscriberScheduler)
.doOnNext(item -> System.out.println("Publisher subscribeOn:" + item + Thread.currentThread().getName()))
.subscribe(item -> System.out.println("Subscriber default:" + item + Thread.currentThread().getName()));
System.in.read();
}
执行的输出结果是:
main
Publisher Default:1Subscriber-1
Publisher Default:2Subscriber-1
Publisher Default:3Subscriber-1
Publisher publishOn:1Publisher-2
Publisher subscribeOn:1Publisher-2
Subscriber default:1Publisher-2
Publisher publishOn:2Publisher-2
Publisher subscribeOn:2Publisher-2
Subscriber default:2Publisher-2
Publisher publishOn:3Publisher-2
Publisher subscribeOn:3Publisher-2
Subscriber default:3Publisher-2
还有延迟发布方法 delayElements()
,如下将以间隔 500 毫秒的时间逐个发送元素
@Test
public void testDelayElements() throws IOException {
Flux.range(0,5).delayElements(Duration.ofMillis(500)).log().subscribe();
System.in.read();
}
并发流多线程分批处理示例:将 1000 条数据,按每 10 个一组分片处理,用 4个线程跑该段逻辑,用 runOn
指定线程池
@Test
public void testParallel() throws IOException {
Flux.range(1,1000)
.buffer(10)
.parallel(4)
.runOn(Schedulers.newParallel("线程池"))
.log()
.flatMap(Flux::fromIterable)
.collectSortedList(Integer::compareTo)
.subscribe(v -> System.out.println("v=" + v));
System.in.read();
}
public class MainTest {
/**
* 日志显示:log(),下面为对应打印的解释
* onSubscribe:流被订阅
* request(unbounded):请求无限数据
* onNext(2): 每个数据到达
* onComplete:流结束
*/
@Test
public void testLog() {
Flux.just(1, 2, 3, 45, 0, 8)
// .log()
.filter(i -> i % 2 == 0)
.log()
.subscribe();
}
/**
* 同步环境 生成 0~10 的序列
* sink 是通道,sink.next(obj) 向下游发布 obj 对象
* sink.complete() 迭代完成
*/
@Test
public void testGenerate() {
Flux.generate( () -> 0, (state, sink) -> {
sink.next(state);
if (state == 10) sink.complete();
return state + 1;
}).log().subscribe();
}
/**
* 多线程环境 create:常用于监听事件,并将事件信息传入管道
* [ INFO] (main) onSubscribe(FluxCreate.BufferAsyncSink)
* [ INFO] (main) request(unbounded)
* 做家务
* [ INFO] (main) onNext(家务)
*/
@Test
public void testCreate() {
interface Listener {
void afterDoSomeThing(String event);
};
class DoSomeThing {
public void doSomeThing(String thing) {
System.out.println("做" + thing);
for (Listener listener : afterListenerList) {
listener.afterDoSomeThing(thing);
}
}
private final List<Listener> afterListenerList = new ArrayList<>();
public void register(Listener listener) {
afterListenerList.add(listener);
}
}
DoSomeThing doSomeThing = new DoSomeThing();
Flux.create(sink -> doSomeThing.register(sink::next)).log().subscribe();
doSomeThing.doSomeThing("家务");
}
/**
* 当不使用缓冲区时,每有 1 个元素便会直接发给订阅者
* buffer(n):缓冲区,凑够数量 n 再发送给订阅者,订阅者接收到的将是 n 个元素组成的 ArrayList 集合
* request(n)含义:找发布者请求 n 次数据,每次请求 bufferSize 个数据,总共能得到 n * bufferSize 个数据
* [ INFO] (main) onNext([1, 2, 3])
* [ INFO] (main) onNext([4, 5, 6])
* [ INFO] (main) onNext([7, 8, 9])
* [ INFO] (main) onNext([10])
*/
@Test
public void testBuffer() {
Flux.range(1, 10).buffer(3).log().subscribe();
}
/**
* 预请求
* limitRate(n):首次 request(n),请求了 75% * n(四舍五入) 次后直接请求 request(75% * n) ,且后续均 request(75% * n)
*/
@Test
public void testLimitRate() {
Flux.range(1, 10).log().limitRate(4).subscribe();
}
/**
* map:一一映射
*/
@Test
public void testMap() {
Flux.range(1, 10).map(value -> value + 1).log().subscribe();
}
/**
* handle:类似于 map 可用于实现一对一映射,但更加强大的是sink.next()可以传不同类型
*/
@Test
public void testHandle() {
Flux.range(1, 10)
.handle((value, sink) -> {
if (value % 2 == 0) sink.next(value);
else sink.next("字符串" + value);
}).log().subscribe();
}
/**
* 扁平化处理:{ "张 三", "李 四"} 变为 { "张", "三", "李", "四"}
*/
@Test
public void testFlatMap() {
Flux.just("张 三", "李 四")
.flatMap(item -> {
String[] strings = item.split(" ");
return Flux.fromArray(strings);
})
.log()
.subscribe();
}
/**
* 流连接
* concatWith:两个流元素类型要求一致
* concat:静态方法,元素类型无要求
* concatMap:将流中单个元素映射成其他流,再将所有流组合成一个流
*/
@Test
public void testConcat() {
Flux.just(1, 2).concatWith(Flux.just(3,4)).log().subscribe();
Flux.concat(Flux.just(1, 2), Flux.just("a",4)).log().subscribe();
Flux.just(1, 2).concatMap(i -> Flux.just("key" + i, "value" + i)).log().subscribe();
}
/**
* 把流变形成新数据
* transform:无状态转换; 原理,无论多少个订阅者,transform只执行一次
* transformDeferred:有状态转换; 每个订阅者transform都只执行一次
*/
@Test
public void testTransform() {
AtomicInteger atomicInteger = new AtomicInteger(1);
Flux<String> flux = Flux.just("a", "b", "c").transformDeferred(
items -> {
System.out.println("调用次数:" + atomicInteger.getAndIncrement());
return items.map(String::toUpperCase);
});
flux.subscribe(System.out::println);
flux.subscribe(System.out::println);
}
/**
* 空流是 Flux.empty(); Flux.just(null) 会报空指针异常
* switchIfEmpty:如果是空流则转换成其他流
* defaultIfEmpty:如果是空流则传入单个元素
*/
@Test
public void testEmpty() {
Flux.empty().switchIfEmpty(Flux.empty()).defaultIfEmpty("a").log().subscribe();
}
/**
* Flux.merge:按照流中每个元素发布的时间顺序合并流
* Flux.mergeSequential:按照流发布的时间顺序合并流,如流1中有多个元素且流1元素最先发布,则流1中元素会被合并到最开头
*/
@Test
public void testMerge() throws IOException {
Flux.merge(
Flux.range(0, 2).delayElements(Duration.ofMillis(300)),
Flux.range(5, 2).delayElements(Duration.ofMillis(100)),
Flux.range(10, 2).delayElements(Duration.ofMillis(200))
).log().subscribe();
System.in.read();
}
/**
* zip:将两个流压缩成元组,多余单个元素会被忽略
* Tuple:元组,[value1, value2]
*/
@Test
public void testZip() {
Flux.range(0, 2).zipWith(Flux.range(0,3)).log().subscribe();
}
/**
* 重试,会从头重试
*/
@Test
public void testRetry() throws IOException {
Flux.just(1)
.log()
.delayElements(Duration.ofSeconds(2))
.timeout(Duration.ofSeconds(1))
.retry(2)
.onErrorReturn(999)
.subscribe();
System.in.read();
}
@Test
public void testCache() throws IOException {
Flux<Integer> cache = Flux.range(1, 10)
.delayElements(Duration.ofSeconds(1))
.cache(2);// 缓存,不传参表示缓存所有元素
cache.log().subscribe(); // 会输出 1~10
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 缓存后用子线程输出缓存的3和4,之后和上面订阅者一样使用同一线程池输出
// 如果不使用 cache 缓存,则默认会每10秒逐个输出 1~10
cache.subscribe(item -> System.out.println("子线程输出:" + item));
}).start();
System.in.read();
}
/**
* 阻塞式 API
*/
@Test
public void testBlock() {
List<Integer> list = Flux.range(1, 1000).collectList().block();
System.out.println(list);
}
/**
* 响应式中的ThreadLocal,响应式编程中 ThreadLocal机制失效
*/
@Test
public void testContextAPI () {
Flux.just(1,2,3)
.transformDeferredContextual((flux,context)->{
System.out.println("flux = " + flux);
System.out.println("context = " + context);
return flux.map(i->i+"==>"+context.get("prefix"));
})
//上游能拿到下游的最近一次数据
.contextWrite(Context.of("prefix","哈哈"))
//ThreadLocal共享了数据,上游的所有人能看到; Context由下游传播给上游
.subscribe(v-> System.out.println("v = " + v));
}
}
前面有介绍过在订阅者处可使用 subscribe()
第二个参数感知错误,这里介绍更多错误处理的 API ,主要为 onErrorXXX()
onErrorReturn :捕获异常,返回默认值
onErrorReturn(T fallbackValue)
:错误时返回值 fallbackValue
,且结束流传输,订阅者将无法感知到此次异常onErrorReturn(Class<E> type, T fallbackValue)
:指定返回的异常类型onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
:异常断言@Test
public void testError() {
Flux.just(1,0,2)
.map(item -> 2 / item)
.onErrorReturn(999)
.log() // 输出 onNext(2),onNext(999),onComplete()
.subscribe();
}
onErrorResume :捕获异常,执行兜底方法。兜底方法需返回一个流供后续继续处理,或者再抛出自定义异常(更推荐用 onErrorMap
)
@Test
public void testError() {
Flux.just(1,0,2)
.map(item -> 2 / item)
.onErrorResume(throwable -> {
System.out.println("异常:" + throwable.getMessage());
return Flux.just(9,0,9);
})
.log() // onNext(2),onNext(9),onNext(0),cancel()
.map(item -> 3 / item)
.onErrorResume(throwable -> Flux.error(new RuntimeException(throwable)))
.subscribe();
}
onErrorMap :捕获并包装成一个业务异常,并重新抛出
@Test
public void testError() {
Flux.just(1,0,2)
.map(item -> 2 / item)
.onErrorMap(throwable -> new RuntimeException(throwable))
.subscribe();
}
@Test
public void testError() {
Flux.just(1,0,2,3)
.log() // request(unbounded),onNext(1),onNext(0),request(1),onNext(2),onNext(3),onComplete()
.map(item -> 2 / item)
.doOnError(throwable -> {
System.out.println("可获取到异常" + throwable.getMessage());
})
.onErrorContinue((throwable, item) -> {
System.out.println("发生了异常:" + throwable.getMessage());
System.out.println("导致异常发生的值:" + throwable.getMessage());
})
.doFinally(signalType -> {
System.out.println("流信号类型" + signalType);
})
.subscribe();
}
@Test
public void testSinks() throws InterruptedException, IOException {
// Sinks.many(); //发送Flux数据
// Sinks.one(); //发送Mono数据
// Sinks: 接受器,数据管道,所有数据顺着这个管道往下走的
Sinks.many().unicast(); //单播: 这个管道只能绑定单个订阅者(消费者)
Sinks.many().multicast();//多播: 这个管道能绑定多个订阅者
Sinks.many().replay();//重放: 这个管道能重放元素。 是否给后来的订阅者把之前的元素依然发给它;
// 单播模式
Sinks.Many<Object> many = Sinks.many().unicast().onBackpressureBuffer(new PriorityQueue<>(2));
for (int i = 0; i < 5; i++) {
many.tryEmitNext(i); // 将元素放入管道
}
// 单播只能订阅一次,二次订阅会报错,如允许多个订阅者可用 多播模式
many.asFlux().subscribe(item -> System.out.println("单播模式:" + item));
// 重放模式:底层利用队列进行缓存之前数据
Sinks.Many<Object> limit = Sinks.many().replay().limit(2); // 缓存两个
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
limit.tryEmitNext(i); // 将元素放入管道
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
limit.asFlux().subscribe(item -> System.out.println("重放模式订阅1:" + item)); // 0、1、2、3、4
TimeUnit.SECONDS.sleep(4);
limit.asFlux().subscribe(item -> System.out.println("重放模式订阅2:" + item)); // 2、3、4
System.in.read();
}