Hi Fabian,

> but each partition may only be written by a single task

Sorry I think I misunderstand something here then: If I have a topic with
one partition, but multiple sink tasks (or parallelism > 1).. this means
the data must all be shuffled to the single task writing that partition?

Padarn

On Mon, Sep 2, 2019 at 9:52 PM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Padarn,
>
> Regarding your throughput concerns: A sink task may write to multiple
> partitions, but each partition may only be written by a single task.
>
> @Eduardo: Thanks for sharing your approach! Not sure if I understood it
> correctly, but I think that the approach does not guarantee that all
> results of a window are emitted before the end-of-window marker is written.
> Since the sink operator and the single-task-operator are separate
> operators, the output records might get stuck (or be bufffered) in one of
> the sink tasks and the single-task would still emit an end-of-window marker
> record because it doesn't know about the sink task.
>
> Best,
> Fabian
>
> Am Do., 29. Aug. 2019 um 18:42 Uhr schrieb Eduardo Winpenny Tejedor <
> eduardo.winpe...@gmail.com>:
>
>> Hi,
>>
>> I'll chip in with an approach I'm trying at the moment that seems to
>> work, and I say seems because I'm only running this on a personal project.
>>
>> Personally, I don't have anything against end-of-message markers per
>> partition, Padarn you seem to not prefer this option as it overloads the
>> meaning of the output payload. My approach is equally valid when producing
>> watermarks/end-of-message markers on a side output though.
>>
>> The main problem of both approaches is knowing when the window has
>> finished across all partitions without having to wait for the start of the
>> next window.
>>
>> I've taken the approach of sending all output messages of the window to
>> 1. the sink but also 2. a single task operator. The single task operator
>> registers an event time based timer at the time of the end of the window.
>> You have the confidence of the task's timer triggering only once at the
>> right time because all the post-window watermarks go through to the same
>> task. At that point I make the task send an end-of-message marker to every
>> partition. I don't need to send the count because Kafka messages are
>> ordered. AND IF you prefer to not overload the semantic of your original
>> Kafka topic you can post the message to a separate location of your choice.
>>
>> While this does mean that the end of marker message only gets sent
>> through once the window has finished across all substreams (as opposed to
>> per stream), it does mean you don't need to wait for the next window to
>> start AND the watermark gap between substreams should never grow that much
>> anyway.
>>
>> This approach should be particularly useful when the number of partitions
>> or keying mechanism is different between the input and output topics.
>>
>> Hopefully that doesn't sound like a terrible idea.
>>
>> eduardo
>>
>>
>>
>>
>> On Wed, 28 Aug 2019, 02:54 Padarn Wilson, <pad...@gmail.com> wrote:
>>
>>> Hi again Fabian,
>>>
>>> Thanks for pointing this out to me. In my case there is no need for
>>> keyed writing - but I do wonder if having each kafka task write only to a
>>> single partition would significantly affect performance.
>>>
>>> Actually now that I think about it, the approach to just wait for the
>>> first records of the next window is also subject to the problem you mention
>>> above: a producer lagging behind the rest could end up with a partition
>>> containing element out of ‘window order’.
>>>
>>> I was also thinking this problem is very similar to that of checkpoint
>>> barriers. I intended to dig into the details of the exactly once Kafka sink
>>> for some inspiration.
>>>
>>> Padarn
>>>
>>> On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi Padarn,
>>>>
>>>> Yes, this is quite tricky.
>>>> The "problem" with watermarks is that you need to consider how you
>>>> write to Kafka.
>>>> If your Kafka sink writes to keyed Kafka stream (each Kafka partition
>>>> is written by multiple producers), you need to broadcast the watermarks to
>>>> each partition, i.e., each partition would receive watermarks from each
>>>> parallel sink task. So in order to reason about the current watermark of a
>>>> partition, you need to observe them and take the minimum WM across all
>>>> current sink task WMs.
>>>> Things become much easier, if each partition is only written by a
>>>> single task but this also means that data is not key-partitioned in Kafka.
>>>> In that case, the sink task only needs to write a WM message to each of
>>>> its assigned partitions.
>>>>
>>>> Hope this helps,
>>>> Fabian
>>>>
>>>>
>>>> Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson <
>>>> pad...@gmail.com>:
>>>>
>>>>> Hi Fabian, thanks for your input
>>>>>
>>>>> Exactly. Actually my first instinct was to see if it was possible to
>>>>> publish the watermarks somehow - my initial idea was to insert regular
>>>>> watermark messages into each partition of the stream, but exposing this
>>>>> seemed quite troublesome.
>>>>>
>>>>> > In that case, you could have a ProcessFunction that is chained
>>>>> before the sink and which counts the window results per time slice and
>>>>> emits the result when the watermark passes to a side output.
>>>>> All side output messages are collected by a single task and can be
>>>>> published to a Kafka topic or even be made available via Queryable State.
>>>>>
>>>>> I understand the idea here (and exactly once semantics are probably
>>>>> fine for my use case), but counting events seems a bit fragile. I'm not
>>>>> totally confident the consumer can guarantee it won't read duplicates (its
>>>>> a golang kafka library that seems to have some quirks).
>>>>>
>>>>> I think ideally each partition of the kafka topic would have some
>>>>> regular information about watermarks. Perhaps the kafka producer can be
>>>>> modified to support this.
>>>>>
>>>>> Padarn
>>>>>
>>>>> On Fri, Aug 16, 2019 at 3:50 PM Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Padarn,
>>>>>>
>>>>>> What you describe is essentially publishing Flink's watermarks to an
>>>>>> outside system.
>>>>>> Flink processes time windows, by waiting for a watermark that's past
>>>>>> the window end time. When it receives such a WM it processes and emits 
>>>>>> all
>>>>>> ended windows and forwards the watermark.
>>>>>> When a sink received a WM for say 12:45:15, you know that all window
>>>>>> results with until 12:45:00 have been emitted.
>>>>>> Hence, the watermark tells you about the completeness of data.
>>>>>>
>>>>>> However, using this information is not so easy, mostly because of the
>>>>>> failure semantics.
>>>>>> Things become much easier if you produce to Kafka with exactly-once
>>>>>> semantics.
>>>>>>
>>>>>> In that case, you could have a ProcessFunction that is chained before
>>>>>> the sink and which counts the window results per time slice and emits the
>>>>>> result when the watermark passes to a side output.
>>>>>> All side output messages are collected by a single task and can be
>>>>>> published to a Kafka topic or even be made available via Queryable State.
>>>>>>
>>>>>> For at-least once output, it's much harder because you'll have
>>>>>> duplicates in the output after a job recovered.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> I think you have two options to let the consuming app know about the
>>>>>> progress.
>>>>>> You can either
>>>>>>
>>>>>> The ProcessFunction could count per window end timestamp how many
>>>>>> records passed and forward that information via a side output.
>>>>>> You could then
>>>>>>
>>>>>>
>>>>>> Essentially, you'd like to publish Flink's watermarks to an outside
>>>>>> system (possibly via Kafka).
>>>>>>
>>>>>>
>>>>>> Am Mo., 12. Aug. 2019 um 14:33 Uhr schrieb Padarn Wilson <
>>>>>> pad...@gmail.com>:
>>>>>>
>>>>>>> Hello Users,
>>>>>>>
>>>>>>> I have a question that is perhaps not best solved within Flink: It
>>>>>>> has to do with notifying a downstream application that a Flink window 
>>>>>>> has
>>>>>>> completed.
>>>>>>>
>>>>>>> The (simplified) scenario is this:
>>>>>>> - We have a Flink job that consumes from Kafka, does some
>>>>>>> preprocessing, and then has a sliding window of 10 minutes and slide 
>>>>>>> time
>>>>>>> of 1 minute.
>>>>>>> - The number of keys in each slide is not fixed
>>>>>>> - The output of the window is then output to Kafka, which is read by
>>>>>>> a downstream application.
>>>>>>>
>>>>>>> What I want to achieve is that the downstream application can
>>>>>>> someone know when it has read all of the data for a single window, 
>>>>>>> without
>>>>>>> waiting for the next window to arrive.
>>>>>>>
>>>>>>> Some options I've considered:
>>>>>>> - Producing a second window over the window results that counts the
>>>>>>> output size, which can then be used by the downstream application to see
>>>>>>> when it has received the same number: This seems fragile, as there it
>>>>>>> relies on there being no loss or duplication of data. Its also an extra
>>>>>>> window and Kafka stream which is a tad messy.
>>>>>>> - Somehow adding an 'end of window' element to each partitions of
>>>>>>> the Kafka topic which can be read by the consumer: This seems a bit 
>>>>>>> messy
>>>>>>> because it mixes different types of events into the same Kafka stream, 
>>>>>>> and
>>>>>>> there is no really simple way to do this in Flink
>>>>>>> - Package the whole window output into a single message and make
>>>>>>> this the unit of transaction: This is possible, but the message would be
>>>>>>> quite large then (at least 10s of mb), as the volume of this stream is
>>>>>>> quite large.
>>>>>>> - Assume that if the consumer has no elements to read, or if the
>>>>>>> next window has started to be read, then it has read the whole window: 
>>>>>>> This
>>>>>>> seems reasonable, and if it wasn't for the fact that my consumer on the
>>>>>>> application end was a bit inflexible right now, it is probably the 
>>>>>>> solution
>>>>>>> I would use.
>>>>>>>
>>>>>>> Any further/better ideas?
>>>>>>>
>>>>>>> Thanks
>>>>>>> Padarn
>>>>>>>
>>>>>>

Reply via email to