1.父pom锁定版本
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2023.0.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
子模块引入依赖
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
</dependency>
</dependencies>
1.数据流:数据的源头
2.变化传播:数据操作(中间操作)
3.异步编程模式:底层控制异步
数据流:每个元素从流的源头,开始源源不断,自己往下滚动
onNext():当某个元素到达后,可以自定义它的处理逻辑
onComplete():处理结束
onError():异常结束
一个数据库:元素(0-n)+信号(1: 正常/异常);
流经过运算符计算后得到一个新流
案例:
Mono: 0|1个元素的流
Flux: n个元素的流
flux
public static void main(String[] args) throws IOException {
//发布者发布数据流:源头
//1.多元素的流
Flux<Integer> just = Flux.just(1, 2, 3, 4, 5);
//流不消费就没用,消费就是订阅
//一个数据流,可以有很多消费者
just.subscribe(System.out::println);
just.subscribe(e -> System.out.println("new just:" + e));
//对于每个消费者来说,每个流都是一样的:广播模式
//每秒产生一个从0开始的递增数字
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1));
flux.subscribe(System.out::println);
//流是异步的
System.in.read();
}
mono:只有一个元素
public static void main(String[] args) {
Mono<Integer> just = Mono.just(1);
just.subscribe(System.out::println);
//空流,但有一个信号,此时代表流完成的信号
Flux<Object> empty = Flux.empty();
}
doOnXxx:当流发生什么事情时,触发一个回调
系统调用提前定义好的钩子函数
定义一个空流,但有一个信号
此时代表流完成的信号
流不被消费,流不会触发完成回调
//空流,但有一个信号,此时代表流完成的信号
Flux<Object> empty = Flux.empty()
.doOnComplete(() -> {
System.out.println("stream finished");
});
empty.subscribe(System.out::println);
链式api中,下面的操作符,操作的是上面的流
自定义订阅者MySubscriber
public class MySubscriber extends BaseSubscriber<Integer> {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("bind with subscription" + subscription);
//订阅者找发布者要一个元素(背压)
request(1);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("item arrive: value" + value);
if (value < 5) {
request(2);//继续要元素
if (value == 3) {
throw new RuntimeException("value == 3 error");
}
} else {
cancel();//取消订阅
}
}
@Override
protected void hookOnComplete() {
System.out.println("stream finish");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("stream error:" + throwable.getMessage());
}
@Override
protected void hookOnCancel() {
System.out.println("stream cancel");
}
}
测试
public static void main(String[] args) throws InterruptedException {
Flux<Integer> flux = Flux.range(1, 7)
.doOnComplete(() -> System.out.println("stream flux finished"))
.doOnCancel(() -> System.out.println("stream flux cancel"))
.doOnError(throwable -> System.out.println("stream err:" + throwable.getMessage()))
.doOnNext( item -> System.out.println("next item arrive:" + item));
flux.subscribe(new MySubscriber());
Thread.sleep(2000);
}
结果:
希望流中元素出问题时执行
后续不再处理
Flux.just(1,2,3,4,5,6,7,0,5,6)
.map(item -> 10 / item)//流会出问题
.doOnNext(item -> System.out.println("item arrive:" + item))//流中元素到达触发
.doOnError( exception -> System.out.println("stream error:" + exception.getMessage()))
.subscribe(System.out::println);
结果:
注意:doInXxx(),要感知某个流的事件,写在这个流的后面,新流的前面
doOnNext():数据到达时,触发
doOnEach():封装的更详细,元素(数据+信号)到达时会触发
信号:正常/异常(取消)
doOnNext():每个数据(流的数据)到达时触发
doOnEach():每个元素(流的数据和信号)到达时候触发
doOnRequest():消费者请求流元素的时候
doOnError():流发生错误时
doOnSubscribe():流被订阅时
doOnTerminal():发送取消/异常信号中断流
doOnCancel():流被取消
doOnDiscard():流中元素被忽略时
.log():日志
示例:
Flux.range(1,10)
//.log() //日志 1-7
.filter(item -> item > 3)
.log() //日志 4-7
.map(item -> "new stream:" + item)// new stream 4-7
.subscribe(System.out::println);
结果:
订阅流:没订阅之前流什么也不做
流的元素开始流动, 发生数据变化
响应式编程:数据流+ 变化传播(操作)
案例
流正常时:只传入正常消费者(只消费正常元素)
Flux<Integer> flux = Flux.range(1,10)
.map(item -> return item *2;);
flux.subscribe(System.out::println);
流出现异常时
Flux<Integer> flux = Flux.range(1,10)
.map(item -> {
if (item == 9) {
int i = 10 / ( 9 - item );
}
return item *2;
});
flux.subscribe(item -> {
System.out.println("stream is correct item: " + item);
}, error -> {
System.out.println("stream is error:" + error.getMessage());
});
流完成时,执行回调
Flux<Integer> flux = Flux.range(1,10)
.map(item -> item *2);
flux.subscribe(
item -> System.out.println("stream is correct item: " + item) ,
error -> System.out.println("stream is error:" + error.getMessage()),
() -> System.out.println("stream finished")
);
流的生命周期钩子可以传播给订阅者
doOnXxx:发生这个事件的时候产生一个回调,通知你(不能改变)
onXxx:发生这个事件以后,执行一个动作,这个动作可以改变元素/信号
例如:
Aop:普通通知(前置/后置/异常/返回)
环绕通知(ProceedingJoinPoint),可以干扰目标方法执行
Flux<Integer> flux = Flux.range(1,10)
.map(item -> item *2);
flux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("stream is subscribe");
//流被订阅时,找发布者要1个数据
request(1);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("next item:" + value);
//数据到达,持续要数据
request(2);
}
@Override
protected void hookOnComplete() {
System.out.println("stream finished");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("stream error:" + throwable);
}
@Override
protected void hookOnCancel() {
super.hookOnCancel();
}
});
取消流:
消费者调用cancel()方法可以取消订阅
@Override
protected void hookOnNext(Integer value) {
System.out.println("next item:" + value);
//数据到达,持续要数据
if (value == 5) {
this.cancel();
}
request(2);
}
自定义消费者建议直接编写BaseSubscriber的逻辑
buffer(2),指定缓冲区大小,也就是消费者一次可以最多拿到2个元素
凑满数批量发送给消费者
Flux.range(1,10)
.buffer(2)//缓冲区缓存2个元素
.subscribe(System.out::println);
结果
消费者每次request(1),拿到buffer大小个真正元素
request(n):找发布者请求n次数据
总共能得到n*bufferSize个数据
Flux.range(1,1000)
.log() //限流触发看上游是怎么限流过去
.limitRate(100)//一次预取100个元素
.subscribe();
从上游每次请求100个元素
75%预取策略:第一次抓100个数据,
如果75%的元素已经处理,继续抓取新的75%元素
编程方式创建序列
generate
同步环境下
sink:接收器
Flux.generate(() -> 0, //初始值
(state, sink) -> {
//0-10
if (state == 10 ) {
sink.complete();//发送完成
}
sink.next(state); //把元素传出去
return state + 1;
})
.log()
.subscribe();
create();异步多线程
创建自定义监听器MyListener
public class MyListener {
FluxSink<Object> sink;
public MyListener(FluxSink<Object> sink) {
this.sink = sink;
}
//用户登录,触发online监听
public void online(String username) {
System.out.println("user login:" + username);
sink.next(username);
}
}
发布流
Flux.create(fluxSink -> {
MyListener listener = new MyListener(fluxSink);
for (int i = 0; i < 10; i++) {
listener.online("张" + i);
}
})
.log()
.subscribe();
System.in.read();
结果:
自定义流中元素处理规则
返回自定义处理后的流
Flux.range(1,10)
.handle((value,sink) -> {
System.out.println("get value:" + value);
sink.next(value * 2); //可以向下发送数据的通道
})
.log()
.subscribe();
handle():处理后,流中可以有不同类型数据
map():处理后,流中只能有相同类型数据
响应式编程,默认还是用当前线程,生成整个流,发布流,流操作
Flux.range(1,10)
.handle((value,sink) -> {
sink.next(value * 2);
})
.log()
.subscribe();
流的发布和中间操作,默认使用当前线程
调度器(线程池)
publishOn():改变发布者所在线程
subscribeOn():改变订阅者所在线程池
Schedulers.immediate();//无执行上下文,在当前线程运行
Schedulers.single();//使用固定的一个单线程
Flux.range(1,10)
.publishOn(Schedulers.single())//在哪个线程池把这个流的数据和操作执行了
.log()
.handle((value,sink) -> {
sink.next(value * 2);
})
.subscribe();
单开一个新线程:
Schedulers.boundedElastic();//有界,弹性调度,不是无限扩充的线程池
线程池中有10*cpu核心个线程,队列默认10W
Schedulers.fromExector(new ThreadPoolExecutor())
Schedulers.parallel();//并发池
Scheduler schedulers =Schedulers.newParallel("parallel-scheduler",4);
Flux<String> flux = Flux.range(1, 10)
.map(item -> item + 10)
.publishOn(schedulers)
.map(item -> "value:" + item)
.log();
new Thread(() -> flux.subscribe(System.out::println)).start();
指定发布者线程池:
发布者没有执行线程池:
Scheduler schedulers =Schedulers.newParallel("parallel-scheduler",4);
Flux<String> flux = Flux.range(1, 10)
.map(item -> item + 10)
.log()
.publishOn(schedulers)
.map(item -> "value:" + item);
new Thread(() -> flux.subscribe(System.out::println)).start();
若没有线程,发布者用的线程就是消费者的线程
pom.xml引入单元测试类
filter:过滤出来满足条件的元素
Flux.range(1,10)
.filter(item -> item % 2 == 0)
.log()
.subscribe();
结果:
onSubscribe():流被订阅
request(unbounded):请求无限数据
onNext():每个元素到达
onComplete():流订阅结束
flatMap():扁平化
Flux.just("zhang san","li si")
.flatMap(item -> {
String[] split = item.split(" "); //2个人的名字,按照空格拆分,单独打印姓和名
return Flux.fromArray(split); //把数据包装成多元素流
})
.log()
.subscribe();
结果:
concatMap():一个元素可以变很多个
Flux.range(1,10)
.concatMap(item -> Flux.just(item * 2,0))
.log()
.subscribe();
结果:映射成很多个
conact():将很多流合并成一个
Flux.concat(Flux.range(1,10),Flux.range(10,20))
.log()
.subscribe();
合并后:
concatWith():老流连接一个新流,元素类型要一样
contactMap():对于元素的位置不限制
Flux.range(1,10)
.concatWith(Flux.range(20,25))
.log()
.subscribe();
transform():
自定义对流中元素的操作,把流编程新数据
有多个订阅者,各个订阅者之间不会共享外部变量值
AtomicInteger atomic = new AtomicInteger(0);
Flux<String> transform = Flux.just("a", "b", "c")
.transform(values -> {
if (atomic.incrementAndGet() == 1) {
//如果是第一次调用,老流中的所有元素转成大写
return values.map(String::toUpperCase);
} else {
//如果不是第一次调用,原封不动返回小写
return values;
}
});
transform.subscribe(item -> System.out.println("subscriber1:" + item));
transform.subscribe(item -> System.out.println("subscriber2:" + item));
结果:
无论多少个订阅者,transform只执行一次
transformDeferred():
有多个订阅者
多个订阅者之间共享外部变量值,每个订阅者执行一次
AtomicInteger atomic = new AtomicInteger(0);
Flux<String> transform = Flux.just("a", "b", "c")
.transformDeferred(values -> {
if (atomic.incrementAndGet() == 1) {
//如果是第一次调用,老流中的所有元素转成大写
return values.map(String::toUpperCase);
} else {
//如果不是第一次调用,原封不动返回小写
return values;
}
});
transform.subscribe(item -> System.out.println("subscriber1:" + item));
transform.subscribe(item -> System.out.println("subscriber2:" + item));
不同订阅者共享外部变量:
defaultIfEmpty()
如果发布者元素为空,指定默认值,否则用发布者的值
静态兜底数据
@Test
public void testEmpty() {
this.returnStr()
.defaultIfEmpty("is empty")
.subscribe(System.out::println);
}
private Mono<String> returnStr() {
return Mono.just("a");
//return Mono.empty();
}
Mono.just(null);流里面有一个null值元素
Mono.empty();流里面没有元素,只有结束信号
switchEmpty()
空转换
动态兜底方法
@Test
public void testSwitchEmpty() {
this.returnStr()
.switchIfEmpty(Mono.just("stream is empty"))
.subscribe(System.out::println);
}
merge()
mergeWith()
将多个流合并在一起,按照发布者顺序发布
contact():将流中元素追加/连接(a流中所有元素完成后,才进行b中流)
mergeSequential():按照哪个流先发元素排队
@Test
public void testMerge() throws IOException {
Flux.merge(
Flux.just(1,2,3).delayElements(Duration.ofSeconds(1)),
Flux.just('a','b','c').delayElements(Duration.ofSeconds(2))
)
.log()
.subscribe();
System.in.read();
}
结果:
zip()
将2个流中每个元素结成一对
无法结对的元素会被忽略
最多支持8个流压缩
@Test
public void testZip() {
Flux.just(1,2,3)
.zipWith(Flux.just(4,5,6))
.log()
.map(tuple -> {
Integer t1 = tuple.getT1(); //元组中的第一个元素
Integer t2 = tuple.getT2(); //元组中的第二个元素
return t1 + t2;
})
.subscribe(System.out::println);
}
结果:
onErrorReturn()
吃掉异常,消费者无异常感知
返回一个兜底默认值,
流正常完成
@Test
public void testError() {
Flux.just(1,2,0)
.map(item -> "100 / " + item + " = " + (100/item))
.onErrorReturn(ArithmeticException.class,"math exception")
.subscribe(System.out::println);
}
结果:
onErrorResume()
调用兜底方法
流正常完成
Flux.just(1,2,0)
.map(item -> "100 / " + item + " = " + (100/item))
.onErrorResume(err -> Mono.just("stream err:" + err.getMessage()))
.subscribe(System.out::println);
根据错误返回新值
@Test
public void testError() {
Flux.just(1,2,0)
.map(item -> "100 / " + item + " = " + (100/item))
.onErrorResume(this::returnByErrType)
.subscribe(System.out::println);
}
private Mono<String> returnByErrType (Throwable throwable) {
if (throwable instanceof NullPointerException) {
return Mono.just("value is required");
}else {
return Mono.just("system error:" + throwable.getMessage());
}
}
只需要写正确的业务代码,
所有的业务异常,全部抛出自定义异常,由全局异常处理器统一处理
流异常完成
1.吃掉异常,抛新异常,消费者有感知
2.流异常完成
3.抛新异常
Flux.just(1, 2, 0,5)
.map(item -> "100 / " + item + " = " + (100 / item))
.onErrorMap(err -> new BusinessException("value is invalid"))
.subscribe(System.out::println,
err -> System.out.println("err happen:" + err.getMessage()),
() -> System.out.println("stream finished"));
流异常完成,流中后续元素不执行
doOnError()
异常被捕获,做自己的事情
不影响异常继续顺着流水线传播
不吃掉异常,只在异常发生时做一件事,消费者有感知
@Test
public void testError() {
Flux.just(1, 2, 0,5)
.map(item -> "100 / " + item + " = " + (100 / item))
.doOnError(err -> {
System.out.println("err saved");
throw new BusinessException("err happen");
})
.subscribe(System.out::println,
err -> System.out.println("err happen:" + err.getMessage()),
() -> System.out.println("stream finished"));
}
文件读写/网络操作
doFinally()
这件事一定会做
Flux.just(1, 2, 0,5)
.map(item -> "100 / " + item + " = " + (100 / item))
.doFinally(signalType -> System.out.println("signalType :" + signalType))
.subscribe(System.out::println,
err -> System.out.println("err happen:" + err.getMessage()),
() -> System.out.println("stream finished"));
onErrorContinue()
流中后续元素,继续执行
@Test
public void testErrContinue() {
Flux.just(1,2,3,0,5)
.map(item -> 10 /item)
.onErrorContinue((err,value) -> {
System.out.println("err in stream:" + err.getMessage());
System.out.println("err value in stream:" + value);
//例如:记录日志
})
.subscribe(
item -> System.out.println("stream success:" + item),
err -> System.out.println("err happen:" + err.getMessage())
);
}
onErrorStop()
流为源源不断的流,出现错误时,取消继续发布
有多个消费者也不会继续往下
从源头中断
onErrorComplete()
一旦发生错误,整个流结束,但是正常结束,后续元素不执行
把错误信号,替换成正确结束信号
retry():将流从头到尾重新请求一次
@Test
public void testTimeoutAndRetry() throws Exception {
Flux.just(1,2,3)
.delayElements(Duration.ofSeconds(3))
.log()
.timeout(Duration.ofSeconds(2))
.retry(3)
.onErrorReturn(0)
.map(item -> item *2)
.subscribe(System.out::println);
System.in.read();
}
结果:流超时,重新获取流3次,仍旧超时,抛异常
Sinks:数据管道/接收器,所有数据顺着这个管道往下走
Sinks.many();//发送Flux数据
Sinks.one();//发送Mono数据
Sinks.many().unicast();//单播:这个管道只能绑定单个订阅者
Sinks.many().multicast();//多播:这个管道可以绑定多个订阅者
Sinks.many().replay();//重放:这个管道能重放元素
是否给后来的订阅者把之前的元素依旧发给它
从头消费还是从订阅的那一刻消费
只能有一个消费者
@Test
public void testSinks() throws Exception{
Sinks.Many<Object> many = Sinks.many()
.unicast()
.onBackpressureBuffer(new LinkedBlockingQueue<>(5));//背压队列,最多能放多少个元素
new Thread(() -> {
for (int i = 0; i < 10; i++) {
many.tryEmitNext(i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
many.asFlux()
.subscribe(System.out::println);
System.in.read();
}
可以有多个消费者
存在问题:第二个消费者延迟到来,丢失前面元素
解决:replay()重放
@Test
public void testSinks() throws Exception{
Sinks.Many<Object> many = Sinks.many()
.multicast()
.onBackpressureBuffer();//背压队列,最多能放多少个元素
new Thread(() -> {
for (int i = 0; i < 10; i++) {
many.tryEmitNext(i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
many.asFlux()
.subscribe(item -> System.out.println("consumer 1 get item:" + item));
many.asFlux()
.subscribe(item -> System.out.println("consumer 2 get item:" + item));
System.in.read();
}
丢失元素:
默认订阅者从订阅的那一刻开始接元素
解决:后到达的订阅者,丢失之前元素
Sinks.Many<Object> many = Sinks.many()
.replay()
.all();
不再丢失元素:底层利用队列,缓存之前数据
range(),just() 默认没有缓存
@Test
public void testCache() throws Exception{
Flux<Integer> cache = Flux.range(1, 10)
.delayElements(Duration.ofSeconds(1))
.cache(3);//缓存之前3个元素
cache.subscribe();
new Thread(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
cache.subscribe(item -> System.out.println("subscriber1 get item:" + item));
}).start();
System.in.read();
}
结果:5秒后打印订阅者中元素,缓存3个
block():获取元素
@Test
public void testBlock() {
List<Integer> list = Flux.range(1, 10)
.map(item -> item * 9)
.collectList()
.block();
list.stream()
.forEach(System.out::println);
}
百万数据,8个线程,每个线程处理100,进行分批处理一直处理结束
@Test
public void testParallelFlux() {
Flux.range(1,100)
.buffer(10)
.parallel(8)
.runOn(Schedulers.newParallel("stream"))
.log()
.subscribe(System.out::println);
}
收集成一个集合
Flux.range(1,100)
.buffer(10)
.parallel(8)
.runOn(Schedulers.newParallel("stream"))
.log()
.flatMap(list -> Flux.fromIterable(list))
.collectSortedList(Integer::compareTo)
.subscribe(System.out::println);
ThreadLocal在响应式编程中无法使用
响应式中,数据流期间共享数据,ContextApi
Context:读写
ContextView:只读
上下文数据传播
@Test
public void testContextApi() {
//支持context的中间操作api
Flux.just(1,2,3)
.transformDeferredContextual((value,context) -> {
System.out.println("get value:" + value);
System.out.println("context:" +context);
return value.map(item -> context.get("prefix") + String.valueOf(item));
})
//上游能拿到下游最近一次数据
.contextWrite(Context.of("prefix","myPrefix:")) //类似threadLocal,共享数据,上游的所有人能看到
//context由下游传到上游
.subscribe(System.out::println);
//响应式编程:dao(数据源)->service-controller;从下游反向传播
}
结果: