Hi Ahmad,

>From your description, I'd look in a different direction: Could it be that
your Sink/Async IO is not processing data (fast enough)?
Since you have a bounded watermark strategy, you'd need to see 10s of data
being processed before the first watermark is emitted.
To test that, can you please simply remove AsyncIO+Sink from your job and
check for print statements?

On Tue, Oct 12, 2021 at 3:23 AM Ahmad Alkilani <amk...@gmail.com> wrote:

> Flink 1.11
> I have a simple Flink application that reads from Kafka, uses event
> timestamps, assigns timestamps and watermarks and then key's by a field and
> uses a KeyedProcessFunciton.
>
> The keyed process function outputs events from with the `processElement`
> method using `out.collect`. No timers are used to collect or output any
> elements (or do anything for that matter).
>
> I also have a simple print statement that shows event time and waterMark
> within the process function.
>
> if (waterMark <= 0)
>   println(
>     s"""
>        |eventTimestamp:        $eventTimestamp
>        |waterMark:             $waterMark
>        |""".stripMargin)
>
>
> If the process function simply does nothing with the incoming records,
> i.e., does not emit any records/data as a result of an input element, then
> you'll see the Water Mark start with -Max Long and then progress just fine
> as expected. If I add `out.collect(....)` then the watermark stops
> progressing and the above print statement prints for every record.
>
> The environment has
> `setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` set.
>
> The source start out something like this:
>
> someKafkaSource.flatMap(_.someTransofmrationToEventType.filter(_.something != 
> 0))
>   .assignTimestampsAndWatermarks(WatermarkStrategy
>     .forBoundedOutOfOrderness[Event](Duration.ofSeconds(10))
>     .withIdleness(Duration.ofSeconds(30))
>     .withTimestampAssigner(new SerializableTimestampAssigner[Event] {
>       override def extractTimestamp(element: Event, recordTimestamp: Long): 
> Long = {
>         if (element.eventTimeUTCMillis > 0) element.eventTimeUTCMillis else 
> recordTimestamp
>       }
>     })
>
> The sink is a custom Rich Sink implementation:
>  resultStream.addSink(new CustomSink()}
>
> I recall seeing a thread somewhere indicating this could be a Flink bug
> but I can't seem to track it down again.
> Happy to provide more information. For what it's worth, the
> KeyedProcessFunction used to be a GlobalWindow with a custom Trigger and
> Evictor but has since been replaced in an attempt to solve the watermark
> issue with no success so far.
>
> Do I have to use assignTimestampAndWatermarks again after the
> KeyedProcessFunction?
>
> Full job flow for completeness:
>
> Kafka -> Flink Kafka source -> flatMap (map & filter) ->
> assignTimestampsAndWaterMarks -> map Function -> Key By -> Keyed Process
> Function -> Async IO -> Custom Sink
>
> Much obliged.
>

Reply via email to