java多线程在项目中的实际应用

发布时间:2024年01月18日

实际项目中,多线程都是结合线程池使用的。
多线程实际应用有两种情况,一种是异步任务执行,不需要返回值,另一种是异步任务的查询,需要返回值。

1、批量执行异步任务

public static void main(String[] args) throws ExecutionException, InterruptedException {
        Integer count = 10;
        // 批量创建时采用多线程
        ExecutorService executorService = new ThreadPoolExecutor(count, count,
                10L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                new ThreadFactoryBuilder()
                        .setNameFormat("excute-pool-%d")
                        .build(),
                new ThreadPoolExecutor.AbortPolicy());
        CountDownLatch countDownLatch = new CountDownLatch(count);

        for (int i = 1; i <= count; i++) {
            Runnable runnable = () -> {
                try {
                    Thread.sleep(5000);
                    //执行任务逻辑代码
                    System.out.println("任务:" + Thread.currentThread().getName());
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            };
            executorService.submit(runnable);
        }
        try {
            //任务执行完,才继续往下走,如果不用管任务是否执行完,把该代码注释掉即可
            //具体要结合业务场景使用
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (Exception e) {
            e.printStackTrace();
        }
        executorService.shutdown();
        System.out.println(".......");
    }

2、批量异步查询-CompletableFuture的使用

CompletableFuture是java8推出的一个非常简便的多线程写法,在上面 "1、批量执行异步任务"中也可以用CompletableFuture来写,而且更简便。

2.1 几种创建方式

1、runAsync()是没返回结果的,supplyAsync()可以指定返回结果。
2、使用没有指定Executor(线程池)的方法时,内部使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。
3、如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Runnable runnable = () -> System.out.println("无返回结果异步任务");
        CompletableFuture.runAsync(runnable);

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                System.out.println("有返回值的异步任务1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello World";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                System.out.println("有返回值的异步任务2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello World";
        });
        //get方法会阻塞主线程,直到线程池中的线程执行完
        future1.get();
        future2.get();
        //项目中一般会采用这个写法,作用实际是和get方法一样的
        CompletableFuture.allOf(future1,future2).join();
        System.out.println("......");
    }

2.2 线程执行结果处理

当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

1、Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。
2、方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
3、这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Runnable runnable = () -> System.out.println("无返回结果异步任务");
        CompletableFuture.runAsync(runnable);

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                System.out.println("有返回值的异步任务1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello World";
        });
        //等待任务执行完成
        future1.join();
        // 任务完成或异常方法完成时执行该方法
        // 如果出现了异常,任务结果为null
        future1.whenComplete(new BiConsumer<String, Throwable>() {
            @Override
            public void accept(String t, Throwable action) {
                System.out.println(t + " 执行完成!");
            }
        });
        System.out.println("......");
    }

2.3 线程执行结果转换

将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。

2.3.1 thenApply

thenApply接收一个函数作为参数,使用该函数处理上一个CompletableFuture调用的结果,并返回一个具有处理结果的Future对象。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            return "Hello World";
        }).thenApply(str-> str+"-haha");
        System.out.println(future1);

2.3.2 thenCompose

thenCompose的参数为一个返回CompletableFuture实例的函数,该函数的参数是先前计算步骤的结果。

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
CompletableFuture<Integer> future = CompletableFuture
    .supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            int number = new Random().nextInt(30);
            System.out.println("第一次运算:" + number);
            return number;
        }
    })
    .thenCompose(new Function<Integer, CompletionStage<Integer>>() {
        @Override
        public CompletionStage<Integer> apply(Integer param) {
            return CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int number = param * 2;
                    System.out.println("第二次运算:" + number);
                    return number;
                }
            });
        }
    });

thenApply 和 thenCompose的区别:
1、thenApply转换的是泛型中的类型,返回的是同一个CompletableFuture;
2、thenCompose将内部的CompletableFuture调用展开来并使用上一个CompletableFutre调用的结果在下一步的CompletableFuture调用中进行运算,是生成一个新的CompletableFuture。

https://blog.csdn.net/sermonlizhi/article/details/123356877

文章来源:https://blog.csdn.net/weixin_41919486/article/details/135669505
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。