中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

學到羊之Kafka-創(chuàng)新互聯(lián)

改則網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)公司,改則網(wǎng)站設(shè)計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為改則上千提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站制作要多少錢,請找那個售后服務(wù)好的改則做網(wǎng)站的公司定做!1 kafka 是啥

Kafka 是一款開源的消息引擎系統(tǒng),用來實現(xiàn)解耦的異步式數(shù)據(jù)傳遞。即系統(tǒng) A 發(fā)消息給到 消息引擎系統(tǒng),系統(tǒng) B 通過消息引擎系統(tǒng)讀取 A 發(fā)送的消息,在大數(shù)據(jù)場景下,能達到削峰填谷的效果。

2 Kafka 術(shù)語

Kafka 中的分區(qū)機制指的是將每個主題(Topic)劃分成多個分區(qū)(Partition),每個分區(qū)是一組有序的消息日志。生產(chǎn)者生產(chǎn)的每條消息只會被發(fā)送到一個分區(qū)中,也就是說如果向一個雙分區(qū)的主題發(fā)送一條消息,這條消息要么在分區(qū) 0 中,要么在分區(qū) 1 中。Kafka 的分區(qū)編號是從 0 開始的,如果 Topic 有 100 個分區(qū),那么它們的分區(qū)號就是從 0 到 99。每個分區(qū)下可以配置若干個副本,其中只能有 1 個領(lǐng)導者副本和 N-1 個追隨者副本。

Kafka 的三層消息架構(gòu):

1)主題層,每個主題可以配置 M 個分區(qū),而每個分區(qū)又可以配置 N 個副本。

2)分區(qū)層,每個分區(qū)的 N 個副本中只能有一個充當領(lǐng)導者角色,對外提供服務(wù);其他 N-1 個副本是追隨者副本,只是提供數(shù)據(jù)冗余之用。

3)消息層,分區(qū)中包含若干條消息,每條消息的位移從 0 開始,依次遞增。最后,客戶端程序只能與分區(qū)的領(lǐng)導者副本進行交互。

Broker 如何持久化數(shù)據(jù)?

Kafka 使用消息日志(Log)來保存數(shù)據(jù),一個日志就是磁盤上一個只能追加寫(Append-only)消息的物理文件。因為只能追加寫入,故避免了緩慢的隨機 I/O 操作,改為性能較好的順序 I/O 寫操作,這也是實現(xiàn) Kafka 高吞吐量特性的一個重要手段。如果不停地向一個日志寫入消息,最終也會耗盡所有的磁盤空間,因此 Kafka 必然要定期地刪除消息以回收磁盤。怎么刪除呢?簡單來說就是通過日志段(Log Segment)機制。在 Kafka 底層,一個日志又進一步細分成多個日志段,消息被追加寫到當前最新的日志段中,當寫滿了一個日志段后,Kafka 會自動切分出一個新的日志段,并將老的日志段封存起來。Kafka 在后臺還有定時任務(wù)會定期地檢查老的日志段是否能夠被刪除,從而實現(xiàn)回收磁盤空間的目的。

3 生產(chǎn)者 3.1 消息發(fā)送

  1. Producer創(chuàng)建時,會創(chuàng)建一個Sender線程并設(shè)置為守護線程;

  2. 生產(chǎn)消息時,內(nèi)部是異步流程。生產(chǎn)的消息先經(jīng)過攔截器->序列化器->分區(qū)器,然后將消息緩存在緩沖區(qū)(該緩沖區(qū)也是在Producer創(chuàng)建時創(chuàng)建);

  3. 批次發(fā)送的條件為:緩沖區(qū)數(shù)據(jù)大小達到 batch.size 或者 linger.ms 達到上限,哪個先達到就算哪個;

  4. 批次發(fā)送后,發(fā)往指定分區(qū),然后落盤到broker;如果生產(chǎn)者配置了 retrires 參數(shù)大于 0 并且失敗原因允許重試,那么客戶端內(nèi)部會對該消息進行重試;

  5. 落盤到broker成功,返回生產(chǎn)元數(shù)據(jù)給生產(chǎn)者;

  6. 元數(shù)據(jù)返回有兩種方式:一種是通過阻塞直接返回,另一種是通過回調(diào)返回。

