Flink 1.11 I have a simple Flink application that reads from Kafka, uses event timestamps, assigns timestamps and watermarks and then key's by a field and uses a KeyedProcessFunciton.
The keyed process function outputs events from with the `processElement` method using `out.collect`. No timers are used to collect or output any elements (or do anything for that matter). I also have a simple print statement that shows event time and waterMark within the process function. if (waterMark <= 0) println( s""" |eventTimestamp: $eventTimestamp |waterMark: $waterMark |""".stripMargin) If the process function simply does nothing with the incoming records, i.e., does not emit any records/data as a result of an input element, then you'll see the Water Mark start with -Max Long and then progress just fine as expected. If I add `out.collect(....)` then the watermark stops progressing and the above print statement prints for every record. The environment has `setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` set. The source start out something like this: someKafkaSource.flatMap(_.someTransofmrationToEventType.filter(_.something != 0)) .assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOfOrderness[Event](Duration.ofSeconds(10)) .withIdleness(Duration.ofSeconds(30)) .withTimestampAssigner(new SerializableTimestampAssigner[Event] { override def extractTimestamp(element: Event, recordTimestamp: Long): Long = { if (element.eventTimeUTCMillis > 0) element.eventTimeUTCMillis else recordTimestamp } }) The sink is a custom Rich Sink implementation: resultStream.addSink(new CustomSink()} I recall seeing a thread somewhere indicating this could be a Flink bug but I can't seem to track it down again. Happy to provide more information. For what it's worth, the KeyedProcessFunction used to be a GlobalWindow with a custom Trigger and Evictor but has since been replaced in an attempt to solve the watermark issue with no success so far. Do I have to use assignTimestampAndWatermarks again after the KeyedProcessFunction? Full job flow for completeness: Kafka -> Flink Kafka source -> flatMap (map & filter) -> assignTimestampsAndWaterMarks -> map Function -> Key By -> Keyed Process Function -> Async IO -> Custom Sink Much obliged.