hi,everyone:
由于业务需求,需要对一条流中的数据分4个步骤处理,处理完第一步后,再过15分钟进行第二步,第二步完成后再过30分钟处理第三步,然后60分钟后进行第四步,我这边通过timerservice设置定时器的方式进行的实现,但是运行过程中发现第二步15分钟的定时器只有非常少部分消息到时间触发了定时任务onTimer中的逻辑,部分代码实现如下:
DataStream<String> dataStream = env.addSource(…);
final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> missPingback = …;
final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level1Tag = …;
final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level2Tag = …;
final OutputTag<Tuple2<String, SimplifiedPingbackMsg>> level3Tag = …;
SingleOutputStreamOperator<FinalItemFeature> missDs = dataStream
.map(…)
.filter(…)
.assignTimestampsAndWatermarks(new AssignWaterMark())
.keyBy(0)
.timeWindow(Time.seconds(winSize))
.process(new BatchMergeProcessFunction(missPingback));
SingleOutputStreamOperator<FinalItemFeature> level1MissedDs =
missDs
.getSideOutput(missPingback)
.keyBy(0)
// ********** 这一步中,处理类中15分钟的定时器只有非常少部分数据的定时器timer触发了,触发比例10%不到。
********
.process(new HbaseTimerProcessFunc(level1Tag, 15));
SingleOutputStreamOperator<FinalItemFeature> level2MissedDs =
level1MissedDs
.keyBy(0)
.process(new HbaseTimerProcessFunc(level2Tag, 30));
SingleOutputStreamOperator<FinalItemFeature> level3MissedDs =
level2MissedDs
.keyBy(0)
.process(new HbaseTimerProcessFunc(level3Tag, 60));
DataStream<FinalItemFeature> l1Ds = level1MissedDs.getSideOutput(level1Tag);
DataStream<FinalItemFeature> l2Ds = level2MissedDs.getSideOutput(level2Tag);
DataStream<FinalItemFeature> l3Ds = level3MissedDs.getSideOutput(level3Tag);
missDs.union(l1Ds).union(l2Ds).union(l3Ds).addSink(…);
———————————
—> HbaseTimerProcessFunc.class 中定时器设置方法
@Override
public void processElement(SimplifiedPingbackMsg value, Context context,
Collector<SimplifiedPingbackMsg> out) throws Exception {
long ts = System.currentTimeMillis() + 60 * 1000 * timeout;
pbState.put(value.getEventId() + ts, value);
context.timerService().registerEventTimeTimer(ts);
MetricModifyUtils.modifyPerDay(timerTotalCounter, 1);
}
———————————
…
// 水印提取类,类似于直接用了ProcessTime来提取的
class AssignWaterMark implements AssignerWithPeriodicWatermarks<Tuple2<String,
SimplifiedPingbackMsg>> {
private long maxOutOfOrderness = 60 * 1000 * 5;
private long currentMaxTimestamp = 0L;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(Tuple2<String, SimplifiedPingbackMsg>
element, long previousElementTimestamp) {
long timestamp = System.currentTimeMillis();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
}
所以为啥这里的实现会出现此类情况呢,这种情况该如何去排查为啥会有绝大多数timer不触发呢,我看了webui中水印生成是正常的。