這篇文章主要講解了“flink中窗口的作用是什么”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“flink中窗口的作用是什么”吧!
創(chuàng)新互聯(lián)建站致力于網(wǎng)站設(shè)計(jì)制作、網(wǎng)站設(shè)計(jì),成都網(wǎng)站設(shè)計(jì),集團(tuán)網(wǎng)站建設(shè)等服務(wù)標(biāo)準(zhǔn)化,推過標(biāo)準(zhǔn)化降低中小企業(yè)的建站的成本,并持續(xù)提升建站的定制化服務(wù)水平進(jìn)行質(zhì)量交付,讓企業(yè)網(wǎng)站從市場競爭中脫穎而出。 選擇創(chuàng)新互聯(lián)建站,就選擇了安全、穩(wěn)定、美觀的網(wǎng)站建設(shè)服務(wù)!
窗口計(jì)算是流式計(jì)算中常用的數(shù)據(jù)計(jì)算方式之一,通過按照固定時間或長度將數(shù)據(jù)流切分成不同的窗口,再對數(shù)據(jù)進(jìn)行相應(yīng)的聚合操作,得到一定時間范圍內(nèi)的統(tǒng)計(jì)結(jié)果,例如統(tǒng)計(jì)最近5分鐘內(nèi)某網(wǎng)站的點(diǎn)擊數(shù),此時,點(diǎn)擊數(shù)據(jù)在不斷產(chǎn)生,通過5分鐘窗口將數(shù)據(jù)限定在固定時間范圍內(nèi),就可以對該范圍內(nèi)的有界數(shù)據(jù)執(zhí)行聚合,得到最近5分鐘的網(wǎng)站點(diǎn)擊數(shù)。
代碼接口規(guī)則
stream.keyBy(...) //keyed類型數(shù)據(jù)集 .window(...) //指定窗口分配器類型 [.trigger(...)] //指定觸發(fā)器類型(可選) [.evictor(...)] //指定evictor(可選) [.allowedLateness(...)] //指定是否延遲處理數(shù)據(jù)(可選) [.sideOutputLateData(...)] //指定Output Lag(可選) .reduce/aggregate/fold/apply() //指定窗口計(jì)算函數(shù) [.getSideOutput(...)] //根據(jù)Tag輸出數(shù)據(jù)(可選)
算子
Windows Assigner:指定窗口類型,定義如何將數(shù)據(jù)流分配到一個或多個窗口
Windows Trigger:指定窗口觸發(fā)的時機(jī),定義窗口滿足什么樣的條件觸發(fā)計(jì)算;
Evictor:用于數(shù)據(jù)剔除
Lateness:標(biāo)記是否處理遲到數(shù)據(jù),當(dāng)遲到數(shù)據(jù)到達(dá)窗口中是否觸發(fā)計(jì)算。
Output Tag:標(biāo)記輸出標(biāo)簽,然后通過getSideOutput將窗口中的數(shù)據(jù)根據(jù)標(biāo)簽輸出。
Windows Function:定義窗口上數(shù)據(jù)處理的邏輯,例如對數(shù)據(jù)進(jìn)行sum操作。
在運(yùn)用窗口計(jì)算時,F(xiàn)link根據(jù)上游數(shù)據(jù)集是否為KeyedStream類型(將數(shù)據(jù)集按Key分區(qū)),對應(yīng)的Window Assigner會不同,
上游數(shù)據(jù)集為KeyedStream類型,則調(diào)用DataStream API的Windwo()方法指定Windows Assigner,數(shù)據(jù)將根據(jù)Key在不同的Task實(shí)例中并行分別計(jì)算,最后得出針對每個Key統(tǒng)計(jì)的結(jié)果。
如果是Non-Keyed類型,則調(diào)用WindowsAll()方法來指定Windows Assigner,所有數(shù)據(jù)都被窗口算子路由到一個Task中計(jì)算,并得到結(jié)果。
建議數(shù)據(jù)進(jìn)行KeyedStream處理,這樣啟動并行計(jì)算,加速效率。
flink支持兩種類型的窗口,一種基于時間,窗口大小由開始和結(jié)束時間戳約束,一種基于數(shù)量,根據(jù)固定數(shù)量定義窗口大小。
根據(jù)Windows Assigner數(shù)據(jù)分配方式的不同將Windows分為4大類:滾動窗口(Tumbling Windows)、滑動窗口(Sliding Windows)、會話窗口(Session Windows)和全局窗口(Global Windows)
滾動窗口根據(jù)固定時間或大小切分,且窗口與窗口間元素互不重疊,適合于固定時間大小和周期統(tǒng)計(jì)某一指標(biāo)的窗口計(jì)算。
DataStream API提供了基于Event Time和Process Time兩種時間類型的Tumbling窗口,對應(yīng)的Assigner分別為TumblingEventTimeWindows和TumblingProcessTimeWindows,窗口大小童工of()指定,時間單位分別為Time.milliseconds(x)、Time.seconds(x)或Time.minutes(x),也可以是不同時間單位的組合。
如下實(shí)例,窗口時間按10S進(jìn)行切分,窗口的時間是[1:00:00.000-1:00:09.999] 到[1:00:10.000-1:00:19.999]的等固定時間范圍。
val inputStream:DataStream[T]= ... //定義Event Time Tumbling Windows val tumblingEventTimeWindows=inputStream.keyBy(_.id) //通過使用TumblingEventTimeWindows定義Event Time滾動窗口 .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process(...) //定義窗口函數(shù) //定義Process Time Tumbling Windows val tumblingProcessingTimeWindows = inputStream.keyBy(_.id) //通過TumblingProcessTimeWindows定義Evnet Time滾動窗口 .window(TumblingProcessTimeWindows.of(Times.seconds(10))) .process(...) //定義窗口函數(shù)
滑動窗口是一種常見的窗口類型,特點(diǎn)是在滾動窗口基礎(chǔ)上增加了窗口滑動時間(Slide Time),且允許窗口數(shù)據(jù)發(fā)生重疊。這種窗口不像滾動窗口按照Windows Size向前移動,而是根據(jù)設(shè)定的Slide Time向前滑動。窗口之間的數(shù)據(jù)重疊大小根據(jù)Windows Size和Slide time決定,當(dāng)Slide Time小于Windows Size便會發(fā)生窗口重疊,Slide Size大于WindowsSize會出現(xiàn)窗口不連續(xù),數(shù)據(jù)可能不會再任何一個窗口內(nèi)計(jì)算。
DataStream API針對Sliding Windows根據(jù)不同時間類型Assigner,包括基于Event Time的SlidingEventTimeWindows和基于Process Time的SlidingProcessingTimeWindows。
實(shí)例如下,指定Windows Size為1h,Slide Time為10m。
val inputStream:DataStream[T]= ... //定義Event Time Sliding Windows val slidingEventTimeWindows=inputStream.keyBy(_.id) //通過使用SlidingEventTimeWindows定義Event Time滾動窗口 .window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(10))) .process(...) //定義窗口函數(shù) //定義Process Time Sliding Windows val slidingProcessTimeWindows = inputStream.keyBy(_.id) //通過SlidingProcessTimeWindows定義Evnet Time滾動窗口 .window(SlidingProcessTimeWindows.of(Time.hours(1),Time.minutes(10))) .process(...) //定義窗口函數(shù)
將某個時間段內(nèi)活躍較高的數(shù)據(jù)聚合為一個窗口進(jìn)行計(jì)算,窗口的觸發(fā)條件為Session Gap,指規(guī)定時間內(nèi)沒有數(shù)據(jù)活躍接入,則任務(wù)窗口結(jié)束,觸發(fā)窗口計(jì)算。
注意:如果數(shù)據(jù)一直不間斷,會導(dǎo)致窗口始終不觸發(fā)。
與滑動、滾動窗口不同,Session Windows不需要定義Windows Size和Slide Time,只需要定義session gap,規(guī)定不活躍數(shù)據(jù)的時間上線即可。
Session Windows比較適合非連續(xù)型數(shù)據(jù)處理或周期性產(chǎn)生數(shù)據(jù)的場景。DataStream API中可以創(chuàng)建基于Event Time和Process Time的Session Windows,對應(yīng)的有Assigner分別為EventTimeSessionWindow和ProcessTimerSessionWindows。
實(shí)例代碼如下:
val inputStream:DataStream[T]= ... //定義Event Time Session Windows val eventTimeSessionWindows=inputStream.keyBy(_.id) //通過使用EventTimeSessionWindows定義Event Time滾動窗口 .window(EventTimeSessionWindows.withGap(Time.milliseconds(10))) .process(...) //定義窗口函數(shù) //定義Process Time Session Windows val processTimeSessionWindows = inputStream.keyBy(_.id) //通過ProcessTimeSessionWindows定義Evnet Time滾動窗口 .window(ProcessTimeSessionWindows.withGap(Time.milliseconds(10))) .process(...) //定義窗口函數(shù)
flink支持動態(tài)調(diào)整的Session Gap,需要實(shí)現(xiàn)SessionWindowTimeGapExtractor接口,并復(fù)寫extract方法,完成Session Gap的抽取,然后將創(chuàng)建好的Session Gap抽取器傳入ProcessiongTimeSessionWindows.withDynamicGap()方法即可。
val inputStream:DataStream[T]= ... //定義Event Time Session Windows val eventTimeSessionWindows=inputStream.keyBy(_.id) //通過使用EventTimeSessionWindows定義Event Time滾動窗口 .window(EventTimeSessionWindows.withDynamicGap( //實(shí)例化SessionWindowTimeGapExtractor接口 new SessionWindowTimeGapExtractor[String]{ override def extract(element:String):Long={ //動態(tài)指定并返回Session Gap } } )) .process(...) //定義窗口函數(shù) //定義Process Time Session Windows val processTimeSessionWindows = inputStream.keyBy(_.id) //通過ProcessTimeSessionWindows定義Evnet Time滾動窗口 .window(ProcessTimeSessionWindows.withDynamicGap( //實(shí)例化SessionWindowTimeGapExtractor接口 new SessionWindowTimeGapExtractor[String]{ override def extract(element:String):Long={ //動態(tài)指定并返回Session Gap } } )) .process(...) //定義窗口函數(shù)
全局會話窗口將所有相同的key數(shù)據(jù)分配到單個窗口中計(jì)算,窗口沒有起始和結(jié)束時間,窗口需要借助Triger觸發(fā)計(jì)算,如果不指定,則不會觸發(fā)計(jì)算。
使用全局窗口要非常謹(jǐn)慎,必須明確自己在整個窗口中統(tǒng)計(jì)出的結(jié)果是什么,并指定對應(yīng)的觸發(fā)器,同時指定相應(yīng)的數(shù)據(jù)清理機(jī)制,否則數(shù)據(jù)將一直留在內(nèi)存中。
val inputStream:DataStream[T]= ... val globalWindows = inputStream.keyBy(_.id) .window(GlobalWindows.create()) //通過GlobalWindows定義Global Windows .process()
flink定義的四種窗口,容易和時間窗口和事件窗口混淆,他們是不同維度的的窗口定義,需要特別注意下。
越長大越孤單,珍惜好身邊人。
感謝各位的閱讀,以上就是“flink中窗口的作用是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對flink中窗口的作用是什么這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!
分享名稱:flink中窗口的作用是什么
地址分享:http://m.rwnh.cn/article44/phoshe.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站改版、關(guān)鍵詞優(yōu)化、網(wǎng)站維護(hù)、企業(yè)網(wǎng)站制作、網(wǎng)站導(dǎo)航、定制開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)