processFunction will just emit watermarks from upstream as they come. No function/operator in Flink is a black hole w.r.t. watermarks. It's just important to remember that watermark after a network shuffle is always the min of all inputs (ignoring idle inputs). So if any connected upstream part is not advancing the watermark, the operator is not advancing it as well independent of the other upstream parts.
processing time timers will run outside of the watermark system. If you emit a record here, it will be blindly embedded into the stream of data where the watermark may or may not be advanced past the timestamp of the record. A windowing operator downstream may then discard your recently emitted record if it's before the watermark depending on your pipeline settings. So your processing time record may end up being a late record. There is no holding back of watermarks whatsoever. TL;DR no watermark should get lost on the abstraction that your are working. If you still see lost watermarks, I suggest you look at the parallel upstream instances of the first operator that doesn't see the watermark and check the watermarks of all these inputs. On Fri, Oct 15, 2021 at 9:06 AM Ahmad Alkilani <amk...@gmail.com> wrote: > Thanks again Arvid, > I am getting closer to the culprit as I've found some interesting > scenarios. Still no exact answer yet. We are indeed also using > .withIdleness to mitigate slow/issues with partitions. > > I did have a few clarifying questions though w.r.t watermarks if you don't > mind. > *Watermark progression:* > The code you pointed out in [1], seems to indicate that watermarks are a > logical side-effect that travel alongside events in the stream but can also > progress on their own? This continues to puzzle me. Here's a contrived > example to clarify: A process function receives data but never emits > anything (neither in the processElement or based on a timer).. i.e., the > processFunction is just a black hole for event records passing through it. > Do watermarks still make it through to the subsequent next operator? If the > answer is yes here, then this confirms my now correct understanding of how > this works. I always thought, and according to what I see in litterature, > that watermarks travelled with events in a stream. > > *Using process time windows alongside event-time computation:* > While most of the processing we have uses event-time.. we do have a > "processing time" window towards the end that essentially has the > responsibility of grouping events together in a batch before sending to an > external system.. the receiving end prefers slightly larger batches of data > vs one event at a time. This window uses processing time since we really > only care about how many records we send / second vs what the semantic > event time meaning of that record is. So for example we'd group events in a > 2 second window, i.e., limiting to 1 request every 2 seconds downstream > (assuming parallelism of 1). The question here is, does this use of event > time windowing have any impact upstream, or downstream, on the progression > of watermarks? By impact downstream, the concern is if the watermark > somehow gets "lost" due to the change of semantics in processing. FWIW, > this window precedes the AsyncI/O function. Both of these have been removed > to simplify troubleshooting for the original question but they are part of > the bigger application once fully assembled. > > Thank you! > > > > On Thu, Oct 14, 2021 at 8:40 AM Arvid Heise <ar...@apache.org> wrote: > >> Hi Ahmad, >> >> The ProcessFunction is simply forwarding the Watermark [1]. So I don't >> have any explanation as to why it would not advance anymore as soon as you >> emit data. My assumption was that by emitting in the process function >> causes backpressure and thus halts the advancement of the watermark >> upstream. >> >> A usual suspect when not seeing good watermarks is that the custom >> watermark assigner is not working as expected. But you mentioned that with >> a no-op, the process function is actually showing the watermark and that >> leaves me completely puzzled. >> >> I would dump down your example even more to find the culprit. >> >> Another common issue is that if you have empty partitions in kafka, you >> wouldn't see advancing watermarks in the 2. process function after the >> keyBy. Since the watermark advances as the min watermark of all input >> subtasks, it will stay as MIN_VALUE in all operators after the keyBy if >> only one subtask sees no watermark advancement. See [2] for a solution. >> >> [1] >> https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java#L72-L72 >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#dealing-with-idle-sources >> >> On Tue, Oct 12, 2021 at 10:22 PM Ahmad Alkilani <amk...@gmail.com> wrote: >> >>> Thanks Arvid. >>> Getting the easy stuff out of the way, I certainly wait for longer than >>> 10s (typically observe what happens over a few minutes) so the bounded >>> watermark issue isn't in play here. >>> >>> The Async IO as it stands today has timeouts so it doesn't run >>> indefinitely. WIth that said, I replaced the Aync IO with a simple process >>> function and print statements in the body of the process function. The >>> process function simply emits what it received. I also removed the custom >>> sink (that has an external dependency) and replaced it with a simple lambda >>> that occasionally prints just to show progress. >>> >>> So now: >>> Kafka -> Flink Kafka source -> flatMap (map & filter) -> >>> assignTimestampsAndWaterMarks -> map Function -> *process function >>> (print watermarks) *-> Key By -> Keyed Process Function -> *process >>> function (print watermarks)* -> Simple Sink >>> >>> I am seeing similar problems. The watermarks don't seem to advance until >>> a checkpoint is triggered. I haven't been able to measure if they indeed >>> advance *regularly *post checkpoint but the watermark at the very least >>> is no longer Long.MinValue. Also the checkpoint could simply be a >>> red-herring, I also see some offset commit messages to Kafka around the >>> same time that I notice watermarks advancing. This is well into a job >>> execution (5-10 minutes) >>> >>> So now that the Aync I/O is out of the way. Any idea what's going on >>> here? >>> Related question, how does Flink know that time has "progressed" after a >>> process function? How is the notion of time carried over given the >>> developer has full control over what the process function does. Meaning, >>> the process function could choose to process a "processing time" trigger >>> for every 5th message it received and then in the processing time trigger >>> just output a completely unrelated piece of data. How does the notion of >>> event time then continue to exist.. i.e., how does it know what the event >>> time is then? >>> >>> Thank you >>> >>> >>> >>> >>> On Mon, Oct 11, 2021 at 11:13 PM Arvid Heise <ar...@apache.org> wrote: >>> >>>> Hi Ahmad, >>>> >>>> From your description, I'd look in a different direction: Could it be >>>> that your Sink/Async IO is not processing data (fast enough)? >>>> Since you have a bounded watermark strategy, you'd need to see 10s of >>>> data being processed before the first watermark is emitted. >>>> To test that, can you please simply remove AsyncIO+Sink from your job >>>> and check for print statements? >>>> >>>> On Tue, Oct 12, 2021 at 3:23 AM Ahmad Alkilani <amk...@gmail.com> >>>> wrote: >>>> >>>>> Flink 1.11 >>>>> I have a simple Flink application that reads from Kafka, uses event >>>>> timestamps, assigns timestamps and watermarks and then key's by a field >>>>> and >>>>> uses a KeyedProcessFunciton. >>>>> >>>>> The keyed process function outputs events from with the >>>>> `processElement` method using `out.collect`. No timers are used to collect >>>>> or output any elements (or do anything for that matter). >>>>> >>>>> I also have a simple print statement that shows event time and >>>>> waterMark within the process function. >>>>> >>>>> if (waterMark <= 0) >>>>> println( >>>>> s""" >>>>> |eventTimestamp: $eventTimestamp >>>>> |waterMark: $waterMark >>>>> |""".stripMargin) >>>>> >>>>> >>>>> If the process function simply does nothing with the incoming records, >>>>> i.e., does not emit any records/data as a result of an input element, then >>>>> you'll see the Water Mark start with -Max Long and then progress just fine >>>>> as expected. If I add `out.collect(....)` then the watermark stops >>>>> progressing and the above print statement prints for every record. >>>>> >>>>> The environment has >>>>> `setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` set. >>>>> >>>>> The source start out something like this: >>>>> >>>>> someKafkaSource.flatMap(_.someTransofmrationToEventType.filter(_.something >>>>> != 0)) >>>>> .assignTimestampsAndWatermarks(WatermarkStrategy >>>>> .forBoundedOutOfOrderness[Event](Duration.ofSeconds(10)) >>>>> .withIdleness(Duration.ofSeconds(30)) >>>>> .withTimestampAssigner(new SerializableTimestampAssigner[Event] { >>>>> override def extractTimestamp(element: Event, recordTimestamp: >>>>> Long): Long = { >>>>> if (element.eventTimeUTCMillis > 0) element.eventTimeUTCMillis >>>>> else recordTimestamp >>>>> } >>>>> }) >>>>> >>>>> The sink is a custom Rich Sink implementation: >>>>> resultStream.addSink(new CustomSink()} >>>>> >>>>> I recall seeing a thread somewhere indicating this could be a Flink >>>>> bug but I can't seem to track it down again. >>>>> Happy to provide more information. For what it's worth, the >>>>> KeyedProcessFunction used to be a GlobalWindow with a custom Trigger and >>>>> Evictor but has since been replaced in an attempt to solve the watermark >>>>> issue with no success so far. >>>>> >>>>> Do I have to use assignTimestampAndWatermarks again after the >>>>> KeyedProcessFunction? >>>>> >>>>> Full job flow for completeness: >>>>> >>>>> Kafka -> Flink Kafka source -> flatMap (map & filter) -> >>>>> assignTimestampsAndWaterMarks -> map Function -> Key By -> Keyed Process >>>>> Function -> Async IO -> Custom Sink >>>>> >>>>> Much obliged. >>>>> >>>>