Hello,

I recently ran into a weird issue with a streaming job in Flink 1.16.1. One
of my functions (KeyedProcessFunction) has been using processing time
timers. I now want to execute the same job based on a historical data dump,
so I had to adjust the logic to use event time timers in that case (and did
*not* use BATCH execution mode). Since my data has a timestamp field, I
implemented a custom WatermarkGenerator that always emits a watermark with
that timestamp in the onEvent callback, and does nothing in the
onPeriodicEmit callback.

My problem is that, sometimes, the very first time my function calls
TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes
some false triggers when the first watermark actually arrives.

I would have expected that, if WatermarkGenerator.onEvent emits a
watermark, it would be sent before the corresponding event, but maybe this
is not always the case?

In case it's relevant, a brief overview of my job's topology:

Source1 -> Broadcast

Source2 ->
  keyBy ->
  connect(Broadcast) ->
  process ->
  filter ->
  assignTimestampsAndWatermarks -> // newly added for historical data
  keyBy ->
  process // function that uses timers

Regards,
Alexis.

Reply via email to