Hi Experts,
I am trying to use to implement a KeyedProcessFunction with onTimer()
callback. I need to use event time and I meet some problems with making
the watermark available to my operator. I meet some strange behaviors.
I have a joined retracted stream without watermark or timestamp
information and i need to assign timestamps and watermarks to it. The
timestamp is just a field in the stream. For the watermark generator part.
Problem:
1. I can use timelag watermark generator and make it work. But for
BoundedOutofOrdernessGenator, The
context.timerService().currentWatermark() in ProcessElement() always
sticks to the initial setup and never updates.
2. I set the autoWatermark interval to 5 seconds for debug purpose, I
only attach this watermark generator in one place with parallelism 1.
However, I am getting 8 records at a time. timelag policy will advance
all 8 records, outOfOrderness policy will only advance 1 records. Maybe
the mismatch is causing the processElement() to capture the wrong
default watermark?
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
This is my code for watermark generator:
@Slf4j
public class PeriodicTableOutputWatermarkGenerator implements
WatermarkGenerator<Tuple2<Boolean, Row>> {
private final long maxTimeLag = 15000;
private transient long currentMaxTimestamp = 15000;
@Override
public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long
eventTimestamp, WatermarkOutput output) {
// the eventTimestamp is get through TimestampAssigner
//
https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
currentMaxTimestamp = Math.max(eventTimestamp,
currentMaxTimestamp);
log.info("eventTimestamp in onEvent method: {}", eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// Policy 1: timelag strategy, can work and advance the timestamp
long watermarkEpochTime = Math.max(System.currentTimeMillis() -
maxTimeLag, currentMaxTimestamp);
output.emitWatermark(new Watermark(watermarkEpochTime));
// Policy 2: periodic emit based on event
long periodicEmitWatermarkTime = currentMaxTimestamp - maxTimeLag;
// output.emitWatermark(new Watermark(periodicEmitWatermarkTime));
log.info("Emit Watermark: watermark based on system time: {},
periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
, watermarkEpochTime, periodicEmitWatermarkTime,
currentMaxTimestamp);
}
}
This is my log printed by the slf4j log above. Every time, it will give
me 8 records, why it is 8 records? I think it should be 1 in theory. I
am very confused. Also, the policy 1 is advancing all 8 records. Policy
2 is advancing 1 of the 8 records and not reflected in processElement().
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator -
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
Any insights? Thank you very much!
Best,
Fuyao