Hi again Fabian, Thanks for pointing this out to me. In my case there is no need for keyed writing - but I do wonder if having each kafka task write only to a single partition would significantly affect performance.
Actually now that I think about it, the approach to just wait for the first records of the next window is also subject to the problem you mention above: a producer lagging behind the rest could end up with a partition containing element out of ‘window order’. I was also thinking this problem is very similar to that of checkpoint barriers. I intended to dig into the details of the exactly once Kafka sink for some inspiration. Padarn On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Padarn, > > Yes, this is quite tricky. > The "problem" with watermarks is that you need to consider how you write > to Kafka. > If your Kafka sink writes to keyed Kafka stream (each Kafka partition is > written by multiple producers), you need to broadcast the watermarks to > each partition, i.e., each partition would receive watermarks from each > parallel sink task. So in order to reason about the current watermark of a > partition, you need to observe them and take the minimum WM across all > current sink task WMs. > Things become much easier, if each partition is only written by a single > task but this also means that data is not key-partitioned in Kafka. > In that case, the sink task only needs to write a WM message to each of > its assigned partitions. > > Hope this helps, > Fabian > > > Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson <pad...@gmail.com > >: > >> Hi Fabian, thanks for your input >> >> Exactly. Actually my first instinct was to see if it was possible to >> publish the watermarks somehow - my initial idea was to insert regular >> watermark messages into each partition of the stream, but exposing this >> seemed quite troublesome. >> >> > In that case, you could have a ProcessFunction that is chained before >> the sink and which counts the window results per time slice and emits the >> result when the watermark passes to a side output. >> All side output messages are collected by a single task and can be >> published to a Kafka topic or even be made available via Queryable State. >> >> I understand the idea here (and exactly once semantics are probably fine >> for my use case), but counting events seems a bit fragile. I'm not totally >> confident the consumer can guarantee it won't read duplicates (its a golang >> kafka library that seems to have some quirks). >> >> I think ideally each partition of the kafka topic would have some regular >> information about watermarks. Perhaps the kafka producer can be modified to >> support this. >> >> Padarn >> >> On Fri, Aug 16, 2019 at 3:50 PM Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Padarn, >>> >>> What you describe is essentially publishing Flink's watermarks to an >>> outside system. >>> Flink processes time windows, by waiting for a watermark that's past the >>> window end time. When it receives such a WM it processes and emits all >>> ended windows and forwards the watermark. >>> When a sink received a WM for say 12:45:15, you know that all window >>> results with until 12:45:00 have been emitted. >>> Hence, the watermark tells you about the completeness of data. >>> >>> However, using this information is not so easy, mostly because of the >>> failure semantics. >>> Things become much easier if you produce to Kafka with exactly-once >>> semantics. >>> >>> In that case, you could have a ProcessFunction that is chained before >>> the sink and which counts the window results per time slice and emits the >>> result when the watermark passes to a side output. >>> All side output messages are collected by a single task and can be >>> published to a Kafka topic or even be made available via Queryable State. >>> >>> For at-least once output, it's much harder because you'll have >>> duplicates in the output after a job recovered. >>> >>> Best, Fabian >>> >>> I think you have two options to let the consuming app know about the >>> progress. >>> You can either >>> >>> The ProcessFunction could count per window end timestamp how many >>> records passed and forward that information via a side output. >>> You could then >>> >>> >>> Essentially, you'd like to publish Flink's watermarks to an outside >>> system (possibly via Kafka). >>> >>> >>> Am Mo., 12. Aug. 2019 um 14:33 Uhr schrieb Padarn Wilson < >>> pad...@gmail.com>: >>> >>>> Hello Users, >>>> >>>> I have a question that is perhaps not best solved within Flink: It has >>>> to do with notifying a downstream application that a Flink window has >>>> completed. >>>> >>>> The (simplified) scenario is this: >>>> - We have a Flink job that consumes from Kafka, does some >>>> preprocessing, and then has a sliding window of 10 minutes and slide time >>>> of 1 minute. >>>> - The number of keys in each slide is not fixed >>>> - The output of the window is then output to Kafka, which is read by a >>>> downstream application. >>>> >>>> What I want to achieve is that the downstream application can someone >>>> know when it has read all of the data for a single window, without waiting >>>> for the next window to arrive. >>>> >>>> Some options I've considered: >>>> - Producing a second window over the window results that counts the >>>> output size, which can then be used by the downstream application to see >>>> when it has received the same number: This seems fragile, as there it >>>> relies on there being no loss or duplication of data. Its also an extra >>>> window and Kafka stream which is a tad messy. >>>> - Somehow adding an 'end of window' element to each partitions of the >>>> Kafka topic which can be read by the consumer: This seems a bit messy >>>> because it mixes different types of events into the same Kafka stream, and >>>> there is no really simple way to do this in Flink >>>> - Package the whole window output into a single message and make this >>>> the unit of transaction: This is possible, but the message would be quite >>>> large then (at least 10s of mb), as the volume of this stream is quite >>>> large. >>>> - Assume that if the consumer has no elements to read, or if the next >>>> window has started to be read, then it has read the whole window: This >>>> seems reasonable, and if it wasn't for the fact that my consumer on the >>>> application end was a bit inflexible right now, it is probably the solution >>>> I would use. >>>> >>>> Any further/better ideas? >>>> >>>> Thanks >>>> Padarn >>>> >>>