3.2 原理剖析

3.3 分區(qū)機制

主題是承載真實數(shù)據(jù)的邏輯容器,而在主題之下還分為若干個分區(qū),主題下的每條消息只會保存在某一個分區(qū)中,而不會在多個分區(qū)中被保存多份。

為什么使用分區(qū)的概念而不是直接使用多個主題呢?

對數(shù)據(jù)進行分區(qū)的主要原因是為了實現(xiàn)系統(tǒng)的高伸縮性(Scalability)。不同的分區(qū)能夠被放置到不同節(jié)點的機器上,而數(shù)據(jù)的讀寫操作也都是針對分區(qū)這個粒度而進行的,這樣每個節(jié)點的機器都能獨立地執(zhí)行各自分區(qū)的讀寫請求處理。并且,還可以通過添加新的節(jié)點機器來增加整體系統(tǒng)的吞吐量。

3.3.1 分區(qū)策略

所謂分區(qū)策略是決定生產(chǎn)者將消息發(fā)送到哪個分區(qū)的算法。

輪詢策略

順序分配。比如一個主題下有 3 個分區(qū),那么第一條消息被發(fā)送到分區(qū) 0,第二條被發(fā)送到分區(qū) 1,第三條被發(fā)送到分區(qū) 2,以此類推。當生產(chǎn)第 4 條消息時又會重新開始,即將其分配到分區(qū) 0。

輪詢策略有非常優(yōu)秀的負載均衡表現(xiàn),它總是能保證消息大限度地被平均分配到所有分區(qū)上,故默認情況下它是最合理的分區(qū)策略,也是最常用的分區(qū)策略之一。?

隨機策略

隨意地將消息放置到任意一個分區(qū)上

Key-ordering 策略

Kafka 允許為每條消息定義消息鍵,簡稱為 Key。這個 Key 可以是一個有著明確業(yè)務(wù)含義的字符串,比如客戶代碼、部門編號或是業(yè)務(wù) ID 等。一旦消息被定義了 Key,就可以保證同一個 Key 的所有消息都進入到相同的分區(qū)里面,由于每個分區(qū)下的消息處理都是有順序的。

假設(shè)有一個服務(wù)需要監(jiān)聽某個公眾號用戶關(guān)注取關(guān)的事件,發(fā)送的消息必須要保證有序性,不然會導致結(jié)果混亂。如果給 Kafka 主題只設(shè)置 1 個分區(qū),這樣所有的消息都只在這一個分區(qū)內(nèi)讀寫,因此保證了全局的順序性。

這樣做雖然實現(xiàn)了因果關(guān)系的順序性,但也喪失了 Kafka 多分區(qū)帶來的高吞吐量和負載均衡的優(yōu)勢。

可以在消息體中封裝了固定的標志位,并對此標志位設(shè)定專門的分區(qū)策略,保證同一標志位的所有消息都發(fā)送到同一分區(qū),這樣既可以保證分區(qū)內(nèi)的消息順序,也可以享受到多分區(qū)帶來的性能紅利。

4 消費者 4.1?消費組(Consumer Group)

消費組是 kafka 提供的可擴展且具有容錯性的消費者機制,是 Kafka 實現(xiàn)單播和廣播兩種消息模型的手段。

多個從同一個主題消費的消費者可以加入到一個消費組中,消費組中的消費者共享 Group Id。組內(nèi)的所有消費者協(xié)調(diào)在一起來消費訂閱主題的所有分區(qū),每個分區(qū)只能由同一個消費者組內(nèi)的一個 Consumer 實例來消費。

4.2?消費消息

Consumer 采用 pull 模式從 broker 中讀取數(shù)據(jù),可以自主控制消費方式,逐條消費或批量消費。

4.2.1 位移提交

Consumer 需要向 Kafka 記錄自己的位移數(shù)據(jù),這個匯報過程稱為 提交位移(Committing Offsets)。這個過程非常靈活,可以提交任何位移值,但也會由此產(chǎn)生系列不好的結(jié)果。假設(shè)?Consumer 消費了 10 條消息,提交的位移值卻是 20,那么位移介于 11~19 之間的消息是有可能丟失的;相反地,如果提交的位移值是 5,那么位移介于 5~9 之間的消息就有可能被重復消費。

