Hi again Fabian,

Thanks for pointing this out to me. In my case there is no need for keyed
writing - but I do wonder if having each kafka task write only to a single
partition would significantly affect performance.

Actually now that I think about it, the approach to just wait for the first
records of the next window is also subject to the problem you mention
above: a producer lagging behind the rest could end up with a partition
containing element out of ‘window order’.

I was also thinking this problem is very similar to that of checkpoint
barriers. I intended to dig into the details of the exactly once Kafka sink
for some inspiration.

Padarn

On Tue, 27 Aug 2019 at 11:01 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Padarn,
>
> Yes, this is quite tricky.
> The "problem" with watermarks is that you need to consider how you write
> to Kafka.
> If your Kafka sink writes to keyed Kafka stream (each Kafka partition is
> written by multiple producers), you need to broadcast the watermarks to
> each partition, i.e., each partition would receive watermarks from each
> parallel sink task. So in order to reason about the current watermark of a
> partition, you need to observe them and take the minimum WM across all
> current sink task WMs.
> Things become much easier, if each partition is only written by a single
> task but this also means that data is not key-partitioned in Kafka.
> In that case, the sink task only needs to write a WM message to each of
> its assigned partitions.
>
> Hope this helps,
> Fabian
>
>
> Am Sa., 17. Aug. 2019 um 05:48 Uhr schrieb Padarn Wilson <pad...@gmail.com
> >:
>
>> Hi Fabian, thanks for your input
>>
>> Exactly. Actually my first instinct was to see if it was possible to
>> publish the watermarks somehow - my initial idea was to insert regular
>> watermark messages into each partition of the stream, but exposing this
>> seemed quite troublesome.
>>
>> > In that case, you could have a ProcessFunction that is chained before
>> the sink and which counts the window results per time slice and emits the
>> result when the watermark passes to a side output.
>> All side output messages are collected by a single task and can be
>> published to a Kafka topic or even be made available via Queryable State.
>>
>> I understand the idea here (and exactly once semantics are probably fine
>> for my use case), but counting events seems a bit fragile. I'm not totally
>> confident the consumer can guarantee it won't read duplicates (its a golang
>> kafka library that seems to have some quirks).
>>
>> I think ideally each partition of the kafka topic would have some regular
>> information about watermarks. Perhaps the kafka producer can be modified to
>> support this.
>>
>> Padarn
>>
>> On Fri, Aug 16, 2019 at 3:50 PM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Padarn,
>>>
>>> What you describe is essentially publishing Flink's watermarks to an
>>> outside system.
>>> Flink processes time windows, by waiting for a watermark that's past the
>>> window end time. When it receives such a WM it processes and emits all
>>> ended windows and forwards the watermark.
>>> When a sink received a WM for say 12:45:15, you know that all window
>>> results with until 12:45:00 have been emitted.
>>> Hence, the watermark tells you about the completeness of data.
>>>
>>> However, using this information is not so easy, mostly because of the
>>> failure semantics.
>>> Things become much easier if you produce to Kafka with exactly-once
>>> semantics.
>>>
>>> In that case, you could have a ProcessFunction that is chained before
>>> the sink and which counts the window results per time slice and emits the
>>> result when the watermark passes to a side output.
>>> All side output messages are collected by a single task and can be
>>> published to a Kafka topic or even be made available via Queryable State.
>>>
>>> For at-least once output, it's much harder because you'll have
>>> duplicates in the output after a job recovered.
>>>
>>> Best, Fabian
>>>
>>> I think you have two options to let the consuming app know about the
>>> progress.
>>> You can either
>>>
>>> The ProcessFunction could count per window end timestamp how many
>>> records passed and forward that information via a side output.
>>> You could then
>>>
>>>
>>> Essentially, you'd like to publish Flink's watermarks to an outside
>>> system (possibly via Kafka).
>>>
>>>
>>> Am Mo., 12. Aug. 2019 um 14:33 Uhr schrieb Padarn Wilson <
>>> pad...@gmail.com>:
>>>
>>>> Hello Users,
>>>>
>>>> I have a question that is perhaps not best solved within Flink: It has
>>>> to do with notifying a downstream application that a Flink window has
>>>> completed.
>>>>
>>>> The (simplified) scenario is this:
>>>> - We have a Flink job that consumes from Kafka, does some
>>>> preprocessing, and then has a sliding window of 10 minutes and slide time
>>>> of 1 minute.
>>>> - The number of keys in each slide is not fixed
>>>> - The output of the window is then output to Kafka, which is read by a
>>>> downstream application.
>>>>
>>>> What I want to achieve is that the downstream application can someone
>>>> know when it has read all of the data for a single window, without waiting
>>>> for the next window to arrive.
>>>>
>>>> Some options I've considered:
>>>> - Producing a second window over the window results that counts the
>>>> output size, which can then be used by the downstream application to see
>>>> when it has received the same number: This seems fragile, as there it
>>>> relies on there being no loss or duplication of data. Its also an extra
>>>> window and Kafka stream which is a tad messy.
>>>> - Somehow adding an 'end of window' element to each partitions of the
>>>> Kafka topic which can be read by the consumer: This seems a bit messy
>>>> because it mixes different types of events into the same Kafka stream, and
>>>> there is no really simple way to do this in Flink
>>>> - Package the whole window output into a single message and make this
>>>> the unit of transaction: This is possible, but the message would be quite
>>>> large then (at least 10s of mb), as the volume of this stream is quite
>>>> large.
>>>> - Assume that if the consumer has no elements to read, or if the next
>>>> window has started to be read, then it has read the whole window: This
>>>> seems reasonable, and if it wasn't for the fact that my consumer on the
>>>> application end was a bit inflexible right now, it is probably the solution
>>>> I would use.
>>>>
>>>> Any further/better ideas?
>>>>
>>>> Thanks
>>>> Padarn
>>>>
>>>

Reply via email to