响应式编程二Mono,Flux简单介绍

发布时间:2023年12月20日

什么是响应式编程

响应式编程是一种面向数据流和变化传播的声明式的编程范式

响应式编程的特点-事件驱动

基于观察者模式
在这里插入图片描述

响应式流JVM规范

1)处理任意数量元素
2)按序处理
3)异步传递元素
4)非阻塞的回压

回压处理策略

1)Missing,不做任何处理
2)Error,抛出错误
3)Buffer(默认策略),当下游没有来得及去处理一些数据的时候会放在buffer里面
4)Drop,下游没有准备好处理元素会直接丢弃
5)Latest,下游只接收最新的数据处理

响应流规范定义的接口类

1)Publisher,发布者
2)Subscriber,订阅者
3)Subscription,发布者和订阅者直接的关系,订阅多少数据
4)Processor (reactor 3.5取消)

Publisher接口

在这里插入图片描述
在这里插入图片描述

Subscriber接口

在这里插入图片描述
在这里插入图片描述

Subscription接口

在这里插入图片描述

Subscriber接口

在这里插入图片描述

Processor接口(代表处理状态)

在这里插入图片描述
###响应式编程接口分析
在这里插入图片描述
1)首先订阅者设置到发布者上面调用(subscribe(Subscriber))
2)然后会调用订阅者的onSubscribe方法
3)它里面传递一个Subscription对象,这个对象会请求订阅者知道我们要多少个元素
4)onNext一个一个去调用我们的订阅者
5)当发生异常的时候发布者给订阅者一个onError信号,当完成的时候就给它一个onComplete信号

Project Reactor框架简介

1)对响应式流规范的一种实现
2)Spring WebFlux默认的响应式框架
3)完全异步非阻塞,对背压的支持
4)提供两个异步序列API:Flux[N]和Mono[0|1]
5)提供对响应式流的操作

创建Flux序列和Mono序列

首先引入依赖

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.4.0</version>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <version>3.4.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    //1、响应式流参数直接列举出来
        Flux<String> just = Flux.just("hello","everybody");//发布者
        just.subscribe(System.out::println);//订阅

        // 2、从数组中创建流
        Flux<String> stringFlux = Flux.fromArray(new String[]{"hello","lihua","zhangli"});
        stringFlux.subscribe(System.out::println);
     
     // 3、实现Iterable接口的集合
        Flux<Integer> integerFlux = Flux.fromIterable(Arrays.asList(1,2,3,4,5,6,7));
        integerFlux.subscribe(System.out::println);

        // 4、第一个参数就是起点 第二个参数:表示序列中元素个数
        Flux<Integer> range = Flux.range(1000,5);
        //订阅以后打印
        range.subscribe(System.out::println);

Mono序列:

        //1、指定元素(只能是一个元素)
        Mono<String> just = Mono.just("hello");
        just.subscribe(System.out::println);

   // 2、空元素两种方式效果一样
        Mono<Object> objectMono = Mono.justOrEmpty(null);
        objectMono.subscribe(System.out::println);
        Mono<Object> objectMono1 = Mono.justOrEmpty(Optional.empty());
        objectMono1.subscribe(System.out::println);

   public static String httpRequest()throws IOException {
        URL url = new URL("http://www.baidu.com");
        URLConnection urlConnection = url.openConnection();
        urlConnection.connect();;
        InputStream inputStream = urlConnection.getInputStream();
        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
        String temp = null;
        StringBuffer stringBuffer = new StringBuffer();
        while ((temp = reader.readLine()) != null){
            stringBuffer.append(temp).append("\r\n");
        }
        return stringBuffer.toString();
    }

    public static void main(String[] args) {
        // 1、发送http请求
//        Mono.fromCallable(() ->httpRequest())
//                .subscribe(System.out::println);
        // 2、使用Java8 方法引用
//        Mono.fromCallable(ReactorDemo3::httpRequest)
//                .subscribe(System.out::println);
        //3、上述代码不仅发送了http请求,还要处理onError
        Mono.fromCallable(ReactorDemo3::httpRequest)
                .subscribe(
                        item -> System.out.println(item),
                        ex -> System.err.println("请求异常:"+ ex.toString()),
                        () -> System.out.println("请求结束")
                        );
    }

