本篇內(nèi)容介紹了“如何結(jié)合線程池理解FutureTask及Future源碼”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
創(chuàng)新互聯(lián)是專業(yè)的宕昌網(wǎng)站建設(shè)公司,宕昌接單;提供成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì),網(wǎng)頁設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行宕昌網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來合作!
多線程Runnable和Callable接口這里就不多說了,Callable有返回值,Runnable無返回值。
public class FutureTaskTest { public static void main(String[] args) { ExecutorService executor = null; try { //線程池提交Runnable接口任務(wù) executor.execute(new MyRunnable()); //線程池提交Callable接口任務(wù) executor = Executors.newFixedThreadPool(2); Future f = executor.submit(new MyCallLable<Integer>()); System.out.println(f.get()); //單線程方式 FutureTask ft = new FutureTask(new MyCallLable<Integer>()); Thread t = new Thread(ft); t.start(); System.out.println(ft.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } finally { if (executor != null) { executor.shutdown(); } } } static class MyCallLable<Integer> implements Callable { @Override public Object call() throws Exception { return 1; } } static class MyRunnable implements Runnable { @Override public void run() { System.out.println(2); } }}
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}
該方法創(chuàng)建了一個(gè)核心線程和最大線程數(shù)一樣的線程池,使用LinkedBlockingQueue這種無界隊(duì)列存儲(chǔ)多余的任務(wù),也就是說,如果我們使用這種jdk自帶的線程提交任務(wù)的時(shí)候,由于隊(duì)列是無界的,當(dāng)任務(wù)達(dá)到一定數(shù)量會(huì)造成內(nèi)存溢出。這里不再分析ThreadPoolExecutor代碼,有興趣的可以看我的另一篇博文專門分析ThreadPoolExecutor源碼的。該方法返回一個(gè)ExecutorService。
ThreadPoolExecutor繼承體系如下圖:
該方法實(shí)際調(diào)用的是實(shí)現(xiàn)類AbstractExecutorService.submit方法
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}
這里的newTaskFor方法就會(huì)將Callable任務(wù)傳遞到FutureTask類中,并封裝到其Callable屬性中
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}
/* 線程狀態(tài)可能的轉(zhuǎn)換: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED *///當(dāng)前任務(wù)狀態(tài)private volatile int state;//新創(chuàng)建private static final int NEW = 0;//即將結(jié)束,但還沒有結(jié)束private static final int COMPLETING = 1;//正常結(jié)束private static final int NORMAL = 2;//異常狀態(tài):Callable接口的Call方法中具體業(yè)務(wù)邏輯出現(xiàn)異常private static final int EXCEPTIONAL = 3;//任務(wù)被取消private static final int CANCELLED = 4;//任務(wù)處于中斷中private static final int INTERRUPTING = 5;//任務(wù)被中斷private static final int INTERRUPTED = 6;//任務(wù)提交傳入的Callable,用來調(diào)用call方法private Callable<V> callable;//Call方法返回值//1.如果任務(wù)正常結(jié)束,返回call方法的返回值//2.如果call方法發(fā)生異常,返回具體的異常信息private Object outcome;//當(dāng)前執(zhí)行的線程private volatile Thread runner;//一個(gè)棧結(jié)構(gòu)的數(shù)據(jù)類型,存儲(chǔ)被get方法阻塞的線程的引用private volatile WaitNode waiters;
public FutureTask(Callable<V> callable) { //外部需要傳入Callable接口的實(shí)現(xiàn) if (callable == null) throw new NullPointerException(); this.callable = callable; //將線程狀態(tài)設(shè)置為先創(chuàng)建 this.state = NEW;}
從示例的線程池提交Calllable接口的案例中一步步分析:1.executor.submit(new MyCallLable<Integer>())方法提交一個(gè)Callable實(shí)現(xiàn);2.第一步實(shí)際會(huì)調(diào)用AbstractExecutorService.submit方法;3.AbstractExecutorService.submit內(nèi)部調(diào)用newTaskFor方法生成一個(gè)FutureTask對象,并將MyCallLable任務(wù)封裝到其Calllable屬性中;4.AbstractExecutorService.submit方法內(nèi)部調(diào)用ThreadPoolExecutor.execute方法提交FutureTask對象到線程池;5-6-7-8.實(shí)際就是線程池提交一個(gè)任務(wù)的執(zhí)行過程,具體源碼可以看我的另一篇博客,這里比較復(fù)雜,概況的說了下;9-10.線程池execute實(shí)際會(huì)執(zhí)行FutureTask的run方法,在run方法中調(diào)用Callable.call,這就是線程池提交Callable執(zhí)行的流程;
public void run() { //條件1:當(dāng)前任務(wù)狀態(tài)不是新建狀態(tài) //條件2:當(dāng)前線程不是FutureTask持有的線程 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) //退出執(zhí)行 return; try { //當(dāng)前FutureTask持有的callable Callable<V> c = callable; //條件1:當(dāng)前提交的Callable不能為空 //條件2:當(dāng)前線程任務(wù)狀態(tài)為新創(chuàng)建 if (c != null && state == NEW) { //Callable的返回值 V result; //任務(wù)是否成功執(zhí)行 boolean ran; try { //調(diào)用用戶自定義call方法的邏輯 result = c.call(); //任務(wù)成功執(zhí)行 ran = true; } catch (Throwable ex) { //發(fā)生異常 result = null; ran = false; setException(ex); } //任務(wù)成功執(zhí)行設(shè)置返回值 if (ran) set(result); } } finally { //run方法結(jié)束持有線程設(shè)置為空,help gc //這里可能正常執(zhí)行完run方法也可能出現(xiàn)異常退出 runner = null; //當(dāng)前任務(wù)執(zhí)行狀態(tài) int s = state; //如果處于中斷的狀態(tài),包含中斷中和已中斷,釋放cpu資源 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}
該方法設(shè)置任務(wù)成功執(zhí)行后的執(zhí)行結(jié)果狀態(tài)和返回值,將返回值封裝到outcome屬性中,由于get方法是阻塞的,還需要喚醒阻塞的線程。
protected void set(V v) { //將狀態(tài)從新建設(shè)置為結(jié)束中 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //返回值賦值 outcome = v; //設(shè)置任務(wù)狀態(tài)為正常結(jié)束 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); //喚醒被get方法阻塞的線程 finishCompletion(); }}
在分析finishCompletion方法前,先介紹下WaitNode類。為什么會(huì)有這個(gè)類?我們知道FutureTask.get方法是阻塞的,如果我們在一個(gè)線程內(nèi)多次調(diào)用get方法,這個(gè)從理論上考慮其實(shí)不需要WaitNode的;如果我們又多次創(chuàng)建了線程在其他線程內(nèi)部調(diào)用get方法呢?由于FutureTask.get方法內(nèi)部會(huì)調(diào)用LockSupport.park(Thread)或LockSupport.parkNanos阻塞線程,所以就需要喚醒;而LockSupport.unpark(Thread)解除線程阻塞也需要指定線程,所以這里就需要一種數(shù)據(jù)結(jié)構(gòu)來存儲(chǔ)當(dāng)前線程的引用了。這里就設(shè)計(jì)了WaitNode這個(gè)類,它是一個(gè)單鏈表,而且采用的是頭插法,在遍歷的時(shí)候也是從前往后遍歷的,這就是一個(gè)典型的棧的結(jié)構(gòu),先進(jìn)后出,后進(jìn)先出。這里為什么又是一個(gè)單鏈表結(jié)構(gòu)呢?這是為了方便在任務(wù)結(jié)束的時(shí)候遍歷。
static final class WaitNode { //當(dāng)前線程的引用 volatile Thread thread; //指向下一個(gè)節(jié)點(diǎn) volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); }}
用于喚醒被get方法阻塞的線程
private void finishCompletion() { // assert state > COMPLETING; //從頭開始遍歷 for (WaitNode q; (q = waiters) != null;) { //使用cas方式設(shè)置當(dāng)前waiters為空,防止外部線程調(diào)用cancel導(dǎo)致finishCompletion該方法被調(diào)用 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { //獲取當(dāng)前WaitNode對應(yīng)的線程 Thread t = q.thread; if (t != null) { q.thread = null; //help gc //喚醒當(dāng)前節(jié)點(diǎn)對應(yīng)的線程 LockSupport.unpark(t); } //獲取當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn) WaitNode next = q.next; if (next == null) break; q.next = null;//help gc //將q指向下要給節(jié)點(diǎn) q = next; } break; } } done(); //將callable置為空,help gc callable = null; }
該方法將返回值設(shè)置為拋出的異常,將任務(wù)狀態(tài)設(shè)置為EXCEPTIONAL狀態(tài),并調(diào)用finishCompletion方法喚醒被get阻塞的線程。
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); }}
3.5.9.FutureTask.handlePossibleCancellationInterrupt方法分析
private void handlePossibleCancellationInterrupt(int s) { //如果任務(wù)狀態(tài)處于中斷中,釋放cpu資源 if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt}
兩個(gè)方法區(qū)別不大,唯一的區(qū)別是阻塞線程的時(shí)候使用的LockSupport.parkNanos(this, nanos)和LockSupport.park(this),當(dāng)有時(shí)間條件的時(shí)候LockSupport.parkNanos(this, nanos)會(huì)在指定時(shí)間內(nèi)結(jié)束后自動(dòng)喚醒線程。
這里講講sleep和LockSupport.parkNanos區(qū)別:sleep在指定時(shí)間到期后會(huì)判斷中斷狀態(tài),根據(jù)中斷狀態(tài)來判斷是否需要拋出異常,而LockSupport.parkNanos不會(huì)根據(jù)中斷狀態(tài)做出響應(yīng)。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s);}public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; //unit.toNanos(timeout)將指定時(shí)間格式轉(zhuǎn)化為對應(yīng)的毫微秒 if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s);}
t.interrupted()也是可以喚醒被LockSupport.park()阻塞的線程的
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? S ystem.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; //自旋 for (;;) { //條件成立說明當(dāng)前線程是被其他線程調(diào)用t.interrupted()這種中斷方式喚醒 if (Thread.interrupted()) { //從隊(duì)列中移除線程被中斷的節(jié)點(diǎn) removeWaiter(q); throw new InterruptedException(); } int s = state; //(4).s>COMPLETING成立,說明當(dāng)前任務(wù)已經(jīng)執(zhí)行完,結(jié)果可能有好有壞 if (s > COMPLETING) { if (q != null) q.thread = null; //返回當(dāng)前任務(wù)狀態(tài) return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); //(1).第一次自旋,q=null,創(chuàng)建當(dāng)前線程對應(yīng)的WaitNode對象 else if (q == null) q = new WaitNode(); //(2).第二次自旋,queued為false,q.next = waiters采用頭插法將當(dāng)前節(jié)點(diǎn)入棧 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //(3).第三次自旋,會(huì)走到這里,將線程阻塞,等待后續(xù)喚醒后繼續(xù)自旋調(diào)用,也可能因?yàn)槌瑫r(shí)后自動(dòng)喚醒 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { //從隊(duì)列中移除get超時(shí)的節(jié)點(diǎn) removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); }}
每次調(diào)用get方法都會(huì)將線程封裝成WaitNode入棧,當(dāng)調(diào)用get方法的線程由于被中斷喚醒或者超時(shí)自動(dòng)喚醒的都需要從隊(duì)列中移除, 并重新組裝棧結(jié)構(gòu)。
一張圖概況該方法做的事情:
private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } }}
將返回值封裝到outcome屬性中返回,可能是正常的值也可能是一個(gè)異常信息
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x);}
public boolean cancel(boolean mayInterruptIfRunning) { //條件1:說明當(dāng)前任務(wù)處于運(yùn)行中 //條件2:任務(wù)狀態(tài)修改 //條件1和條件2成立則執(zhí)行下面cancel的核心處理邏輯,否則返回false代表取消失敗 //可能會(huì)有多個(gè)線程調(diào)用cancel方法導(dǎo)致cancel失敗的情況 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception //mayInterruptIfRunning是否中斷線程 if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) //中斷線程 t.interrupt(); } finally { // final state //設(shè)置任務(wù)為中斷狀態(tài) UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { //喚醒所有g(shù)et阻塞的線程 finishCompletion(); } return true;}
“如何結(jié)合線程池理解FutureTask及Future源碼”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
新聞標(biāo)題:如何結(jié)合線程池理解FutureTask及Future源碼
文章源于:http://m.rwnh.cn/article30/gspsso.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信公眾號(hào)、用戶體驗(yàn)、關(guān)鍵詞優(yōu)化、動(dòng)態(tài)網(wǎng)站、響應(yīng)式網(wǎng)站、電子商務(wù)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)