Reactor 是一个基于响应式编程的库,主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API,包括创建、转换、过滤、组合等操作符,用于处理异步数据流。以下是一些 Reactor 的主要 API 示例:
pom依赖
<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2023.0.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.7.2</version> <scope>test</scope> </dependency> </dependencies>
Mono
: 用于表示包含零个或一个元素的异步序列。Flux
: 用于表示包含零个或多个元素的异步序列。import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
public class ReactorCreateExample {
public static void main(String[] args) {
// 创建包含单个元素的 Mono
Mono<String> mono = Mono.just("Hello, Reactor!");
// 创建包含多个元素的 Flux
Flux<Integer> flux = Flux.fromArray(new Integer[]{1, 2, 3, 4, 5});
mono.subscribe(System.out::println); // 输出: Hello, Reactor!
flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5
}
}
使用转换操作符对数据流进行转换或处理。
import reactor.core.publisher.Flux;
public class ReactorTransformExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 5);
// 对每个元素进行平方操作
Flux<Integer> squared = source.map(x -> x * x);
squared.subscribe(System.out::println); // 输出: 1, 4, 9, 16, 25
}
}
使用过滤操作符筛选数据流中的元素。
import reactor.core.publisher.Flux;
public class ReactorFilterExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 5);
// 筛选偶数
Flux<Integer> evenNumbers = source.filter(x -> x % 2 == 0);
evenNumbers.subscribe(System.out::println); // 输出: 2, 4
}
}
使用组合操作符组合多个数据流。
import reactor.core.publisher.Flux;
public class ReactorCombineExample {
public static void main(String[] args) {
Flux<Integer> source1 = Flux.range(1, 3);
Flux<Integer> source2 = Flux.range(4, 3);
// 合并两个数据流
Flux<Integer> merged = Flux.concat(source1, source2);
merged.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5, 6
}
}
这些只是 Reactor API 的一小部分示例。Reactor 提供了丰富的操作符和方法,用于处理复杂的异步数据流。开发人员可以根据具体需求选择适当的操作符进行组合,以构建出符合业务逻辑的异步处理链。
Reactor 提供了多种处理错误的方式,例如使用 onErrorResume
, onErrorReturn
, doOnError
等方法。
import reactor.core.publisher.Flux;
public class ReactorErrorHandlingExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.just(1, 2, 0, 4, 5);
// 处理除零异常并提供默认值
Flux<Integer> result = source.map(x -> 10 / x)
.onErrorResume(ex -> Flux.just(-1));
result.subscribe(System.out::println); // 输出: 10, 5, -1
}
}
Reactor 提供了背压处理的支持,允许生产者和消费者之间实现合理的数据流控制。使用 onBackpressureBuffer
或者其他背压操作符可以处理高速生产者和慢速消费者之间的数据流。
import reactor.core.publisher.Flux;
public class ReactorBackpressureExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 1000);
// 设置缓冲区大小
Flux<Integer> buffered = source.onBackpressureBuffer(10);
buffered.subscribe(
data -> {
// 模拟慢速消费者
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(data);
},
error -> System.err.println("Error: " + error),
() -> System.out.println("Done")
);
}
}
Reactor 还提供了 WebFlux 模块,用于处理响应式的 Web 请求。以下是一个简单的示例:
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
public class WebFluxController {
@GetMapping("/hello")
public Mono<ResponseEntity<String>> hello() {
return Mono.just(ResponseEntity.ok("Hello, Reactor WebFlux!"));
}
}
Reactor 中有一些核心概念,了解这些概念有助于更好地使用 Reactor API。
Publisher(发布者): 代表一个生产数据的源头,通常是 Mono
或 Flux
。
Subscriber(订阅者): 用于消费数据流的组件。通过 subscribe
方法订阅 Publisher
。
Subscription(订阅): 代表 Subscriber
和 Publisher
之间的连接。Subscriber
可以使用 Subscription
来请求数据,取消订阅等。
Processor(处理器): 既是 Publisher
又是 Subscriber
,用于在两者之间进行转换和处理。
public class ReactorCoreConceptsExample {
public static void main(String[] args) {
// 创建发布者
Flux<Integer> source = Flux.range(1, 5);
// 创建处理器,并进行数据处理
UnicastProcessor<Integer> processor = UnicastProcessor.create();
source.map(value -> value * 2) // Example: doubling the values
.subscribe(processor);
// 创建订阅者
CustomSubscriber<Integer> subscriber = new CustomSubscriber<>();
// 订阅并处理数据
processor.subscribe(subscriber);
}
// 自定义订阅者
static class CustomSubscriber<T> extends BaseSubscriber<T> {
@Override
protected void hookOnNext(T value) {
System.out.println("Processed Value: " + value);
}
@Override
protected void hookOnError(Throwable throwable) {
System.err.println("Error: " + throwable);
}
@Override
protected void hookOnComplete() {
System.out.println("Done");
}
}
}
UnicastProcessor.create()
已弃用,可以使用Sinks.many().unicast().onBackpressureBuffer()
Reactor 提供了多种调度器,用于控制异步操作的执行线程。例如,Schedulers.boundedElastic()
创建了一个弹性线程池,可以根据需要动态调整线程数。
public class ReactorSchedulersExample {
public static void main(String[] args) {
Flux.range(1, 5)
.publishOn(Schedulers.boundedElastic()) // 在弹性线程池上发布
.map(x -> x * x)
.subscribeOn(Schedulers.parallel()) // 在并行线程池上订阅
.subscribe(System.out::println);
}
}
使用 zip
操作符可以组合多个 Mono
或 Flux
,将它们的元素进行组合。
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ReactorZipExample {
public static void main(String[] args) {
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.just("Reactor");
// 将两个 Mono 合并为一个 Flux
Flux<String> result = Flux.zip(mono1, mono2)
.map(tuple -> tuple.getT1() + " " + tuple.getT2());
result.subscribe(System.out::println); // 输出: Hello Reactor
}
}
使用 timeout
操作符可以在指定的时间内等待数据流产生结果,如果超时,则触发错误。
public class ReactorTimeoutExample {
public static void main(String[] args) throws InterruptedException {
Flux<Integer> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofSeconds(2)); // 模拟延迟
// 在指定时间内等待数据流产生结果,否则触发超时
source.timeout(Duration.ofSeconds(1))
.subscribe(
data -> System.out.println("Received: " + data),
error -> System.err.println("Error: " + error),
() -> System.out.println("Done")
);
//睡一会,等待任务执行完成
Thread.sleep(3333);
}
}
使用 parallel
操作符可以将一个数据流并行处理,提高处理速度。
public class ReactorParallelExample {
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.parallel()
.runOn(Schedulers.parallel())
.map(x -> x * x)
.sequential()
.subscribe(System.out::println);
//睡一会,等待任务执行完成
Thread.sleep(1111);
}
}
Reactor 与 Java Stream 可以方便地进行集成。
import reactor.core.publisher.Flux;
import java.util.stream.Stream;
public class ReactorJavaStreamIntegrationExample {
public static void main(String[] args) {
Flux<Integer> flux = Flux.fromStream(Stream.of(1, 2, 3, 4, 5));
flux.subscribe(System.out::println); // 输出: 1, 2, 3, 4, 5
}
}
Reactor 提供了条件操作符,例如 switchIfEmpty
和 filter
,用于根据条件处理数据流。
public class ReactorConditionalOperatorsExample {
public static void main(String[] args) {
Flux<Integer> empty = Flux.range(1, 0);
Flux<Integer> source = Flux.range(1, 5);
// 如果数据流为空,则切换到另一个数据流
empty.switchIfEmpty(Flux.range(6, 3))
.subscribe(System.out::println); // 输出: 6,7,8
// 使用 filter 过滤元素
source.filter(x -> x % 2 == 0)
.subscribe(System.out::println); // 输出: 2, 4
}
}
代码需要写在test测试目录下!!!
Reactor 提供了 StepVerifier
类,用于测试异步操作的行为。
public class ReactorTestingExample {
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 5);
// 使用 StepVerifier 验证数据流的行为
StepVerifier.create(flux)
.expectNext(1, 1, 3, 4, 5)//正确顺序应该是12345
.expectComplete()
.verify();
}
}
Reactor 提供了 retryWhen
方法,结合 Backoff
操作符,用于在发生错误时进行重试。
public class ReactorRetryExample {
public static void main(String[] args) throws InterruptedException {
Mono<Object> source = Mono.fromCallable(() -> {
throw new RuntimeException("Simulated error");
})
//最大重试次数为3次,初始重试间隔为1秒,并且采用指数回退策略,直到达到最大的回退时间(这里是5秒)。
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5)));
source.subscribe(
data -> System.out.println("Received: " + data),
error -> System.err.println("Error: " + error.getMessage())
);
//得多睡会儿,让它跑完最大重试时间
Thread.sleep(999999);
}
}
Reactor 提供了 Context
类,用于在操作链中传递上下文信息。这对于在异步操作中共享信息非常有用。
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
public class ReactorContextExample {
public static void main(String[] args) {
Mono<String> mono = Mono.deferContextual(contextView ->
Mono.just("Hello, " + contextView.get("user")));
String result = mono.contextWrite(Context.of("user", "John")).block();
System.out.println(result); // 输出: Hello, John
}
}
doOn
方法进行副作用处理doOn
系列方法允许在数据流的不同生命周期阶段执行副作用操作,如日志记录、统计等。
import reactor.core.publisher.Flux;
public class ReactorDoOnExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 5);
source
.doOnNext(value -> System.out.println("Processing element: " + value))
.doOnComplete(() -> System.out.println("Processing complete"))
.subscribe(System.out::println);
}
}
transform
方法进行操作链重用transform
方法允许对操作链进行重用,将一系列操作组合为一个新的 Function
。
import reactor.core.publisher.Flux;
import java.util.function.Function;
public class ReactorTransformExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 5);
// 定义一个操作链
Function<Flux<Integer>, Flux<Integer>> customTransform = flux ->
flux.filter(x -> x % 2 == 0)
.map(x -> x * 2);
// 使用 transform 应用自定义操作链
source.transform(customTransform)
.subscribe(System.out::println); // 输出: 4, 8
}
}