Hi Padarn,

Regarding your throughput concerns: A sink task may write to multiple
partitions, but each partition may only be written by a single task.

@Eduardo: Thanks for sharing your approach! Not sure if I understood it
correctly, but I think that the approach does not guarantee that all
results of a window are emitted before the end-of-window marker is written.
Since the sink operator and the single-task-operator are separate
operators, the output records might get stuck (or be bufffered) in one of
the sink tasks and the single-task would still emit an end-of-window marker
record because it doesn't know about the sink task.

Best,
Fabian

Am Do., 29. Aug. 2019 um 18:42 Uhr schrieb Eduardo Winpenny Tejedor <
eduardo.winpe...@gmail.com>:

> Hi,
>
> I'll chip in with an approach I'm trying at the moment that seems to work,
> and I say seems because I'm only running this on a personal project.
>
> Personally, I don't have anything against end-of-message markers per
> partition, Padarn you seem to not prefer this option as it overloads the
> meaning of the output payload. My approach is equally valid when producing
> watermarks/end-of-message markers on a side output though.
>
> The main problem of both approaches is knowing when the window has
> finished across all partitions without having to wait for the start of the
> next window.
>
> I've taken the approach of sending all output messages of the window to 1.
> the sink but also 2. a single task operator. The single task operator
> registers an event time based timer at the time of the end of the window.
> You have the confidence of the task's timer triggering only once at the
> right time because all the post-window watermarks go through to the same
> task. At that point I make the task send an end-of-message marker to every
> partition. I don't need to send the count because Kafka messages are
> ordered. AND IF you prefer to not overload the semantic of your original
> Kafka topic you can post the message to a separate location of your choice.
>
> While this does mean that the end of marker message only gets sent through
> once the window has finished across all substreams (as opposed to per
> stream), it does mean you don't need to wait for the next window to start
> AND the watermark gap between substreams should never grow that much anyway.
>
> This approach should be particularly useful when the number of partitions
> or keying mechanism is different between the input and output topics.
>
> Hopefully that doesn't sound like a terrible idea.
>
> eduardo
>
>
>
>
> On Wed, 28 Aug 2019, 02:54 Padarn Wilson, <pad...@gmail.com> wrote:
>
>> Hi again Fabian,
>>
>> Thanks for pointing this out to me. In my case there is no need for keyed
>> writing - but I do wonder if having each kafka task write only to a single
>> partition would significantly affect performance.
>>
>> Actually now that I think about it, the approach to just wait for the
>> first records of the next window is also subject to the problem you mention
>> above: a producer lagging behind the rest could end up with a partition
>> containing element out of ‘window order’.
>>
>> I was also thinking this problem is very similar to that of checkpoint
>> barriers. I intended to dig into the details of the exactly once Kafka sink
>> for some inspiration.
>>
>> Padarn
>>
>> On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Padarn,
>>>
>>> Yes, this is quite tricky.
>>> The "problem" with watermarks is that you need to consider how you write
>>> to Kafka.
>>> If your Kafka sink writes to keyed Kafka stream (each Kafka partition is
>>> written by multiple producers), you need to broadcast the watermarks to
>>> each partition, i.e., each partition would receive watermarks from each
>>> parallel sink task. So in order to reason about the current watermark of a
>>> partition, you need to observe them and take the minimum WM across all
>>> current sink task WMs.
>>> Things become much easier, if each partition is only written by a single
>>> task but this also means that data is not key-partitioned in Kafka.
>>> In that case, the sink task only needs to write a WM message to each of
>>> its assigned partitions.
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>>
>>> Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson <
>>> pad...@gmail.com>:
>>>
>>>> Hi Fabian, thanks for your input
>>>>
>>>> Exactly. Actually my first instinct was to see if it was possible to
>>>> publish the watermarks somehow - my initial idea was to insert regular
>>>> watermark messages into each partition of the stream, but exposing this
>>>> seemed quite troublesome.
>>>>
>>>> > In that case, you could have a ProcessFunction that is chained before
>>>> the sink and which counts the window results per time slice and emits the
>>>> result when the watermark passes to a side output.
>>>> All side output messages are collected by a single task and can be
>>>> published to a Kafka topic or even be made available via Queryable State.
>>>>
>>>> I understand the idea here (and exactly once semantics are probably
>>>> fine for my use case), but counting events seems a bit fragile. I'm not
>>>> totally confident the consumer can guarantee it won't read duplicates (its
>>>> a golang kafka library that seems to have some quirks).
>>>>
>>>> I think ideally each partition of the kafka topic would have some
>>>> regular information about watermarks. Perhaps the kafka producer can be
>>>> modified to support this.
>>>>
>>>> Padarn
>>>>
>>>> On Fri, Aug 16, 2019 at 3:50 PM Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Padarn,
>>>>>
>>>>> What you describe is essentially publishing Flink's watermarks to an
>>>>> outside system.
>>>>> Flink processes time windows, by waiting for a watermark that's past
>>>>> the window end time. When it receives such a WM it processes and emits all
>>>>> ended windows and forwards the watermark.
>>>>> When a sink received a WM for say 12:45:15, you know that all window
>>>>> results with until 12:45:00 have been emitted.
>>>>> Hence, the watermark tells you about the completeness of data.
>>>>>
>>>>> However, using this information is not so easy, mostly because of the
>>>>> failure semantics.
>>>>> Things become much easier if you produce to Kafka with exactly-once
>>>>> semantics.
>>>>>
>>>>> In that case, you could have a ProcessFunction that is chained before
>>>>> the sink and which counts the window results per time slice and emits the
>>>>> result when the watermark passes to a side output.
>>>>> All side output messages are collected by a single task and can be
>>>>> published to a Kafka topic or even be made available via Queryable State.
>>>>>
>>>>> For at-least once output, it's much harder because you'll have
>>>>> duplicates in the output after a job recovered.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> I think you have two options to let the consuming app know about the
>>>>> progress.
>>>>> You can either
>>>>>
>>>>> The ProcessFunction could count per window end timestamp how many
>>>>> records passed and forward that information via a side output.
>>>>> You could then
>>>>>
>>>>>
>>>>> Essentially, you'd like to publish Flink's watermarks to an outside
>>>>> system (possibly via Kafka).
>>>>>
>>>>>
>>>>> Am Mo., 12. Aug. 2019 um 14:33 Uhr schrieb Padarn Wilson <
>>>>> pad...@gmail.com>:
>>>>>
>>>>>> Hello Users,
>>>>>>
>>>>>> I have a question that is perhaps not best solved within Flink: It
>>>>>> has to do with notifying a downstream application that a Flink window has
>>>>>> completed.
>>>>>>
>>>>>> The (simplified) scenario is this:
>>>>>> - We have a Flink job that consumes from Kafka, does some
>>>>>> preprocessing, and then has a sliding window of 10 minutes and slide time
>>>>>> of 1 minute.
>>>>>> - The number of keys in each slide is not fixed
>>>>>> - The output of the window is then output to Kafka, which is read by
>>>>>> a downstream application.
>>>>>>
>>>>>> What I want to achieve is that the downstream application can someone
>>>>>> know when it has read all of the data for a single window, without 
>>>>>> waiting
>>>>>> for the next window to arrive.
>>>>>>
>>>>>> Some options I've considered:
>>>>>> - Producing a second window over the window results that counts the
>>>>>> output size, which can then be used by the downstream application to see
>>>>>> when it has received the same number: This seems fragile, as there it
>>>>>> relies on there being no loss or duplication of data. Its also an extra
>>>>>> window and Kafka stream which is a tad messy.
>>>>>> - Somehow adding an 'end of window' element to each partitions of the
>>>>>> Kafka topic which can be read by the consumer: This seems a bit messy
>>>>>> because it mixes different types of events into the same Kafka stream, 
>>>>>> and
>>>>>> there is no really simple way to do this in Flink
>>>>>> - Package the whole window output into a single message and make this
>>>>>> the unit of transaction: This is possible, but the message would be quite
>>>>>> large then (at least 10s of mb), as the volume of this stream is quite
>>>>>> large.
>>>>>> - Assume that if the consumer has no elements to read, or if the next
>>>>>> window has started to be read, then it has read the whole window: This
>>>>>> seems reasonable, and if it wasn't for the fact that my consumer on the
>>>>>> application end was a bit inflexible right now, it is probably the 
>>>>>> solution
>>>>>> I would use.
>>>>>>
>>>>>> Any further/better ideas?
>>>>>>
>>>>>> Thanks
>>>>>> Padarn
>>>>>>
>>>>>

Reply via email to