(1)Phaser是什么?
創(chuàng)新互聯(lián)建站長(zhǎng)期為上1000家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺(tái),與合作伙伴共同營(yíng)造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為平魯企業(yè)提供專業(yè)的做網(wǎng)站、成都做網(wǎng)站,平魯網(wǎng)站改版等技術(shù)服務(wù)。擁有十載豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。(2)Phaser具有哪些特性?
(3)Phaser相對(duì)于CyclicBarrier和CountDownLatch的優(yōu)勢(shì)?
Phaser,翻譯為階段,它適用于這樣一種場(chǎng)景,一個(gè)大任務(wù)可以分為多個(gè)階段完成,且每個(gè)階段的任務(wù)可以多個(gè)線程并發(fā)執(zhí)行,但是必須上一個(gè)階段的任務(wù)都完成了才可以執(zhí)行下一個(gè)階段的任務(wù)。
這種場(chǎng)景雖然使用CyclicBarrier或者CountryDownLatch也可以實(shí)現(xiàn),但是要復(fù)雜的多。首先,具體需要多少個(gè)階段是可能會(huì)變的,其次,每個(gè)階段的任務(wù)數(shù)也可能會(huì)變的。相比于CyclicBarrier和CountDownLatch,Phaser更加靈活更加方便。
下面我們看一個(gè)最簡(jiǎn)單的使用案例:
public class PhaserTest {
public static final int PARTIES = 3;
public static final int PHASES = 4;
public static void main(String[] args) {
Phaser phaser = new Phaser(PARTIES) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
// 本文由公從號(hào)“彤哥讀源碼”原創(chuàng)
System.out.println("=======phase: " + phase + " finished=============");
return super.onAdvance(phase, registeredParties);
}
};
for (int i = 0; i < PARTIES; i++) {
new Thread(()->{
for (int j = 0; j < PHASES; j++) {
System.out.println(String.format("%s: phase: %d", Thread.currentThread().getName(), j));
phaser.arriveAndAwaitAdvance();
}
}, "Thread " + i).start();
}
}
}
這里我們定義一個(gè)需要4個(gè)階段完成的大任務(wù),每個(gè)階段需要3個(gè)小任務(wù),針對(duì)這些小任務(wù),我們分別起3個(gè)線程來執(zhí)行這些小任務(wù),查看輸出結(jié)果為:
Thread 0: phase: 0
Thread 2: phase: 0
Thread 1: phase: 0
=======phase: 0 finished=============
Thread 2: phase: 1
Thread 0: phase: 1
Thread 1: phase: 1
=======phase: 1 finished=============
Thread 1: phase: 2
Thread 0: phase: 2
Thread 2: phase: 2
=======phase: 2 finished=============
Thread 0: phase: 3
Thread 2: phase: 3
Thread 1: phase: 3
=======phase: 3 finished=============
可以看到,每個(gè)階段都是三個(gè)線程都完成了才進(jìn)入下一個(gè)階段。這是怎么實(shí)現(xiàn)的呢,讓我們一起來學(xué)習(xí)吧。
根據(jù)我們前面學(xué)習(xí)AQS的原理,大概猜測(cè)一下Phaser的實(shí)現(xiàn)原理。
首先,需要存儲(chǔ)當(dāng)前階段phase、當(dāng)前階段的任務(wù)數(shù)(參與者)parties、未完成參與者的數(shù)量,這三個(gè)變量我們可以放在一個(gè)變量state中存儲(chǔ)。
其次,需要一個(gè)隊(duì)列存儲(chǔ)先完成的參與者,當(dāng)最后一個(gè)參與者完成任務(wù)時(shí),需要喚醒隊(duì)列中的參與者。
嗯,差不多就是這樣子。
結(jié)合上面的案例帶入:
初始時(shí)當(dāng)前階段為0,參與者數(shù)為3個(gè),未完成參與者數(shù)為3;
第一個(gè)線程執(zhí)行到phaser.arriveAndAwaitAdvance();
時(shí)進(jìn)入隊(duì)列;
第二個(gè)線程執(zhí)行到phaser.arriveAndAwaitAdvance();
時(shí)進(jìn)入隊(duì)列;
第三個(gè)線程執(zhí)行到phaser.arriveAndAwaitAdvance();
時(shí)先執(zhí)行這個(gè)階段的總結(jié)onAdvance()
,再喚醒前面兩個(gè)線程繼續(xù)執(zhí)行下一個(gè)階段的任務(wù)。
嗯,整體能說得通,至于是不是這樣呢,讓我們一起來看源碼吧。
static final class QNode implements ForkJoinPool.ManagedBlocker {
final Phaser phaser;
final int phase;
final boolean interruptible;
final boolean timed;
boolean wasInterrupted;
long nanos;
final long deadline;
volatile Thread thread; // nulled to cancel wait
QNode next;
QNode(Phaser phaser, int phase, boolean interruptible,
boolean timed, long nanos) {
this.phaser = phaser;
this.phase = phase;
this.interruptible = interruptible;
this.nanos = nanos;
this.timed = timed;
this.deadline = timed ? System.nanoTime() + nanos : 0L;
thread = Thread.currentThread();
}
}
先完成的參與者放入隊(duì)列中的節(jié)點(diǎn),這里我們只需要關(guān)注thread
和next
兩個(gè)屬性即可,很明顯這是一個(gè)單鏈表,存儲(chǔ)著入隊(duì)的線程。
// 狀態(tài)變量,用于存儲(chǔ)當(dāng)前階段phase、參與者數(shù)parties、未完成的參與者數(shù)unarrived_count
private volatile long state;
// 最多可以有多少個(gè)參與者,即每個(gè)階段最多有多少個(gè)任務(wù)
private static final int MAX_PARTIES = 0xffff;
// 最多可以有多少階段
private static final int MAX_PHASE = Integer.MAX_VALUE;
// 參與者數(shù)量的偏移量
private static final int PARTIES_SHIFT = 16;
// 當(dāng)前階段的偏移量
private static final int PHASE_SHIFT = 32;
// 未完成的參與者數(shù)的掩碼,低16位
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
// 參與者數(shù),中間16位
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
// counts的掩碼,counts等于參與者數(shù)和未完成的參與者數(shù)的'|'操作
private static final long COUNTS_MASK = 0xffffffffL;
private static final long TERMINATION_BIT = 1L << 63;
// 一次一個(gè)參與者完成
private static final int ONE_ARRIVAL = 1;
// 增加減少參與者時(shí)使用
private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
// 減少參與者時(shí)使用
private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
// 沒有參與者時(shí)使用
private static final int EMPTY = 1;
// 用于求未完成參與者數(shù)量
private static int unarrivedOf(long s) {
int counts = (int)s;
return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}
// 用于求參與者數(shù)量(中間16位),注意int的位置
private static int partiesOf(long s) {
return (int)s >>> PARTIES_SHIFT;
}
// 用于求階段數(shù)(高32位),注意int的位置
private static int phaseOf(long s) {
return (int)(s >>> PHASE_SHIFT);
}
// 已完成參與者的數(shù)量
private static int arrivedOf(long s) {
int counts = (int)s; // 低32位
return (counts == EMPTY) ? 0 :
(counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}
// 用于存儲(chǔ)已完成參與者所在的線程,根據(jù)當(dāng)前階段的奇偶性選擇不同的隊(duì)列
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;
主要屬性為state
和evenQ
及oddQ
:
(1)state,狀態(tài)變量,高32位存儲(chǔ)當(dāng)前階段phase,中間16位存儲(chǔ)參與者的數(shù)量,低16位存儲(chǔ)未完成參與者的數(shù)量,本文由公從號(hào)“彤哥讀源碼”原創(chuàng);
(2)evenQ和oddQ,已完成的參與者存儲(chǔ)的隊(duì)列,當(dāng)最后一個(gè)參與者完成任務(wù)后喚醒隊(duì)列中的參與者繼續(xù)執(zhí)行下一個(gè)階段的任務(wù),或者結(jié)束任務(wù)。
public Phaser() {
this(null, 0);
}
public Phaser(int parties) {
this(null, parties);
}
public Phaser(Phaser parent) {
this(parent, 0);
}
public Phaser(Phaser parent, int parties) {
if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
int phase = 0;
this.parent = parent;
if (parent != null) {
final Phaser root = parent.root;
this.root = root;
this.evenQ = root.evenQ;
this.oddQ = root.oddQ;
if (parties != 0)
phase = parent.doRegister(1);
}
else {
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
// 狀態(tài)變量state的存儲(chǔ)分為三段
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}
構(gòu)造函數(shù)中還有一個(gè)parent和root,這是用來構(gòu)造多層級(jí)階段的,不在本文的討論范圍之內(nèi),忽略之。
重點(diǎn)還是看state的賦值方式,高32位存儲(chǔ)當(dāng)前階段phase,中間16位存儲(chǔ)參與者的數(shù)量,低16位存儲(chǔ)未完成參與者的數(shù)量。
下面我們一起來看看幾個(gè)主要方法的源碼:
注冊(cè)一個(gè)參與者,如果調(diào)用該方法時(shí),onAdvance()方法正在執(zhí)行,則該方法等待其執(zhí)行完畢。
public int register() {
return doRegister(1);
}
private int doRegister(int registrations) {
// state應(yīng)該加的值,注意這里是相當(dāng)于同時(shí)增加parties和unarrived
long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent;
int phase;
for (;;) {
// state的值
long s = (parent == null) ? state : reconcileState();
// state的低32位,也就是parties和unarrived的值
int counts = (int)s;
// parties的值
int parties = counts >>> PARTIES_SHIFT;
// unarrived的值
int unarrived = counts & UNARRIVED_MASK;
// 檢查是否溢出
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
// 當(dāng)前階段phase
phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
break;
// 不是第一個(gè)參與者
if (counts != EMPTY) { // not 1st registration
if (parent == null || reconcileState() == s) {
// unarrived等于0說明當(dāng)前階段正在執(zhí)行onAdvance()方法,等待其執(zhí)行完畢
if (unarrived == 0) // wait out advance
root.internalAwaitAdvance(phase, null);
// 否則就修改state的值,增加adjust,如果成功就跳出循環(huán)
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
s, s + adjust))
break;
}
}
// 是第一個(gè)參與者
else if (parent == null) { // 1st root registration
// 計(jì)算state的值
long next = ((long)phase << PHASE_SHIFT) | adjust;
// 修改state的值,如果成功就跳出循環(huán)
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
}
else {
// 多層級(jí)階段的處理方式
synchronized (this) { // 1st sub registration
if (state == s) { // recheck under lock
phase = parent.doRegister(1);
if (phase < 0)
break;
// finish registration whenever parent registration
// succeeded, even when racing with termination,
// since these are part of the same "transaction".
while (!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
((long)phase << PHASE_SHIFT) | adjust)) {
s = state;
phase = (int)(root.state >>> PHASE_SHIFT);
// assert (int)s == EMPTY;
}
break;
}
}
}
}
return phase;
}
// 等待onAdvance()方法執(zhí)行完畢
// 原理是先自旋一定次數(shù),如果進(jìn)入下一個(gè)階段,這個(gè)方法直接就返回了,
// 如果自旋一定次數(shù)后還沒有進(jìn)入下一個(gè)階段,則當(dāng)前線程入隊(duì)列,等待onAdvance()執(zhí)行完畢喚醒
private int internalAwaitAdvance(int phase, QNode node) {
// 保證隊(duì)列為空
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
// 自旋的次數(shù)
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
// 檢查當(dāng)前階段是否變化,如果變化了說明進(jìn)入下一個(gè)階段了,這時(shí)候就沒有必要自旋了
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
// 如果node為空,注冊(cè)的時(shí)候傳入的為空
if (node == null) { // spinning in noninterruptible mode
// 未完成的參與者數(shù)量
int unarrived = (int)s & UNARRIVED_MASK;
// unarrived有變化,增加自旋次數(shù)
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
// 自旋次數(shù)完了,則新建一個(gè)節(jié)點(diǎn)
if (interrupted || --spins < 0) { // need node to record intr
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
}
else if (node.isReleasable()) // done or aborted
break;
else if (!queued) { // push onto queue
// 節(jié)點(diǎn)入隊(duì)列
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
// 當(dāng)前線程進(jìn)入阻塞狀態(tài),跟調(diào)用LockSupport.park()一樣,等待被喚醒
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true;
}
}
}
// 到這里說明節(jié)點(diǎn)所在線程已經(jīng)被喚醒了
if (node != null) {
// 置空節(jié)點(diǎn)中的線程
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
// 喚醒當(dāng)前階段阻塞著的線程
releaseWaiters(phase);
return p;
}
增加一個(gè)參與者總體的邏輯為:
(1)增加一個(gè)參與者,需要同時(shí)增加parties和unarrived兩個(gè)數(shù)值,也就是state的中16位和低16位;
(2)如果是第一個(gè)參與者,則嘗試原子更新state的值,如果成功了就退出;
(3)如果不是第一個(gè)參與者,則檢查是不是在執(zhí)行onAdvance(),如果是等待onAdvance()執(zhí)行完成,如果否則嘗試原子更新state的值,直到成功退出;
(4)等待onAdvance()完成是采用先自旋后進(jìn)入隊(duì)列排隊(duì)的方式等待,減少線程上下文切換;
當(dāng)前線程當(dāng)前階段執(zhí)行完畢,等待其它線程完成當(dāng)前階段。
如果當(dāng)前線程是該階段最后一個(gè)到達(dá)的,則當(dāng)前線程會(huì)執(zhí)行onAdvance()方法,并喚醒其它線程進(jìn)入下一個(gè)階段。
public int arriveAndAwaitAdvance() {
// Specialization of doArrive+awaitAdvance eliminating some reads/paths
final Phaser root = this.root;
for (;;) {
// state的值
long s = (root == this) ? state : reconcileState();
// 當(dāng)前階段
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
// parties和unarrived的值
int counts = (int)s;
// unarrived的值(state的低16位)
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
// 修改state的值
if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
s -= ONE_ARRIVAL)) {
// 如果不是最后一個(gè)到達(dá)的,則調(diào)用internalAwaitAdvance()方法自旋或進(jìn)入隊(duì)列等待
if (unarrived > 1)
// 這里是直接返回了,internalAwaitAdvance()方法的源碼見register()方法解析
return root.internalAwaitAdvance(phase, null);
// 到這里說明是最后一個(gè)到達(dá)的參與者
if (root != this)
return parent.arriveAndAwaitAdvance();
// n只保留了state中parties的部分,也就是中16位
long n = s & PARTIES_MASK; // base of next state
// parties的值,即下一次需要到達(dá)的參與者數(shù)量
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
// 執(zhí)行onAdvance()方法,返回true表示下一階段參與者數(shù)量為0了,也就是結(jié)束了
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
// n 加上unarrived的值
n |= nextUnarrived;
// 下一個(gè)階段等待當(dāng)前階段加1
int nextPhase = (phase + 1) & MAX_PHASE;
// n 加上下一階段的值
n |= (long)nextPhase << PHASE_SHIFT;
// 修改state的值為n
if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
return (int)(state >>> PHASE_SHIFT); // terminated
// 喚醒其它參與者并進(jìn)入下一個(gè)階段
releaseWaiters(phase);
// 返回下一階段的值
return nextPhase;
}
}
}
arriveAndAwaitAdvance的大致邏輯為:
(1)修改state中unarrived部分的值減1;
(2)如果不是最后一個(gè)到達(dá)的,則調(diào)用internalAwaitAdvance()方法自旋或排隊(duì)等待;
(3)如果是最后一個(gè)到達(dá)的,則調(diào)用onAdvance()方法,然后修改state的值為下一階段對(duì)應(yīng)的值,并喚醒其它等待的線程;
(4)返回下一階段的值;
(1)Phaser適用于多階段多任務(wù)的場(chǎng)景,每個(gè)階段的任務(wù)都可以控制得很細(xì);
(2)Phaser內(nèi)部使用state變量及隊(duì)列實(shí)現(xiàn)整個(gè)邏輯,本文由公從號(hào)“彤哥讀源碼”原創(chuàng);
(3)state的高32位存儲(chǔ)當(dāng)前階段phase,中16位存儲(chǔ)當(dāng)前階段參與者(任務(wù))的數(shù)量parties,低16位存儲(chǔ)未完成參與者的數(shù)量unarrived;
(4)隊(duì)列會(huì)根據(jù)當(dāng)前階段的奇偶性選擇不同的隊(duì)列;
(5)當(dāng)不是最后一個(gè)參與者到達(dá)時(shí),會(huì)自旋或者進(jìn)入隊(duì)列排隊(duì)來等待所有參與者完成任務(wù);
(6)當(dāng)最后一個(gè)參與者完成任務(wù)時(shí),會(huì)喚醒隊(duì)列中的線程并進(jìn)入下一個(gè)階段;
Phaser相對(duì)于CyclicBarrier和CountDownLatch的優(yōu)勢(shì)?
答:優(yōu)勢(shì)主要有兩點(diǎn):
(1)Phaser可以完成多階段,而一個(gè)CyclicBarrier或者CountDownLatch一般只能控制一到兩個(gè)階段的任務(wù);
(2)Phaser每個(gè)階段的任務(wù)數(shù)量可以控制,而一個(gè)CyclicBarrier或者CountDownLatch任務(wù)數(shù)量一旦確定不可修改。
創(chuàng)新互聯(lián)www.cdcxhl.cn,專業(yè)提供香港、美國(guó)云服務(wù)器,動(dòng)態(tài)BGP最優(yōu)骨干路由自動(dòng)選擇,持續(xù)穩(wěn)定高效的網(wǎng)絡(luò)助力業(yè)務(wù)部署。公司持有工信部辦法的idc、isp許可證, 機(jī)房獨(dú)有T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確進(jìn)行流量調(diào)度,確保服務(wù)器高可用性。佳節(jié)活動(dòng)現(xiàn)已開啟,新人活動(dòng)云服務(wù)器買多久送多久。
網(wǎng)頁(yè)題目:死磕java同步系列之Phaser源碼解析-創(chuàng)新互聯(lián)
網(wǎng)頁(yè)地址:http://m.rwnh.cn/article12/cecsgc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供手機(jī)網(wǎng)站建設(shè)、網(wǎng)站維護(hù)、域名注冊(cè)、網(wǎng)站營(yíng)銷、外貿(mào)建站、網(wǎng)站改版
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容