這篇文章主要介紹了apache spark中怎么實現(xiàn)端對端的 exactly once,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
創(chuàng)新互聯(lián)建站專注于婁煩網(wǎng)站建設服務及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗。 熱誠為您提供婁煩營銷型網(wǎng)站建設,婁煩網(wǎng)站制作、婁煩網(wǎng)頁設計、婁煩網(wǎng)站官網(wǎng)定制、小程序定制開發(fā)服務,打造婁煩網(wǎng)絡公司原創(chuàng)品牌,更為您提供婁煩網(wǎng)站排名全網(wǎng)營銷落地服務。
Exactly Once的實現(xiàn)
如果實時作業(yè)要實現(xiàn)端對端的 exactly once 則需要數(shù)據(jù)源、數(shù)據(jù)處理與數(shù)據(jù)存儲的三個階段都保證 exactly once 的語義。目前基 于Kafka Direct API加上Spark RDD 算子精確一次的保證能夠實現(xiàn)端對端的 exactly once 的語義。
在數(shù)據(jù)存儲階段一般實現(xiàn) exactly once 需要保證存儲的過程是冪等操作或事務操作。很多系統(tǒng)本身就支持了冪等操作,比如相同數(shù)據(jù)寫 hdfs 同一個文件,這本身就是冪等操作,保證了多次操作最終獲取的值還是相同;HBase、ElasticSearch 與 redis 等都能夠實現(xiàn)冪等操作。對于關系型數(shù)據(jù)庫的操作一般都是能夠支持事務性操作。
官方在創(chuàng)建 DirectKafkaInputStream 時只需要輸入消費 Kafka 的 From Offset,然后其自行獲取本次消費的 End Offset,也就是當前最新的 Offset。保存的 Offset 是本批次的 End Offset,下次消費從上次的 End Offset 開始消費。
當程序宕機或重啟任務后,這其中存在一些問題。如果在數(shù)據(jù)處理完成前存儲 Offset,則可能存在作業(yè)處理數(shù)據(jù)失敗與作業(yè)宕機等情況,重啟后會無法追溯上次處理的數(shù)據(jù)導致數(shù)據(jù)出現(xiàn)丟失。如果在數(shù)據(jù)處理完成后存儲 Offset,但是存儲 Offset 過程中發(fā)生失敗或作業(yè)宕機等情況,則在重啟后會重復消費上次已經(jīng)消費過的數(shù)據(jù)。
而且此時又無法保證重啟后消費的數(shù)據(jù)與宕機前的數(shù)據(jù)量相同數(shù)據(jù)相當,這又會引入另外一個問題,如果是基于聚合統(tǒng)計指標作更新操作,這會帶來無法判斷上次數(shù)據(jù)是否已經(jīng)更新成功。
所以在 muise spark core 中我們加入了自己的實現(xiàn)用以保證 Exactly once 的語義。具體的實現(xiàn)是我們對 Spark 源碼進行了改造,保證在創(chuàng)建 DirectKafkaInputStream 可以同時輸入 From Offset 與 End Offset,并且我們在存儲 Kafka Offset 的時候保存了每個批次的起始Offset 與結束 Offset,具體格式如下:
如此做的用意在于能夠確保無論是宕機還是人為重啟,重啟后的第一個批次與重啟前的最后一個批次數(shù)據(jù)一模一樣。這樣的設計使得后面用戶在后面對于第一個批次的數(shù)據(jù)處理非常靈活可變,如果用戶直接忽略第一個批次的數(shù)據(jù),那此時保證的是 at most once 的語義,因為我們無法獲知重啟前的最后一個批次數(shù)據(jù)操作是否有成功完成。
如果用戶依照原有邏輯處理第一個批次的數(shù)據(jù),不對其做去重操作,那此時保證的是 at least once 的語義,最終結果中可能存在重復數(shù)據(jù);最后如果用戶想要實現(xiàn) exactly once,muise spark core 提供了根據(jù)topic、partition 與 offset 生成 UID 的功能。
只要確保兩個批次消費的 Offset 相同,則最終生成的 UID 也相同,用戶可以根據(jù)此 UID 作為判斷上個批次數(shù)據(jù)是否有存儲成功的依據(jù)。下面簡單的給出了重啟后第一個批次操作的行為。
002
Metrics系統(tǒng)
Musie spark core 基于 Spark 本身的 metrics 系統(tǒng)進行了改造,添加了許多定制的 metrics,并且向用戶暴露了 metrics 注冊接口,用戶可以非常方便的注冊自己的 metrics 并在程序中更新 metrics 的數(shù)值。最后所有的 metrics 會根據(jù)作業(yè)設定的批次間隔寫入 Graphite,基于公司定制的預警系統(tǒng)進行報警,前端可以通過 Grafana 展現(xiàn)各項 metrics 指標。
Muise spark core本身定制的metrics包含以下三種:
Fail 批次時間內 spark task 失敗次數(shù)超過4次便報警,用于監(jiān)控程序的運行狀態(tài)。
Ack 批次時間內 spark streaming 處理的數(shù)據(jù)量小0便報警,用于監(jiān)控程序是否在正常消費數(shù)據(jù)。
Lag 批次時間內數(shù)據(jù)消費延遲大于設定值便報警。
其中由于我們大部分作業(yè)開啟了 Back Pressure 功能,這就導致在Spark UI 中看到每個批次數(shù)據(jù)都能在正常時間內消費完成,然而可能此時 kafka 中已經(jīng)積壓了大量數(shù)據(jù),故每個批次我們都會計算當前消費時間與數(shù)據(jù)本身時間的一個平均差值,如果這個差值大于批次時間,說明本身數(shù)據(jù)消費就已經(jīng)存在了延遲。
下圖展現(xiàn)了預警系統(tǒng)中,基于用戶自定義注冊的Metrics以及系統(tǒng)定制的Metrics進行預警。
003
容錯
其實在上面 Exactly Once 一章中已經(jīng)詳細的描述了 muise spark core如何在程序宕機后能夠保證數(shù)據(jù)正確的處理。但是為了能夠讓 Spark Sreaming 能夠長時間穩(wěn)定的運行在Yarn集群上,還需要添加許多配置,感興趣的朋友可以查看:Long running Spark Streaming Jobs on Yarn Cluster。
除了上述容錯保證之外,Muise Portal(后面會講)也提供了對 Spark Streaming 作業(yè)定時檢測的功能。目前每過5分鐘對當前所有數(shù)據(jù)庫中狀態(tài)標記為 Running 的 Spark Streaming 作業(yè)進行狀態(tài)檢測,通過Yarn提供的REST APIs可以根據(jù)每個作業(yè)的Application Id查詢作業(yè)在Yarn上的狀態(tài),如果狀態(tài)處于非運行狀態(tài),則會嘗試重啟作業(yè)。
感謝你能夠認真閱讀完這篇文章,希望小編分享的“apache spark中怎么實現(xiàn)端對端的 exactly once”這篇文章對大家有幫助,同時也希望大家多多支持創(chuàng)新互聯(lián),關注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關知識等著你來學習!
分享名稱:apachespark中怎么實現(xiàn)端對端的exactlyonce
分享地址:http://m.rwnh.cn/article24/jcjdce.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供ChatGPT、定制網(wǎng)站、做網(wǎng)站、建站公司、網(wǎng)站設計公司、
聲明:本網(wǎng)站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)