Flink 1.11
I have a simple Flink application that reads from Kafka, uses event
timestamps, assigns timestamps and watermarks and then key's by a field and
uses a KeyedProcessFunciton.
The keyed process function outputs events from with the `processElement`
method using `out.collect`. No timers are used to collect or output any
elements (or do anything for that matter).
I also have a simple print statement that shows event time and waterMark
within the process function.
if (waterMark <= 0)
println(
s"""
|eventTimestamp: $eventTimestamp
|waterMark: $waterMark
|""".stripMargin)
If the process function simply does nothing with the incoming records,
i.e., does not emit any records/data as a result of an input element, then
you'll see the Water Mark start with -Max Long and then progress just fine
as expected. If I add `out.collect(....)` then the watermark stops
progressing and the above print statement prints for every record.
The environment has
`setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` set.
The source start out something like this:
someKafkaSource.flatMap(_.someTransofmrationToEventType.filter(_.something
!= 0))
.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness[Event](Duration.ofSeconds(10))
.withIdleness(Duration.ofSeconds(30))
.withTimestampAssigner(new SerializableTimestampAssigner[Event] {
override def extractTimestamp(element: Event, recordTimestamp:
Long): Long = {
if (element.eventTimeUTCMillis > 0) element.eventTimeUTCMillis
else recordTimestamp
}
})
The sink is a custom Rich Sink implementation:
resultStream.addSink(new CustomSink()}
I recall seeing a thread somewhere indicating this could be a Flink bug but
I can't seem to track it down again.
Happy to provide more information. For what it's worth, the
KeyedProcessFunction used to be a GlobalWindow with a custom Trigger and
Evictor but has since been replaced in an attempt to solve the watermark
issue with no success so far.
Do I have to use assignTimestampAndWatermarks again after the
KeyedProcessFunction?
Full job flow for completeness:
Kafka -> Flink Kafka source -> flatMap (map & filter) ->
assignTimestampsAndWaterMarks -> map Function -> Key By -> Keyed Process
Function -> Async IO -> Custom Sink
Much obliged.