Flux和Mono使用from(Publisher p) 工厂方法

 public static void main(String[] args) {
        Flux.from((Publisher<String>) s -> {
            for (int i = 0; i < 10; i++) {
                s.onNext("hello " + i);
            }
            //完成信号
            s.onComplete();
        }).subscribe(
                //订阅打印
                System.out::println,
                //异常
                System.err::println,
                //处理结束打印
                () -> System.out.println("处理结束")
        );
    }

使用defer工厂创建序列(在订阅时决定其行为

   static boolean isValidValue(String value){
        System.out.println("调用了 isValidValue的方法");
        return  false;
    }
    static String getData(String value){
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "eche:" + value;
    }
    static Mono<String> requestData(String value){
        return isValidValue(value) ? Mono.fromCallable(() -> getData(value)):
                Mono.error(new RuntimeException("isvalid value"));
    }
    static Mono<String> requestDeferData(String value){
        return Mono.defer(()->isValidValue(value) ? Mono.fromCallable(()->getData(value)):
                Mono.error(new RuntimeException()));
    }

    public static void main(String[] args) {
       requestData("zhangsan").subscribe();
       //此处使用subscribe
        requestDeferData("zhangsan").subscribe();
    }
订阅响应式流

有如下订阅方式:
在这里插入图片描述

第一种方式:复作用方式的流处理
第二种方式:增加订阅者处理
第三种方式:我们出了异常如何处理
第四种方式:我们出了异常或者完成的时候应该怎么处理
第五种方式:订阅者和提供者它之间的一些关系
第六种方式:自定义subscribe

      //1、subscribe
        Flux.range(100,10)
                .filter(num -> num % 3 == 0)
                .map(num -> "hello test " + num)
                .doOnNext(System.out::println)
                .subscribe();

        //2、增加订阅者
        Flux.range(100,10)
                .filter( num -> num % 2 == 0)
                .subscribe(System.out::println);

       // 3、增加对异常的处理
        Flux.from(subscirber -> {
            for(int  i =0;i< 5;i++){
                subscirber.onNext(i);
            }
            subscirber.onError(new Exception("测试数据异常"));
        }).subscribe(
                num -> System.out.println(num),
                ex -> System.err.println("异常情况:" +ex)
        );

        //4、 完成事件的处理
        Flux.from((Publisher<Integer>) s -> {
            for(int i = 0 ;i < 10;i++){
                s.onNext(i);
            }
            s.onComplete();
        }).subscribe(
                item -> System.out.println("onNext :"+ item),
                ex -> System.out.println("异常情况:" +ex),
                () -> System.out.println("处理完毕")
        );


       //4、 完成事件的处理
        Flux.from((Publisher<Integer>) s -> {
            for(int i = 0 ;i < 10;i++){
                s.onNext(i);
            }
            s.onComplete();
        }).subscribe(
                item -> System.out.println("onNext :"+ item),
                ex -> System.out.println("异常情况:" +ex),
                () -> System.out.println("处理完毕")
        );

  // 5、手动控制订阅
        Flux.range(1,100)
                .subscribe(
                        data -> System.out.println("onNext:" + data),
                        ex -> System.err.println("异常信息:" +ex),
                        ()-> System.out.println("onComplete"),
                        subscription -> {
                            subscription.request(5);
                            subscription.cancel();
                        }
                );


    //6、实现自定义订阅者
        Subscriber<String> subscriber = new Subscriber<String>() {
            volatile Subscription subscription;
            @Override
            public void onSubscribe(Subscription s) {
                subscription = s;
                System.out.println("initial request for 1 element");
                //订阅一个元素
                subscription.request(1);
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext:" + s);
                System.out.println("requesting 1 more element" );
                //再请求一个元素
                subscription.request(1);

            }

            @Override
            public void onError(Throwable t) {
                System.err.println("出现异常:" +t);
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
        Flux<String> stream = Flux.just("hello", "everyone", "!");
        stream.subscribe(subscriber);

映射响应式流元素

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

    // map映射
        Flux.range(1,10)
                .map(item -> "hello test" + item)
                .subscribe(System.out::println);

   // index索引
        Flux.range(0,10)
                .map(item -> "hello test " + item)
                .index()
                .subscribe(item ->{
                    // 二元组第一个元素 ,编号 0开始
                    Long t1 = item.getT1();
                    // 二元组第二个元素,也就是具体值
                    String t2 = item.getT2();
                    System.out.println(t1 + ":" + t2);

                },
                        System.err::println,
                        () -> System.out.println("流已经处理完毕"));

	//时间戳
       Flux.range(0,10)
                .map(item -> "hello test " + item)
                .timestamp()
                .subscribe(item ->{
                            // 二元组第一个元素 ,编号 0开始
                            Long t1 = item.getT1();
                            // 二元组第二个元素,也就是具体值
                            String t2 = item.getT2();
                            System.out.println(t1 + ":" + t2);

                        },
                        System.err::println,
                        () -> System.out.println("流已经处理完毕"));

过滤响应式流

Project Reactor 包含用于过滤元素的各种操作符。

  1. filter 操作符仅传递满足条件的元素。
  2. take(n) 操作符限制所获取的元素,该方法忽略除前 n 个元素之外的所有元素。
  3. takeLast 仅返回流的最后一个元素。
  4. takeUntil(Predicate) 传递一个元素直到满足某个条件。
  5. elementAt(n) 只可用于获取序列的第 n 个元素。
  6. takeUntilOther(Publisher) 或 skipUntilOther(Publisher) 操作符,可以跳过或获取一个元素,直到某些消息从另一个流到达。
   public static void main(String[] args) throws InterruptedException {
        Flux.interval(Duration.ofMillis(500))
                .map(item -> "test " + item)
                .doOnNext(System.out::println)
                //三秒之前的数都不要
                .skipUntilOther(Mono.just("start").delayElement(Duration.ofSeconds(3)))
                //拿去截至条件
                .takeUntilOther(Mono.just("end").delayElement(Duration.ofSeconds(6)))
                .subscribe(
                        item -> System.out.println("onNext: " + item),
                        ex -> System.err.println("onError: " + ex),
                        () -> System.out.println("onCompleted")
                );
        Thread.sleep(10*1000);
    }
收集响应式流
  • 收集到List
  • 使用 collectMap 操作符的映射( Map<K,T> );
  • 使用 collectMultimap 操作符的多映射( Map<K,Collection<T>> );
  • Flux.collect(Collector) 操作符收集到任何实现了 java.util.stream.Collector 的数据结构。
  • Flux 和 Mono 都有 repeat() 方法和 repeat(times) 方法,这两种方法可以针对传入序列进=行循环操作。
  • defaultIfEmpty(T) 是另一个简洁的方法,它能为空的 Flux 或 Mono 提供默认值。
  • Flux.distinct() 仅传递之前未在流中遇到过的元素。
  • Flux.distinctUntilChanged() 操作符没有此限制,可用于无限流以删除出现在不间断行中的重复项。
   Flux.just(1,2,36,4,25,6,7)
                //CollectionSoredList 默认是升序
                .collectSortedList(Comparator.reverseOrder())
                .subscribe(System.out::println);

        Flux.just(1,2,3,4,5,6)
                .collectMap(item -> "key:num" + item)
                .subscribe(System.out::println);
        Flux.just(1,2,3,4,5,6)
                .collectMap(
                        item -> "key:" + item,
                        item -> "value:" + item
                ).subscribe(System.out::println);

        Flux.just(1,2,3,4,5,6)
                .collectMap(
                        integer -> "key:" + integer,
                        integer -> "value:" + integer,
                        ()->{
                            Map<String,String> map = new HashMap<>();
                            for(int i =7 ;i <10;i++){
                                map.put("key:" + i ,"value:" + i);
                            }
                            return map;

                        }
                ).subscribe(System.out::println);


     Flux.just(1,2,3,4,5)
                .collectMultimap(
                        item -> "key:" + item,
                        item ->{
                            List<String> values = new ArrayList<>();
                            for(int i =0 ;i < item ;i ++){
                                values.add("value:" + i);
                            }
                            return values;
                        }
                ).subscribe(System.out::println);
        Flux.just(1,2,3,4,5)
                .collectMultimap(
                        item -> "key:" + item,
                        item ->{
                            List<String> values = new ArrayList<>();
                            for(int i =0 ;i < item ;i ++){
                                values.add("value:" + i);
                            }
                            return values;
                        },// 扩充
                        () ->{
                            Map map = new HashMap<String,List>();
                            List<String> list = new ArrayList<>();
                            for(int i = 0 ;i <3;i ++){
                                list.clear();
                                for(int j = 0 ;j <i;j++){
                                    list.add("ele:" +j);
                                }
                                map.put(i + ":key",list);
                            }
                            return map;
                        }
                ).subscribe(System.out::println);



	        Flux.just(1,2,3)
                .repeat(3)// 实际上是打印四次,一次原始的,三次重复的
                .subscribe(System.out::println);
        Flux.empty().defaultIfEmpty("hello test").subscribe(System.out::println);


        Flux.just(1,1,1,2,2,2,3,3,3,1,1,1,2,2,2)
                .distinctUntilChanged()
                .subscribe(System.out::print);
        System.out.println();
        System.out.println("===============");
        Flux.just(1,1,1,2,2,2,3,3,3,1,1,1,2,2,2)
                .distinct()
                .subscribe(System.out::print);


裁剪流中的元素
  1. 统计流中元素的数量;
  2. 检查所有元素是否具有 Flux.all(Predicate) 所需的属性;
  3. 使用 Flux.any(Predicate) 操作符检查是否至少有一个元素具有所需属性;
  4. 使用 hasElements 操作符检查流中是否包含多个元素;
  5. 使用 hasElement 操作符检查流中是否包含某个所需的元素。短路逻辑,在元素与值匹配时立即返回true。
  6. any 操作符不仅可以检查元素的相等性,还可以通过提供自定义 Predicate 实例来检查任何其他属性。

检查序列中是否包含偶数

Flux.just(1,2,3,4,5,6)
        .any(item -> item % 2 == 0)
        .subscribe(System.out::println);

我们可以检查一下它的整个运行过程:增加副作用

Flux.just(1,2,3,4,5,6)
        .doOnNext(item -> System.out.println(item))
        .any(item -> item % 2 == 0)
        .subscribe(System.out::println);

Flux 类能使用自定义逻辑来裁剪序列(也称为折叠)。 reduce 操作符通常需要一个初始值和一个函数,而该函数会将前一步的结果与当前步的元素组合在一起。
将1-5的元素求和:

   // reduce
        // 1,2,3,4,5
        // 0 1 3 6 10
        Flux.range(1,5)
                .reduce(0,(item1,item2)->{
                    System.out.println("item1:" + item1);
                    System.out.println("item2:" + item2);
                    return item1 + item2;
                }).subscribe(System.out::println);

Flux.scan()操作符在进行聚合时,可以向下游发送中间结果。
scan 操作符对 1 到 5 之间的整数求和:

Flux.range(1,5)
        .scan(0,(num1,num2) ->{
            System.out.println("num1:" + num1);
            System.out.println("num2:" + num2);
            return num1 + num2;
        })
        .subscribe(System.out::println);

scan 操作符对于许多需要获取处理中事件的相关信息的应用程序有用

例如,我们可以计算流上的移动平均值:

int arrLength = 5;
Flux.just(1,2,3,4,5,6)
        .index()
        .scan(new int[arrLength],(arr,entry) ->{
            arr[(int) (entry.getT1() % arrLength)] = entry.getT2();
            return arr;
}).skip(arrLength)// 当窗口数组被灌满之后,开始计算平均值,因此要跳过没有灌满的情况
        .map(array -> Arrays.stream(array).sum() * 1.0 /arrLength)
        .subscribe(System.out::println);

Mono 和 Flux 流有 then、thenMany 和 thenEmpty 操作符,它们在上游流完成时完成。
上游流完成处理后,这些操作符可用于触发新流,订阅是对于新流的。

Flux.just(1,2,3,4)
        .doOnNext(item -> System.out.println("副作用:" + item
        ))
        .thenMany(Flux.just(5,6,7))
        .subscribe(System.out::println);
文章来源:https://blog.csdn.net/lsdstone/article/details/134983206
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。