Flink 1.11
I have a simple Flink application that reads from Kafka, uses event
timestamps, assigns timestamps and watermarks and then key's by a field and
uses a KeyedProcessFunciton.

The keyed process function outputs events from with the `processElement`
method using `out.collect`. No timers are used to collect or output any
elements (or do anything for that matter).

I also have a simple print statement that shows event time and waterMark
within the process function.

if (waterMark <= 0)
  println(
    s"""
       |eventTimestamp:        $eventTimestamp
       |waterMark:             $waterMark
       |""".stripMargin)


If the process function simply does nothing with the incoming records,
i.e., does not emit any records/data as a result of an input element, then
you'll see the Water Mark start with -Max Long and then progress just fine
as expected. If I add `out.collect(....)` then the watermark stops
progressing and the above print statement prints for every record.

The environment has
`setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` set.

The source start out something like this:

someKafkaSource.flatMap(_.someTransofmrationToEventType.filter(_.something
!= 0))
  .assignTimestampsAndWatermarks(WatermarkStrategy
    .forBoundedOutOfOrderness[Event](Duration.ofSeconds(10))
    .withIdleness(Duration.ofSeconds(30))
    .withTimestampAssigner(new SerializableTimestampAssigner[Event] {
      override def extractTimestamp(element: Event, recordTimestamp:
Long): Long = {
        if (element.eventTimeUTCMillis > 0) element.eventTimeUTCMillis
else recordTimestamp
      }
    })

The sink is a custom Rich Sink implementation:
 resultStream.addSink(new CustomSink()}

I recall seeing a thread somewhere indicating this could be a Flink bug but
I can't seem to track it down again.
Happy to provide more information. For what it's worth, the
KeyedProcessFunction used to be a GlobalWindow with a custom Trigger and
Evictor but has since been replaced in an attempt to solve the watermark
issue with no success so far.

Do I have to use assignTimestampAndWatermarks again after the
KeyedProcessFunction?

Full job flow for completeness:

Kafka -> Flink Kafka source -> flatMap (map & filter) ->
assignTimestampsAndWaterMarks -> map Function -> Key By -> Keyed Process
Function -> Async IO -> Custom Sink

Much obliged.

Reply via email to