Hi Padarn, Regarding your throughput concerns: A sink task may write to multiple partitions, but each partition may only be written by a single task.
@Eduardo: Thanks for sharing your approach! Not sure if I understood it correctly, but I think that the approach does not guarantee that all results of a window are emitted before the end-of-window marker is written. Since the sink operator and the single-task-operator are separate operators, the output records might get stuck (or be bufffered) in one of the sink tasks and the single-task would still emit an end-of-window marker record because it doesn't know about the sink task. Best, Fabian Am Do., 29. Aug. 2019 um 18:42 Uhr schrieb Eduardo Winpenny Tejedor < eduardo.winpe...@gmail.com>: > Hi, > > I'll chip in with an approach I'm trying at the moment that seems to work, > and I say seems because I'm only running this on a personal project. > > Personally, I don't have anything against end-of-message markers per > partition, Padarn you seem to not prefer this option as it overloads the > meaning of the output payload. My approach is equally valid when producing > watermarks/end-of-message markers on a side output though. > > The main problem of both approaches is knowing when the window has > finished across all partitions without having to wait for the start of the > next window. > > I've taken the approach of sending all output messages of the window to 1. > the sink but also 2. a single task operator. The single task operator > registers an event time based timer at the time of the end of the window. > You have the confidence of the task's timer triggering only once at the > right time because all the post-window watermarks go through to the same > task. At that point I make the task send an end-of-message marker to every > partition. I don't need to send the count because Kafka messages are > ordered. AND IF you prefer to not overload the semantic of your original > Kafka topic you can post the message to a separate location of your choice. > > While this does mean that the end of marker message only gets sent through > once the window has finished across all substreams (as opposed to per > stream), it does mean you don't need to wait for the next window to start > AND the watermark gap between substreams should never grow that much anyway. > > This approach should be particularly useful when the number of partitions > or keying mechanism is different between the input and output topics. > > Hopefully that doesn't sound like a terrible idea. > > eduardo > > > > > On Wed, 28 Aug 2019, 02:54 Padarn Wilson, <pad...@gmail.com> wrote: > >> Hi again Fabian, >> >> Thanks for pointing this out to me. In my case there is no need for keyed >> writing - but I do wonder if having each kafka task write only to a single >> partition would significantly affect performance. >> >> Actually now that I think about it, the approach to just wait for the >> first records of the next window is also subject to the problem you mention >> above: a producer lagging behind the rest could end up with a partition >> containing element out of ‘window order’. >> >> I was also thinking this problem is very similar to that of checkpoint >> barriers. I intended to dig into the details of the exactly once Kafka sink >> for some inspiration. >> >> Padarn >> >> On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Padarn, >>> >>> Yes, this is quite tricky. >>> The "problem" with watermarks is that you need to consider how you write >>> to Kafka. >>> If your Kafka sink writes to keyed Kafka stream (each Kafka partition is >>> written by multiple producers), you need to broadcast the watermarks to >>> each partition, i.e., each partition would receive watermarks from each >>> parallel sink task. So in order to reason about the current watermark of a >>> partition, you need to observe them and take the minimum WM across all >>> current sink task WMs. >>> Things become much easier, if each partition is only written by a single >>> task but this also means that data is not key-partitioned in Kafka. >>> In that case, the sink task only needs to write a WM message to each of >>> its assigned partitions. >>> >>> Hope this helps, >>> Fabian >>> >>> >>> Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson < >>> pad...@gmail.com>: >>> >>>> Hi Fabian, thanks for your input >>>> >>>> Exactly. Actually my first instinct was to see if it was possible to >>>> publish the watermarks somehow - my initial idea was to insert regular >>>> watermark messages into each partition of the stream, but exposing this >>>> seemed quite troublesome. >>>> >>>> > In that case, you could have a ProcessFunction that is chained before >>>> the sink and which counts the window results per time slice and emits the >>>> result when the watermark passes to a side output. >>>> All side output messages are collected by a single task and can be >>>> published to a Kafka topic or even be made available via Queryable State. >>>> >>>> I understand the idea here (and exactly once semantics are probably >>>> fine for my use case), but counting events seems a bit fragile. I'm not >>>> totally confident the consumer can guarantee it won't read duplicates (its >>>> a golang kafka library that seems to have some quirks). >>>> >>>> I think ideally each partition of the kafka topic would have some >>>> regular information about watermarks. Perhaps the kafka producer can be >>>> modified to support this. >>>> >>>> Padarn >>>> >>>> On Fri, Aug 16, 2019 at 3:50 PM Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi Padarn, >>>>> >>>>> What you describe is essentially publishing Flink's watermarks to an >>>>> outside system. >>>>> Flink processes time windows, by waiting for a watermark that's past >>>>> the window end time. When it receives such a WM it processes and emits all >>>>> ended windows and forwards the watermark. >>>>> When a sink received a WM for say 12:45:15, you know that all window >>>>> results with until 12:45:00 have been emitted. >>>>> Hence, the watermark tells you about the completeness of data. >>>>> >>>>> However, using this information is not so easy, mostly because of the >>>>> failure semantics. >>>>> Things become much easier if you produce to Kafka with exactly-once >>>>> semantics. >>>>> >>>>> In that case, you could have a ProcessFunction that is chained before >>>>> the sink and which counts the window results per time slice and emits the >>>>> result when the watermark passes to a side output. >>>>> All side output messages are collected by a single task and can be >>>>> published to a Kafka topic or even be made available via Queryable State. >>>>> >>>>> For at-least once output, it's much harder because you'll have >>>>> duplicates in the output after a job recovered. >>>>> >>>>> Best, Fabian >>>>> >>>>> I think you have two options to let the consuming app know about the >>>>> progress. >>>>> You can either >>>>> >>>>> The ProcessFunction could count per window end timestamp how many >>>>> records passed and forward that information via a side output. >>>>> You could then >>>>> >>>>> >>>>> Essentially, you'd like to publish Flink's watermarks to an outside >>>>> system (possibly via Kafka). >>>>> >>>>> >>>>> Am Mo., 12. Aug. 2019 um 14:33 Uhr schrieb Padarn Wilson < >>>>> pad...@gmail.com>: >>>>> >>>>>> Hello Users, >>>>>> >>>>>> I have a question that is perhaps not best solved within Flink: It >>>>>> has to do with notifying a downstream application that a Flink window has >>>>>> completed. >>>>>> >>>>>> The (simplified) scenario is this: >>>>>> - We have a Flink job that consumes from Kafka, does some >>>>>> preprocessing, and then has a sliding window of 10 minutes and slide time >>>>>> of 1 minute. >>>>>> - The number of keys in each slide is not fixed >>>>>> - The output of the window is then output to Kafka, which is read by >>>>>> a downstream application. >>>>>> >>>>>> What I want to achieve is that the downstream application can someone >>>>>> know when it has read all of the data for a single window, without >>>>>> waiting >>>>>> for the next window to arrive. >>>>>> >>>>>> Some options I've considered: >>>>>> - Producing a second window over the window results that counts the >>>>>> output size, which can then be used by the downstream application to see >>>>>> when it has received the same number: This seems fragile, as there it >>>>>> relies on there being no loss or duplication of data. Its also an extra >>>>>> window and Kafka stream which is a tad messy. >>>>>> - Somehow adding an 'end of window' element to each partitions of the >>>>>> Kafka topic which can be read by the consumer: This seems a bit messy >>>>>> because it mixes different types of events into the same Kafka stream, >>>>>> and >>>>>> there is no really simple way to do this in Flink >>>>>> - Package the whole window output into a single message and make this >>>>>> the unit of transaction: This is possible, but the message would be quite >>>>>> large then (at least 10s of mb), as the volume of this stream is quite >>>>>> large. >>>>>> - Assume that if the consumer has no elements to read, or if the next >>>>>> window has started to be read, then it has read the whole window: This >>>>>> seems reasonable, and if it wasn't for the fact that my consumer on the >>>>>> application end was a bit inflexible right now, it is probably the >>>>>> solution >>>>>> I would use. >>>>>> >>>>>> Any further/better ideas? >>>>>> >>>>>> Thanks >>>>>> Padarn >>>>>> >>>>>