Java学习笔记-day06-响应式编程Reactor API大全(上)

发布时间:2024年01月10日

Reactor 是一个基于响应式编程的库,主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API,包括创建、转换、过滤、组合等操作符,用于处理异步数据流。以下是一些 Reactor 的主要 API 示例:

pom依赖

   <dependencyManagement>
       <dependencies>
           <dependency>
               <groupId>io.projectreactor</groupId>
               <artifactId>reactor-bom</artifactId>
               <version>2023.0.0</version>
               <type>pom</type>
               <scope>import</scope>
           </dependency>
       </dependencies>
   </dependencyManagement>


   <dependencies>
       <dependency>
           <groupId>io.projectreactor</groupId>
           <artifactId>reactor-core</artifactId>
       </dependency>
       <dependency>
           <groupId>io.projectreactor</groupId>
           <artifactId>reactor-test</artifactId>
           <scope>test</scope>
       </dependency>
       <dependency>
           <groupId>org.junit.jupiter</groupId>
           <artifactId>junit-jupiter</artifactId>
           <version>5.7.2</version>
           <scope>test</scope>
       </dependency>

   </dependencies>

1. 创建 Mono 和 Flux

  • Mono: 用于表示包含零个或一个元素的异步序列。
  • Flux: 用于表示包含零个或多个元素的异步序列。
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;

public class ReactorCreateExample {
    public static void main(String[] args) {
        // 创建包含单个元素的 Mono
        Mono<String> mono = Mono.just("Hello, Reactor!");

        // 创建包含多个元素的 Flux
        Flux<Integer> flux = Flux.fromArray(new Integer[]{1, 2, 3, 4, 5});

        mono.subscribe(System.out::println); // 输出: Hello, Reactor!
        flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5
    }
}

2. 转换操作符

使用转换操作符对数据流进行转换或处理。

import reactor.core.publisher.Flux;

public class ReactorTransformExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.range(1, 5);

        // 对每个元素进行平方操作
        Flux<Integer> squared = source.map(x -> x * x);

        squared.subscribe(System.out::println); // 输出: 1, 4, 9, 16, 25
    }
}

3. 过滤操作符

使用过滤操作符筛选数据流中的元素。

import reactor.core.publisher.Flux;

public class ReactorFilterExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.range(1, 5);

        // 筛选偶数
        Flux<Integer> evenNumbers = source.filter(x -> x % 2 == 0);

        evenNumbers.subscribe(System.out::println); // 输出: 2, 4
    }
}

4. 组合操作符

使用组合操作符组合多个数据流。

import reactor.core.publisher.Flux;

public class ReactorCombineExample {
    public static void main(String[] args) {
        Flux<Integer> source1 = Flux.range(1, 3);
        Flux<Integer> source2 = Flux.range(4, 3);

        // 合并两个数据流
        Flux<Integer> merged = Flux.concat(source1, source2);

        merged.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5, 6
    }
}

这些只是 Reactor API 的一小部分示例。Reactor 提供了丰富的操作符和方法,用于处理复杂的异步数据流。开发人员可以根据具体需求选择适当的操作符进行组合,以构建出符合业务逻辑的异步处理链。

5. 错误处理

Reactor 提供了多种处理错误的方式,例如使用 onErrorResume, onErrorReturn, doOnError 等方法。

import reactor.core.publisher.Flux;

public class ReactorErrorHandlingExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.just(1, 2, 0, 4, 5);

        // 处理除零异常并提供默认值
        Flux<Integer> result = source.map(x -> 10 / x)
                .onErrorResume(ex -> Flux.just(-1));

        result.subscribe(System.out::println); // 输出: 10, 5, -1
    }
}

6. 背压处理

Reactor 提供了背压处理的支持,允许生产者和消费者之间实现合理的数据流控制。使用 onBackpressureBuffer 或者其他背压操作符可以处理高速生产者和慢速消费者之间的数据流。

import reactor.core.publisher.Flux;

public class ReactorBackpressureExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.range(1, 1000);

        // 设置缓冲区大小
        Flux<Integer> buffered = source.onBackpressureBuffer(10);

        buffered.subscribe(
                data -> {
                    // 模拟慢速消费者
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(data);
                },
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Done")
        );
    }
}
  • TODO:未能实现没有背压和有背压的对比

