【Java并发】聊聊Future如何提升商品查询速度

发布时间:2024年01月24日

java中可以通过new thread、实现runnable来进行实现线程。但是唯一的缺点是没有返回值、以及抛出异常,而callable就可以解决这个问题。通过配合使用futuretask来进行使用。
并且Future提供了对任务的操作,取消,查询是否完成,获取结果。

Demo

        FutureTask<Integer> futureTask = new FutureTask<Integer>(() -> {
            Thread.sleep(10000);
            System.out.println("调用三方翻译接口");
            return 1024;
        });

        new Thread(futureTask).start();

        Integer integer = futureTask.get();
//        Integer integer = futureTask.get(1, TimeUnit.SECONDS);

        while (true) {
            if (futureTask.isDone()) {
                System.out.println("完成任务");
                break;
            } else {
                System.out.println("执行中,稍等.");
            }
        }
        Integer x = futureTask.get();
        System.out.println(x);

在这里插入图片描述

实现原理

FutureTask核心代码

基本属性

     /* 
     *  Possible state transitions:
     * NEW -> COMPLETING -> NORMAL 任务正常执行,返回结果是正常的结果
     * NEW -> COMPLETING -> EXCEPTIONAL 任务正常执行,但是返回结果是异常
     * NEW -> CANCELLED  任务直接被取消的流程
     * NEW -> INTERRUPTING -> INTERRUPTED  
     */
     // 当前任务的状态
    private volatile int state;
    private static final int NEW          = 0; // 任务的初始化状态
    private static final int COMPLETING   = 1; // Callable的结果,正常封装给当前FutureTask
    private static final int NORMAL       = 2; // Normal任务正常结束
    private static final int EXCEPTIONAL  = 3; // 执行任务时,发生了异常
    private static final int CANCELLED    = 4; // 任务被取消了
    private static final int INTERRUPTING = 5; // 线程的中断状态,被设置为了ture 
    private static final int INTERRUPTED  = 6; // 线程被中断了

    // 当前要执行的任务
    private Callable<V> callable;
    // 存放任务返回结果的属性,也就是futureTask.get 需要获取的结果
    private Object outcome; 
    // 执行任务的线程
    private volatile Thread runner;
    // 单向链表,存放通过get方法挂起等待的线程
    private volatile WaitNode waiters;

任务

当线程start() 之后其实执行的就是call()方法。也就是通过futureTask的run方法执行的call()方法。

    // run方法的执行流程,最终会执行callable的call方法
    public void run() {
        // 保证任务的状态是NEW才执行,或者CAS将当前线程设置为runner.
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
        // 准备执行任务
            Callable<V> c = callable;
            // 任务不为null 并且state==new
            if (c != null && state == NEW) {
                V result; // 返回的结果
                boolean ran; // 任务执行是否正常结束
                try { 
                    // 调用callable的call方法。
                    result = c.call();
                    ran = true; // 正常结束 ran = true
                } catch (Throwable ex) {
                    result = null; // 异常结果为null 
                    ran = false; // 异常结束 ran = false
                    setException(ex); // 设置异常信息
                }
                if (ran) 
                    // 正常执行结束,设置返回结果
                    set(result);
            }
        } finally {
      		// 执行完毕 或者异常 ,都将runner设置为null 
            runner = null;
            // 拿到状态
            int s = state;
            // 如果中断 要做一些事情
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    //设置result值
    protected void set(V v) {
        // cas设置state为 new-completing
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v; // 返回结果给outcome
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
			// 下面说
            finishCompletion();
        }
    }

get

get方法获取返回结果,到挂起的位置

    public V get() throws InterruptedException, ExecutionException {
        //拿状态
        int s = state;
        //满足未完成状态 就需要等待
        if (s <= COMPLETING)
        // 挂起线程,等待拿结果。
            s = awaitDone(false, 0L);
        return report(s);
    }
    //线程要等待任务执行结束,等待任务执行的状态大于completing状态
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // dealline get() 就是0 ,如果是get(time,unit) 追加当前系统时间
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        // 构建waitNode 
        WaitNode q = null;
        boolean queued = false;
        //死循环
        for (;;) {
             //判断get线程是否中断了
            if (Thread.interrupted()) {
                //将当前节点从waiter中移除
                removeWaiter(q);
                //并且抛出中断异常
                throw new InterruptedException();
            }
			// 拿到现在任务的状态
            int s = state;
            // 判断任务是否执行完毕
            if (s > COMPLETING) {
                // q != null。表示设置过了 直接移除waitNode线程
                if (q != null)
                    q.thread = null;
                    //返回当前任务状态
                return s;
            }
            // 如果任务的状态处于completing 
            else if (s == COMPLETING) // cannot time out yet
               //让步
                Thread.yield();
            else if (q == null)
                // 线程状态还是new  call方法可能还没有执行 准备挂起线程
                // 封装waitNode存放当前线程
                q = new WaitNode();
            else if (!queued)
                // 如果waitNode还没有排在waiters中,就拍进来(头插法) cas修改
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
                // 如果get(time,unit)挂起线程方式
            else if (timed) {
                // 计算挂起时间
                nanos = deadline - System.nanoTime();
                // 挂起时间 是否小于等于0 
                if (nanos <= 0L) {
                    // 移除waiter 当前node
                    removeWaiter(q);
                    // 返回任务状态
                    return state;
                }
                //正常指定挂起时间即可
                LockSupport.parkNanos(this, nanos);
            }
            else
                //get()挂起线程的方式
                LockSupport.park(this);
        }
    }

   

finishCompletion唤醒线程

线程挂起后,如果任务执行完毕,由finishCompletioon唤醒线程

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            // 拿到第一个阶段,cas的方式修改 将其设置为null 
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                   // 拿到q线程信息
                    Thread t = q.thread;
                    // 线程信息不为空
                    if (t != null) {
                       // 将waitNode的thread设置为null 
                        q.thread = null;
                        // 唤醒这个线程
                        LockSupport.unpark(t);
                    }
                    //往后遍历 接着唤醒
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    //执行next的waitNode 
                    q = next;
                }
                break;
            }
        }
 
 		//拓展方法 没任何实现 
        done();

		//任务执行完毕
        callable = null;        // to reduce footprint
    }

report

// 任务结束。
private V report(int s) throws ExecutionException {
    // 拿到结果
    Object x = outcome;
    // 判断是正常返回结束
    if (s == NORMAL)
        // 返回结果
        return (V)x;
    // 任务状态是大于取消
    if (s >= CANCELLED)
        // 甩异常。
        throw new CancellationException();
    // 扔异常。
    throw new ExecutionException((Throwable)x);
}

// 正常返回 report
// 异常返回 report
// 取消任务 report
// 中断任务 awaitDone

流程图

在这里插入图片描述

文章来源:https://blog.csdn.net/jia970426/article/details/135760070
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。