基于Flink 1.11.0是怎樣實現Flink的Watermark機制,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

在使用eventTime的時候如何處理亂序數據?我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由于網絡延遲等原因,導致亂序的產生,特別是使用kafka的話,多個分區的數據無法保證有序。所以在進行window計算的時候,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了。這個特別的機制,就是watermark。Watermark是用于處理亂序事件的,用于衡量Event Time進展的機制。watermark可以翻譯為水位線。
Watermark的核心本質可以理解成一個延遲觸發機制。
在 Flink 的窗口處理過程中,如果確定全部數據到達,就可以對 Window 的所有數據做 窗口計算操作(如匯總、分組等),如果數據沒有全部到達,則繼續等待該窗口中的數據全 部到達才開始處理。這種情況下就需要用到水位線(WaterMarks)機制,它能夠衡量數據處 理進度(表達數據到達的完整性),保證事件數據(全部)到達 Flink 系統,或者在亂序及 延遲到達時,也能夠像預期一樣計算出正確并且連續的結果。當任何 Event 進入到 Flink 系統時,會根據當前大事件時間產生 Watermarks 時間戳。
那么 Flink 是怎么計算 Watermak 的值呢?
Watermark =進入Flink 的大的事件時間(mxtEventTime)-指定的延遲時間(t)
那么有 Watermark 的 Window 是怎么觸發窗口函數的呢?
如果有窗口的停止時間等于或者小于 maxEventTime - t(當時的warkmark),那么這個窗口被觸發執行。
其核心處理流程如下圖所示。

如果數據元素的事件時間是有序的,Watermark 時間戳會隨著數據元素的事件時間按順 序生成,此時水位線的變化和事件時間保持一直(因為既然是有序的時間,就不需要設置延遲了,那么t就是 0。所以 watermark=maxtime-0 = maxtime),也就是理想狀態下的水位 線。當 Watermark 時間大于 Windows 結束時間就會觸發對 Windows 的數據計算,以此類推, 下一個 Window 也是一樣。這種情況其實是亂序數據的一種特殊情況。
現實情況下數據元素往往并不是按照其產生順序接入到 Flink 系統中進行處理,而頻繁 出現亂序或遲到的情況,這種情況就需要使用 Watermarks 來應對。比如下圖,設置延遲時間t為2。
在多并行度的情況下,Watermark 會有一個對齊機制,這個對齊機制會取所有 Channel 中最小的 Watermark。
sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

注意:不管是數據是否有序,都可以使用上面的代碼。有序的數據只是無序數據的一種特殊情況。
測試數據:基站的手機通話數據,如下:

需求:按基站,每5秒統計通話時間最長的記錄。
StationLog用于封裝基站數據
package watermark;
//station1,18688822219,18684812319,10,1595158485855
public class StationLog {
private String stationID; //基站ID
private String from; //呼叫放
private String to; //被叫方
private long duration; //通話的持續時間
private long callTime; //通話的呼叫時間
public StationLog(String stationID, String from,
String to, long duration,
long callTime) {
this.stationID = stationID;
this.from = from;
this.to = to;
this.duration = duration;
this.callTime = callTime;
}
public String getStationID() {
return stationID;
}
public void setStationID(String stationID) {
this.stationID = stationID;
}
public long getCallTime() {
return callTime;
}
public void setCallTime(long callTime) {
this.callTime = callTime;
}
public String getFrom() {
return from;
}
public void setFrom(String from) {
this.from = from;
}
public String getTo() {
return to;
}
public void setTo(String to) {
this.to = to;
}
public long getDuration() {
return duration;
}
public void setDuration(long duration) {
this.duration = duration;
}
}代碼實現:WaterMarkDemo用于完成計算(注意:為了方便咱們測試設置任務的并行度為1)
package watermark;
import java.time.Duration;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
//每隔五秒,將過去是10秒內,通話時間最長的通話日志輸出。
public class WaterMarkDemo {
public static void main(String[] args) throws Exception {
//得到Flink流式處理的運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
//設置周期性的產生水位線的時間間隔。當數據流很大的時候,如果每個事件都產生水位線,會影響性能。
env.getConfig().setAutoWatermarkInterval(100);//默認100毫秒
//得到輸入流
DataStreamSource<String> stream = env.socketTextStream("bigdata111", 1234);
stream.flatMap(new FlatMapFunction<String, StationLog>() {
public void flatMap(String data, Collector<StationLog> output) throws Exception {
String[] words = data.split(",");
// 基站ID from to 通話時長 callTime
output.collect(new StationLog(words[0], words[1],words[2], Long.parseLong(words[3]), Long.parseLong(words[4])));
}
}).filter(new FilterFunction<StationLog>() {
@Override
public boolean filter(StationLog value) throws Exception {
return value.getDuration() > 0?true:false;
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<StationLog>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner<StationLog>() {
@Override
public long extractTimestamp(StationLog element, long recordTimestamp) {
return element.getCallTime(); //指定EventTime對應的字段
}
})
).keyBy(new KeySelector<StationLog, String>(){
@Override
public String getKey(StationLog value) throws Exception {
return value.getStationID(); //按照基站分組
}}
).timeWindow(Time.seconds(5)) //設置時間窗口
.reduce(new MyReduceFunction(),new MyProcessWindows()).print();
env.execute();
}
}
//用于如何處理窗口中的數據,即:找到窗口內通話時間最長的記錄。
class MyReduceFunction implements ReduceFunction<StationLog> {
@Override
public StationLog reduce(StationLog value1, StationLog value2) throws Exception {
// 找到通話時間最長的通話記錄
return value1.getDuration() >= value2.getDuration() ? value1 : value2;
}
}
//窗口處理完成后,輸出的結果是什么
class MyProcessWindows extends ProcessWindowFunction<StationLog, String, String, TimeWindow> {
@Override
public void process(String key, ProcessWindowFunction<StationLog, String, String, TimeWindow>.Context context,
Iterable<StationLog> elements, Collector<String> out) throws Exception {
StationLog maxLog = elements.iterator().next();
StringBuffer sb = new StringBuffer();
sb.append("窗口范圍是:").append(context.window().getStart()).append("----").append(context.window().getEnd()).append("\n");;
sb.append("基站ID:").append(maxLog.getStationID()).append("\t")
.append("呼叫時間:").append(maxLog.getCallTime()).append("\t")
.append("主叫號碼:").append(maxLog.getFrom()).append("\t")
.append("被叫號碼:") .append(maxLog.getTo()).append("\t")
.append("通話時長:").append(maxLog.getDuration()).append("\n");
out.collect(sb.toString());
}
}看完上述內容,你們掌握基于Flink 1.11.0是怎樣實現Flink的Watermark機制的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注創新互聯-成都網站建設公司行業資訊頻道,感謝各位的閱讀!
本文標題:基于Flink1.11.0是怎樣實現Flink的Watermark機制-創新互聯
瀏覽地址:http://www.yijiale78.com/article16/phhgg.html
成都網站建設公司_創新互聯,為您提供電子商務、營銷型網站建設、網站營銷、做網站、App開發、網站設計
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