7. 使用 Reactor WebFlux 处理 Web 请求

Reactor 还提供了 WebFlux 模块,用于处理响应式的 Web 请求。以下是一个简单的示例:

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

@RestController
public class WebFluxController {

    @GetMapping("/hello")
    public Mono<ResponseEntity<String>> hello() {
        return Mono.just(ResponseEntity.ok("Hello, Reactor WebFlux!"));
    }
}

8. Reactor 核心概念

Reactor 中有一些核心概念,了解这些概念有助于更好地使用 Reactor API。

  • Publisher(发布者): 代表一个生产数据的源头,通常是 MonoFlux

  • Subscriber(订阅者): 用于消费数据流的组件。通过 subscribe 方法订阅 Publisher

  • Subscription(订阅): 代表 SubscriberPublisher 之间的连接。Subscriber 可以使用 Subscription 来请求数据,取消订阅等。

  • Processor(处理器): 既是 Publisher 又是 Subscriber,用于在两者之间进行转换和处理。

public class ReactorCoreConceptsExample {
    public static void main(String[] args) {
        // 创建发布者
        Flux<Integer> source = Flux.range(1, 5);

        // 创建处理器,并进行数据处理
        UnicastProcessor<Integer> processor = UnicastProcessor.create();
        source.map(value -> value * 2)  // Example: doubling the values
                .subscribe(processor);

        // 创建订阅者
        CustomSubscriber<Integer> subscriber = new CustomSubscriber<>();

        // 订阅并处理数据
        processor.subscribe(subscriber);
    }

    // 自定义订阅者
    static class CustomSubscriber<T> extends BaseSubscriber<T> {
        @Override
        protected void hookOnNext(T value) {
            System.out.println("Processed Value: " + value);
        }

        @Override
        protected void hookOnError(Throwable throwable) {
            System.err.println("Error: " + throwable);
        }

        @Override
        protected void hookOnComplete() {
            System.out.println("Done");
        }
    }
}
  • UnicastProcessor.create()已弃用,可以使用Sinks.many().unicast().onBackpressureBuffer()

9. Reactor 调度器

Reactor 提供了多种调度器,用于控制异步操作的执行线程。例如,Schedulers.boundedElastic() 创建了一个弹性线程池,可以根据需要动态调整线程数。

public class ReactorSchedulersExample {
    public static void main(String[] args) {
        Flux.range(1, 5)
                .publishOn(Schedulers.boundedElastic())  // 在弹性线程池上发布
                .map(x -> x * x)
                .subscribeOn(Schedulers.parallel())  // 在并行线程池上订阅
                .subscribe(System.out::println);
    }
}
  • 经测试,大概率只使用了一个线程

11. 组合多个 Mono 或 Flux

使用 zip 操作符可以组合多个 MonoFlux,将它们的元素进行组合。

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactorZipExample {
    public static void main(String[] args) {
        Mono<String> mono1 = Mono.just("Hello");
        Mono<String> mono2 = Mono.just("Reactor");

        // 将两个 Mono 合并为一个 Flux
        Flux<String> result = Flux.zip(mono1, mono2)
                .map(tuple -> tuple.getT1() + " " + tuple.getT2());

        result.subscribe(System.out::println); // 输出: Hello Reactor
    }
}

12. 超时操作

使用 timeout 操作符可以在指定的时间内等待数据流产生结果,如果超时,则触发错误。

public class ReactorTimeoutExample {
    public static void main(String[] args) throws InterruptedException {
        Flux<Integer> source = Flux.just(1, 2, 3)
                .delayElements(Duration.ofSeconds(2)); // 模拟延迟

        // 在指定时间内等待数据流产生结果,否则触发超时
        source.timeout(Duration.ofSeconds(1))
                .subscribe(
                        data -> System.out.println("Received: " + data),
                        error -> System.err.println("Error: " + error),
                        () -> System.out.println("Done")
                );

        //睡一会,等待任务执行完成
        Thread.sleep(3333);
    }
}

13. 并行操作

使用 parallel 操作符可以将一个数据流并行处理,提高处理速度。

public class ReactorParallelExample {
    public static void main(String[] args) throws InterruptedException {
        Flux.range(1, 10)
                .parallel()
                .runOn(Schedulers.parallel())
                .map(x -> x * x)
                .sequential()
                .subscribe(System.out::println);
        //睡一会,等待任务执行完成
        Thread.sleep(1111);
    }
}

