响应式常用API

发布时间:2024年01月22日

Reactor 是一个基于响应式编程的库,主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API,包括创建、转换、过滤、组合等操作符,用于处理异步数据流。以下是一些 Reactor 的主要 API 示例:

pom依赖

<dependencyManagement>
	<dependencies>
		<dependency>
   		<groupId>io.projectreactor</groupId>
			<artifactId>reactor-bom</artifactId>
			<version>2023.0.0</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
	</dependencies>
</dependencyManagement>
		
<dependencies>
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-core</artifactId>
	</dependency>
			
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-test</artifactId>
		<scope>test</scope>
	</dependency>
			
	<dependency>
		<groupId>org.junit.jupiter</groupId>
		<artifactId>junit-jupiter</artifactId>
		<version>5.7.2</version>
		<scope>test</scope>
	</dependency>
</dependencies>
22. 使用 Reactor 的 elapsed 方法进行时间测量

elapsed 方法可以用于测量元素发射之间的时间间隔,返回包含时间间隔和元素的元组。

import reactor.core.publisher.Flux;
import java.time.Duration;
public class ReactorElapsedExample {
    
    public static void main(String[] args) throws InterruptedException {
        Flux < Integer > source = Flux.just(1, 2, 3, 4, 5).delayElements(Duration.ofSeconds(1));
        source.elapsed().subscribe(tuple - > {
            long elapsedTime = tuple.getT1();int value = tuple.getT2();System.out.println("Elapsed Time: " + elapsedTime + "ms, Value: " + value);
        });
        Thread.sleep(23333);
    }
}
23. 使用 Reactor 的 cache 方法进行结果缓存

cache 方法可以用于缓存结果,避免多次计算相同的数据流。

import reactor.core.publisher.Flux;
public class ReactorCacheExample {

    public static void main(String[] args) {
        Flux < Integer > source = Flux.range(1, 3)
            .log()
            .cache();
        source.subscribe(System.out::println);
        // 输出: 1, 2, 3
        source.subscribe(System.out::println);
        // 输出: 1, 2, 3 直接从缓存中取,日志中显示,未调用request、onNext等方法
    }
}
24. 使用 Reactor 的 reduce 方法进行聚合操作

reduce 方法用于对数据流中的元素进行聚合操作,返回一个包含最终结果的 Mono

import reactor.core.publisher.Flux;
public class ReactorReduceExample {

    public static void main(String[] args) {
        Flux < Integer > source = Flux.range(1, 5);
        // 输出: Sum: 15
        source.reduce(Integer::sum).subscribe(result - > System.out.println("Sum: " + result));
    }
}
25. 使用 Reactor 的 interval 方法进行周期性操作

interval 方法可以用于创建一个周期性的数据流,用于执行定时任务。

import reactor.core.publisher.Flux;
import java.time.Duration;
public class ReactorIntervalExample {

    public static void main(String[] args) throws InterruptedException {
        Flux.interval(Duration.ofSeconds(1))
            // 限制产生的元素数量
            .take(5)
            .subscribe(System.out::println);
        Thread.sleep(233333);
    }
}
26. 使用 Reactor 的 onErrorContinue 方法进行错误处理

onErrorContinue 方法允许在发生错误时继续处理数据流,并提供一个处理函数,用于处理错误。

import reactor.core.publisher.Flux;
public class ReactorOnErrorContinueExample {

    public static void main(String[] args) {
        Flux < Integer > source = Flux.just(1, 2, 0, 4, 5);
        // 在发生除零错误时继续处理数据流
        source.map(x - > 10 / x)
            .onErrorContinue((error, value) - > {
                //10/0触发的异常会在最后打印
                System.err.println("Error: " + error.getMessage() + ", Value: " + value);
            })
            .subscribe(System.out::println);
    }
}
28. 使用 Reactor 的 materialize 方法进行错误通知

materialize 方法用于将正常元素和错误信息封装为通知对象,使得错误信息也成为数据流的一部分。

import reactor.core.publisher.Flux;
public class ReactorMaterializeExample {

    public static void main(String[] args) {
        Flux < Integer > source = Flux.just(1, 2, 0, 4, 5);
        // 将正常元素和错误信息封装为通知对象
        source.map(x - > 10 / x)
            .materialize()
            .subscribe(System.out::println);
    }
}
29. 使用 Reactor 的 expand 方法进行递归操作

expand 方法用于对数据流进行递归操作,产生新的元素并加入数据流。

import reactor.core.publisher.Flux;
public class ReactorExpandExample {

    public static void main(String[] args) {
        Flux < Integer > source = Flux.just(1, 2, 3);
        // 对数据流进行递归操作,每个元素产生两个新元素
        source.expand(value - > Flux.just(value * 2, value * 3))
            //限制产生的元素数量
            .take(22)
            .subscribe(System.out::println);
        //原始   新元素 ->新元素 ->新元素...//1 2 3   -> 2 3  4 6  6 9 ->4 6  6 9     8 12  12 18    12 18   18 27 -> 8 ...
    }
}
30. 使用 Reactor 的 checkpoint 方法进行调试

