Hi Ahmad, >From your description, I'd look in a different direction: Could it be that your Sink/Async IO is not processing data (fast enough)? Since you have a bounded watermark strategy, you'd need to see 10s of data being processed before the first watermark is emitted. To test that, can you please simply remove AsyncIO+Sink from your job and check for print statements?
On Tue, Oct 12, 2021 at 3:23 AM Ahmad Alkilani <amk...@gmail.com> wrote: > Flink 1.11 > I have a simple Flink application that reads from Kafka, uses event > timestamps, assigns timestamps and watermarks and then key's by a field and > uses a KeyedProcessFunciton. > > The keyed process function outputs events from with the `processElement` > method using `out.collect`. No timers are used to collect or output any > elements (or do anything for that matter). > > I also have a simple print statement that shows event time and waterMark > within the process function. > > if (waterMark <= 0) > println( > s""" > |eventTimestamp: $eventTimestamp > |waterMark: $waterMark > |""".stripMargin) > > > If the process function simply does nothing with the incoming records, > i.e., does not emit any records/data as a result of an input element, then > you'll see the Water Mark start with -Max Long and then progress just fine > as expected. If I add `out.collect(....)` then the watermark stops > progressing and the above print statement prints for every record. > > The environment has > `setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` set. > > The source start out something like this: > > someKafkaSource.flatMap(_.someTransofmrationToEventType.filter(_.something != > 0)) > .assignTimestampsAndWatermarks(WatermarkStrategy > .forBoundedOutOfOrderness[Event](Duration.ofSeconds(10)) > .withIdleness(Duration.ofSeconds(30)) > .withTimestampAssigner(new SerializableTimestampAssigner[Event] { > override def extractTimestamp(element: Event, recordTimestamp: Long): > Long = { > if (element.eventTimeUTCMillis > 0) element.eventTimeUTCMillis else > recordTimestamp > } > }) > > The sink is a custom Rich Sink implementation: > resultStream.addSink(new CustomSink()} > > I recall seeing a thread somewhere indicating this could be a Flink bug > but I can't seem to track it down again. > Happy to provide more information. For what it's worth, the > KeyedProcessFunction used to be a GlobalWindow with a custom Trigger and > Evictor but has since been replaced in an attempt to solve the watermark > issue with no success so far. > > Do I have to use assignTimestampAndWatermarks again after the > KeyedProcessFunction? > > Full job flow for completeness: > > Kafka -> Flink Kafka source -> flatMap (map & filter) -> > assignTimestampsAndWaterMarks -> map Function -> Key By -> Keyed Process > Function -> Async IO -> Custom Sink > > Much obliged. >