自動提交

1)開啟自動提交: enable.auto.commit=true,默認為 true

2)配置自動提交間隔: auto.commit.interval.ms ,默認 5s

自動提交會導致消息被重復消費

  • Consumer 每 5s 提交 offset
  • 假設(shè)提交 offset 后的 3s 發(fā)生了 Rebalance
  • Rebalance 之后的所有 Consumer 從上一次提交的 offset 處繼續(xù)消費
  • 因此 Rebalance 發(fā)生前 3s 的消息會被重復消費

雖然能通過減少 auto.commit.interval.ms 的值來提高提交頻率,但這么做只能縮小重復消費的時間窗口,不可能完全消除它。

手動同步提交

使用 KafkaConsumer#commitSync(),會提交 KafkaConsumer#poll() 返回的最新 offset。

該方法為同步操作,等待直到 offset 被成功提交才返回。

while (true) {
    ConsumerRecordsrecords =
consumer.poll(Duration.ofSeconds(1)); process(records); // 處理消息
try {
        // Consumer 程序會處于阻塞狀態(tài),直到遠端的 Broker 返回提交結(jié)果
        consumer.commitSync();
    } catch (CommitFailedException e) {
        // 處理提交失敗異常
        handle(e);  
    }
}

手動異步提交

while (true) {
            ConsumerRecordsrecords = 
  consumer.poll(Duration.ofSeconds(1));
            // 處理消息
            process(records); 
            // 會立即返回結(jié)果,不會阻塞
            consumer.commitAsync((offsets, exception) ->{
  if (exception != null)
            handle(exception);
  });
}

但?commitAsync 不能替代?commitSync,因為出現(xiàn)問題時它不會自動重試。由于是異步操作,倘若提交失敗后自動重試,那么它重試時提交的位移值可能早已經(jīng)“過期”或不是最新值了。因此,異步提交的重試其實沒有意義,所以 commitAsync 是不會重試的。

手動同步提交與異步提交結(jié)合

try {
            while (true) {
                ConsumerRecordsrecords =
                        consumer.poll(Duration.ofSeconds(1));
                // 處理消息
                process(records);
                // 使用異步提交規(guī)避阻塞
                commitAysnc();
            }
        } catch (Exception e) {
            // 處理異常
            handle(e);
        } finally {
            try {
                // Consumer 要關(guān)閉前使用同步阻塞式提交,以確保 Consumer 關(guān)閉前能夠保存正確的位移數(shù)據(jù)
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
4.2.2 位移管理

Kafka默認定期自動提交位移( enable.auto.commit = true ),也手動提交位移。另外kafka會定期把group消費情況保存起來,做成一個offset map?

位移管理機制將 Consumer 的位移數(shù)據(jù)作為一條條普通的 Kafka 消息,提交到 __consumer_offsets 主題中。

5 異常處理

如何保證消息不被重復消費?

如何保證消息消費的冪等性?

如何防止消息丟失?

如何處理消息積壓?

消費慢了怎么處理?

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧

名稱欄目:學到羊之Kafka-創(chuàng)新互聯(lián)
分享URL:http://m.rwnh.cn/article10/dscego.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站內(nèi)鏈品牌網(wǎng)站建設(shè)、小程序開發(fā)網(wǎng)站維護、手機網(wǎng)站建設(shè)面包屑導航

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)公司
抚州市| 昌江| 桑日县| 龙川县| 枣庄市| 邳州市| 高雄市| 双桥区| 杭锦旗| 揭东县| 高陵县| 隆子县| 建平县| 定州市| 德江县| 洪江市| 根河市| 元阳县| 象山县| 沁阳市| 刚察县| 会泽县| 庄浪县| 鲁甸县| 新郑市| 湖南省| 九寨沟县| 乌恰县| 广宗县| 鹤山市| 潞城市| 靖江市| 莆田市| 江川县| 嫩江县| 吉木萨尔县| 资讯| 苗栗市| 前郭尔| 吉安县| 余姚市|