中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

zookeeper(7)源碼分析-集群Leader選舉FastLeaderElection

一、Election接口

選舉的父接口為Election,其定義了lookForLeader和shutdown兩個方法,lookForLeader表示尋找Leader,shutdown則表示關閉,如關閉服務端之間的連接。

創(chuàng)新互聯(lián)公司專業(yè)為企業(yè)提供同仁網(wǎng)站建設、同仁做網(wǎng)站、同仁網(wǎng)站設計、同仁網(wǎng)站制作等企業(yè)網(wǎng)站建設、網(wǎng)頁設計與制作、同仁企業(yè)網(wǎng)站模板建站服務,十余年同仁做網(wǎng)站經(jīng)驗,不只是建網(wǎng)站,更提供有價值的思路和整體網(wǎng)絡服務。

zookeeper(7)源碼分析-集群Leader選舉FastLeaderElection

Election接口的實現(xiàn)類

zookeeper(7)源碼分析-集群Leader選舉FastLeaderElection

1、AuthFastLeaderElection,同F(xiàn)astLeaderElection算法基本一致,只是在消息中加入了認證信息,在3.4.0版本后已經(jīng)被棄用。
2、FastLeaderElection,是標準的fast paxos算法的實現(xiàn),基于TCP協(xié)議進行選舉。
3、LeaderElection,在3.4.0版本后已經(jīng)被棄用。

FastLeaderElection分析

1、重要內(nèi)部類

1、Notification類

Notification表示收到的選舉投票信息(其他服務器發(fā)來的選舉投票信息),其包含了投票中被選舉者的服務器sid、zxid、選舉周期epoch,選舉者的服務器sid,狀態(tài),選周期epoch

static public class Notification {
        /*
         * Format version, introduced in 3.4.6
         */

        public final static int CURRENTVERSION = 0x2;
        int version;

        /*
         * Proposed leader 被選舉者的服務器id
         */
        long leader;

        /*
         * zxid of the proposed leader 被選舉者的事務zxid
         */
        long zxid;

        /*
         * Epoch 選舉者的選舉周期
         */
        long electionEpoch;

        /*
         * current state of sender 選舉者的節(jié)點狀態(tài)
         * 總共有4中
         * LOOKING 尋找leader狀態(tài)
         * FOLLOWING 跟隨者
         * LEADING leader狀態(tài)
                 *OBSERVING 不參與操作和選舉
         */
        QuorumPeer.ServerState state;

        /*
         * Address of sender 選舉者的服務器id
         */
        long sid;

        QuorumVerifier qv;
        /*
         * epoch of the proposed leader 被選舉者的選舉周期
         */
        long peerEpoch;
    }

2、ToSend類

ToSend表示發(fā)送給其他服務器的選舉投票信息,也包含了被選舉者的sid、zxid、選舉周期等信息。

static public class ToSend {
        static enum mType {crequest, challenge, notification, ack}

        ToSend(mType type,
                long leader,
                long zxid,
                long electionEpoch,
                ServerState state,
                long sid,
                long peerEpoch,
                byte[] configData) {

            this.leader = leader;
            this.zxid = zxid;
            this.electionEpoch = electionEpoch;
            this.state = state;
            this.sid = sid;
            this.peerEpoch = peerEpoch;
            this.configData = configData;
        }

        /*
         * Proposed leader in the case of notification 被推舉的leader的sid
         */
        long leader;

        /*
         * id contains the tag for acks, and zxid for notifications 
         * 被推舉的leader的最大事務id
         */
        long zxid;

        /*
         * Epoch 選舉者的選舉周期
         */
        long electionEpoch;

        /*
         * Current state; 選舉者的節(jié)點狀態(tài)
         */
        QuorumPeer.ServerState state;

        /*
         * Address of recipient選舉者的服務器sid
         */
        long sid;

        /*
         * Used to send a QuorumVerifier (configuration info)
         */
        byte[] configData = dummyData;

        /*
         * Leader epoch 被選舉者的選舉周期
         */
        long peerEpoch;
    }

3、Messenger類

