Hi, I'll chip in with an approach I'm trying at the moment that seems to work, and I say seems because I'm only running this on a personal project.
Personally, I don't have anything against end-of-message markers per partition, Padarn you seem to not prefer this option as it overloads the meaning of the output payload. My approach is equally valid when producing watermarks/end-of-message markers on a side output though. The main problem of both approaches is knowing when the window has finished across all partitions without having to wait for the start of the next window. I've taken the approach of sending all output messages of the window to 1. the sink but also 2. a single task operator. The single task operator registers an event time based timer at the time of the end of the window. You have the confidence of the task's timer triggering only once at the right time because all the post-window watermarks go through to the same task. At that point I make the task send an end-of-message marker to every partition. I don't need to send the count because Kafka messages are ordered. AND IF you prefer to not overload the semantic of your original Kafka topic you can post the message to a separate location of your choice. While this does mean that the end of marker message only gets sent through once the window has finished across all substreams (as opposed to per stream), it does mean you don't need to wait for the next window to start AND the watermark gap between substreams should never grow that much anyway. This approach should be particularly useful when the number of partitions or keying mechanism is different between the input and output topics. Hopefully that doesn't sound like a terrible idea. eduardo On Wed, 28 Aug 2019, 02:54 Padarn Wilson, <pad...@gmail.com> wrote: > Hi again Fabian, > > Thanks for pointing this out to me. In my case there is no need for keyed > writing - but I do wonder if having each kafka task write only to a single > partition would significantly affect performance. > > Actually now that I think about it, the approach to just wait for the > first records of the next window is also subject to the problem you mention > above: a producer lagging behind the rest could end up with a partition > containing element out of ‘window order’. > > I was also thinking this problem is very similar to that of checkpoint > barriers. I intended to dig into the details of the exactly once Kafka sink > for some inspiration. > > Padarn > > On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Padarn, >> >> Yes, this is quite tricky. >> The "problem" with watermarks is that you need to consider how you write >> to Kafka. >> If your Kafka sink writes to keyed Kafka stream (each Kafka partition is >> written by multiple producers), you need to broadcast the watermarks to >> each partition, i.e., each partition would receive watermarks from each >> parallel sink task. So in order to reason about the current watermark of a >> partition, you need to observe them and take the minimum WM across all >> current sink task WMs. >> Things become much easier, if each partition is only written by a single >> task but this also means that data is not key-partitioned in Kafka. >> In that case, the sink task only needs to write a WM message to each of >> its assigned partitions. >> >> Hope this helps, >> Fabian >> >> >> Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson < >> pad...@gmail.com>: >> >>> Hi Fabian, thanks for your input >>> >>> Exactly. Actually my first instinct was to see if it was possible to >>> publish the watermarks somehow - my initial idea was to insert regular >>> watermark messages into each partition of the stream, but exposing this >>> seemed quite troublesome. >>> >>> > In that case, you could have a ProcessFunction that is chained before >>> the sink and which counts the window results per time slice and emits the >>> result when the watermark passes to a side output. >>> All side output messages are collected by a single task and can be >>> published to a Kafka topic or even be made available via Queryable State. >>> >>> I understand the idea here (and exactly once semantics are probably fine >>> for my use case), but counting events seems a bit fragile. I'm not totally >>> confident the consumer can guarantee it won't read duplicates (its a golang >>> kafka library that seems to have some quirks). >>> >>> I think ideally each partition of the kafka topic would have some >>> regular information about watermarks. Perhaps the kafka producer can be >>> modified to support this. >>> >>> Padarn >>> >>> On Fri, Aug 16, 2019 at 3:50 PM Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi Padarn, >>>> >>>> What you describe is essentially publishing Flink's watermarks to an >>>> outside system. >>>> Flink processes time windows, by waiting for a watermark that's past >>>> the window end time. When it receives such a WM it processes and emits all >>>> ended windows and forwards the watermark. >>>> When a sink received a WM for say 12:45:15, you know that all window >>>> results with until 12:45:00 have been emitted. >>>> Hence, the watermark tells you about the completeness of data. >>>> >>>> However, using this information is not so easy, mostly because of the >>>> failure semantics. >>>> Things become much easier if you produce to Kafka with exactly-once >>>> semantics. >>>> >>>> In that case, you could have a ProcessFunction that is chained before >>>> the sink and which counts the window results per time slice and emits the >>>> result when the watermark passes to a side output. >>>> All side output messages are collected by a single task and can be >>>> published to a Kafka topic or even be made available via Queryable State. >>>> >>>> For at-least once output, it's much harder because you'll have >>>> duplicates in the output after a job recovered. >>>> >>>> Best, Fabian >>>> >>>> I think you have two options to let the consuming app know about the >>>> progress. >>>> You can either >>>> >>>> The ProcessFunction could count per window end timestamp how many >>>> records passed and forward that information via a side output. >>>> You could then >>>> >>>> >>>> Essentially, you'd like to publish Flink's watermarks to an outside >>>> system (possibly via Kafka). >>>> >>>> >>>> Am Mo., 12. Aug. 2019 um 14:33 Uhr schrieb Padarn Wilson < >>>> pad...@gmail.com>: >>>> >>>>> Hello Users, >>>>> >>>>> I have a question that is perhaps not best solved within Flink: It has >>>>> to do with notifying a downstream application that a Flink window has >>>>> completed. >>>>> >>>>> The (simplified) scenario is this: >>>>> - We have a Flink job that consumes from Kafka, does some >>>>> preprocessing, and then has a sliding window of 10 minutes and slide time >>>>> of 1 minute. >>>>> - The number of keys in each slide is not fixed >>>>> - The output of the window is then output to Kafka, which is read by a >>>>> downstream application. >>>>> >>>>> What I want to achieve is that the downstream application can someone >>>>> know when it has read all of the data for a single window, without waiting >>>>> for the next window to arrive. >>>>> >>>>> Some options I've considered: >>>>> - Producing a second window over the window results that counts the >>>>> output size, which can then be used by the downstream application to see >>>>> when it has received the same number: This seems fragile, as there it >>>>> relies on there being no loss or duplication of data. Its also an extra >>>>> window and Kafka stream which is a tad messy. >>>>> - Somehow adding an 'end of window' element to each partitions of the >>>>> Kafka topic which can be read by the consumer: This seems a bit messy >>>>> because it mixes different types of events into the same Kafka stream, and >>>>> there is no really simple way to do this in Flink >>>>> - Package the whole window output into a single message and make this >>>>> the unit of transaction: This is possible, but the message would be quite >>>>> large then (at least 10s of mb), as the volume of this stream is quite >>>>> large. >>>>> - Assume that if the consumer has no elements to read, or if the next >>>>> window has started to be read, then it has read the whole window: This >>>>> seems reasonable, and if it wasn't for the fact that my consumer on the >>>>> application end was a bit inflexible right now, it is probably the >>>>> solution >>>>> I would use. >>>>> >>>>> Any further/better ideas? >>>>> >>>>> Thanks >>>>> Padarn >>>>> >>>>