中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

如何獲取Kafka的消費(fèi)者詳情——從Scala到Java的切換-創(chuàng)新互聯(lián)

前文摘要

在前面的文章《Kafka的Lag計(jì)算誤區(qū)及正確實(shí)現(xiàn)》中介紹了如何計(jì)算消費(fèi)者的消費(fèi)滯后量(Lag),并且講解了如何調(diào)用Kafka的kafka.admin.ConsumerGroupCommand文件中的KafkaConsumerGroupService來發(fā)送OffsetRequest和OffsetFetchRequest兩個(gè)請(qǐng)求,進(jìn)而通過兩個(gè)請(qǐng)求結(jié)果之間的差值來獲得結(jié)果。不過如果你不想修改kafka-core的代碼并重新編譯的話,這種實(shí)現(xiàn)方式無法成功,所以本文的主要目的就是通過調(diào)用更底層的API來實(shí)現(xiàn)不修改kafka-core的代碼來實(shí)現(xiàn)KafkaConsumerGroupService的功能,即通過Java調(diào)用Scala的代碼來實(shí)現(xiàn)獲取Kafka的消費(fèi)者詳情的功能。

成都創(chuàng)新互聯(lián)公司主營洛扎網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,成都APP應(yīng)用開發(fā),洛扎h5小程序開發(fā)搭建,洛扎網(wǎng)站營銷推廣歡迎洛扎等地區(qū)企業(yè)咨詢

目標(biāo)及實(shí)現(xiàn)

實(shí)現(xiàn)如同 bin/kafka-consumer-group.sh –describe –bootstrap-server localhost:9092 –group CONSUMER_GROUP_ID的效果:

