响应式编程是一种面向数据流和变化传播的声明式的编程范式
基于观察者模式
1)处理任意数量元素
2)按序处理
3)异步传递元素
4)非阻塞的回压
1)Missing,不做任何处理
2)Error,抛出错误
3)Buffer(默认策略),当下游没有来得及去处理一些数据的时候会放在buffer里面
4)Drop,下游没有准备好处理元素会直接丢弃
5)Latest,下游只接收最新的数据处理
1)Publisher,发布者
2)Subscriber,订阅者
3)Subscription,发布者和订阅者直接的关系,订阅多少数据
4)Processor (reactor 3.5取消)
###响应式编程接口分析
1)首先订阅者设置到发布者上面调用(subscribe(Subscriber))
2)然后会调用订阅者的onSubscribe方法
3)它里面传递一个Subscription对象,这个对象会请求订阅者知道我们要多少个元素
4)onNext一个一个去调用我们的订阅者
5)当发生异常的时候发布者给订阅者一个onError信号,当完成的时候就给它一个onComplete信号
1)对响应式流规范的一种实现
2)Spring WebFlux默认的响应式框架
3)完全异步非阻塞,对背压的支持
4)提供两个异步序列API:Flux[N]和Mono[0|1]
5)提供对响应式流的操作
首先引入依赖
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.0</version>
<scope>test</scope>
</dependency>
</dependencies>
//1、响应式流参数直接列举出来
Flux<String> just = Flux.just("hello","everybody");//发布者
just.subscribe(System.out::println);//订阅
// 2、从数组中创建流
Flux<String> stringFlux = Flux.fromArray(new String[]{"hello","lihua","zhangli"});
stringFlux.subscribe(System.out::println);
// 3、实现Iterable接口的集合
Flux<Integer> integerFlux = Flux.fromIterable(Arrays.asList(1,2,3,4,5,6,7));
integerFlux.subscribe(System.out::println);
// 4、第一个参数就是起点 第二个参数:表示序列中元素个数
Flux<Integer> range = Flux.range(1000,5);
//订阅以后打印
range.subscribe(System.out::println);
Mono序列:
//1、指定元素(只能是一个元素)
Mono<String> just = Mono.just("hello");
just.subscribe(System.out::println);
// 2、空元素两种方式效果一样
Mono<Object> objectMono = Mono.justOrEmpty(null);
objectMono.subscribe(System.out::println);
Mono<Object> objectMono1 = Mono.justOrEmpty(Optional.empty());
objectMono1.subscribe(System.out::println);
public static String httpRequest()throws IOException {
URL url = new URL("http://www.baidu.com");
URLConnection urlConnection = url.openConnection();
urlConnection.connect();;
InputStream inputStream = urlConnection.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String temp = null;
StringBuffer stringBuffer = new StringBuffer();
while ((temp = reader.readLine()) != null){
stringBuffer.append(temp).append("\r\n");
}
return stringBuffer.toString();
}
public static void main(String[] args) {
// 1、发送http请求
// Mono.fromCallable(() ->httpRequest())
// .subscribe(System.out::println);
// 2、使用Java8 方法引用
// Mono.fromCallable(ReactorDemo3::httpRequest)
// .subscribe(System.out::println);
//3、上述代码不仅发送了http请求,还要处理onError
Mono.fromCallable(ReactorDemo3::httpRequest)
.subscribe(
item -> System.out.println(item),
ex -> System.err.println("请求异常:"+ ex.toString()),
() -> System.out.println("请求结束")
);
}
Flux和Mono使用from(Publisher p) 工厂方法
public static void main(String[] args) {
Flux.from((Publisher<String>) s -> {
for (int i = 0; i < 10; i++) {
s.onNext("hello " + i);
}
//完成信号
s.onComplete();
}).subscribe(
//订阅打印
System.out::println,
//异常
System.err::println,
//处理结束打印
() -> System.out.println("处理结束")
);
}
使用defer工厂创建序列(在订阅时决定其行为)
static boolean isValidValue(String value){
System.out.println("调用了 isValidValue的方法");
return false;
}
static String getData(String value){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "eche:" + value;
}
static Mono<String> requestData(String value){
return isValidValue(value) ? Mono.fromCallable(() -> getData(value)):
Mono.error(new RuntimeException("isvalid value"));
}
static Mono<String> requestDeferData(String value){
return Mono.defer(()->isValidValue(value) ? Mono.fromCallable(()->getData(value)):
Mono.error(new RuntimeException()));
}
public static void main(String[] args) {
requestData("zhangsan").subscribe();
//此处使用subscribe
requestDeferData("zhangsan").subscribe();
}
有如下订阅方式:
第一种方式:复作用方式的流处理
第二种方式:增加订阅者处理
第三种方式:我们出了异常如何处理
第四种方式:我们出了异常或者完成的时候应该怎么处理
第五种方式:订阅者和提供者它之间的一些关系
第六种方式:自定义subscribe
//1、subscribe
Flux.range(100,10)
.filter(num -> num % 3 == 0)
.map(num -> "hello test " + num)
.doOnNext(System.out::println)
.subscribe();
//2、增加订阅者
Flux.range(100,10)
.filter( num -> num % 2 == 0)
.subscribe(System.out::println);
// 3、增加对异常的处理
Flux.from(subscirber -> {
for(int i =0;i< 5;i++){
subscirber.onNext(i);
}
subscirber.onError(new Exception("测试数据异常"));
}).subscribe(
num -> System.out.println(num),
ex -> System.err.println("异常情况:" +ex)
);
//4、 完成事件的处理
Flux.from((Publisher<Integer>) s -> {
for(int i = 0 ;i < 10;i++){
s.onNext(i);
}
s.onComplete();
}).subscribe(
item -> System.out.println("onNext :"+ item),
ex -> System.out.println("异常情况:" +ex),
() -> System.out.println("处理完毕")
);
//4、 完成事件的处理
Flux.from((Publisher<Integer>) s -> {
for(int i = 0 ;i < 10;i++){
s.onNext(i);
}
s.onComplete();
}).subscribe(
item -> System.out.println("onNext :"+ item),
ex -> System.out.println("异常情况:" +ex),
() -> System.out.println("处理完毕")
);
// 5、手动控制订阅
Flux.range(1,100)
.subscribe(
data -> System.out.println("onNext:" + data),
ex -> System.err.println("异常信息:" +ex),
()-> System.out.println("onComplete"),
subscription -> {
subscription.request(5);
subscription.cancel();
}
);
//6、实现自定义订阅者
Subscriber<String> subscriber = new Subscriber<String>() {
volatile Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
System.out.println("initial request for 1 element");
//订阅一个元素
subscription.request(1);
}
@Override
public void onNext(String s) {
System.out.println("onNext:" + s);
System.out.println("requesting 1 more element" );
//再请求一个元素
subscription.request(1);
}
@Override
public void onError(Throwable t) {
System.err.println("出现异常:" +t);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
Flux<String> stream = Flux.just("hello", "everyone", "!");
stream.subscribe(subscriber);
// map映射
Flux.range(1,10)
.map(item -> "hello test" + item)
.subscribe(System.out::println);
// index索引
Flux.range(0,10)
.map(item -> "hello test " + item)
.index()
.subscribe(item ->{
// 二元组第一个元素 ,编号 0开始
Long t1 = item.getT1();
// 二元组第二个元素,也就是具体值
String t2 = item.getT2();
System.out.println(t1 + ":" + t2);
},
System.err::println,
() -> System.out.println("流已经处理完毕"));
//时间戳
Flux.range(0,10)
.map(item -> "hello test " + item)
.timestamp()
.subscribe(item ->{
// 二元组第一个元素 ,编号 0开始
Long t1 = item.getT1();
// 二元组第二个元素,也就是具体值
String t2 = item.getT2();
System.out.println(t1 + ":" + t2);
},
System.err::println,
() -> System.out.println("流已经处理完毕"));
Project Reactor 包含用于过滤元素的各种操作符。
public static void main(String[] args) throws InterruptedException {
Flux.interval(Duration.ofMillis(500))
.map(item -> "test " + item)
.doOnNext(System.out::println)
//三秒之前的数都不要
.skipUntilOther(Mono.just("start").delayElement(Duration.ofSeconds(3)))
//拿去截至条件
.takeUntilOther(Mono.just("end").delayElement(Duration.ofSeconds(6)))
.subscribe(
item -> System.out.println("onNext: " + item),
ex -> System.err.println("onError: " + ex),
() -> System.out.println("onCompleted")
);
Thread.sleep(10*1000);
}
Flux.just(1,2,36,4,25,6,7)
//CollectionSoredList 默认是升序
.collectSortedList(Comparator.reverseOrder())
.subscribe(System.out::println);
Flux.just(1,2,3,4,5,6)
.collectMap(item -> "key:num" + item)
.subscribe(System.out::println);
Flux.just(1,2,3,4,5,6)
.collectMap(
item -> "key:" + item,
item -> "value:" + item
).subscribe(System.out::println);
Flux.just(1,2,3,4,5,6)
.collectMap(
integer -> "key:" + integer,
integer -> "value:" + integer,
()->{
Map<String,String> map = new HashMap<>();
for(int i =7 ;i <10;i++){
map.put("key:" + i ,"value:" + i);
}
return map;
}
).subscribe(System.out::println);
Flux.just(1,2,3,4,5)
.collectMultimap(
item -> "key:" + item,
item ->{
List<String> values = new ArrayList<>();
for(int i =0 ;i < item ;i ++){
values.add("value:" + i);
}
return values;
}
).subscribe(System.out::println);
Flux.just(1,2,3,4,5)
.collectMultimap(
item -> "key:" + item,
item ->{
List<String> values = new ArrayList<>();
for(int i =0 ;i < item ;i ++){
values.add("value:" + i);
}
return values;
},// 扩充
() ->{
Map map = new HashMap<String,List>();
List<String> list = new ArrayList<>();
for(int i = 0 ;i <3;i ++){
list.clear();
for(int j = 0 ;j <i;j++){
list.add("ele:" +j);
}
map.put(i + ":key",list);
}
return map;
}
).subscribe(System.out::println);
Flux.just(1,2,3)
.repeat(3)// 实际上是打印四次,一次原始的,三次重复的
.subscribe(System.out::println);
Flux.empty().defaultIfEmpty("hello test").subscribe(System.out::println);
Flux.just(1,1,1,2,2,2,3,3,3,1,1,1,2,2,2)
.distinctUntilChanged()
.subscribe(System.out::print);
System.out.println();
System.out.println("===============");
Flux.just(1,1,1,2,2,2,3,3,3,1,1,1,2,2,2)
.distinct()
.subscribe(System.out::print);
检查序列中是否包含偶数
Flux.just(1,2,3,4,5,6)
.any(item -> item % 2 == 0)
.subscribe(System.out::println);
我们可以检查一下它的整个运行过程:增加副作用
Flux.just(1,2,3,4,5,6)
.doOnNext(item -> System.out.println(item))
.any(item -> item % 2 == 0)
.subscribe(System.out::println);
Flux 类能使用自定义逻辑来裁剪序列(也称为折叠)。 reduce 操作符通常需要一个初始值和一个函数,而该函数会将前一步的结果与当前步的元素组合在一起。
将1-5的元素求和:
// reduce
// 1,2,3,4,5
// 0 1 3 6 10
Flux.range(1,5)
.reduce(0,(item1,item2)->{
System.out.println("item1:" + item1);
System.out.println("item2:" + item2);
return item1 + item2;
}).subscribe(System.out::println);
Flux.scan()操作符在进行聚合时,可以向下游发送中间结果。
scan 操作符对 1 到 5 之间的整数求和:
Flux.range(1,5)
.scan(0,(num1,num2) ->{
System.out.println("num1:" + num1);
System.out.println("num2:" + num2);
return num1 + num2;
})
.subscribe(System.out::println);
scan 操作符对于许多需要获取处理中事件的相关信息的应用程序有用
例如,我们可以计算流上的移动平均值:
int arrLength = 5;
Flux.just(1,2,3,4,5,6)
.index()
.scan(new int[arrLength],(arr,entry) ->{
arr[(int) (entry.getT1() % arrLength)] = entry.getT2();
return arr;
}).skip(arrLength)// 当窗口数组被灌满之后,开始计算平均值,因此要跳过没有灌满的情况
.map(array -> Arrays.stream(array).sum() * 1.0 /arrLength)
.subscribe(System.out::println);
Mono 和 Flux 流有 then、thenMany 和 thenEmpty 操作符,它们在上游流完成时完成。
上游流完成处理后,这些操作符可用于触发新流,订阅是对于新流的。
Flux.just(1,2,3,4)
.doOnNext(item -> System.out.println("副作用:" + item
))
.thenMany(Flux.just(5,6,7))
.subscribe(System.out::println);