Java学习笔记-day06-响应式编程Reactor与Callback、CompletableFuture三种形式异步编码对比

发布时间:2024年01月10日

1. Reactor是什么

  • Reactor 是一个基于Reactive Streams规范的响应式编程框架。它提供了一组用于构建异步、事件驱动、响应式应用程序的工具和库。Reactor 的核心是 Flux(表示一个包含零到多个元素的异步序列)和 Mono表示一个包含零或一个元素的异步序列)。
  • Reactor 通过提供响应式的操作符,如mapfilterflatMap等,使得开发者能够方便地进行数据流的转换和处理。

2. Reactor、Callback、CompletableFuture三种形式异步编码对比

  • 编码简洁程度Reactor最优
  • Reactor线程利用率最高(因实现了Reactive Streams规范,拥有背压+事件驱动特性,此处暂不展开)

代码如下:

pom依赖

<dependencyManagement>
   <dependencies>
       <dependency>
           <groupId>io.projectreactor</groupId>
           <artifactId>reactor-bom</artifactId>
           <version>2023.0.0</version>
           <type>pom</type>
           <scope>import</scope>
       </dependency>
   </dependencies>
</dependencyManagement>

<dependencies>
   <dependency>
       <groupId>io.projectreactor</groupId>
       <artifactId>reactor-core</artifactId>
   </dependency>
</dependencies>

Callback回调地狱

interface FirstCallback {
    void onCompleteFirst(String result);

    void onErrorFirst(Exception e);
}

interface SecondCallback {
    void onCompleteSecond(String result);

    void onErrorSecond(Exception e);
}

interface ThirdCallback {
    void onCompleteThird(String result);

    void onErrorThird(Exception e);
}

class AsyncOperations {
    static void firstOperation(FirstCallback firstCallback) {
        new Thread(() -> {
            try {
                // 模拟异步操作
                Thread.sleep(2000);
                // 操作完成后调用回调函数
                firstCallback.onCompleteFirst("First operation completed");
            } catch (Exception e) {
                // 发生异常时调用错误回调
                firstCallback.onErrorFirst(e);
            }
        }).start();
    }

    static void secondOperation(String input, SecondCallback secondCallback) {
        new Thread(() -> {
            try {
                // 模拟异步操作
                Thread.sleep(2000);
                // 操作完成后调用回调函数
                secondCallback.onCompleteSecond("Second operation completed with input: " + input);
            } catch (Exception e) {
                // 发生异常时调用错误回调
                secondCallback.onErrorSecond(e);
            }
        }).start();
    }

    static void thirdOperation(String input, ThirdCallback thirdCallback) {
        new Thread(() -> {
            try {
                // 模拟异步操作
                Thread.sleep(2000);
                // 操作完成后调用回调函数
                thirdCallback.onCompleteThird("Third operation completed with input: " + input);
            } catch (Exception e) {
                // 发生异常时调用错误回调
                thirdCallback.onErrorThird(e);
            }
        }).start();
    }
}

public class CallbackHellExample {
    public static void main(String[] args) {
        AsyncOperations.firstOperation(new FirstCallback() {
            @Override
            public void onCompleteFirst(String result) {
                System.out.println("First Callback: " + result);

                // 第一次操作完成后调用第二次操作
                AsyncOperations.secondOperation(result, new SecondCallback() {
                    @Override
                    public void onCompleteSecond(String result) {
                        System.out.println("Second Callback: " + result);

                        // 第二次操作完成后调用第三次操作
                        AsyncOperations.thirdOperation(result, new ThirdCallback() {
                            @Override
                            public void onCompleteThird(String result) {
                                System.out.println("Third Callback: " + result);
                            }

                            @Override
                            public void onErrorThird(Exception e) {
                                System.out.println("Error in Third Callback: " + e.getMessage());
                            }
                        });
                    }

                    @Override
                    public void onErrorSecond(Exception e) {
                        System.out.println("Error in Second Callback: " + e.getMessage());
                    }
                });
            }

            @Override
            public void onErrorFirst(Exception e) {
                System.out.println("Error in First Callback: " + e.getMessage());
            }
        });

        // 主线程继续执行其他操作
        System.out.println("Main thread continues...");
    }
}

CompletableFuture优化Callback回调地狱

public class CompletableFutureExample {
    public static void main(String[] args) {
        CompletableFuture<String> firstOperation = CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟异步操作
                Thread.sleep(2000);
                return "First operation completed";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        CompletableFuture<String> secondOperation = firstOperation.thenApplyAsync(result -> {
            System.out.println("First CompletableFuture: " + result);
            try {
                // 模拟异步操作
                Thread.sleep(2000);
                return "Second operation completed with input: " + result;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        CompletableFuture<String> thirdOperation = secondOperation.thenApplyAsync(result -> {
            System.out.println("Second CompletableFuture: " + result);
            try {
                // 模拟异步操作
                Thread.sleep(2000);
                return "Third operation completed with input: " + result;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        thirdOperation.whenComplete((result, throwable) -> {
            if (throwable == null) {
                System.out.println("Third CompletableFuture: " + result);
            } else {
                System.out.println("Error in CompletableFuture: " + throwable.getMessage());
            }
        });

        // 主线程继续执行其他操作
        System.out.println("Main thread continues...");

        // 等待所有操作完成
        CompletableFuture.allOf(firstOperation, secondOperation, thirdOperation).join();
    }
}

Reactor优化Callback回调地狱

public class ReactorOptimizedExample {
    public static void main(String[] args) {
        Mono.fromCallable(() -> {
                    // 模拟异步操作
                    Thread.sleep(2000);
                    return "First operation completed";
                })
                .subscribeOn(Schedulers.boundedElastic())
                .flatMap(result -> {
                    System.out.println("First Reactor: " + result);
                    return Mono.fromCallable(() -> {
                        // 模拟异步操作
                        Thread.sleep(2000);
                        return "Second operation completed with input: " + result;
                    }).subscribeOn(Schedulers.boundedElastic());
                })
                .flatMap(result -> {
                    System.out.println("Second Reactor: " + result);
                    return Mono.fromCallable(() -> {
                        // 模拟异步操作
                        Thread.sleep(2000);
                        return "Third operation completed with input: " + result;
                    }).subscribeOn(Schedulers.boundedElastic());
                })
                .doOnSuccess(result -> System.out.println("Third Reactor: " + result))
                .doOnError(error -> System.out.println("Error in Reactor: " + error.getMessage()))
                .block(); // 阻塞等待操作完成

        // 主线程继续执行其他操作
        System.out.println("Main thread continues...");
    }
}
文章来源:https://blog.csdn.net/Demo_00/article/details/135507437
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。