Hi Fabian, > but each partition may only be written by a single task
Sorry I think I misunderstand something here then: If I have a topic with one partition, but multiple sink tasks (or parallelism > 1).. this means the data must all be shuffled to the single task writing that partition? Padarn On Mon, Sep 2, 2019 at 9:52 PM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Padarn, > > Regarding your throughput concerns: A sink task may write to multiple > partitions, but each partition may only be written by a single task. > > @Eduardo: Thanks for sharing your approach! Not sure if I understood it > correctly, but I think that the approach does not guarantee that all > results of a window are emitted before the end-of-window marker is written. > Since the sink operator and the single-task-operator are separate > operators, the output records might get stuck (or be bufffered) in one of > the sink tasks and the single-task would still emit an end-of-window marker > record because it doesn't know about the sink task. > > Best, > Fabian > > Am Do., 29. Aug. 2019 um 18:42 Uhr schrieb Eduardo Winpenny Tejedor < > eduardo.winpe...@gmail.com>: > >> Hi, >> >> I'll chip in with an approach I'm trying at the moment that seems to >> work, and I say seems because I'm only running this on a personal project. >> >> Personally, I don't have anything against end-of-message markers per >> partition, Padarn you seem to not prefer this option as it overloads the >> meaning of the output payload. My approach is equally valid when producing >> watermarks/end-of-message markers on a side output though. >> >> The main problem of both approaches is knowing when the window has >> finished across all partitions without having to wait for the start of the >> next window. >> >> I've taken the approach of sending all output messages of the window to >> 1. the sink but also 2. a single task operator. The single task operator >> registers an event time based timer at the time of the end of the window. >> You have the confidence of the task's timer triggering only once at the >> right time because all the post-window watermarks go through to the same >> task. At that point I make the task send an end-of-message marker to every >> partition. I don't need to send the count because Kafka messages are >> ordered. AND IF you prefer to not overload the semantic of your original >> Kafka topic you can post the message to a separate location of your choice. >> >> While this does mean that the end of marker message only gets sent >> through once the window has finished across all substreams (as opposed to >> per stream), it does mean you don't need to wait for the next window to >> start AND the watermark gap between substreams should never grow that much >> anyway. >> >> This approach should be particularly useful when the number of partitions >> or keying mechanism is different between the input and output topics. >> >> Hopefully that doesn't sound like a terrible idea. >> >> eduardo >> >> >> >> >> On Wed, 28 Aug 2019, 02:54 Padarn Wilson, <pad...@gmail.com> wrote: >> >>> Hi again Fabian, >>> >>> Thanks for pointing this out to me. In my case there is no need for >>> keyed writing - but I do wonder if having each kafka task write only to a >>> single partition would significantly affect performance. >>> >>> Actually now that I think about it, the approach to just wait for the >>> first records of the next window is also subject to the problem you mention >>> above: a producer lagging behind the rest could end up with a partition >>> containing element out of ‘window order’. >>> >>> I was also thinking this problem is very similar to that of checkpoint >>> barriers. I intended to dig into the details of the exactly once Kafka sink >>> for some inspiration. >>> >>> Padarn >>> >>> On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi Padarn, >>>> >>>> Yes, this is quite tricky. >>>> The "problem" with watermarks is that you need to consider how you >>>> write to Kafka. >>>> If your Kafka sink writes to keyed Kafka stream (each Kafka partition >>>> is written by multiple producers), you need to broadcast the watermarks to >>>> each partition, i.e., each partition would receive watermarks from each >>>> parallel sink task. So in order to reason about the current watermark of a >>>> partition, you need to observe them and take the minimum WM across all >>>> current sink task WMs. >>>> Things become much easier, if each partition is only written by a >>>> single task but this also means that data is not key-partitioned in Kafka. >>>> In that case, the sink task only needs to write a WM message to each of >>>> its assigned partitions. >>>> >>>> Hope this helps, >>>> Fabian >>>> >>>> >>>> Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson < >>>> pad...@gmail.com>: >>>> >>>>> Hi Fabian, thanks for your input >>>>> >>>>> Exactly. Actually my first instinct was to see if it was possible to >>>>> publish the watermarks somehow - my initial idea was to insert regular >>>>> watermark messages into each partition of the stream, but exposing this >>>>> seemed quite troublesome. >>>>> >>>>> > In that case, you could have a ProcessFunction that is chained >>>>> before the sink and which counts the window results per time slice and >>>>> emits the result when the watermark passes to a side output. >>>>> All side output messages are collected by a single task and can be >>>>> published to a Kafka topic or even be made available via Queryable State. >>>>> >>>>> I understand the idea here (and exactly once semantics are probably >>>>> fine for my use case), but counting events seems a bit fragile. I'm not >>>>> totally confident the consumer can guarantee it won't read duplicates (its >>>>> a golang kafka library that seems to have some quirks). >>>>> >>>>> I think ideally each partition of the kafka topic would have some >>>>> regular information about watermarks. Perhaps the kafka producer can be >>>>> modified to support this. >>>>> >>>>> Padarn >>>>> >>>>> On Fri, Aug 16, 2019 at 3:50 PM Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Padarn, >>>>>> >>>>>> What you describe is essentially publishing Flink's watermarks to an >>>>>> outside system. >>>>>> Flink processes time windows, by waiting for a watermark that's past >>>>>> the window end time. When it receives such a WM it processes and emits >>>>>> all >>>>>> ended windows and forwards the watermark. >>>>>> When a sink received a WM for say 12:45:15, you know that all window >>>>>> results with until 12:45:00 have been emitted. >>>>>> Hence, the watermark tells you about the completeness of data. >>>>>> >>>>>> However, using this information is not so easy, mostly because of the >>>>>> failure semantics. >>>>>> Things become much easier if you produce to Kafka with exactly-once >>>>>> semantics. >>>>>> >>>>>> In that case, you could have a ProcessFunction that is chained before >>>>>> the sink and which counts the window results per time slice and emits the >>>>>> result when the watermark passes to a side output. >>>>>> All side output messages are collected by a single task and can be >>>>>> published to a Kafka topic or even be made available via Queryable State. >>>>>> >>>>>> For at-least once output, it's much harder because you'll have >>>>>> duplicates in the output after a job recovered. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> I think you have two options to let the consuming app know about the >>>>>> progress. >>>>>> You can either >>>>>> >>>>>> The ProcessFunction could count per window end timestamp how many >>>>>> records passed and forward that information via a side output. >>>>>> You could then >>>>>> >>>>>> >>>>>> Essentially, you'd like to publish Flink's watermarks to an outside >>>>>> system (possibly via Kafka). >>>>>> >>>>>> >>>>>> Am Mo., 12. Aug. 2019 um 14:33 Uhr schrieb Padarn Wilson < >>>>>> pad...@gmail.com>: >>>>>> >>>>>>> Hello Users, >>>>>>> >>>>>>> I have a question that is perhaps not best solved within Flink: It >>>>>>> has to do with notifying a downstream application that a Flink window >>>>>>> has >>>>>>> completed. >>>>>>> >>>>>>> The (simplified) scenario is this: >>>>>>> - We have a Flink job that consumes from Kafka, does some >>>>>>> preprocessing, and then has a sliding window of 10 minutes and slide >>>>>>> time >>>>>>> of 1 minute. >>>>>>> - The number of keys in each slide is not fixed >>>>>>> - The output of the window is then output to Kafka, which is read by >>>>>>> a downstream application. >>>>>>> >>>>>>> What I want to achieve is that the downstream application can >>>>>>> someone know when it has read all of the data for a single window, >>>>>>> without >>>>>>> waiting for the next window to arrive. >>>>>>> >>>>>>> Some options I've considered: >>>>>>> - Producing a second window over the window results that counts the >>>>>>> output size, which can then be used by the downstream application to see >>>>>>> when it has received the same number: This seems fragile, as there it >>>>>>> relies on there being no loss or duplication of data. Its also an extra >>>>>>> window and Kafka stream which is a tad messy. >>>>>>> - Somehow adding an 'end of window' element to each partitions of >>>>>>> the Kafka topic which can be read by the consumer: This seems a bit >>>>>>> messy >>>>>>> because it mixes different types of events into the same Kafka stream, >>>>>>> and >>>>>>> there is no really simple way to do this in Flink >>>>>>> - Package the whole window output into a single message and make >>>>>>> this the unit of transaction: This is possible, but the message would be >>>>>>> quite large then (at least 10s of mb), as the volume of this stream is >>>>>>> quite large. >>>>>>> - Assume that if the consumer has no elements to read, or if the >>>>>>> next window has started to be read, then it has read the whole window: >>>>>>> This >>>>>>> seems reasonable, and if it wasn't for the fact that my consumer on the >>>>>>> application end was a bit inflexible right now, it is probably the >>>>>>> solution >>>>>>> I would use. >>>>>>> >>>>>>> Any further/better ideas? >>>>>>> >>>>>>> Thanks >>>>>>> Padarn >>>>>>> >>>>>>