内射老阿姨1区2区3区4区_久久精品人人做人人爽电影蜜月_久久国产精品亚洲77777_99精品又大又爽又粗少妇毛片

如何結(jié)合線程池理解FutureTask及Future源碼

本篇內(nèi)容介紹了“如何結(jié)合線程池理解FutureTask及Future源碼”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

創(chuàng)新互聯(lián)是專業(yè)的宕昌網(wǎng)站建設(shè)公司,宕昌接單;提供成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì),網(wǎng)頁設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行宕昌網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來合作!

1.FutureTask繼承體系結(jié)構(gòu)

如何結(jié)合線程池理解FutureTask及Future源碼

2.從簡單示例開始分析

多線程Runnable和Callable接口這里就不多說了,Callable有返回值,Runnable無返回值。

public class FutureTaskTest {    public static void main(String[] args) {        ExecutorService executor = null;        try {            //線程池提交Runnable接口任務(wù)            executor.execute(new MyRunnable());            //線程池提交Callable接口任務(wù)            executor = Executors.newFixedThreadPool(2);            Future f = executor.submit(new MyCallLable<Integer>());            System.out.println(f.get());            //單線程方式            FutureTask ft = new FutureTask(new MyCallLable<Integer>());            Thread t = new Thread(ft);            t.start();            System.out.println(ft.get());        } catch (InterruptedException e) {            e.printStackTrace();        } catch (ExecutionException e) {            e.printStackTrace();        } finally {            if (executor != null) {                executor.shutdown();            }        }    }    static class MyCallLable<Integer> implements Callable {        @Override        public Object call() throws Exception {            return 1;        }    }    static class MyRunnable implements  Runnable {        @Override        public void run() {            System.out.println(2);        }    }}

3.以線程池提交方式分析FutureTask源碼

3.1.Executors.newFixedThreadPool分析

public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(nThreads, nThreads,                                  0L, TimeUnit.MILLISECONDS,                                  new LinkedBlockingQueue<Runnable>());}

該方法創(chuàng)建了一個(gè)核心線程和最大線程數(shù)一樣的線程池,使用LinkedBlockingQueue這種無界隊(duì)列存儲(chǔ)多余的任務(wù),也就是說,如果我們使用這種jdk自帶的線程提交任務(wù)的時(shí)候,由于隊(duì)列是無界的,當(dāng)任務(wù)達(dá)到一定數(shù)量會(huì)造成內(nèi)存溢出。這里不再分析ThreadPoolExecutor代碼,有興趣的可以看我的另一篇博文專門分析ThreadPoolExecutor源碼的。該方法返回一個(gè)ExecutorService。

ThreadPoolExecutor繼承體系如下圖:

如何結(jié)合線程池理解FutureTask及Future源碼

3.2.ExecutorService.submit方法分析

該方法實(shí)際調(diào)用的是實(shí)現(xiàn)類AbstractExecutorService.submit方法

public <T> Future<T> submit(Callable<T> task) {    if (task == null) throw new NullPointerException();    RunnableFuture<T> ftask = newTaskFor(task);    execute(ftask);    return ftask;}

這里的newTaskFor方法就會(huì)將Callable任務(wù)傳遞到FutureTask類中,并封裝到其Callable屬性中

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {    return new FutureTask<T>(callable);}

3.3.FutureTask屬性分析