14. 与 Java Stream 集成

Reactor 与 Java Stream 可以方便地进行集成。

import reactor.core.publisher.Flux;
import java.util.stream.Stream;

public class ReactorJavaStreamIntegrationExample {
    public static void main(String[] args) {
        Flux<Integer> flux = Flux.fromStream(Stream.of(1, 2, 3, 4, 5));

        flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5
    }
}

15. 使用 Mono 和 Flux 进行条件操作

Reactor 提供了条件操作符,例如 switchIfEmptyfilter,用于根据条件处理数据流。

public class ReactorConditionalOperatorsExample {
    public static void main(String[] args) {
        Flux<Integer> empty = Flux.range(1, 0);
        Flux<Integer> source = Flux.range(1, 5);

        // 如果数据流为空,则切换到另一个数据流
        empty.switchIfEmpty(Flux.range(6, 3))
                .subscribe(System.out::println); // 输出: 6,7,8

        // 使用 filter 过滤元素
        source.filter(x -> x % 2 == 0)
                .subscribe(System.out::println); // 输出: 2, 4
    }
}

16. 使用 Reactor StepVerifier 进行测试

代码需要写在test测试目录下!!!

Reactor 提供了 StepVerifier 类,用于测试异步操作的行为。

public class ReactorTestingExample {
    public static void main(String[] args) {
        Flux<Integer> flux = Flux.range(1, 5);

        // 使用 StepVerifier 验证数据流的行为
        StepVerifier.create(flux)
                .expectNext(1, 1, 3, 4, 5)//正确顺序应该是12345
                .expectComplete()
                .verify();
    }
}

17. 使用 Mono 和 Flux 进行重试

Reactor 提供了 retryWhen 方法,结合 Backoff 操作符,用于在发生错误时进行重试。

public class ReactorRetryExample {
    public static void main(String[] args) throws InterruptedException {
        Mono<Object> source = Mono.fromCallable(() -> {
                    throw new RuntimeException("Simulated error");
                })
                //最大重试次数为3次,初始重试间隔为1秒,并且采用指数回退策略,直到达到最大的回退时间(这里是5秒)。
                .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5)));

        source.subscribe(
                data -> System.out.println("Received: " + data),
                error -> System.err.println("Error: " + error.getMessage())
        );
        
        //得多睡会儿,让它跑完最大重试时间
        Thread.sleep(999999);
    }
}

19. 使用 Reactor Context 进行上下文传递

Reactor 提供了 Context 类,用于在操作链中传递上下文信息。这对于在异步操作中共享信息非常有用。

import reactor.core.publisher.Mono;
import reactor.util.context.Context;

public class ReactorContextExample {
    public static void main(String[] args) {
        Mono<String> mono = Mono.deferContextual(contextView ->
                Mono.just("Hello, " + contextView.get("user")));

        String result = mono.contextWrite(Context.of("user", "John")).block();
        System.out.println(result); // 输出: Hello, John
    }
}

20. 使用 Reactor 的 doOn 方法进行副作用处理

doOn 系列方法允许在数据流的不同生命周期阶段执行副作用操作,如日志记录、统计等。

import reactor.core.publisher.Flux;

public class ReactorDoOnExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.range(1, 5);

        source
                .doOnNext(value -> System.out.println("Processing element: " + value))
                .doOnComplete(() -> System.out.println("Processing complete"))
                .subscribe(System.out::println);
    }
}

21. 使用 Reactor 的 transform 方法进行操作链重用

transform 方法允许对操作链进行重用,将一系列操作组合为一个新的 Function

import reactor.core.publisher.Flux;

import java.util.function.Function;

public class ReactorTransformExample {
    public static void main(String[] args) {
        Flux<Integer> source = Flux.range(1, 5);

        // 定义一个操作链
        Function<Flux<Integer>, Flux<Integer>> customTransform = flux ->
                flux.filter(x -> x % 2 == 0)
                        .map(x -> x * 2);

        // 使用 transform 应用自定义操作链
        source.transform(customTransform)
                .subscribe(System.out::println); // 输出: 4, 8
    }
}
文章来源:https://blog.csdn.net/Demo_00/article/details/135511437
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。