Reactor 是一个基于响应式编程的库,主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API,包括创建、转换、过滤、组合等操作符,用于处理异步数据流。以下是一些 Reactor 的主要 API 示例:
pom依赖
<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2023.0.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.7.2</version> <scope>test</scope> </dependency> </dependencies>
elapsed
方法进行时间测量elapsed
方法可以用于测量元素发射之间的时间间隔,返回包含时间间隔和元素的元组。
import reactor.core.publisher.Flux;
import java.time.Duration;
public class ReactorElapsedExample {
public static void main(String[] args) throws InterruptedException {
Flux<Integer> source = Flux.just(1, 2, 3, 4, 5)
.delayElements(Duration.ofSeconds(1));
source.elapsed()
.subscribe(tuple -> {
long elapsedTime = tuple.getT1();
int value = tuple.getT2();
System.out.println("Elapsed Time: " + elapsedTime + "ms, Value: " + value);
});
Thread.sleep(23333);
}
}
cache
方法进行结果缓存cache
方法可以用于缓存结果,避免多次计算相同的数据流。
import reactor.core.publisher.Flux;
public class ReactorCacheExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 3)
.log() //日志
.cache();
source.subscribe(System.out::println); // 输出: 1, 2, 3
source.subscribe(System.out::println); // 输出: 1, 2, 3 直接从缓存中取,日志中显示,未调用request、onNext等方法
}
}
reduce
方法进行聚合操作reduce
方法用于对数据流中的元素进行聚合操作,返回一个包含最终结果的 Mono
。
import reactor.core.publisher.Flux;
public class ReactorReduceExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 5);
source.reduce(Integer::sum)
.subscribe(result -> System.out.println("Sum: " + result)); // 输出: Sum: 15
}
}
interval
方法进行周期性操作interval
方法可以用于创建一个周期性的数据流,用于执行定时任务。
import reactor.core.publisher.Flux;
import java.time.Duration;
public class ReactorIntervalExample {
public static void main(String[] args) throws InterruptedException {
Flux.interval(Duration.ofSeconds(1))
.take(5) // 限制产生的元素数量
.subscribe(System.out::println);
Thread.sleep(233333);
}
}
onErrorContinue
方法进行错误处理onErrorContinue
方法允许在发生错误时继续处理数据流,并提供一个处理函数,用于处理错误。
import reactor.core.publisher.Flux;
public class ReactorOnErrorContinueExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.just(1, 2, 0, 4, 5);
// 在发生除零错误时继续处理数据流
source.map(x -> 10 / x)
.onErrorContinue((error, value) -> {
//10/0触发的异常会在最后打印
System.err.println("Error: " + error.getMessage() + ", Value: " + value);
})
.subscribe(System.out::println);
}
}
materialize
方法进行错误通知materialize
方法用于将正常元素和错误信息封装为通知对象,使得错误信息也成为数据流的一部分。
import reactor.core.publisher.Flux;
public class ReactorMaterializeExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.just(1, 2, 0, 4, 5);
// 将正常元素和错误信息封装为通知对象
source.map(x -> 10 / x)
.materialize()
.subscribe(System.out::println);
}
}
expand
方法进行递归操作expand
方法用于对数据流进行递归操作,产生新的元素并加入数据流。
import reactor.core.publisher.Flux;
public class ReactorExpandExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.just(1, 2, 3);
// 对数据流进行递归操作,每个元素产生两个新元素
source.expand(value -> Flux.just(value * 2, value * 3))
.take(22) // 限制产生的元素数量
.subscribe(System.out::println);
//原始 新元素 ->新元素 ->新元素...
//1 2 3 -> 2 3 4 6 6 9 ->4 6 6 9 8 12 12 18 12 18 18 27 -> 8 ...
}
}
checkpoint
方法进行调试checkpoint
方法用于在操作链中设置断点,以便在调试时更容易定位问题。
import reactor.core.publisher.Flux;
public class ReactorCheckpointExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 5);
// 在操作链中设置断点
source.checkpoint("Initial Source")
.map(x -> x * 2)
.checkpoint("Mapped Source")
.subscribe(System.out::println);
}
}
groupBy
方法进行分组操作groupBy
方法用于将数据流中的元素进行分组,返回一个 GroupedFlux
。
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
public class ReactorGroupByExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 10);
// 将数据流中的元素按奇偶分组
Flux<GroupedFlux<String, Integer>> groupedFlux = source.groupBy(value -> value % 2 == 0 ? "Even" : "Odd");
groupedFlux.subscribe(group -> {
String key = group.key();
group.subscribe(value -> System.out.println(key + ": " + value));
});
}
}
concatMap
方法进行顺序操作concatMap
方法用于对数据流中的元素进行顺序操作,并保持元素的相对顺序。
import reactor.core.publisher.Flux;
public class ReactorConcatMapExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 3);
// 对每个元素进行异步操作,保持相对顺序
source.concatMap(value -> Flux.just(value * 2).log())
.subscribe(System.out::println);
}
}
block
方法获取结果在某些情况下,可以使用 block
方法来阻塞等待数据流的完成,并获取最终结果。
import reactor.core.publisher.Flux;
public class ReactorBlockExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 3);
// 阻塞等待数据流的完成,并获取最终结果
Integer result = source.reduce((x, y) -> x + y).block();
System.out.println("Sum: " + result); // 输出: Sum: 6
}
}
doFinally
方法进行清理操作doFinally
方法用于在数据流完成时执行清理操作,无论是正常完成还是发生错误。
import reactor.core.publisher.Flux;
public class ReactorDoFinallyExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 3);
source
.doFinally(signalType -> System.out.println("Finally: " + signalType))
.subscribe(System.out::println);
}
}
log
方法进行日志记录log
方法用于在操作链中添加日志记录,以便更好地了解数据流的处理过程。
import reactor.core.publisher.Flux;
public class ReactorLogExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 3);
source.log()
.subscribe(System.out::println);
}
}
create
方法创建自定义 Publishercreate
方法用于创建自定义的 Flux
或 Mono
,通过编程方式发射元素和控制订阅。
import reactor.core.publisher.Flux;
public class ReactorCreate2Example {
public static void main(String[] args) {
Flux<Integer> customFlux = Flux.create(emitter -> {
for (int i = 1; i <= 5; i++) {
emitter.next(i);
}
emitter.complete();
});
customFlux.subscribe(System.out::println);
}
}
sample
方法进行采样操作sample
方法用于在固定的时间间隔内从数据流中采样元素。
import reactor.core.publisher.Flux;
import java.time.Duration;
public class ReactorSampleExample {
public static void main(String[] args) throws InterruptedException {
Flux<Integer> source = Flux.range(1, 10).delayElements(Duration.ofSeconds(1)); // 模拟延迟;
// 在2秒钟采样一个元素
source.sample(Duration.ofSeconds(2)) //数据源1秒一个,采用2秒一次。会漏掉部分数据
.subscribe(System.out::println);
// 阻塞主线程,让采样执行完
Thread.sleep(233333);
}
}
limitRate
方法进行限流limitRate
方法用于限制数据流的速率,防止快速生产者导致的资源耗尽。
import reactor.core.publisher.Flux;
public class ReactorLimitRateExample {
public static void main(String[] args) {
Flux<Integer> source = Flux.range(1, 1000).log();
// 限制数据流的速率为每秒产生100个元素
source.limitRate(100) //一次预取100个元素; 第一次 request(100),以后request(75) (100*75=75)
.subscribe(
data -> {
// 模拟慢速消费者
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(data);
},
error -> System.err.println("Error: " + error),
() -> System.out.println("Done")
);
}
}
学习打卡day08:响应式编程Reactor API大全(中)