答: ① sleep()方法給其他線程運行機會時不考慮線程的優(yōu)先級,因此會給低優(yōu)先級的線程以運行的機會;yield()方法只會給相同優(yōu)先級或更高優(yōu)先級的線程以運行的機會; ② 線程執(zhí)行sleep()方法后轉(zhuǎn)入阻塞(blocked)狀態(tài),而執(zhí)行yield()方法后轉(zhuǎn)入就緒(ready)狀態(tài); ③ sleep()方法聲明拋出InterruptedException,而yield()方法沒有聲明任何異常; ④ sleep()方法比yield()方法(跟操作系統(tǒng)CPU調(diào)度相關)具有更好的可移植性。
讓客戶滿意是我們工作的目標,不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領域值得信任、有價值的長期合作伙伴,公司提供的服務項目有:域名與空間、虛擬空間、營銷軟件、網(wǎng)站建設、仁布網(wǎng)站維護、網(wǎng)站推廣。
答:
wait():使一個線程處于等待(阻塞)狀態(tài),并且釋放所持有的對象的鎖;
sleep():使一個正在運行的線程處于睡眠狀態(tài),是一個靜態(tài)方法,調(diào)用此方法要處理InterruptedException異常;
notify():喚醒一個處于等待狀態(tài)的線程,當然在調(diào)用此方法的時候,并不能確切的喚醒某一個等待狀態(tài)的線程,而是由JVM確定喚醒哪個線程,而且與優(yōu)先級無關;
notityAll():喚醒所有處于等待狀態(tài)的線程,該方法并不是將對象的鎖給所有線程,而是讓它們競爭,只有獲得鎖的線程才能進入就緒狀態(tài);
答:如果系統(tǒng)中存在臨界資源(資源數(shù)量少于競爭資源的線程數(shù)量的資源),例如正在寫的數(shù)據(jù)以后可能被另一個線程讀到,或者正在讀的數(shù)據(jù)可能已經(jīng)被另一個線程寫過了,那么這些數(shù)據(jù)就必須進行同步存?。〝?shù)據(jù)庫操作中的排他鎖就是最好的例子)。當應用程序在對象上調(diào)用了一個需要花費很長時間來執(zhí)行的方法,并且不希望讓程序等待方法的返回時,就應該使用異步編程,在很多情況下采用異步途徑往往更有效率。事實上,所謂的同步就是指阻塞式操作,而異步就是非阻塞式操作。
當run() 或者 call() 方法執(zhí)行完的時候線程會自動結(jié)束,如果要手動結(jié)束一個線程,你可以用volatile 布爾變量來退出run()方法的循環(huán)或者是取消任務來中斷線程。
使用自定義的標志位決定線程的執(zhí)行情況
public?class?SafeStopThread?implements?Runnable{?? ???private?volatile?boolean?stop=false;//此變量必須加上volatile?? ???int?a=0;?? ???@Override?? ????public?void?run()?{?? ????????//?TODO?Auto-generated?method?stub?? ????????while(!stop){?? ???????????????synchronized?("")?{?? ????????????????????a++;?? ????????????????????try?{?? ????????????????????????Thread.sleep(100);?? ????????????????????}?catch?(Exception?e)?{?? ????????????????????????//?TODO:?handle?exception?? ????????????????????}?? ????????????????????a--;?? ????????????????????String?tn=Thread.currentThread().getName();?? ????????????????????System.out.println(tn+":a="+a);?? ????????????????}?? ????????}?? ??????//線程終止?? ?????public?void?terminate(){?? ?????????stop=true;?? ??????}?? ??public?static?void?main(String[]?args)?{?? ???????SafeStopThread?t=new?SafeStopThread();?? ???????Thread?t1=new?Thread(t);?? ???????t1.start();?? ???????for(int?i=0;i<5;i++){??? ???????????new?Thread(t).start();?? ???????}?? ?????t.terminate();?? ???}?? }
在語言層面有兩種方式。java.lang.Thread 類的實例就是一個線程但是它需要調(diào)用java.lang.Runnable接口來執(zhí)行,由于線程類本身就是調(diào)用的Runnable接口所以你可以繼承java.lang.Thread 類或者直接調(diào)用Runnable接口來重寫run()方法實現(xiàn)線程。
Java不支持類的多重繼承,但允許你調(diào)用多個接口。所以如果你要繼承其他類,當然是調(diào)用Runnable接口好了。
Semaphore兩個重要的方法就是semaphore.acquire() 請求一個信號量,這時候的信號量個數(shù)-1(一旦沒有可使用的信號量,也即信號量個數(shù)變?yōu)樨摂?shù)時,再次請求的時候就會阻塞,直到其他線程釋放了信號量)semaphore.release()釋放一個信號量,此時信號量個數(shù)+1
public?class?SemaphoreTest?{?? ????private?Semaphore?mSemaphore?=?new?Semaphore(5);?? ????public?void?run(){?? ????????for(int?i=0;?i<?100;?i++){?? ????????????new?Thread(new?Runnable()?{?? ????????????????@Override?? ????????????????public?void?run()?{?? ????????????????????test();?? ????????????????}?? ????????????}).start();?? ????????}?? ????}?? ?? ????private?void?test(){?? ????????try?{?? ????????????mSemaphore.acquire();?? ????????}?catch?(InterruptedException?e)?{?? ????????????e.printStackTrace();?? ????????}?? ????????System.out.println(Thread.currentThread().getName()?+?"?進來了");?? ????????try?{?? ????????????Thread.sleep(1000);?? ????????}?catch?(InterruptedException?e)?{?? ????????????e.printStackTrace();?? ????????}?? ????????System.out.println(Thread.currentThread().getName()?+?"?出去了");?? ????????mSemaphore.release();?? ????}?? }
線程調(diào)度是指系統(tǒng)為線程分配處理器使用權的過程。 主要調(diào)度方式有兩種,分別是協(xié)同式線程調(diào)度和搶占式線程調(diào)度。
協(xié)同式線程調(diào)度:線程的執(zhí)行時間由線程本身控制,當線程把自己的工作執(zhí)行完了之后,主動通知系統(tǒng)切換到另一個線程上。
好處是切換操作對于線程自己是可知的,沒什么線程同步問題。
壞處是線程執(zhí)行時間不可控,可能會一直阻塞然后系統(tǒng)崩潰。
搶占式線程調(diào)度:每個線程由系統(tǒng)分配執(zhí)行時間,不由線程本身決定。線程的執(zhí)行時間是系統(tǒng)可控的,不會有一直阻塞的問題。
Java使用搶占式調(diào)度
搶占式。一個線程用完CPU之后,操作系統(tǒng)會根據(jù)線程優(yōu)先級、線程饑餓情況等數(shù)據(jù)算出一個總的優(yōu)先級并分配下一個時間片給某個線程執(zhí)行。
線程類的構造方法、靜態(tài)塊是被new這個線程類所在的線程所調(diào)用的,而run方法里面的代碼才是被線程自身所調(diào)用的。
Thread?t?=?Thread.currentThread(); String?name?=?t.getName(); System.out.println("name="?+?name);
創(chuàng)建線程要花費昂貴的資源和時間,如果任務來了才創(chuàng)建線程那么響應時間會變長,而且一個進程能創(chuàng)建的線程數(shù)有限。
為了避免這些問題,在程序啟動的時候就創(chuàng)建若干線程來響應處理,它們被稱為線程池,里面的線程叫工作線程。
Executor框架讓你可以創(chuàng)建不同的線程池。比如單線程池,每次處理一個任務;數(shù)目固定的線程池或者是緩存線程池(一個適合很多生存期短的任務的程序的可擴展線程池)。
以下是Java自帶的幾種線程池: 1、newFixedThreadPool 創(chuàng)建一個指定工作線程數(shù)量的線程池。 每當提交一個任務就創(chuàng)建一個工作線程,如果工作線程數(shù)量達到線程池初始的最大數(shù),則將提交的任務存入到池隊列中。
2、newCachedThreadPool 創(chuàng)建一個可緩存的線程池。 這種類型的線程池特點是:
1).工作線程的創(chuàng)建數(shù)量幾乎沒有限制(其實也有限制的,數(shù)目為Interger. MAX_VALUE),這樣可靈活的往線程池中添加線程。
2).如果長時間沒有往線程池中提交任務,即如果工作線程空閑了指定的時間(默認為1分鐘),則該工作線程將自動終止。終止后,如果你又提交了新的任務,則線程池重新創(chuàng)建一個工作線程。
3、newSingleThreadExecutor創(chuàng)建一個單線程化的Executor,即只創(chuàng)建唯一的工作者線程來執(zhí)行任務,如果這個線程異常結(jié)束,會有另一個取代它,保證順序執(zhí)行(我覺得這點是它的特色)。
單工作線程最大的特點是可保證順序地執(zhí)行各個任務,并且在任意給定的時間不會有多個線程是活動的。
4、newScheduleThreadPool 創(chuàng)建一個定長的線程池,而且支持定時的以及周期性的任務執(zhí)行,類似于Timer。
Executor 和 ExecutorService 這兩個接口主要的區(qū)別是:
ExecutorService 接口繼承了 Executor 接口,是 Executor 的子接口
Executor 和 ExecutorService 第二個區(qū)別是:Executor 接口定義了 execute()方法用來接收一個Runnable接口的對象,而 ExecutorService 接口中的 submit()方法可以接受Runnable和Callable接口的對象。
Executor 和 ExecutorService 接口第三個區(qū)別是 Executor 中的 execute() 方法不返回任何結(jié)果,而 ExecutorService 中的 submit()方法可以通過一個 Future 對象返回運算結(jié)果。
Executor 和 ExecutorService 接口第四個區(qū)別是除了允許客戶端提交一個任務,ExecutorService 還提供用來控制線程池的方法。比如:調(diào)用 shutDown() 方法終止線程池。
Executors 類提供工廠方法用來創(chuàng)建不同類型的線程池。
比如: newSingleThreadExecutor() 創(chuàng)建一個只有一個線程的線程池,newFixedThreadPool(int numOfThreads)來創(chuàng)建固定線程數(shù)的線程池,newCachedThreadPool()可以根據(jù)需要創(chuàng)建新的線程,但如果已有線程是空閑的會重用已有線程。
Java通過Executors提供四種線程池,分別為:
newCachedThreadPool創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
newFixedThreadPool 創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待。
newScheduledThreadPool 創(chuàng)建一個定長線程池,支持定時及周期性任務執(zhí)行。
newSingleThreadExecutor 創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務,保證所有任務按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。
start()方法被用來啟動新創(chuàng)建的線程,而且start()內(nèi)部調(diào)用了run()方法,這和直接調(diào)用run()方法的效果不一樣。
當你調(diào)用run()方法的時候,只會是在原來的線程中調(diào)用,沒有新的線程啟動,start()方法才會啟動新線程。
兩個方法都可以向線程池提交任務,execute()方法的返回類型是void,它定義在Executor接口中, 而submit()方法可以返回持有計算結(jié)果的Future對象,它定義在ExecutorService接口中,它擴展了Executor接口,其它線程池類像ThreadPoolExecutor和ScheduledThreadPoolExecutor都有這些方法。
notify()方法不能喚醒某個具體的線程,所以只有一個線程在等待的時候它才有用武之地。而notifyAll()喚醒所有線程并允許他們爭奪鎖確保了至少有一個線程能繼續(xù)運行。
當有線程調(diào)用了對象的 notifyAll()方法(喚醒所有 wait 線程)或 notify()方法(只隨機喚醒一個 wait 線程),被喚醒的的線程便會進入該對象的鎖池中,鎖池中的線程會去競爭該對象鎖。也就是說,調(diào)用了notify后只要一個線程會由等待池進入鎖池,而notifyAll會將該對象等待池內(nèi)的所有線程移動到鎖池中,等待鎖競爭
優(yōu)先級高的線程競爭到對象鎖的概率大,假若某線程沒有競爭到該對象鎖,它還會留在鎖池中,唯有線程再次調(diào)用 wait()方法,它才會重新回到等待池中。
一個很明顯的原因是JAVA提供的鎖是對象級的而不是線程級的,每個對象都有鎖,通過線程獲得。
如果線程需要等待某些鎖那么調(diào)用對象中的wait()方法就有意義了。如果wait()方法定義在Thread類中,線程正在等待的是哪個鎖就不明顯了。
簡單的說,由于wait,notify和notifyAll都是鎖級別的操作,所以把他們定義在Object類中因為鎖屬于對象。
主要是因為Java API強制要求這樣做,如果你不這么做,你的代碼會拋出IllegalMonitorStateException異常。還有一個原因是為了避免wait和notify之間產(chǎn)生競態(tài)條件。
最主要的原因是為了防止以下這種情況
//?等待者(Thread1)while?(condition?!=?true)?{?//?step.1 ????lock.wait()?//?step.4}//?喚醒者(Thread2)condition?=?true;?//?step.2lock.notify();?//?step.3
在對之前的代碼去掉 synchronized 塊之后,如果在等待者判斷 condition != true 之后而調(diào)用 wait() 之前,喚醒者**將 condition 修改成了 true 同時調(diào)用了 notify() **的話,那么等待者在調(diào)用了 wait() 之后就沒有機會被喚醒了。
join() 的作用:讓“主線程”等待“子線程”結(jié)束之后才能繼續(xù)運行。
yield方法可以暫停當前正在執(zhí)行的線程對象,讓其它有相同優(yōu)先級的線程執(zhí)行。它是一個靜態(tài)方法而且只保證當前線程放棄CPU占用而不能保證使其它線程一定能占用CPU,執(zhí)行yield()的線程有可能在進入到暫停狀態(tài)后馬上又被執(zhí)行。
sleep()方法(休眠)是線程類(Thread)的靜態(tài)方法,調(diào)用此方法會讓當前線程暫停執(zhí)行指定的時間,將執(zhí)行機會(CPU)讓給其他線程,但是對象的鎖依然保持,因此休眠時間結(jié)束后會自動恢復。注意這里的恢復并不是恢復到執(zhí)行的狀態(tài),而是恢復到可運行狀態(tài)中等待CPU的寵幸。
Java程序中wait和sleep都會造成某種形式的暫停,它們可以滿足不同的需要。
wait存在于Object類中;sleep存在于Thread類中。
wait會讓出CPU資源以及釋放鎖;sleep只會釋放CPU資源。
wait只能在同步塊中使用;sleep沒這限制。
wait需要notify(或 notifyAll)喚醒,進入等鎖狀態(tài);sleep到指定時間便會自動恢復到運行狀態(tài)。
不能被重寫,線程的很多方法都是由系統(tǒng)調(diào)用的,不能通過子類覆寫去改變他們的行為。
Thread類的sleep()和yield()方法將在當前正在執(zhí)行的線程上運行。
該代碼只有在某個A線程執(zhí)行時會被執(zhí)行,這種情況下通知某個B線程yield是無意義的(因為B線程本來就沒在執(zhí)行)。因此只有當前線程執(zhí)行yield才是有意義的。通過使該方法為static,你將不會浪費時間嘗試yield 其他線程。
只能給自己喂安眠藥,不能給別人喂安眠藥。
阻塞式方法是指程序會一直等待該方法完成期間不做其他事情。
ServerSocket的accept()方法就是一直等待客戶端連接。這里的阻塞是指調(diào)用結(jié)果返回之前,當前線程會被掛起,直到得到結(jié)果之后才會返回。
此外,還有異步和非阻塞式方法在任務完成前就返回。
在Java里面沒有辦法強制啟動一個線程,它是被線程調(diào)度器控制著
簡單的說,如果異常沒有被捕獲該線程將會停止執(zhí)行。
Thread.UncaughtExceptionHandler是用于處理未捕獲異常造成線程突然中斷情況的一個內(nèi)嵌接口。
當一個未捕獲異常將造成線程中斷的時候JVM會使用Thread.getUncaughtExceptionHandler()來查詢線程的UncaughtExceptionHandler并將線程和異常作為參數(shù)傳遞給handler的uncaughtException()方法進行處理。
在Java中有兩種異常。
非運行時異常(Checked Exception):這種異常必須在方法聲明的throws語句指定,或者在方法體內(nèi)捕獲。例如:IOException和ClassNotFoundException。
運行時異常(Unchecked Exception):這種異常不必在方法聲明中指定,也不需要在方法體中捕獲。例如,NumberFormatException。
因為run()方法不支持throws語句,所以當線程對象的run()方法拋出非運行異常時,我們必須捕獲并且處理它們。當運行時異常從run()方法中拋出時,默認行為是在控制臺輸出堆棧記錄并且退出程序。
好在,java提供給我們一種在線程對象里捕獲和處理運行時異常的一種機制。實現(xiàn)用來處理運行時異常的類,這個類實現(xiàn)UncaughtExceptionHandler接口并且實現(xiàn)這個接口的uncaughtException()方法。示例:
package?concurrency; import?java.lang.Thread.UncaughtExceptionHandler; public?class?Main2?{ ????public?static?void?main(String[]?args)?{ ????????Task?task?=?new?Task(); ????????Thread?thread?=?new?Thread(task); ????????thread.setUncaughtExceptionHandler(new?ExceptionHandler()); ????????thread.start(); ????} } class?Task?implements?Runnable{ ????@Override ????public?void?run()?{ ????????int?numero?=?Integer.parseInt("TTT"); ????} } class?ExceptionHandler?implements?UncaughtExceptionHandler{ ????@Override ????public?void?uncaughtException(Thread?t,?Throwable?e)?{ ????????System.out.printf("An?exception?has?been?captured\n"); ????????System.out.printf("Thread:??%s\n",?t.getId()); ????????System.out.printf("Exception:??%s:??%s\n",?e.getClass().getName(),e.getMessage()); ????????System.out.printf("Stack?Trace:??\n"); ????????e.printStackTrace(System.out); ????????System.out.printf("Thread?status:??%s\n",t.getState()); ????} }
當一個線程拋出了異常并且沒有被捕獲時(這種情況只可能是運行時異常),JVM檢查這個線程是否被預置了未捕獲異常處理器。如果找到,JVM將調(diào)用線程對象的這個方法,并將線程對象和異常作為傳入?yún)?shù)。
Thread類還有另一個方法可以處理未捕獲到的異常,即靜態(tài)方法setDefaultUncaughtExceptionHandler()。這個方法在應用程序中為所有的線程對象創(chuàng)建了一個異常處理器。
當線程拋出一個未捕獲到的異常時,JVM將為異常尋找以下三種可能的處理器。
首先,它查找線程對象的未捕獲異常處理器。
如果找不到,JVM繼續(xù)查找線程對象所在的線程組(ThreadGroup)的未捕獲異常處理器。
如果還是找不到,如同本節(jié)所講的,JVM將繼續(xù)查找默認的未捕獲異常處理器。
如果沒有一個處理器存在,JVM則將堆棧異常記錄打印到控制臺,并退出程序。
處于等待狀態(tài)的線程可能會收到錯誤警報和偽喚醒,如果不在循環(huán)中檢查等待條件,程序就會在沒有滿足結(jié)束條件的情況下退出。
1、一般來說,wait肯定是在某個條件調(diào)用的,不是if就是while 2、放在while里面,是防止出于waiting的對象被別的原因調(diào)用了喚醒方法,但是while里面的條件并沒有滿足(也可能當時滿足了,但是由于別的線程操作后,又不滿足了),就需要再次調(diào)用wait將其掛起。 3、其實還有一點,就是while最好也被同步,這樣不會導致錯失信號。
while(condition){????wait(); }
忙循環(huán)就是程序員用循環(huán)讓一個線程等待,不像傳統(tǒng)方法wait()、 sleep() 或 yield(),它們都放棄了CPU控制,而忙循環(huán)不會放棄CPU,它就是在運行一個空循環(huán)。
這么做的目的是為了保留CPU緩存,在多核系統(tǒng)中,一個等待線程醒來的時候可能會在另一個內(nèi)核運行,這樣會重建緩存。為了避免重建緩存和減少等待重建的時間就可以使用它了。
沒有獲得鎖的線程一直循環(huán)在那里看是否該鎖的保持者已經(jīng)釋放了鎖,這就是自旋鎖。
互斥鎖:從等待到解鎖過程,線程會從sleep狀態(tài)變?yōu)閞unning狀態(tài),過程中有線程上下文的切換,搶占CPU等開銷。
自旋鎖不會引起調(diào)用者休眠,如果自旋鎖已經(jīng)被別的線程保持,調(diào)用者就一直循環(huán)在那里看是否該自旋鎖的保持者釋放了鎖。由于自旋鎖不會引起調(diào)用者休眠,所以自旋鎖的效率遠高于互斥鎖。
雖然自旋鎖效率比互斥鎖高,但它會存在下面兩個問題: 1、自旋鎖一直占用CPU,在未獲得鎖的情況下,一直運行,如果不能在很短的時間內(nèi)獲得鎖,會導致CPU效率降低。 2、試圖遞歸地獲得自旋鎖會引起死鎖。遞歸程序決不能在持有自旋鎖時調(diào)用它自己,也決不能在遞歸調(diào)用時試圖獲得相同的自旋鎖。
由此可見,我們要慎重的使用自旋鎖,自旋鎖適合于鎖使用者保持鎖時間比較短并且鎖競爭不激烈的情況。正是由于自旋鎖使用者一般保持鎖時間非常短,因此選擇自旋而不是睡眠是非常必要的,自旋鎖的效率遠高于互斥鎖。
同一個Runnable,使用全局變量。
第一種:將共享數(shù)據(jù)封裝到一個對象中,把這個共享數(shù)據(jù)所在的對象傳遞給不同的Runnable
第二種:將這些Runnable對象作為某一個類的內(nèi)部類,共享的數(shù)據(jù)作為外部類的成員變量,對共享數(shù)據(jù)的操作分配給外部類的方法來完成,以此實現(xiàn)對操作共享數(shù)據(jù)的互斥和通信,作為內(nèi)部類的Runnable來操作外部類的方法,實現(xiàn)對數(shù)據(jù)的操作
class?ShareData?{ ?private?int?x?=?0; ?public?synchronized?void?addx(){ ???x++; ???System.out.println("x++?:?"+x); ?} ?public?synchronized?void?subx(){ ???x--; ???System.out.println("x--?:?"+x); ?} } public?class?ThreadsVisitData?{ ? ?public?static?ShareData?share?=?new?ShareData(); ? ?public?static?void?main(String[]?args)?{ ??//final?ShareData?share?=?new?ShareData(); ??new?Thread(new?Runnable()?{ ????public?void?run()?{ ????????for(int?i?=?0;i<100;i++){ ????????????share.addx(); ????????} ????} ??}).start(); ??new?Thread(new?Runnable()?{ ????public?void?run()?{ ????????for(int?i?=?0;i<100;i++){ ????????????share.subx(); ????????} ????} ???}).start();? ?} }
Runnable和Callable都是接口, 不同之處: 1.Callable可以返回一個類型V,而Runnable不可以 2.Callable能夠拋出checked exception,而Runnable不可以。 3.Runnable是自從java1.1就有了,而Callable是1.5之后才加上去的 4.Callable和Runnable都可以應用于executors。而Thread類只支持Runnable.
import?java.util.concurrent.Callable;?? import?java.util.concurrent.ExecutionException;?? import?java.util.concurrent.ExecutorService;?? import?java.util.concurrent.Executors;?? import?java.util.concurrent.Future;?? ?? public?class?ThreadTestB?{?? ????public?static?void?main(String[]?args)?{?? ????????ExecutorService?e=Executors.newFixedThreadPool(10);?? ????????Future?f1=e.submit(new?MyCallableA());?? ????????Future?f2=e.submit(new?MyCallableA());?? ????????Future?f3=e.submit(new?MyCallableA());???????? ????????System.out.println("--Future.get()....");?? ????????try?{?? ????????????System.out.println(f1.get());?? ????????????System.out.println(f2.get());?? ????????????System.out.println(f3.get());???????????? ????????}?catch?(InterruptedException?e1)?{?? ????????????e1.printStackTrace();?? ????????}?catch?(ExecutionException?e1)?{?? ????????????e1.printStackTrace();?? ????????}?? ????????e.shutdown();?? ????}?? }?? ?? class?MyCallableA?implements?Callable<String>{?? ????public?String?call()?throws?Exception?{?? ????????System.out.println("開始執(zhí)行Callable");?? ????????String[]?ss={"zhangsan","lisi"};?? ????????long[]?num=new?long[2];?? ????????for(int?i=0;i<1000000;i++){?? ????????????num[(int)(Math.random()*2)]++;?? ????????}?? ?????????? ????????if(num[0]>num[1]){?? ????????????return?ss[0];?? ????????}else?if(num[0]<num[1]){?? ????????????throw?new?Exception("棄權!");?? ????????}else{?? ????????????return?ss[1];?? ????????}?? ????}? }
CountDownLatch和CyclicBarrier都能夠?qū)崿F(xiàn)線程之間的等待,只不過它們側(cè)重點不同:
CountDownLatch一般用于某個線程A等待若干個其他線程執(zhí)行完任務之后,它才執(zhí)行;
CyclicBarrier一般用于一組線程互相等待至某個狀態(tài),然后這一組線程再同時執(zhí)行;
另外,CountDownLatch是不能夠重用的,而CyclicBarrier是可以重用的。
CountDownLatch的用法:
public?class?Test?{ ?????public?static?void?main(String[]?args)?{??? ?????????final?CountDownLatch?latch?=?new?CountDownLatch(2); ?????????? ?????????new?Thread(){ ?????????????public?void?run()?{ ?????????????????try?{ ?????????????????????System.out.println("子線程"+Thread.currentThread().getName()+"正在執(zhí)行"); ????????????????????Thread.sleep(3000); ????????????????????System.out.println("子線程"+Thread.currentThread().getName()+"執(zhí)行完畢"); ????????????????????latch.countDown(); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ?????????????}; ?????????}.start(); ?????????? ?????????new?Thread(){ ?????????????public?void?run()?{ ?????????????????try?{ ?????????????????????System.out.println("子線程"+Thread.currentThread().getName()+"正在執(zhí)行"); ?????????????????????Thread.sleep(3000); ?????????????????????System.out.println("子線程"+Thread.currentThread().getName()+"執(zhí)行完畢"); ?????????????????????latch.countDown(); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ?????????????}; ?????????}.start(); ?????????? ?????????try?{ ?????????????System.out.println("等待2個子線程執(zhí)行完畢..."); ????????????latch.await(); ????????????System.out.println("2個子線程已經(jīng)執(zhí)行完畢"); ????????????System.out.println("繼續(xù)執(zhí)行主線程"); ????????}?catch?(InterruptedException?e)?{ ????????????e.printStackTrace(); ????????} ?????} }
CyclicBarrier用法:
public?class?Test?{ ????public?static?void?main(String[]?args)?{ ????????int?N?=?4; ????????CyclicBarrier?barrier??=?new?CyclicBarrier(N,new?Runnable()?{ ????????????@Override ????????????public?void?run()?{ ????????????????System.out.println("當前線程"+Thread.currentThread().getName());??? ????????????} ????????}); ????????? ????????for(int?i=0;i<N;i++) ????????????new?Writer(barrier).start(); ????} ????static?class?Writer?extends?Thread{ ????????private?CyclicBarrier?cyclicBarrier; ????????public?Writer(CyclicBarrier?cyclicBarrier)?{ ????????????this.cyclicBarrier?=?cyclicBarrier; ????????} ? ????????@Override ????????public?void?run()?{ ????????????System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數(shù)據(jù)..."); ????????????try?{ ????????????????Thread.sleep(5000);??????//以睡眠來模擬寫入數(shù)據(jù)操作 ????????????????System.out.println("線程"+Thread.currentThread().getName()+"寫入數(shù)據(jù)完畢,等待其他線程寫入完畢"); ????????????????cyclicBarrier.await(); ????????????}?catch?(InterruptedException?e)?{ ????????????????e.printStackTrace(); ????????????}catch(BrokenBarrierException?e){ ????????????????e.printStackTrace(); ????????????} ????????????System.out.println("所有線程寫入完畢,繼續(xù)處理其他任務..."); ????????} ????} }
interrupt方法用于中斷線程。調(diào)用該方法的線程的狀態(tài)為將被置為”中斷”狀態(tài)。
注意:線程中斷僅僅是置線程的中斷狀態(tài)位,不會停止線程。需要用戶自己去監(jiān)視線程的狀態(tài)為并做處理。支持線程中斷的方法(也就是線程中斷后會拋出interruptedException的方法)就是在監(jiān)視線程的中斷狀態(tài),一旦線程的中斷狀態(tài)被置為“中斷狀態(tài)”,就會拋出中斷異常。
isInterrupted 只是簡單的查詢中斷狀態(tài),不會對狀態(tài)進行修改。
ConcurrentHashMap的結(jié)構是比較復雜的,都深究去本質(zhì),其實也就是數(shù)組和鏈表而已。我們由淺入深慢慢的分析其結(jié)構。
先簡單分析一下,ConcurrentHashMap 的成員變量中,包含了一個 Segment 的數(shù)組(final Segment<K,V>[] segments;),而 Segment 是 ConcurrentHashMap 的內(nèi)部類,然后在 Segment 這個類中,包含了一個 HashEntry 的數(shù)組(transient volatile HashEntry<K,V>[] table;)。而 HashEntry 也是ConcurrentHashMap 的內(nèi)部類。HashEntry 中,包含了 key 和 value 以及 next 指針(類似于 HashMap 中 Entry),所以 HashEntry 可以構成一個鏈表。
所以通俗的講,ConcurrentHashMap 數(shù)據(jù)結(jié)構為一個 Segment 數(shù)組,Segment 的數(shù)據(jù)結(jié)構為 HashEntry 的數(shù)組,而 HashEntry 存的是我們的鍵值對,可以構成鏈表。
首先,我們看一下 HashEntry 類。
HashEntry 用來封裝散列映射表中的鍵值對。在 HashEntry 類中,key,hash 和 next 域都被聲明為 final 型,value 域被聲明為 volatile 型。其類的定義為:
static?final?class?HashEntry<K,V>?{ ????????final?int?hash; ????????final?K?key; ????????volatile?V?value; ????????volatile?HashEntry<K,V>?next; ????????HashEntry(int?hash,?K?key,?V?value,?HashEntry<K,V>?next)?{ ????????????this.hash?=?hash; ????????????this.key?=?key; ????????????this.value?=?value; ????????????this.next?=?next; ????????} ????????... ????????... }
HashEntry 的學習可以類比著 HashMap 中的 Entry。我們的存儲鍵值對的過程中,散列的時候如果發(fā)生“碰撞”,將采用“分離鏈表法”來處理碰撞:把碰撞的 HashEntry 對象鏈接成一個鏈表。
如下圖,我們在一個空桶中插入 A、B、C 兩個 HashEntry 對象后的結(jié)構圖(其實應該為鍵值對,在這進行了簡化以方便更容易理解):
Segment 的類定義為static final class Segment<K,V> extends ReentrantLock implements Serializable。其繼承于 ReentrantLock 類,從而使得 Segment 對象可以充當鎖的角色。Segment 中包含HashEntry 的數(shù)組,其可以守護其包含的若干個桶(HashEntry的數(shù)組)。Segment 在某些意義上有點類似于 HashMap了,都是包含了一個數(shù)組,而數(shù)組中的元素可以是一個鏈表。
table:table 是由 HashEntry 對象組成的數(shù)組如果散列時發(fā)生碰撞,碰撞的 HashEntry 對象就以鏈表的形式鏈接成一個鏈表table數(shù)組的數(shù)組成員代表散列映射表的一個桶每個 table 守護整個 ConcurrentHashMap 包含桶總數(shù)的一部分如果并發(fā)級別為 16,table 則守護 ConcurrentHashMap 包含的桶總數(shù)的 1/16。
count 變量是計算器,表示每個 Segment 對象管理的 table 數(shù)組(若干個 HashEntry 的鏈表)包含的HashEntry 對象的個數(shù)。之所以在每個Segment對象中包含一個 count 計數(shù)器,而不在 ConcurrentHashMap 中使用全局的計數(shù)器,是為了避免出現(xiàn)“熱點域”而影響并發(fā)性。
/** ?*?Segments?are?specialized?versions?of?hash?tables.??This ?*?subclasses?from?ReentrantLock?opportunistically,?just?to ?*?simplify?some?locking?and?avoid?separate?construction. ?*/ static?final?class?Segment<K,V>?extends?ReentrantLock?implements?Serializable?{ ????/** ?????*?The?per-segment?table.?Elements?are?accessed?via ?????*?entryAt/setEntryAt?providing?volatile?semantics. ?????*/ ????transient?volatile?HashEntry<K,V>[]?table; ????/** ?????*?The?number?of?elements.?Accessed?only?either?within?locks ?????*?or?among?other?volatile?reads?that?maintain?visibility. ?????*/ ????transient?int?count; ????transient?int?modCount; ????/** ?????*?裝載因子 ?????*/ ????final?float?loadFactor; }
我們通過下圖來展示一下插入 ABC 三個節(jié)點后,Segment 的示意圖:
其實從我個人角度來說,Segment結(jié)構是與HashMap很像的。
ConcurrentHashMap 的結(jié)構中包含的 Segment 的數(shù)組,在默認的并發(fā)級別會創(chuàng)建包含 16 個 Segment 對象的數(shù)組。通過我們上面的知識,我們知道每個 Segment 又包含若干個散列表的桶,每個桶是由 HashEntry 鏈接起來的一個鏈表。如果 key 能夠均勻散列,每個 Segment 大約守護整個散列表桶總數(shù)的 1/16。
下面我們還有通過一個圖來演示一下 ConcurrentHashMap 的結(jié)構:
在 ConcurrentHashMap 中,當執(zhí)行 put 方法的時候,會需要加鎖來完成。我們通過代碼來解釋一下具體過程: 當我們 new 一個 ConcurrentHashMap 對象,并且執(zhí)行put操作的時候,首先會執(zhí)行 ConcurrentHashMap 類中的 put 方法,該方法源碼為:
/** ?*?Maps?the?specified?key?to?the?specified?value?in?this?table. ?*?Neither?the?key?nor?the?value?can?be?null. ?* ?*?<p>?The?value?can?be?retrieved?by?calling?the?<tt>get</tt>?method ?*?with?a?key?that?is?equal?to?the?original?key. ?* ?*?@param?key?key?with?which?the?specified?value?is?to?be?associated ?*?@param?value?value?to?be?associated?with?the?specified?key ?*?@return?the?previous?value?associated?with?<tt>key</tt>,?or ?*?????????<tt>null</tt>?if?there?was?no?mapping?for?<tt>key</tt> ?*?@throws?NullPointerException?if?the?specified?key?or?value?is?null ?*/ @SuppressWarnings("unchecked") public?V?put(K?key,?V?value)?{ ????Segment<K,V>?s; ????if?(value?==?null) ????????throw?new?NullPointerException(); ????int?hash?=?hash(key); ????int?j?=?(hash?>>>?segmentShift)?&?segmentMask; ????if?((s?=?(Segment<K,V>)UNSAFE.getObject??????????//?nonvolatile;?recheck ?????????(segments,?(j?<<?SSHIFT)?+?SBASE))?==?null)?//??in?ensureSegment ????????s?=?ensureSegment(j); ????return?s.put(key,?hash,?value,?false); }
我們通過注釋可以了解到,ConcurrentHashMap 不允許空值。該方法首先有一個 Segment 的引用 s,然后會通過 hash() 方法對 key 進行計算,得到哈希值;繼而通過調(diào)用 Segment 的 put(K key, int hash, V value, boolean onlyIfAbsent)方法進行存儲操作。該方法源碼為:
final?V?put(K?key,?int?hash,?V?value,?boolean?onlyIfAbsent)?{ ????//加鎖,這里是鎖定的Segment而不是整個ConcurrentHashMap ????HashEntry<K,V>?node?=?tryLock()???null?:scanAndLockForPut(key,?hash,?value); ????V?oldValue; ????try?{ ????????HashEntry<K,V>[]?tab?=?table; ????????//得到hash對應的table中的索引index ????????int?index?=?(tab.length?-?1)?&?hash; ????????//找到hash對應的是具體的哪個桶,也就是哪個HashEntry鏈表 ????????HashEntry<K,V>?first?=?entryAt(tab,?index); ????????for?(HashEntry<K,V>?e?=?first;;)?{ ????????????if?(e?!=?null)?{ ????????????????K?k; ????????????????if?((k?=?e.key)?==?key?|| ????????????????????(e.hash?==?hash?&&?key.equals(k)))?{ ????????????????????oldValue?=?e.value; ????????????????????if?(!onlyIfAbsent)?{ ????????????????????????e.value?=?value; ????????????????????????++modCount; ????????????????????} ????????????????????break; ????????????????} ????????????????e?=?e.next; ????????????} ????????????else?{ ????????????????if?(node?!=?null) ????????????????????node.setNext(first); ????????????????else ????????????????????node?=?new?HashEntry<K,V>(hash,?key,?value,?first); ????????????????int?c?=?count?+?1; ????????????????if?(c?>?threshold?&&?tab.length?<?MAXIMUM_CAPACITY) ????????????????????rehash(node); ????????????????else ????????????????????setEntryAt(tab,?index,?node); ????????????????++modCount; ????????????????count?=?c; ????????????????oldValue?=?null; ????????????????break; ????????????} ????????} ????}?finally?{ ????????//解鎖 ????????unlock(); ????} ????return?oldValue; }
關于該方法的某些關鍵步驟,在源碼上加上了注釋。
需要注意的是:加鎖操作是針對的 hash 值對應的某個 Segment,而不是整個 ConcurrentHashMap。因為 put 操作只是在這個 Segment 中完成,所以并不需要對整個 ConcurrentHashMap 加鎖。所以,此時,其他的線程也可以對另外的 Segment 進行 put 操作,因為雖然該 Segment 被鎖住了,但其他的 Segment 并沒有加鎖。同時,讀線程并不會因為本線程的加鎖而阻塞。
正是因為其內(nèi)部的結(jié)構以及機制,所以 ConcurrentHashMap 在并發(fā)訪問的性能上要比Hashtable和同步包裝之后的HashMap的性能提高很多。在理想狀態(tài)下,ConcurrentHashMap 可以支持 16 個線程執(zhí)行并發(fā)寫操作(如果并發(fā)級別設置為 16),及任意數(shù)量線程的讀操作。
在實際的應用中,散列表一般的應用場景是:除了少數(shù)插入操作和刪除操作外,絕大多數(shù)都是讀取操作,而且讀操作在大多數(shù)時候都是成功的。正是基于這個前提,ConcurrentHashMap 針對讀操作做了大量的優(yōu)化。通過 HashEntry 對象的不變性和用 volatile 型變量協(xié)調(diào)線程間的內(nèi)存可見性,使得 大多數(shù)時候,讀操作不需要加鎖就可以正確獲得值。這個特性使得 ConcurrentHashMap 的并發(fā)性能在分離鎖的基礎上又有了近一步的提高。
ConcurrentHashMap 是一個并發(fā)散列映射表的實現(xiàn),它允許完全并發(fā)的讀取,并且支持給定數(shù)量的并發(fā)更新。相比于 HashTable 和用同步包裝器包裝的 HashMap(Collections.synchronizedMap(new HashMap())),ConcurrentHashMap 擁有更高的并發(fā)性。在 HashTable 和由同步包裝器包裝的 HashMap 中,使用一個全局的鎖來同步不同線程間的并發(fā)訪問。同一時間點,只能有一個線程持有鎖,也就是說在同一時間點,只能有一個線程能訪問容器。這雖然保證多線程間的安全并發(fā)訪問,但同時也導致對容器的訪問變成串行化的了。
ConcurrentHashMap 的高并發(fā)性主要來自于三個方面:
用分離鎖實現(xiàn)多個線程間的更深層次的共享訪問。
用 HashEntery 對象的不變性來降低執(zhí)行讀操作的線程在遍歷鏈表期間對加鎖的需求。
通過對同一個 Volatile 變量的寫 / 讀訪問,協(xié)調(diào)不同線程間讀 / 寫操作的內(nèi)存可見性。
使用分離鎖,減小了請求 同一個鎖的頻率。
通過 HashEntery 對象的不變性及對同一個 Volatile 變量的讀 / 寫來協(xié)調(diào)內(nèi)存可見性,使得 讀操作大多數(shù)時候不需要加鎖就能成功獲取到需要的值。由于散列映射表在實際應用中大多數(shù)操作都是成功的 讀操作,所以 2 和 3 既可以減少請求同一個鎖的頻率,也可以有效減少持有鎖的時間。通過減小請求同一個鎖的頻率和盡量減少持有鎖的時間 ,使得 ConcurrentHashMap 的并發(fā)性相對于 HashTable 和用同步包裝器包裝的 HashMap有了質(zhì)的提高。
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强?。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用于生產(chǎn)者和消費者的場景,生產(chǎn)者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產(chǎn)者存放元素的容器,而消費者也只從容器里拿元素。
1)add(E e): 添加元素,如果BlockingQueue可以容納,則返回true,否則報異常
2)offer(E e): 添加元素,如果BlockingQueue可以容納,則返回true,否則返回false.
3)put(E e): 添加元素,如果BlockQueue沒有空間,則調(diào)用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續(xù).
4)poll(long timeout, TimeUnit timeUnit): 取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等timeout參數(shù)規(guī)定的時間,取不到時返回null
5)take(): 取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態(tài)直到Blocking有新的對象被加入為止
1)ArrayBlockingQueue: 有界的先入先出順序隊列,構造方法確定隊列的大小.
2)LinkedBlockingQueue: ×××的先入先出順序隊列,構造方法提供兩種,一種初始化隊列大小,隊列即有界;第二種默認構造方法,隊列×××(有界即Integer.MAX_VALUE)
4)SynchronousQueue: 特殊的BlockingQueue,沒有空間的隊列,即必須有取的方法阻塞在這里的時候才能放入元素。
3)PriorityBlockingQueue: 支持優(yōu)先級的阻塞隊列 ,存入對象必須實現(xiàn)Comparator接口 (需要注意的是 隊列不是在加入元素的時候進行排序,而是取出的時候,根據(jù)Comparator來決定優(yōu)先級最高的)。
BlockingQueue 實現(xiàn)主要用于生產(chǎn)者-使用者隊列,BlockingQueue 實現(xiàn)是線程安全的。所有排隊方法都可以使用內(nèi)部鎖或其他形式的并發(fā)控制來自動達到它們的目的
這是一個生產(chǎn)者-使用者場景的一個用例。注意,BlockingQueue 可以安全地與多個生產(chǎn)者和多個使用者一起使用 此用例來自jdk文檔
//這是一個生產(chǎn)者類 class?Producer?implements?Runnable?{ ???private?final?BlockingQueue?queue; ???Producer(BlockingQueue?q)?{? ???????queue?=?q;? ???} ???public?void?run()?{ ?????try?{ ???????while(true)?{? ???????????queue.put(produce());? ???????} ?????}?catch?(InterruptedException?ex)?{? ?????????...?handle?... ?????????} ???} ???Object?produce()?{? ???????...? ???} ?} ?//這是一個消費者類 ?class?Consumer?implements?Runnable?{ ???private?final?BlockingQueue?queue; ???Consumer(BlockingQueue?q)?{?queue?=?q;?} ???public?void?run()?{ ?????try?{ ???????while(true)?{? ???????????consume(queue.take());? ???????} ?????}?catch?(InterruptedException?ex)?{? ?????????...?handle?... ?????} ???} ???void?consume(Object?x)?{? ???????...? ???} ?} ?//這是實現(xiàn)類 ?class?Setup?{ ???void?main()?{ ?????//實例一個非阻塞隊列 ?????BlockingQueue?q?=?new?SomeQueueImplementation(); ?????//將隊列傳入兩個消費者和一個生產(chǎn)者中 ?????Producer?p?=?new?Producer(q); ?????Consumer?c1?=?new?Consumer(q); ?????Consumer?c2?=?new?Consumer(q); ?????new?Thread(p).start(); ?????new?Thread(c1).start(); ?????new?Thread(c2).start(); ???} ?}
合理利用線程池能夠帶來三個好處。第一:降低資源消耗。通過重復利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。第二:提高響應速度。當任務到達時,任務可以不需要等到線程創(chuàng)建就能立即執(zhí)行。第三:提高線程的可管理性。線程是稀缺資源,如果無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控。但是要做到合理的利用線程池,必須對其原理了如指掌。
我們可以通過ThreadPoolExecutor來創(chuàng)建一個線程池。
new??ThreadPoolExecutor(corePoolSize,?maximumPoolSize,?keepAliveTime,?milliseconds,runnableTaskQueue,?handler);
創(chuàng)建一個線程池需要輸入幾個參數(shù):
corePoolSize(線程池的基本大?。寒斕峤灰粋€任務到線程池時,線程池會創(chuàng)建一個線程來執(zhí)行任務,即使其他空閑的基本線程能夠執(zhí)行新任務也會創(chuàng)建線程,等到需要執(zhí)行的任務數(shù)大于線程池基本大小時就不再創(chuàng)建。如果調(diào)用了線程池的prestartAllCoreThreads方法,線程池會提前創(chuàng)建并啟動所有基本線程。
runnableTaskQueue(任務隊列):用于保存等待執(zhí)行的任務的阻塞隊列。 可以選擇以下幾個阻塞隊列。
ArrayBlockingQueue:是一個基于數(shù)組結(jié)構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
LinkedBlockingQueue:一個基于鏈表結(jié)構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊列。
SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊列。
PriorityBlockingQueue:一個具有優(yōu)先級的無限阻塞隊列。
maximumPoolSize(線程池最大大?。壕€程池允許創(chuàng)建的最大線程數(shù)。如果隊列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會再創(chuàng)建新的線程執(zhí)行任務。值得注意的是如果使用了×××的任務隊列這個參數(shù)就沒什么效果。
ThreadFactory:用于設置創(chuàng)建線程的工廠,可以通過線程工廠給每個創(chuàng)建出來的線程設置更有意義的名字。
RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。以下是JDK1.5提供的四種策略。
AbortPolicy:直接拋出異常。
CallerRunsPolicy:只用調(diào)用者所在線程來運行任務。
DiscardOldestPolicy:丟棄隊列里最近的一個任務,并執(zhí)行當前任務。
DiscardPolicy:不處理,丟棄掉。 當然也可以根據(jù)應用場景需要來實現(xiàn)RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務。
keepAliveTime(線程活動保持時間):線程池的工作線程空閑后,保持存活的時間。所以如果任務很多,并且每個任務執(zhí)行的時間比較短,可以調(diào)大這個時間,提高線程的利用率。
TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
我們可以使用execute提交的任務,但是execute方法沒有返回值,所以無法判斷任務是否被線程池執(zhí)行成功。通過以下代碼可知execute方法輸入的任務是一個Runnable類的實例。
threadsPool.execute(new?Runnable()?{ ????@Override ????public?void?run()?{ ????????//?TODO?Auto-generated?method?stub ????} });
我們也可以使用submit 方法來提交任務,它會返回一個future,那么我們可以通過這個future來判斷任務是否執(zhí)行成功,通過future的get方法來獲取返回值,get方法會阻塞住直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間后立即返回,這時有可能任務沒有執(zhí)行完。
Future<Object>?future?=?executor.submit(harReturnValuetask);try?{?????Object?s?=?future.get(); }?catch?(InterruptedException?e)?{????//?處理中斷異常}?catch?(ExecutionException?e)?{????//?處理無法執(zhí)行任務異常}?finally?{????//?關閉線程池 ????executor.shutdown(); }
我們可以通過調(diào)用線程池的shutdown或shutdownNow方法來關閉線程池,它們的原理是遍歷線程池中的工作線程,然后逐個調(diào)用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。但是它們存在一定的區(qū)別,shutdownNow首先將線程池的狀態(tài)設置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務的線程,并返回等待執(zhí)行任務的列表,而shutdown只是將線程池的狀態(tài)設置成SHUTDOWN狀態(tài),然后中斷所有沒有正在執(zhí)行任務的線程。
只要調(diào)用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當所有的任務都已關閉后,才表示線程池關閉成功,這時調(diào)用isTerminaed方法會返回true。至于我們應該調(diào)用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調(diào)用shutdown來關閉線程池,如果任務不一定要執(zhí)行完,則可以調(diào)用shutdownNow。
流程分析:線程池的主要工作流程如下圖:
從上圖我們可以看出,當提交一個新任務到線程池時,線程池的處理流程如下:
首先線程池判斷基本線程池是否已滿?沒滿,創(chuàng)建一個工作線程來執(zhí)行任務。滿了,則進入下個流程。
其次線程池判斷工作隊列是否已滿?沒滿,則將新提交的任務存儲在工作隊列里。滿了,則進入下個流程。
最后線程池判斷整個線程池是否已滿?沒滿,則創(chuàng)建一個新的工作線程來執(zhí)行任務,滿了,則交給飽和策略來處理這個任務。
上面的流程分析讓我們很直觀的了解了線程池的工作原理,讓我們再通過源代碼來看看是如何實現(xiàn)的。線程池執(zhí)行任務的方法如下:
public?void?execute(Runnable?command)?{????if?(command?==?null)???????throw?new?NullPointerException();????//如果線程數(shù)小于基本線程數(shù),則創(chuàng)建線程并執(zhí)行當前任務? ????if?(poolSize?>=?corePoolSize?||?!addIfUnderCorePoolSize(command))?{????//如線程數(shù)大于等于基本線程數(shù)或線程創(chuàng)建失敗,則將當前任務放到工作隊列中。 ????????if?(runState?==?RUNNING?&&?workQueue.offer(command))?{????????????if?(runState?!=?RUNNING?||?poolSize?==?0) ??????????????????????ensureQueuedTaskHandled(command); ????????}????//如果線程池不處于運行中或任務無法放入隊列,并且當前線程數(shù)量小于最大允許的線程數(shù)量,則創(chuàng)建一個線程執(zhí)行任務。????????else?if?(!addIfUnderMaximumPoolSize(command))????????//拋出RejectedExecutionException異常 ????????????reject(command);?//?is?shutdown?or?saturated ????} }
工作線程。線程池創(chuàng)建線程時,會將線程封裝成工作線程Worker,Worker在執(zhí)行完任務后,還會無限循環(huán)獲取工作隊列里的任務來執(zhí)行。我們可以從Worker的run方法里看到這點:
public?void?run()?{?????try?{ ???????????Runnable?task?=?firstTask; ???????????firstTask?=?null;????????????while?(task?!=?null?||?(task?=?getTask())?!=?null)?{ ????????????????????runTask(task); ????????????????????task?=?null; ????????????} ??????}?finally?{ ?????????????workerDone(this); ??????} }
要想合理的配置線程池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:
任務的性質(zhì):CPU密集型任務,IO密集型任務和混合型任務。
任務的優(yōu)先級:高,中和低。
任務的執(zhí)行時間:長,中和短。
任務的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。
任務性質(zhì)不同的任務可以用不同規(guī)模的線程池分開處理。CPU密集型任務配置盡可能小的線程,如配置Ncpu+1個線程的線程池。IO密集型任務則由于線程并不是一直在執(zhí)行任務,則配置盡可能多的線程,如2*Ncpu。混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率,如果這兩個任務執(zhí)行時間相差太大,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數(shù)。
優(yōu)先級不同的任務可以使用優(yōu)先級隊列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高的任務先得到執(zhí)行,需要注意的是如果一直有優(yōu)先級高的任務提交到隊列里,那么優(yōu)先級低的任務可能永遠不能執(zhí)行。
執(zhí)行時間不同的任務可以交給不同規(guī)模的線程池來處理,或者也可以使用優(yōu)先級隊列,讓執(zhí)行時間短的任務先執(zhí)行。
依賴數(shù)據(jù)庫連接池的任務,因為線程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果,如果等待的時間越長CPU空閑時間就越長,那么線程數(shù)應該設置越大,這樣才能更好的利用CPU。
建議使用有界隊列,有界隊列能增加系統(tǒng)的穩(wěn)定性和預警能力,可以根據(jù)需要設大一點,比如幾千。有一次我們組使用的后臺任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,通過排查發(fā)現(xiàn)是數(shù)據(jù)庫出現(xiàn)了問題,導致執(zhí)行SQL變得非常緩慢,因為后臺任務線程池里的任務全是需要向數(shù)據(jù)庫查詢和插入數(shù)據(jù)的,所以導致線程池里的工作線程全部阻塞住,任務積壓在線程池里。如果當時我們設置成×××隊列,線程池的隊列就會越來越多,有可能會撐滿內(nèi)存,導致整個系統(tǒng)不可用,而不只是后臺任務出現(xiàn)問題。當然我們的系統(tǒng)所有的任務是用的單獨的服務器部署的,而我們使用不同規(guī)模的線程池跑不同類型的任務,但是出現(xiàn)這樣問題時也會影響到其他任務。
通過線程池提供的參數(shù)進行監(jiān)控。線程池里有一些屬性在監(jiān)控線程池的時候可以使用
taskCount:線程池需要執(zhí)行的任務數(shù)量。
completedTaskCount:線程池在運行過程中已完成的任務數(shù)量。小于或等于taskCount。
largestPoolSize:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個數(shù)據(jù)可以知道線程池是否滿過。如等于線程池的最大大小,則表示線程池曾經(jīng)滿了。
getPoolSize:線程池的線程數(shù)量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不+getActiveCount:獲取活動的線程數(shù)。
通過擴展線程池進行監(jiān)控。通過繼承線程池并重寫線程池的beforeExecute,afterExecute和terminated方法,我們可以在任務執(zhí)行前,執(zhí)行后和線程池關閉前干一些事情。如監(jiān)控任務的平均執(zhí)行時間,最大執(zhí)行時間和最小執(zhí)行時間等。這幾個方法在線程池里是空方法。如:
protected?void?beforeExecute(Thread?t,?Runnable?r)?{?}
網(wǎng)頁名稱:43道多線程面試題,附帶答案(二)
URL標題:http://m.rwnh.cn/article10/jdihdo.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供企業(yè)網(wǎng)站制作、小程序開發(fā)、軟件開發(fā)、網(wǎng)站設計、App開發(fā)、手機網(wǎng)站建設
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)