[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID
TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                   CLIENT-ID
topic-test1          0          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          1          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          2          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          3          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID

KafkaConsumerGroupService的核心方法是CollectGroupAssignment,其方法參數(shù)為一個(gè)consumer group的groupId,方法輸出為上面示例中的列表信息。CollectGroupAssignment方法主要有以下幾個(gè)步驟:

  1. 根據(jù)groupId調(diào)用describeConsumerGroup方法(內(nèi)部原理是發(fā)送DescribeGroupsRequest請(qǐng)求)來獲取consumer group的基本信息,參考上面示例中的CONSUMER-ID、HOST、CLIENT-ID以及TopicPartition信息,但是沒有CURRENT-OFFSET、LOG-END-OFFSET、LAG信息。注意這里的LOG-END-OFFSET是消費(fèi)者可見的LEO,不是生產(chǎn)者可見的LEO,也就是通俗意義上的HW。
  2. 根據(jù)groupId調(diào)用listGroupOffsets方法(內(nèi)部原理是發(fā)送OffsetFetchRequest請(qǐng)求)來獲取各個(gè)分區(qū)(Partition)的對(duì)應(yīng)的消費(fèi)位移CURRENT-OFFSET。
  3. 通過調(diào)用KafkaConsumer的endOffsets方法來獲取TopicPartition對(duì)應(yīng)的HW,即示例中的LOG-END-OFFSET。
  4. 計(jì)算Lag并組合成信息列表List<partitionassignmentstate style="margin-block-start: 0px; margin-block-end: 0px; margin: 0px; padding: 0px;">。</partitionassignmentstate>
改造

對(duì)應(yīng)Java版的KafkaConsumerGroupService改造代碼可以參見代碼,目錄結(jié)構(gòu)如下圖所示:
如何獲取 Kafka 的消費(fèi)者詳情 —— 從 Scala 到 Java 的切換

其中model中的ConsumerGroupSummary、ConsumerSummary和PartitionAssignmentState是簡(jiǎn)單的JavaBean, PartitionAssignmentState是用來保存每個(gè)TopicPartition的消費(fèi)者信息的,具體內(nèi)容參考如下。KafkaConsumerGroupCustomService就是本文所要陳述的Java改造辦的KafkaConsumerGroupSerivice,ConsumerGroupUtils用來存放一些公用的代碼。

@Data
@Builder
public class PartitionAssignmentState {
    private String group; // groupId
    private Node coordinator; // consumer coodinator節(jié)點(diǎn)信息
    private String topic;
    private int partition;
    private long offset;
    private long lag;
    private String consumerId;
    private String host;
    private String clientId;
    private long logEndOffset;
}

初始化KafkaConsumerGroupCustomService需要Kafka的服務(wù)端地址,然后初始化AdminClient和KafkaConsumer,AdminClient中包含了眾多管理類方法,主要是通過發(fā)送各種自定義協(xié)議請(qǐng)求來完成,上面步驟中所說的describeConsumerGroup和listGroupOffsets方法也是通過AdminClient來實(shí)現(xiàn)的;KafkaConsumer主要是用來獲取TopicPartition對(duì)應(yīng)的HW(消費(fèi)者可見的LogEndOffsets)的。

KafkaConsumerGroupCustomService中與scala版對(duì)應(yīng)的collectGroupAssignment方法如下(詳細(xì)步驟參考代碼注釋):

public List<PartitionAssignmentState> collectGroupAssignment(
        AdminClient adminClient, KafkaConsumer<String, String> consumer,
        String group) {
    //1. 獲取consumer group的基本信息,包括CONSUMER-ID、HOST、
    // CLIENT-ID以及TopicPartition信息
    AdminClient.ConsumerGroupSummary consumerGroupSummary
            = adminClient.describeConsumerGroup(group, 0);
    List<TopicPartition> assignedTopicPartitions = new ArrayList<>();
    List<PartitionAssignmentState> rowsWithConsumer = new ArrayList<>();
    scala.collection.immutable.List<AdminClient.ConsumerSummary> consumers
            = consumerGroupSummary.consumers().get();
    if (consumers != null) {
        //2. 獲取各個(gè)分區(qū)(Partition)的對(duì)應(yīng)的消費(fèi)位移CURRENT-OFFSET
        scala.collection.immutable.Map<TopicPartition, Object> offsets
                = adminClient.listGroupOffsets(group);
        if (offsets.nonEmpty()) {
            String state = consumerGroupSummary.state();
            // 3. 還有一個(gè)狀態(tài)是Dead表示"group"對(duì)應(yīng)的consumer group不存在
            if (state.equals("Stable") || state.equals("Empty")
                    || state.equals("PreparingRebalance")
                    || state.equals("AwaitingSync")) {
                List<ConsumerSummary> consumerList = changeToJavaList(consumers);
                // 4. 獲取當(dāng)前有消費(fèi)者的消費(fèi)信息,即包含CONSUMER-ID、HOST、CLIENT-ID
                rowsWithConsumer = getRowsWithConsumer(consumerGroupSummary, offsets,
                        consumer, consumerList, assignedTopicPartitions, group);
            }
        }
        //5. 獲取當(dāng)前沒有消費(fèi)者的消費(fèi)信息
        List<PartitionAssignmentState> rowsWithoutConsumer =
                getRowsWithoutConsumer(consumerGroupSummary,
                offsets, consumer, assignedTopicPartitions, group);
        //6. 合并結(jié)果
        rowsWithConsumer.addAll(rowsWithoutConsumer);
    }
    return rowsWithConsumer;
}

KafkaConsumerGroupCustomService類中包含有g(shù)etRowsWithConsumer()、getRowsWithoutConsumer()、changeToJavaList等私有方法也都是在Scala語言與Java語言之間進(jìn)行切換,這樣可以不需要修改kafka-core的原生代碼而通過外部的封裝調(diào)用既可以實(shí)現(xiàn)獲取Kafka消費(fèi)者詳情的功能。光看代碼比較抽象,建議對(duì)此感興趣的同學(xué)可以親自對(duì)比一下kafka-core包中kafka.admin.ConsumerGroupCommand的KafkaConsumerGroupSerivice與筆者自定義的KafkaConsumerGroupCustomService的實(shí)現(xiàn)來了解下Scala語言到Java語言的轉(zhuǎn)換。

如果需要打印詳情可以調(diào)用KafkaConsumerGroupCustomService同目錄的ConsumerGroupUtils類中的printPasList(List list)方法。注意要運(yùn)行這些代碼需要JDK8的環(huán)境,筆者為了讓代碼顯得“騷氣”一點(diǎn)就用來一點(diǎn)Java8的語法,如果需要Java7的代碼實(shí)現(xiàn)可以關(guān)注私聊。

或許有些同學(xué)對(duì)于Scala和Java交叉的代碼并不感冒,想要尋求一種存Java式的實(shí)現(xiàn)方式,那么在這里怎么實(shí)現(xiàn)呢?答案是通過KafkaAdminClient,它是AdminClient的Java版實(shí)現(xiàn),從Kafka0.11.0.0版本開始引入的,不過KafkaAdminClient本身并沒有提供describeConsumerGroup、listGroupOffsets之類的方法給我們直接使用,擴(kuò)展一下也很方便,由于篇幅限制,這部分的內(nèi)容將在下一篇文章中進(jìn)行介紹,如果想要先一睹為快,可以參考下代碼實(shí)現(xiàn),詳細(xì)的邏輯解析敬請(qǐng)期待….


本文的重點(diǎn)是你有沒有收獲與成長(zhǎng),其余的都不重要,希望讀者們能謹(jǐn)記這一點(diǎn)。同時(shí)我經(jīng)過多年的收藏目前也算收集到了一套完整的學(xué)習(xí)資料,包括但不限于:分布式架構(gòu)、高可擴(kuò)展、高性能、高并發(fā)、Jvm性能調(diào)優(yōu)、Spring,MyBatis,Nginx源碼分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個(gè)知識(shí)點(diǎn)高級(jí)進(jìn)階干貨,希望對(duì)想成為架構(gòu)師的朋友有一定的參考和幫助

需要更詳細(xì)思維導(dǎo)圖和以下資料的可以加一下技術(shù)交流分享群:“708 701 457”免費(fèi)獲取

如何獲取 Kafka 的消費(fèi)者詳情 —— 從 Scala 到 Java 的切換
如何獲取 Kafka 的消費(fèi)者詳情 —— 從 Scala 到 Java 的切換
如何獲取 Kafka 的消費(fèi)者詳情 —— 從 Scala 到 Java 的切換
如何獲取 Kafka 的消費(fèi)者詳情 —— 從 Scala 到 Java 的切換

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。

網(wǎng)站題目:如何獲取Kafka的消費(fèi)者詳情——從Scala到Java的切換-創(chuàng)新互聯(lián)
網(wǎng)站地址:http://m.rwnh.cn/article32/copipc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App開發(fā)、搜索引擎優(yōu)化、響應(yīng)式網(wǎng)站、Google、電子商務(wù)網(wǎng)站策劃

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

微信小程序開發(fā)
平顺县| 德阳市| 启东市| 敦化市| 浦县| 锡林郭勒盟| 中卫市| 阳高县| 衡南县| 凤山市| 新蔡县| 鄢陵县| 东乡族自治县| 陈巴尔虎旗| 会泽县| 凤阳县| 个旧市| 明星| 探索| 三亚市| 额尔古纳市| 二手房| 东丽区| 商城县| 探索| 九江市| 太湖县| 临潭县| 三门峡市| 通化市| 陆河县| 莱州市| 开阳县| 潞城市| 丹凤县| 潼关县| 盐源县| 东台市| 上栗县| 泽普县| 宣化县|