Reactor 是一个基于响应式编程的库,主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API,包括创建、转换、过滤、组合等操作符,用于处理异步数据流。以下是一些 Reactor 的主要 API 示例:
pom依赖
<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2023.0.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.7.2</version> <scope>test</scope> </dependency> </dependencies>
then
方法进行后续操作then
方法用于在当前数据流完成后执行后续操作。
import reactor.core.publisher.Flux;
public class ReactorThenExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.just(1, 2, 3);
// 在当前数据流完成后执行后续操作
source.then(Mono.fromRunnable(() -> System.out.println("Done")))
.subscribe();
}
}
publishOn
方法进行线程切换publishOn
方法用于切换数据流的发布线程,从而改变元素处理的线程。
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class ReactorPublishOnExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 3);
// 将数据流的发布线程切换到另一个线程池
source.publishOn(Schedulers.elastic())
.map(value -> value * 2)
.subscribe(System.out::println);
}
}
subscribeOn
方法进行订阅线程切换subscribeOn
方法用于切换数据流的订阅线程,影响整个数据流的执行线程。
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class ReactorSubscribeOnExample {
public static void main(String[] args) throws InterruptedException {
Flux<Integer> source = Flux.range(1, 3).log();
// 将数据流的订阅线程切换到另一个线程池 另一个线程:parallel-1
source.subscribeOn(Schedulers.parallel())
.map(value -> value * 2)
.subscribe(System.out::println);
Thread.sleep(23333);
}
}
delayElements
方法进行元素延迟delayElements
方法用于延迟数据流中元素的发送。
import reactor.core.publisher.Flux;
import java.time.Duration;
public class ReactorDelayElementsExample {
public static void main(String[] args) throws InterruptedException {
Flux<Integer> source = Flux.range(1, 3);
// 延迟每个元素的发送
source.delayElements(Duration.ofSeconds(1))
.subscribe(System.out::println);
Thread.sleep(23333);
}
}
concatWith
方法进行数据流连接concatWith
方法用于将两个数据流连接在一起,保持顺序。
import reactor.core.publisher.Flux;
public class ReactorConcatWithExample {
public static void main(String[] args) {
Flux<Integer> source1 = Flux.just(1, 2, 3);
Flux<Integer> source2 = Flux.just(4, 5, 6);
// 将两个数据流连接在一起,保持顺序
source1.concatWith(source2)
.subscribe(System.out::println);
}
}
merge
方法进行多数据流合并merge
方法用于将多个数据流合并成一个数据流,并发执行。
import reactor.core.publisher.Flux;
public class ReactorMergeExample {
public static void main(String[] args) {
Flux<Integer> source1 = Flux.just(1, 2, 3);
Flux<Integer> source2 = Flux.just(4, 5, 6);
// 将两个数据流合并成一个数据流
Flux<Integer> mergedFlux = Flux.merge(source1, source2);
mergedFlux.subscribe(System.out::println);
}
}
和
merge的比较concatWith
: 这个方法会按照合并的顺序执行 Flux
。它会等待第一个 Flux
完成(包括完成信号或错误信号),然后再开始下一个 Flux
。merge
: 这个方法会并发执行所有的 Flux
,它不会等待前一个 Flux
完成。因此,元素的顺序可能是交错的。concatWith
: 它接受一个单独的 Flux
作为参数,将这个 Flux
追加到当前 Flux
的末尾。merge
: 它接受可变参数,可以传入多个 Flux
,并同时合并它们。public class FluxConcatWithMergeExample {
public static void main(String[] args) throws InterruptedException {
Flux<Integer> flux1 = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(100));
Flux<Integer> flux2 = Flux.just(4, 5, 6).delayElements(Duration.ofMillis(50));
Flux<Integer> flux3 = Flux.just(7, 8, 9).delayElements(Duration.ofMillis(75));
// 使用 concatWith 方法,按顺序执行
flux1.concatWith(flux2)
.concatWith(flux3)
.subscribe(v ->{
System.out.println("concatWith = " + v);
});
// 使用 merge 方法,并发执行
Flux.merge(flux1, flux2, flux3)
.subscribe(v ->{
System.out.println("merge = " + v);
});
Thread.sleep(22333);
}
}
mergeSequential
方法进行多数据流合并mergeSequential
方法用于按顺序合并多个数据流,保持各个数据流的元素顺序。
import reactor.core.publisher.Flux;
public class ReactorMergeSequentialExample {
public static void main(String[] args) {
Flux<Integer> source1 = Flux.just(1, 2, 3);
Flux<Integer> source2 = Flux.just(4, 5, 6);
// 按顺序合并两个数据流
Flux<Integer> mergedFlux = Flux.mergeSequential(source1, source2);
mergedFlux.subscribe(System.out::println);
}
}
combineLatest
方法进行多数据流合并combineLatest
方法用于合并多个数据流的最新元素。
import reactor.core.publisher.Flux;
public class ReactorCombineLatestExample {
public static void main(String[] args) {
Flux<Integer> source1 = Flux.just(1, 2, 3);
Flux<Integer> source2 = Flux.just(4, 5, 6);
// 合并两个数据流的最新元素
Flux<Integer> combinedFlux = Flux.combineLatest(source1, source2, (a, b) -> a + b);
combinedFlux.subscribe(System.out::println);
}
}
doOnNext
方法进行每个元素的附加操作doOnNext
方法用于在每个元素发出时执行附加操作,例如日志记录、统计等。
import reactor.core.publisher.Flux;
public class ReactorDoOnNextExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 3);
// 在每个元素发出时执行附加操作
source.doOnNext(value -> System.out.println("Processing: " + value))
.subscribe(System.out::println);
}
}
fromCallable
方法创建带有返回值的 MonofromCallable
方法用于创建一个 Mono
,其值由提供的 Callable
对象返回。
import reactor.core.publisher.Mono;
import java.util.concurrent.Callable;
public class ReactorFromCallableExample {
public static void main(String[] args) {
// 创建带有返回值的 Mono
Mono<String> resultMono = Mono.fromCallable(() -> {
// 执行一些计算
return "Result";
});
resultMono.subscribe(System.out::println);
}
}
using
方法进行资源管理using
方法用于在数据流的生命周期内管理资源,例如打开和关闭文件、网络连接等。
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ReactorUsingExample {
public static void main(String[] args) {
// 使用 using 方法管理资源
Flux<String> resultFlux = Flux.using(
() -> getResource(), // 打开资源
resource -> getData(resource), // 使用资源获取数据流
resource -> releaseResource(resource) // 关闭资源
);
resultFlux.subscribe(System.out::println);
}
private static Mono<String> getResource() {
System.out.println("Opening resource");
return Mono.just("Resource");
}
private static Flux<String> getData(Mono resource) {
System.out.println("Getting data from resource: " + resource);
return Flux.just("Data1", "Data2", "Data3");
}
private static Mono<Void> releaseResource(Mono resource) {
System.out.println("Releasing resource: " + resource);
return Mono.empty();
}
}
scan
方法进行累积操作scan
方法用于对数据流中的元素进行累积操作,并生成一个新的数据流。
import reactor.core.publisher.Flux;
public class ReactorScanExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.just(1, 2, 3, 4, 5);
// 对数据流中的元素进行累积操作
source.scan(0, (acc, value) -> acc + value)
.subscribe(System.out::println);
}
}
takeWhile
方法进行条件性的元素获取takeWhile
方法用于根据指定的条件获取数据流中的元素,直到条件不满足。
import reactor.core.publisher.Flux;
public class ReactorTakeWhileExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.just(1, 2, 3, 4, 5);
// 根据条件获取元素,直到条件不满足
source.takeWhile(value -> value < 4)
.subscribe(System.out::println);
}
}
thenMany
方法进行串联操作thenMany
方法用于在当前数据流完成后执行另一个数据流,将它们串联起来。
import reactor.core.publisher.Flux;
public class ReactorThenManyExample {
public static void main(String[] args) {
Flux<Integer> source1 = Flux.just(1, 2, 3);
Flux<Integer> source2 = Flux.just(4, 5, 6);
// 在当前数据流完成后执行另一个数据流
source1.thenMany(source2)
.subscribe(System.out::println);
}
}
ignoreElements
方法忽略所有元素ignoreElements
方法用于忽略数据流中的所有元素,只关注完成信号或错误信号。
import reactor.core.publisher.Flux;
public class ReactorIgnoreElementsExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.just(1, 2, 3);
// 忽略所有元素,只关注完成信号
source.ignoreElements()
.doOnTerminate(() -> System.out.println("Completed"))
.subscribe();
}
}
在 Reactor 中,Sink
是一个用于手动推送元素(signals)到 Subscriber
的接口。它允许你在创建 Flux 或 Mono 的过程中手动控制元素的生成。Reactor 提供了两种 Sink
:FluxSink
用于创建 Flux,MonoSink
用于创建 Mono。
import reactor.core.publisher.Flux;
public class FluxSinkExample {
public static void main(String[] args) {
Flux.create(fluxSink -> {
for (int i = 0; i < 5; i++) {
fluxSink.next(i); // 发送元素
}
fluxSink.complete(); // 发送完成信号
})
.subscribe(System.out::println);
}
}
import reactor.core.publisher.Flux;
public class FluxSinkErrorExample {
public static void main(String[] args) {
Flux.create(fluxSink -> {
for (int i = 0; i < 5; i++) {
fluxSink.next(i); // 发送元素
}
fluxSink.error(new RuntimeException("Simulated error")); // 发送错误信号
})
.subscribe(
System.out::println,
error -> System.err.println("Error: " + error.getMessage())
);
}
}
import reactor.core.publisher.Mono;
public class MonoSinkExample {
public static void main(String[] args) {
Mono.create(monoSink -> {
monoSink.success("Hello, Mono!"); // 发送元素
})
.subscribe(System.out::println);
}
}
import reactor.core.publisher.Mono;
public class MonoSinkErrorExample {
public static void main(String[] args) {
Mono.create(monoSink -> {
monoSink.error(new RuntimeException("Simulated error")); // 发送错误信号
})
.subscribe(
System.out::println,
error -> System.err.println("Error: " + error.getMessage())
);
}
}
FluxSink
进行背压控制在 Reactor 中,FluxSink
也提供了一些方法用于实现背压控制,以避免在高速生产者和低速消费者之间的元素溢出。
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
public class FluxSinkBackpressureExample {
public static void main(String[] args) {
Flux.create(fluxSink -> {
for (int i = 0; i < 1000; i++) {
fluxSink.next(i);
}
}, FluxSink.OverflowStrategy.BUFFER) // 指定背压策略
.onBackpressureBuffer(10, buffer -> System.err.println("Buffer overflow! Discarding: " + buffer))
.subscribe(value -> {
// 模拟慢速消费者
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(value);
});
}
}
在上述例子中,通过指定 FluxSink.OverflowStrategy.BUFFER
背压策略,当消费者无法跟上生产者的速度时,缓冲区将被用来存储元素。使用 onBackpressureBuffer
方法可以在溢出时执行自定义的操作。
FluxSink
进行手动请求FluxSink
也提供了 request
方法,允许消费者手动请求元素。
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
public class FluxSinkManualRequestExample {
public static void main(String[] args) {
Flux.create(fluxSink -> {
for (int i = 0; i < 100; i++) {
fluxSink.next(i);
if (i % 10 == 0 && fluxSink.requestedFromDownstream() == 0) {
// 当请求的元素达到 0 时,等待下游再次请求
while (fluxSink.requestedFromDownstream() == 0) {
// 等待下游请求
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
fluxSink.complete();
})
.subscribe(System.out::println);
}
}
在这个例子中,当消费者请求的元素达到 0 时,生产者会等待下游再次请求。这种手动控制请求的方式可以更灵活地处理背压。
Hooks
进行全局错误处理Hooks
是 Reactor 提供的一组钩子,可以用于全局错误处理,捕获整个流的错误。
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
public class ReactorHooksErrorHandlingExample {
public static void main(String[] args) {
// 使用 Hooks 进行全局错误处理
Hooks.onOperatorError((error, reference) -> {
System.err.println("Global Error Handling: " + error.getMessage());
return error;
});
Flux<Integer> source = Flux.just(1, 2, 0, 4, 5);
// 流中的错误将被全局处理
source.map(x -> 10 / x)
.subscribe(
data -> System.out.println("Received: " + data),
error -> System.err.println("Subscriber Error: " + error.getMessage())
);
}
}
在这个例子中,我们使用 Hooks.onOperatorError
来设置全局错误处理,当流中发生错误时,会调用全局错误处理的回调方法。这可以用于捕获整个流的错误,而不是每个 subscribe
中单独处理。
ConnectableFlux
进行热序列ConnectableFlux
是 Reactor 提供的一种特殊类型的 Flux,它允许在订阅之前预热(开始生成元素),并在多个订阅者之间共享相同的序列。
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import java.time.Duration;
public class ReactorConnectableFluxExample {
public static void main(String[] args) {
ConnectableFlux<Integer> connectableFlux = Flux.range(1, 3)
.delayElements(Duration.ofSeconds(1))
.publish(); // 将普通的 Flux 转换为 ConnectableFlux
connectableFlux.connect(); // 开始生成元素
// 第一个订阅者
connectableFlux.subscribe(data -> System.out.println("Subscriber 1: " + data));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 第二个订阅者,共享相同的序列
connectableFlux.subscribe(data -> System.out.println("Subscriber 2: " + data));
// 结果:
// Subscriber 1: 1
// Subscriber 1: 2
// Subscriber 2: 2
// Subscriber 1: 3
// Subscriber 2: 3
}
}
在这个例子中,我们使用 publish
方法将普通的 Flux 转换为 ConnectableFlux
,通过 connect
方法开始生成元素。第一个订阅者在元素生成过程中订阅,然后等待了 2 秒后,第二个订阅者也开始订阅,两者共享相同的序列。这种方式可以用于创建热序列,使得订阅者能够共享相同的元素序列。
Flux.defer
实现延迟订阅Flux.defer
允许你在每次订阅时创建一个新的 Flux,从而实现延迟订阅。这对于需要在每次订阅时执行一些逻辑的场景非常有用。
import reactor.core.publisher.Flux;
public class ReactorFluxDeferExample {
public static void main(String[] args) {
Flux<Integer> deferredFlux = Flux.defer(() -> {
// 在每次订阅时创建新的 Flux
System.out.println("Creating new Flux");
return Flux.just(1, 2, 3);
});
// 第一个订阅
deferredFlux.subscribe(data -> System.out.println("Subscriber 1: " + data));
// 第二个订阅
deferredFlux.subscribe(data -> System.out.println("Subscriber 2: " + data));
// 结果:
// Creating new Flux
// Subscriber 1: 1
// Subscriber 1: 2
// Subscriber 1: 3
// Creating new Flux
// Subscriber 2: 1
// Subscriber 2: 2
// Subscriber 2: 3
}
}
在这个例子中,Flux.defer
中的 lambda 表达式将在每次订阅时执行,因此每个订阅都会创建一个新的 Flux。这对于那些需要在每次订阅时重新生成数据的情况非常有用。
Flux.handle
处理元素和错误Flux.handle
方法用于处理元素和错误,通过提供一个 BiConsumer
处理每个元素,并通过提供一个 BiConsumer
处理错误。
import reactor.core.publisher.Flux;
public class ReactorFluxHandleExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.just(1, 2, 0, 4, 5);
// 处理元素和错误
Flux<Integer> handledFlux = source.handle((value, sink) -> {
if (value != 0) {
sink.next(value); // 处理元素
} else {
sink.error(new RuntimeException("Cannot divide by zero")); // 处理错误
}
});
handledFlux.subscribe(
System.out::println,
error -> System.err.println("Error: " + error.getMessage())
);
}
}
在这个例子中,我们使用 Flux.handle
处理每个元素,如果元素不为零,则将其发送到下游;如果元素为零,则通过 sink.error
处理错误。这可以用于处理元素和错误的场景。
Mono.handle
处理元素和错误Mono.handle
方法与 Flux.handle
类似,用于处理单个元素和错误。
import reactor.core.publisher.Mono;
public class ReactorMonoHandleExample {
public static void main(String[] args) {
Mono<Integer> source = Mono.just(10);
// 处理元素和错误
Mono<Integer> handledMono = source.handle((value, sink) -> {
if (value > 0) {
sink.next(value); // 处理元素
} else {
sink.error(new RuntimeException("Invalid value")); // 处理错误
}
});
handledMono.subscribe(
System.out::println,
error -> System.err.println("Error: " + error.getMessage())
);
}
}
在这个例子中,我们使用 Mono.handle
处理单个元素,如果元素为正数,则发送到下游;如果元素不为正数,则通过 sink.error
处理错误。这可以用于处理单个元素和错误的场景。
太太太多了,到此为止吧~~~~
学习打卡day08:响应式编程Reactor API大全(下)