Hi Fuyao, I think you need to implement your own /WatermarkStrategy/
class and register that to
/window/./assignTimestampsAndWatermarks(new
YourEventWatermarkStrategy)/
/
/
Make sure you use KafkaConsumer's /assignTimestampsAndWatermarks /if
you're using Kafka consumers
/
/
an example code for a booking event that has it's internal timestamp
would be
public class BookingWatermarkStrategyimplements WatermarkStrategy<Booking> {
@Override public WatermarkGenerator<Booking>createWatermarkGenerator(
WatermarkGeneratorSupplier.Context context
) {
return new WatermarkGenerator<Booking>() {
private final long OUT_OF_ORDERNESS_MILLIS =30; private long
currentMaxTimestamp = Long.MIN_VALUE +OUT_OF_ORDERNESS_MILLIS +1; @Override
public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput
output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp,
bookingEvent.getTimestamp()); Watermark watermark =new
Watermark(currentMaxTimestamp -OUT_OF_ORDERNESS_MILLIS -1);
output.emitWatermark(watermark); }
@Override public void onPeriodicEmit(WatermarkOutput output) {
// Do nothing since watermark will be emitted every event }
}; }
@Override public TimestampAssigner<Booking>createTimestampAssigner(
TimestampAssignerSupplier.Context context
) {
return (booking, recordTimestamp) -> booking.getTimestamp(); }
}
On Wed, Nov 11, 2020 at 12:28 AM <fuyao...@oracle.com
<mailto:fuyao...@oracle.com>> wrote:
Hi Experts,
I am trying to use to implement a KeyedProcessFunction with
onTimer()
callback. I need to use event time and I meet some problems with
making
the watermark available to my operator. I meet some strange
behaviors.
I have a joined retracted stream without watermark or timestamp
information and i need to assign timestamps and watermarks to
it. The
timestamp is just a field in the stream. For the watermark
generator part.
Problem:
1. I can use timelag watermark generator and make it work. But for
BoundedOutofOrdernessGenator, The
context.timerService().currentWatermark() in ProcessElement()
always
sticks to the initial setup and never updates.
2. I set the autoWatermark interval to 5 seconds for debug
purpose, I
only attach this watermark generator in one place with
parallelism 1.
However, I am getting 8 records at a time. timelag policy will
advance
all 8 records, outOfOrderness policy will only advance 1
records. Maybe
the mismatch is causing the processElement() to capture the wrong
default watermark?
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html*writing-a-periodic-watermarkgenerator__;Iw!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAsIUclfM$>
This is my code for watermark generator:
@Slf4j
public class PeriodicTableOutputWatermarkGenerator implements
WatermarkGenerator<Tuple2<Boolean, Row>> {
private final long maxTimeLag = 15000;
private transient long currentMaxTimestamp = 15000;
@Override
public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2,
long
eventTimestamp, WatermarkOutput output) {
// the eventTimestamp is get through TimestampAssigner
//
https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
<https://urldefense.com/v3/__https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAQmdMGjw$>
currentMaxTimestamp = Math.max(eventTimestamp,
currentMaxTimestamp);
log.info
<https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("eventTimestamp
in onEvent method: {}", eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// Policy 1: timelag strategy, can work and advance the
timestamp
long watermarkEpochTime =
Math.max(System.currentTimeMillis() -
maxTimeLag, currentMaxTimestamp);
output.emitWatermark(new Watermark(watermarkEpochTime));
// Policy 2: periodic emit based on event
long periodicEmitWatermarkTime = currentMaxTimestamp -
maxTimeLag;
// output.emitWatermark(new
Watermark(periodicEmitWatermarkTime));
log.info
<https://urldefense.com/v3/__http://log.info__;!!GqivPVa7Brio!IsqeiKPZU2RRj3KIBPhxTUiQ5GxzTU3oVWXMzZA1E3fghyT4OllCyHMAGAShS_k$>("Emit
Watermark: watermark based on system time: {},
periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"
, watermarkEpochTime, periodicEmitWatermarkTime,
currentMaxTimestamp);
}
}
This is my log printed by the slf4j log above. Every time, it
will give
me 8 records, why it is 8 records? I think it should be 1 in
theory. I
am very confused. Also, the policy 1 is advancing all 8 records.
Policy
2 is advancing 1 of the 8 records and not reflected in
processElement().
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
1605047187881
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
1605047187881
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
-
Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
Any insights? Thank you very much!
Best,
Fuyao