3.1、內(nèi)部類

Messenger包含了WorkerReceiver和WorkerSender兩個內(nèi)部類
1、WorkerReceiver繼承了ZooKeeperThread,是選票接收器。
2、其會不斷地從QuorumCnxManager中的recvQueue獲取其他服務器發(fā)來的選舉消息,類型是Message

WorkerReceiver(QuorumCnxManager manager) {
                super("WorkerReceiver");
                this.stop = false;
                this.manager = manager;
            }

//從QuorumCnxManager中的recvQueue中獲取投票消息
                        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                        if(response == null) continue;                      

并將其轉(zhuǎn)換成一個選票消息Notification,然后保存到recvqueue中,在選票接收過程中,如果發(fā)現(xiàn)該外部選票的選舉輪次小于當前服務器的,那么忽略該外部投票,同時立即發(fā)送自己的內(nèi)部投票,把投票信息組裝成ToSend加入到sendqueue隊列。

ToSend notmsg = new ToSend(ToSend.mType.notification,
                                            v.getId(),
                                            v.getZxid(),
                                            logicalclock.get(),
                                            self.getPeerState(),
                                            response.sid,
                                            v.getPeerEpoch(),
                                            qv.toString().getBytes());
                                    sendqueue.offer(notmsg);

3、WorkerSender也繼承了ZooKeeperThread,為選票發(fā)送器,其會不斷地從sendqueue中獲取待發(fā)送的選票

ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);

并將其傳遞到底層QuorumCnxManager中,其過程是將FastLeaderElection的ToSend轉(zhuǎn)化為QuorumCnxManager的Message。

void process(ToSend m) {
                ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
                                                    m.leader,
                                                    m.zxid,
                                                    m.electionEpoch,
                                                    m.peerEpoch,
                                                    m.configData);

                manager.toSend(m.sid, requestBuffer);

            }
3.2、Messenger構造函數(shù)
 Messenger(QuorumCnxManager manager) {
                        //創(chuàng)建WorkerSender
            this.ws = new WorkerSender(manager);
                        // 新創(chuàng)建線程
            this.wsThread = new Thread(this.ws,
                    "WorkerSender[myid=" + self.getId() + "]");
                         // 設置為守護線程
            this.wsThread.setDaemon(true);
                         // 創(chuàng)建WorkerReceiver
            this.wr = new WorkerReceiver(manager);
                        // 新創(chuàng)建線程
            this.wrThread = new Thread(this.wr,
                    "WorkerReceiver[myid=" + self.getId() + "]");
                         // 設置為守護線程             
            this.wrThread.setDaemon(true);
        }

2、FastLeaderElection類屬性

    // 完成Leader選舉之后需要等待時長
    final static int finalizeWait = 200;
    // 兩個連續(xù)通知檢查之間的最大時長
    final static int maxNotificationInterval = 60000;       
        // 管理服務器之間的連接
    QuorumCnxManager manager;
        // 選票發(fā)送隊列,用于保存待發(fā)送的選票
    LinkedBlockingQueue<ToSend> sendqueue;

    // 選票接收隊列,用于保存接收到的外部投票
    LinkedBlockingQueue<Notification> recvqueue;
        //投票者
        QuorumPeer self;
    Messenger messenger;
        //邏輯始終,當前選舉周期
    AtomicLong logicalclock = new AtomicLong(); /* Election instance */
        //被選舉者服務器sid
    long proposedLeader;
        //被選舉者服務器zxid
    long proposedZxid;
        //被選舉者服務器選舉周期
    long proposedEpoch;

FastLeaderElection核心方法

1、發(fā)送選票

其會遍歷所有的參與者投票集合,然后將自己的選票信息發(fā)送至上述所有的投票者集合,其并非同步發(fā)送,而是將ToSend消息放置于sendqueue中,之后由WorkerSender進行發(fā)送