checkpoint 方法用于在操作链中设置断点,以便在调试时更容易定位问题。

import reactor.core.publisher.Flux;
public class ReactorCheckpointExample {
    
    public static void main(String[] args) {
        Flux < Integer > source = Flux.range(1, 5);
        // 在操作链中设置断点
        source.checkpoint("Initial Source")
            .map(x - > x * 2)
            .checkpoint("Mapped Source")
            .subscribe(System.out::println);
    }
}
  • 好像没啥用
31. 使用 Reactor 的 groupBy 方法进行分组操作

groupBy 方法用于将数据流中的元素进行分组,返回一个 GroupedFlux

import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
public class ReactorGroupByExample {

    public static void main(String[] args) {
        Flux < Integer > source = Flux.range(1, 10);
        // 将数据流中的元素按奇偶分组
        Flux < GroupedFlux < String, Integer >> groupedFlux = source.groupBy(value - > value % 2 == 0 ? "Even" : "Odd");
        groupedFlux.subscribe(group - > {
            String key = group.key();
            group.subscribe(value - > System.out.println(key + ": " + value));
        });
    }
}
32. 使用 Reactor 的 concatMap 方法进行顺序操作

concatMap 方法用于对数据流中的元素进行顺序操作,并保持元素的相对顺序。

import reactor.core.publisher.Flux;
public class ReactorConcatMapExample {
    public static void main(String[] args) {
        Flux < Integer > source = Flux.range(1, 3);
        // 对每个元素进行异步操作,保持相对顺序
        source.concatMap(value - > Flux.just(value * 2).log()).subscribe(System.out::println);
    }
}
33. 使用 Reactor 的 block 方法获取结果

在某些情况下,可以使用 block 方法来阻塞等待数据流的完成,并获取最终结果。

import reactor.core.publisher.Flux;
public class ReactorBlockExample {

    public static void main(String[] args) {
        Flux < Integer > source = Flux.range(1, 3);
        // 阻塞等待数据流的完成,并获取最终结果
        Integer result = source.reduce((x, y) - > x + y).block();
        System.out.println("Sum: " + result); // 输出: Sum: 6
    }
}
35. 使用 Reactor 的 doFinally 方法进行清理操作

doFinally 方法用于在数据流完成时执行清理操作,无论是正常完成还是发生错误。

import reactor.core.publisher.Flux;
public class ReactorDoFinallyExample {
    public static void main(String[] args) {
        Flux < Integer > source = Flux.range(1, 3);
        source.doFinally(signalType - > System.out.println("Finally: " + signalType)).subscribe(System.out::println);
    }
}
36. 使用 Reactor 的 log 方法进行日志记录

log 方法用于在操作链中添加日志记录,以便更好地了解数据流的处理过程。

import reactor.core.publisher.Flux;
public class ReactorLogExample {
    public static void main(String[] args) {
        Flux < Integer > source = Flux.range(1, 3);
        source.log().subscribe(System.out::println);
    }
}
37. 使用 Reactor 的 create 方法创建自定义 Publisher

create 方法用于创建自定义的 FluxMono,通过编程方式发射元素和控制订阅。

import reactor.core.publisher.Flux;
public class ReactorCreate2Example {
    public static void main(String[] args) {
        Flux < Integer > customFlux = Flux.create(emitter - > {
            for(int i = 1; i <= 5; i++) {
                emitter.next(i);
            }
            emitter.complete();
        });
        customFlux.subscribe(System.out::println);
    }
}
38. 使用 Reactor 的 sample 方法进行采样操作

sample 方法用于在固定的时间间隔内从数据流中采样元素。

import reactor.core.publisher.Flux;
import java.time.Duration;
public class ReactorSampleExample {
    public static void main(String[] args) throws InterruptedException {
        // 模拟延迟;
        Flux < Integer > source = Flux.range(1, 10).delayElements(Duration.ofSeconds(1));
        // 在2秒钟采样一个元素
        source.sample(Duration.ofSeconds(2))
            //数据源1秒一个,采用2秒一次。会漏掉部分数据
            .subscribe(System.out::println);
        // 阻塞主线程,让采样执行完
        Thread.sleep(233333);
    }
}
41. 使用 Reactor 的 limitRate 方法进行限流

limitRate 方法用于限制数据流的速率,防止快速生产者导致的资源耗尽。

import reactor.core.publisher.Flux;
public class ReactorLimitRateExample {
    public static void main(String[] args) {
        Flux < Integer > source = Flux.range(1, 1000).log();
        // 限制数据流的速率为每秒产生100个元素
        source.limitRate(100)
            //一次预取100个元素; 第一次 request(100),以后request(75) (100*75=75)
            .subscribe(data - > {
                // 模拟慢速消费者
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(data);
            }, error - > System.err.println("Error: " + error), () - > System.out.println("Done"));
    }
}
文章来源:https://blog.csdn.net/qq_37165235/article/details/135741547
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。