    /* 線程狀態(tài)可能的轉(zhuǎn)換:     * NEW -> COMPLETING -> NORMAL     * NEW -> COMPLETING -> EXCEPTIONAL     * NEW -> CANCELLED     * NEW -> INTERRUPTING -> INTERRUPTED     *///當(dāng)前任務(wù)狀態(tài)private volatile int state;//新創(chuàng)建private static final int NEW          = 0;//即將結(jié)束,但還沒有結(jié)束private static final int COMPLETING   = 1;//正常結(jié)束private static final int NORMAL       = 2;//異常狀態(tài):Callable接口的Call方法中具體業(yè)務(wù)邏輯出現(xiàn)異常private static final int EXCEPTIONAL  = 3;//任務(wù)被取消private static final int CANCELLED    = 4;//任務(wù)處于中斷中private static final int INTERRUPTING = 5;//任務(wù)被中斷private static final int INTERRUPTED  = 6;//任務(wù)提交傳入的Callable,用來調(diào)用call方法private Callable<V> callable;//Call方法返回值//1.如果任務(wù)正常結(jié)束,返回call方法的返回值//2.如果call方法發(fā)生異常,返回具體的異常信息private Object outcome;//當(dāng)前執(zhí)行的線程private volatile Thread runner;//一個(gè)棧結(jié)構(gòu)的數(shù)據(jù)類型,存儲(chǔ)被get方法阻塞的線程的引用private volatile WaitNode waiters;

3.4.FutureTask構(gòu)造方法分析

public FutureTask(Callable<V> callable) {    //外部需要傳入Callable接口的實(shí)現(xiàn)    if (callable == null)        throw new NullPointerException();    this.callable = callable;    //將線程狀態(tài)設(shè)置為先創(chuàng)建    this.state = NEW;}

3.5.FutureTask執(zhí)行過程分析

3.5.1.使用線程池提交Callable接口情況下調(diào)用過程分析

如何結(jié)合線程池理解FutureTask及Future源碼

從示例的線程池提交Calllable接口的案例中一步步分析:1.executor.submit(new MyCallLable<Integer>())方法提交一個(gè)Callable實(shí)現(xiàn);2.第一步實(shí)際會(huì)調(diào)用AbstractExecutorService.submit方法;3.AbstractExecutorService.submit內(nèi)部調(diào)用newTaskFor方法生成一個(gè)FutureTask對象,并將MyCallLable任務(wù)封裝到其Calllable屬性中;4.AbstractExecutorService.submit方法內(nèi)部調(diào)用ThreadPoolExecutor.execute方法提交FutureTask對象到線程池;5-6-7-8.實(shí)際就是線程池提交一個(gè)任務(wù)的執(zhí)行過程,具體源碼可以看我的另一篇博客,這里比較復(fù)雜,概況的說了下;9-10.線程池execute實(shí)際會(huì)執(zhí)行FutureTask的run方法,在run方法中調(diào)用Callable.call,這就是線程池提交Callable執(zhí)行的流程;

3.5.2.FutureTask.run方法分析

public void run() {    //條件1:當(dāng)前任務(wù)狀態(tài)不是新建狀態(tài)    //條件2:當(dāng)前線程不是FutureTask持有的線程    if (state != NEW ||        !UNSAFE.compareAndSwapObject(this, runnerOffset,                                     null, Thread.currentThread()))        //退出執(zhí)行        return;    try {        //當(dāng)前FutureTask持有的callable        Callable<V> c = callable;        //條件1:當(dāng)前提交的Callable不能為空        //條件2:當(dāng)前線程任務(wù)狀態(tài)為新創(chuàng)建        if (c != null && state == NEW) {            //Callable的返回值            V result;            //任務(wù)是否成功執(zhí)行            boolean ran;            try {                //調(diào)用用戶自定義call方法的邏輯                result = c.call();                //任務(wù)成功執(zhí)行                ran = true;            } catch (Throwable ex) {                //發(fā)生異常                result = null;                ran = false;                setException(ex);            }            //任務(wù)成功執(zhí)行設(shè)置返回值            if (ran)                set(result);        }    } finally {        //run方法結(jié)束持有線程設(shè)置為空,help gc        //這里可能正常執(zhí)行完run方法也可能出現(xiàn)異常退出        runner = null;        //當(dāng)前任務(wù)執(zhí)行狀態(tài)        int s = state;        //如果處于中斷的狀態(tài),包含中斷中和已中斷,釋放cpu資源        if (s >= INTERRUPTING)            handlePossibleCancellationInterrupt(s);    }}

3.5.3.FutureTask.set方法分析

該方法設(shè)置任務(wù)成功執(zhí)行后的執(zhí)行結(jié)果狀態(tài)和返回值,將返回值封裝到outcome屬性中,由于get方法是阻塞的,還需要喚醒阻塞的線程。

protected void set(V v) {    //將狀態(tài)從新建設(shè)置為結(jié)束中    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {        //返回值賦值        outcome = v;        //設(shè)置任務(wù)狀態(tài)為正常結(jié)束        UNSAFE.putOrderedInt(this, stateOffset, NORMAL);         //喚醒被get方法阻塞的線程        finishCompletion();    }}

3.5.6.FutureTask靜態(tài)內(nèi)部類WaitNode分析

在分析finishCompletion方法前,先介紹下WaitNode類。為什么會(huì)有這個(gè)類?我們知道FutureTask.get方法是阻塞的,如果我們在一個(gè)線程內(nèi)多次調(diào)用get方法,這個(gè)從理論上考慮其實(shí)不需要WaitNode的;如果我們又多次創(chuàng)建了線程在其他線程內(nèi)部調(diào)用get方法呢?由于FutureTask.get方法內(nèi)部會(huì)調(diào)用LockSupport.park(Thread)或LockSupport.parkNanos阻塞線程,所以就需要喚醒;而LockSupport.unpark(Thread)解除線程阻塞也需要指定線程,所以這里就需要一種數(shù)據(jù)結(jié)構(gòu)來存儲(chǔ)當(dāng)前線程的引用了。這里就設(shè)計(jì)了WaitNode這個(gè)類,它是一個(gè)單鏈表,而且采用的是頭插法,在遍歷的時(shí)候也是從前往后遍歷的,這就是一個(gè)典型的棧的結(jié)構(gòu),先進(jìn)后出,后進(jìn)先出。這里為什么又是一個(gè)單鏈表結(jié)構(gòu)呢?這是為了方便在任務(wù)結(jié)束的時(shí)候遍歷。

static final class WaitNode {    //當(dāng)前線程的引用    volatile Thread thread;    //指向下一個(gè)節(jié)點(diǎn)    volatile WaitNode next;    WaitNode() { thread = Thread.currentThread(); }}

3.5.7.FutureTask.finishCompletion方法分析

用于喚醒被get方法阻塞的線程

private void finishCompletion() {    // assert state > COMPLETING;    //從頭開始遍歷    for (WaitNode q; (q = waiters) != null;) {        //使用cas方式設(shè)置當(dāng)前waiters為空,防止外部線程調(diào)用cancel導(dǎo)致finishCompletion該方法被調(diào)用        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {            for (;;) {                //獲取當(dāng)前WaitNode對應(yīng)的線程                Thread t = q.thread;                if (t != null) {                    q.thread = null; //help gc                    //喚醒當(dāng)前節(jié)點(diǎn)對應(yīng)的線程                    LockSupport.unpark(t);                }                //獲取當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)                WaitNode next = q.next;                if (next == null)                    break;                q.next = null;//help gc                //將q指向下要給節(jié)點(diǎn)                q = next;            }            break;        }    }    done();    //將callable置為空,help gc    callable = null; }

3.5.8.FutureTask.setException方法分析

該方法將返回值設(shè)置為拋出的異常,將任務(wù)狀態(tài)設(shè)置為EXCEPTIONAL狀態(tài),并調(diào)用finishCompletion方法喚醒被get阻塞的線程。

protected void setException(Throwable t) {    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {        outcome = t;        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state        finishCompletion();    }}

3.5.9.FutureTask.handlePossibleCancellationInterrupt方法分析

private void handlePossibleCancellationInterrupt(int s) {    //如果任務(wù)狀態(tài)處于中斷中,釋放cpu資源    if (s == INTERRUPTING)        while (state == INTERRUPTING)            Thread.yield(); // wait out pending interrupt}

3.5.9.FutureTask.get和FutureTask.get(long timeout, TimeUnit unit)方法分析

兩個(gè)方法區(qū)別不大,唯一的區(qū)別是阻塞線程的時(shí)候使用的LockSupport.parkNanos(this, nanos)和LockSupport.park(this),當(dāng)有時(shí)間條件的時(shí)候LockSupport.parkNanos(this, nanos)會(huì)在指定時(shí)間內(nèi)結(jié)束后自動(dòng)喚醒線程。

這里講講sleep和LockSupport.parkNanos區(qū)別:sleep在指定時(shí)間到期后會(huì)判斷中斷狀態(tài),根據(jù)中斷狀態(tài)來判斷是否需要拋出異常,而LockSupport.parkNanos不會(huì)根據(jù)中斷狀態(tài)做出響應(yīng)。
public V get() throws InterruptedException, ExecutionException {    int s = state;    if (s <= COMPLETING)        s = awaitDone(false, 0L);    return report(s);}public V get(long timeout, TimeUnit unit)    throws InterruptedException, ExecutionException, TimeoutException {    if (unit == null)        throw new NullPointerException();    int s = state;    //unit.toNanos(timeout)將指定時(shí)間格式轉(zhuǎn)化為對應(yīng)的毫微秒    if (s <= COMPLETING &&        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)        throw new TimeoutException();    return report(s);}

3.6.0.FutureTask.awaitDone方法分析

t.interrupted()也是可以喚醒被LockSupport.park()阻塞的線程的
private int awaitDone(boolean timed, long nanos)    throws InterruptedException {    final long deadline = timed ? S        ystem.nanoTime() + nanos : 0L;    WaitNode q = null;    boolean queued = false;    //自旋    for (;;) {        //條件成立說明當(dāng)前線程是被其他線程調(diào)用t.interrupted()這種中斷方式喚醒        if (Thread.interrupted()) {            //從隊(duì)列中移除線程被中斷的節(jié)點(diǎn)            removeWaiter(q);            throw new InterruptedException();        }                int s = state;        //(4).s>COMPLETING成立,說明當(dāng)前任務(wù)已經(jīng)執(zhí)行完,結(jié)果可能有好有壞        if (s > COMPLETING) {            if (q != null)                q.thread = null;            //返回當(dāng)前任務(wù)狀態(tài)            return s;        }        else if (s == COMPLETING) // cannot time out yet            Thread.yield();        //(1).第一次自旋,q=null,創(chuàng)建當(dāng)前線程對應(yīng)的WaitNode對象        else if (q == null)            q = new WaitNode();        //(2).第二次自旋,queued為false,q.next = waiters采用頭插法將當(dāng)前節(jié)點(diǎn)入棧        else if (!queued)            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,                                                 q.next = waiters, q);        //(3).第三次自旋,會(huì)走到這里,將線程阻塞,等待后續(xù)喚醒后繼續(xù)自旋調(diào)用,也可能因?yàn)槌瑫r(shí)后自動(dòng)喚醒        else if (timed) {            nanos = deadline - System.nanoTime();            if (nanos <= 0L) {                //從隊(duì)列中移除get超時(shí)的節(jié)點(diǎn)                removeWaiter(q);                return state;            }            LockSupport.parkNanos(this, nanos);        }        else            LockSupport.park(this);    }}

3.6.1.FutureTask.removeWaiter方法分析

每次調(diào)用get方法都會(huì)將線程封裝成WaitNode入棧,當(dāng)調(diào)用get方法的線程由于被中斷喚醒或者超時(shí)自動(dòng)喚醒的都需要從隊(duì)列中移除, 并重新組裝棧結(jié)構(gòu)。

一張圖概況該方法做的事情:

如何結(jié)合線程池理解FutureTask及Future源碼

private void removeWaiter(WaitNode node) {    if (node != null) {        node.thread = null;        retry:        for (;;) {          // restart on removeWaiter race            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {                s = q.next;                if (q.thread != null)                    pred = q;                else if (pred != null) {                    pred.next = s;                    if (pred.thread == null) // check for race                        continue retry;                }                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,                                                      q, s))                    continue retry;            }            break;        }    }}

3.6.2.FutureTask.report方法分析

將返回值封裝到outcome屬性中返回,可能是正常的值也可能是一個(gè)異常信息

private V report(int s) throws ExecutionException {    Object x = outcome;    if (s == NORMAL)        return (V)x;    if (s >= CANCELLED)        throw new CancellationException();    throw new ExecutionException((Throwable)x);}

3.6.3.FutureTask.cancel方法分析

public boolean cancel(boolean mayInterruptIfRunning) {    //條件1:說明當(dāng)前任務(wù)處于運(yùn)行中    //條件2:任務(wù)狀態(tài)修改    //條件1和條件2成立則執(zhí)行下面cancel的核心處理邏輯,否則返回false代表取消失敗    //可能會(huì)有多個(gè)線程調(diào)用cancel方法導(dǎo)致cancel失敗的情況    if (!(state == NEW &&          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))        return false;    try {    // in case call to interrupt throws exception        //mayInterruptIfRunning是否中斷線程        if (mayInterruptIfRunning) {            try {                Thread t = runner;                if (t != null)                    //中斷線程                    t.interrupt();            } finally { // final state                //設(shè)置任務(wù)為中斷狀態(tài)                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);            }        }    } finally {        //喚醒所有g(shù)et阻塞的線程        finishCompletion();    }    return true;}

“如何結(jié)合線程池理解FutureTask及Future源碼”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

新聞標(biāo)題:如何結(jié)合線程池理解FutureTask及Future源碼
文章源于:http://m.rwnh.cn/article30/gspsso.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信公眾號(hào)用戶體驗(yàn)、關(guān)鍵詞優(yōu)化動(dòng)態(tài)網(wǎng)站、響應(yīng)式網(wǎng)站電子商務(wù)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)
兰溪市| 正安县| 黑龙江省| 西乌| 嵊泗县| 象山县| 仙游县| 于田县| 长丰县| 云霄县| 仪征市| 武定县| 左云县| 咸宁市| 交口县| 阳信县| 凌源市| 宜州市| 石屏县| 万宁市| 南郑县| 三穗县| 甘泉县| 柘城县| 仁化县| 杭锦后旗| 丹阳市| 文化| 醴陵市| 南城县| 原阳县| 锡林浩特市| 海伦市| 福清市| 巢湖市| 十堰市| 太保市| 电白县| 凌海市| 和静县| 乃东县|