private void sendNotifications() {
        for (long sid : self.getCurrentAndNextConfigVoters()) {
            QuorumVerifier qv = self.getQuorumVerifier();
                         // 構造發(fā)送消息
            ToSend notmsg = new ToSend(ToSend.mType.notification,
                    proposedLeader,
                    proposedZxid,
                    logicalclock.get(),
                    QuorumPeer.ServerState.LOOKING,
                    sid,
                    proposedEpoch, qv.toString().getBytes());
            if(LOG.isDebugEnabled()){
                LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                      Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                      " (n.round), " + sid + " (recipient), " + self.getId() +
                      " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
            }
                        // 將發(fā)送消息放置于隊列
            sendqueue.offer(notmsg);
        }
    }

2、totalOrderPredicate函數(shù)

該函數(shù)將接收的投票與自身投票進行PK,查看是否消息中包含的服務器id是否更優(yōu),其按照epoch、zxid、id的優(yōu)先級進行PK。

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
        if(self.getQuorumVerifier().getWeight(newId) == 0){
            return false;
        }

        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */
        // 1. 判斷消息里的epoch是不是比當前的大,如果大則消息中id對應的服務器就是leader
        // 2. 如果epoch相等則判斷zxid,如果消息里的zxid大,則消息中id對應的服務器就是leader
        // 3. 如果前面兩個都相等那就比較服務器id,如果大,則其就是leader
        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
    }

3、termPredicate方法

該函數(shù)用于判斷Leader選舉是否結束,即是否有一半以上的服務器選出了相同的Leader,其過程是將收到的選票與當前選票進行對比,選票相同的放入同一個集合,之后判斷選票相同的集合是否超過了半數(shù)。

protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
        SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
        voteSet.addQuorumVerifier(self.getQuorumVerifier());
        if (self.getLastSeenQuorumVerifier() != null
                && self.getLastSeenQuorumVerifier().getVersion() > self
                        .getQuorumVerifier().getVersion()) {
            voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
        }

        /*
         * First make the views consistent. Sometimes peers will have different
         * zxids for a server depending on timing.
         */
        for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
            if (vote.equals(entry.getValue())) {
                voteSet.addAck(entry.getKey());
            }
        }

        return voteSet.hasAllQuorums();
    }

4、lookForLeader函數(shù)

1、該函數(shù)用于開始新一輪的Leader選舉,其首先會將邏輯時鐘自增,然后更新本服務器的選票信息(初始化選票),之后將選票信息放入sendqueue等待發(fā)送給其他服務器
zookeeper(7)源碼分析-集群Leader選舉FastLeaderElection

2、每臺服務器會不斷地從recvqueue隊列中獲取外部選票,處理外部選票。

Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

3、判斷選舉輪次,選票PK,更新選票

if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();

4、歸檔選票,統(tǒng)計選票,返回最后的選票

/*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            if (n == null) {
                                 //設置leading狀態(tài),否則設置為flowing
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                                 //最終選票              
                                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, logicalclock.get(), 
                                        proposedEpoch);
                                // 清空recvqueue隊列的選票             
                                leaveInstance(endVote);
                                return endVote;
                            }

總結

FastLeaderElection的算法,其是ZooKeeper的核心部分,比較復雜,梳理了一下大概的流程,好多細節(jié)沒有展開。

網(wǎng)頁題目:zookeeper(7)源碼分析-集群Leader選舉FastLeaderElection
地址分享:http://m.rwnh.cn/article16/jcjjdg.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站內(nèi)鏈、域名注冊、虛擬主機品牌網(wǎng)站制作、網(wǎng)站設計網(wǎng)站營銷

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作
关岭| 罗江县| 平度市| 英吉沙县| 郧西县| 古丈县| 天镇县| 乐亭县| 当涂县| 松滋市| 当雄县| 高淳县| 孟村| 岳阳县| 中卫市| 贵州省| 孝义市| 兰西县| 夹江县| 高要市| 当雄县| 武川县| 黑山县| 邵武市| 朔州市| 闻喜县| 油尖旺区| 深水埗区| 华池县| 绥德县| 乌兰察布市| 西乌珠穆沁旗| 贵南县| 安阳县| 双柏县| 枣庄市| 广南县| 丹东市| 阿瓦提县| 霍林郭勒市| 沽源县|