Hi Ahmad, The ProcessFunction is simply forwarding the Watermark [1]. So I don't have any explanation as to why it would not advance anymore as soon as you emit data. My assumption was that by emitting in the process function causes backpressure and thus halts the advancement of the watermark upstream.
A usual suspect when not seeing good watermarks is that the custom watermark assigner is not working as expected. But you mentioned that with a no-op, the process function is actually showing the watermark and that leaves me completely puzzled. I would dump down your example even more to find the culprit. Another common issue is that if you have empty partitions in kafka, you wouldn't see advancing watermarks in the 2. process function after the keyBy. Since the watermark advances as the min watermark of all input subtasks, it will stay as MIN_VALUE in all operators after the keyBy if only one subtask sees no watermark advancement. See [2] for a solution. [1] https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java#L72-L72 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#dealing-with-idle-sources On Tue, Oct 12, 2021 at 10:22 PM Ahmad Alkilani <amk...@gmail.com> wrote: > Thanks Arvid. > Getting the easy stuff out of the way, I certainly wait for longer than > 10s (typically observe what happens over a few minutes) so the bounded > watermark issue isn't in play here. > > The Async IO as it stands today has timeouts so it doesn't run > indefinitely. WIth that said, I replaced the Aync IO with a simple process > function and print statements in the body of the process function. The > process function simply emits what it received. I also removed the custom > sink (that has an external dependency) and replaced it with a simple lambda > that occasionally prints just to show progress. > > So now: > Kafka -> Flink Kafka source -> flatMap (map & filter) -> > assignTimestampsAndWaterMarks -> map Function -> *process function (print > watermarks) *-> Key By -> Keyed Process Function -> *process function > (print watermarks)* -> Simple Sink > > I am seeing similar problems. The watermarks don't seem to advance until a > checkpoint is triggered. I haven't been able to measure if they indeed > advance *regularly *post checkpoint but the watermark at the very least > is no longer Long.MinValue. Also the checkpoint could simply be a > red-herring, I also see some offset commit messages to Kafka around the > same time that I notice watermarks advancing. This is well into a job > execution (5-10 minutes) > > So now that the Aync I/O is out of the way. Any idea what's going on here? > Related question, how does Flink know that time has "progressed" after a > process function? How is the notion of time carried over given the > developer has full control over what the process function does. Meaning, > the process function could choose to process a "processing time" trigger > for every 5th message it received and then in the processing time trigger > just output a completely unrelated piece of data. How does the notion of > event time then continue to exist.. i.e., how does it know what the event > time is then? > > Thank you > > > > > On Mon, Oct 11, 2021 at 11:13 PM Arvid Heise <ar...@apache.org> wrote: > >> Hi Ahmad, >> >> From your description, I'd look in a different direction: Could it be >> that your Sink/Async IO is not processing data (fast enough)? >> Since you have a bounded watermark strategy, you'd need to see 10s of >> data being processed before the first watermark is emitted. >> To test that, can you please simply remove AsyncIO+Sink from your job and >> check for print statements? >> >> On Tue, Oct 12, 2021 at 3:23 AM Ahmad Alkilani <amk...@gmail.com> wrote: >> >>> Flink 1.11 >>> I have a simple Flink application that reads from Kafka, uses event >>> timestamps, assigns timestamps and watermarks and then key's by a field and >>> uses a KeyedProcessFunciton. >>> >>> The keyed process function outputs events from with the `processElement` >>> method using `out.collect`. No timers are used to collect or output any >>> elements (or do anything for that matter). >>> >>> I also have a simple print statement that shows event time and waterMark >>> within the process function. >>> >>> if (waterMark <= 0) >>> println( >>> s""" >>> |eventTimestamp: $eventTimestamp >>> |waterMark: $waterMark >>> |""".stripMargin) >>> >>> >>> If the process function simply does nothing with the incoming records, >>> i.e., does not emit any records/data as a result of an input element, then >>> you'll see the Water Mark start with -Max Long and then progress just fine >>> as expected. If I add `out.collect(....)` then the watermark stops >>> progressing and the above print statement prints for every record. >>> >>> The environment has >>> `setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` set. >>> >>> The source start out something like this: >>> >>> someKafkaSource.flatMap(_.someTransofmrationToEventType.filter(_.something >>> != 0)) >>> .assignTimestampsAndWatermarks(WatermarkStrategy >>> .forBoundedOutOfOrderness[Event](Duration.ofSeconds(10)) >>> .withIdleness(Duration.ofSeconds(30)) >>> .withTimestampAssigner(new SerializableTimestampAssigner[Event] { >>> override def extractTimestamp(element: Event, recordTimestamp: Long): >>> Long = { >>> if (element.eventTimeUTCMillis > 0) element.eventTimeUTCMillis else >>> recordTimestamp >>> } >>> }) >>> >>> The sink is a custom Rich Sink implementation: >>> resultStream.addSink(new CustomSink()} >>> >>> I recall seeing a thread somewhere indicating this could be a Flink bug >>> but I can't seem to track it down again. >>> Happy to provide more information. For what it's worth, the >>> KeyedProcessFunction used to be a GlobalWindow with a custom Trigger and >>> Evictor but has since been replaced in an attempt to solve the watermark >>> issue with no success so far. >>> >>> Do I have to use assignTimestampAndWatermarks again after the >>> KeyedProcessFunction? >>> >>> Full job flow for completeness: >>> >>> Kafka -> Flink Kafka source -> flatMap (map & filter) -> >>> assignTimestampsAndWaterMarks -> map Function -> Key By -> Keyed Process >>> Function -> Async IO -> Custom Sink >>> >>> Much obliged. >>> >>