Hi Experts,

I am trying to use to implement a KeyedProcessFunction with onTimer() callback. I need to use event time and I meet some problems with making the watermark available to my operator. I meet some strange behaviors.

I have a joined retracted stream without watermark or timestamp information and i need to assign timestamps and watermarks to it. The timestamp is just a field in the stream. For the watermark generator part.

Problem:

1. I can use timelag watermark generator and make it work. But for BoundedOutofOrdernessGenator, The context.timerService().currentWatermark() in ProcessElement() always sticks to the initial setup and never updates.

2. I set the autoWatermark interval to 5 seconds for debug purpose, I only attach this watermark generator in one place with parallelism 1. However, I am getting 8 records at a time. timelag policy will advance all 8 records, outOfOrderness policy will only advance 1 records. Maybe the mismatch is causing the processElement() to capture the wrong default watermark?

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator

This is my code for watermark generator:

@Slf4j
public class PeriodicTableOutputWatermarkGenerator implements WatermarkGenerator<Tuple2<Boolean, Row>> {
    private final long maxTimeLag = 15000;
    private transient long currentMaxTimestamp = 15000;
    @Override
    public void onEvent(Tuple2<Boolean, Row> booleanRowTuple2, long eventTimestamp, WatermarkOutput output) {
        // the eventTimestamp is get through TimestampAssigner
        // https://github.com/apache/flink/blob/f24cb3f3b7e773706188ae92998b3e1ffbf1829e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java         currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp);
        log.info("eventTimestamp in onEvent method: {}", eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // Policy 1: timelag strategy, can work and advance the timestamp
        long watermarkEpochTime = Math.max(System.currentTimeMillis() - maxTimeLag, currentMaxTimestamp);
        output.emitWatermark(new Watermark(watermarkEpochTime));

        // Policy 2: periodic emit based on event
        long periodicEmitWatermarkTime = currentMaxTimestamp - maxTimeLag;
        // output.emitWatermark(new Watermark(periodicEmitWatermarkTime));

        log.info("Emit Watermark: watermark based on system time: {}, periodicEmitWatermarkTime: {}, currentMaxTimestamp: {}"                 , watermarkEpochTime, periodicEmitWatermarkTime, currentMaxTimestamp);
    }
}


This is my log printed by the slf4j log above. Every time, it will give me 8 records, why it is 8 records? I think it should be 1 in theory. I am very confused. Also, the policy 1 is advancing all 8 records. Policy 2 is advancing 1 of the 8 records and not reflected in processElement().

14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266198, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266199, periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881 14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266199, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266198, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266198, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266198, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266198, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:01,199 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047266198, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000

14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881 14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000 14:28:06,200 INFO org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit Watermark: watermark based on system time: 1605047271200, periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000


Any insights? Thank you very much!


Best,

Fuyao

Reply via email to