You are right, offsets cannot be used for tracking processing progress. I
think setting Kafka offsets with respect to some progress notion other than
"has been consumed" would be highly application specific and hard to
generalize.
As you said, there might be a window (such as a session window) that is
open much longer than all other windows and which would hold back the
offset. Other applications might not use the built-in windows at all but
custom ProcessFunctions.

Have you considered tracking progress using watermarks?

2017-12-04 14:42 GMT+01:00 Juho Autio <juho.au...@rovio.com>:

> Thank you Fabian. Really clear explanation. That matches with my
> observation indeed (data is not dropped from either small or big topic, but
> the offsets are advancing in kafka side already before those offsets have
> been triggered from a window operator).
>
> This means that it's a bit harder to meaningfully monitor the job's
> progress solely based on kafka consumer offsets. Is there a reason why
> Flink couldn't instead commit the offsets after they have been triggered
> from downstream windows? I could imagine that this might pose a problem if
> there are any windows that remain open for a very long time, but in general
> it would be useful IMHO. Or Flink could even commit both (read vs.
> triggered) offsets to kafka for monitoring purposes.
>
> On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Juho,
>>
>> the partitions of both topics are independently consumed, i.e., at their
>> own speed without coordination. With the configuration that Gordon linked,
>> watermarks are generated per partition.
>> Each source task maintains the latest (and highest) watermark per
>> partition and propagates the smallest watermark. The same mechanism is
>> applied for watermarks across tasks (this is what Kien referred to).
>>
>> In the case that you are describing, the partitions of the smaller topic
>> are faster consumed (hence the offsets are faster aligned) but watermarks
>> are emitted "at the speed" of the bigger topic.
>> Therefore, the timestamps of records from the smaller topic can be much
>> ahead of the watermark.
>> In principle, that does not pose a problem. Stateful operators (such as
>> windows) remember the "early" records and process them when they receive a
>> watermark passes the timestamps of the early records.
>>
>> Regarding your question "Are they committed to Kafka before their
>> watermark has passed on Flink's side?":
>> The offsets of the smaller topic might be checkpointed when all
>> partitions have been read to the "end" and the bigger topic is still
>> catching up.
>> The watermarks are moving at the speed of the bigger topic, but all
>> "early" events of the smaller topic are stored in stateful operators and
>> are checkpointed as well.
>>
>> So, you do not lose neither early nor late data.
>>
>> Best, Fabian
>>
>>
>>
>> 2017-12-01 13:43 GMT+01:00 Juho Autio <juho.au...@rovio.com>:
>>
>>> Thanks for the answers, I still don't understand why I can see the
>>> offsets being quickly committed to Kafka for the "small topic"? Are they
>>> committed to Kafka before their watermark has passed on Flink's side? That
>>> would be quite confusing.. Indeed when Flink handles the state/offsets
>>> internally, the consumer offsets are committed to Kafka just for reference.
>>>
>>> Otherwise, what you're saying sounds very good to me. The documentation
>>> just doesn't explicitly say anything about how it works across topics.
>>>
>>> On Kien's answer: "When you join multiple stream with different
>>> watermarks", note that I'm not joining any topics myself, I get them as a
>>> single stream from the Flink kafka consumer based on the list of topics
>>> that I asked for.
>>>
>>> Thanks,
>>> Juho
>>>
>>> On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> The FlinkKafkaConsumer can handle watermark advancement with
>>>> per-Kafka-partition awareness (across partitions of different topics).
>>>> You can see an example of how to do that here [1].
>>>>
>>>> Basically what this does is that it generates watermarks within the
>>>> Kafka
>>>> consumer individually for each Kafka partition, and the per-partition
>>>> watermarks are aggregated and emitted from the consumer in the same way
>>>> that
>>>> watermarks are aggregated on a stream shuffle; only when the low
>>>> watermark
>>>> advances across all partitions, should a watermark be emitted from the
>>>> consumer.
>>>>
>>>> Therefore, this helps avoid the problem that you described, in which a
>>>> "big_topic" has subscribed partitions that lags behind others. In this
>>>> case
>>>> and when the above feature is used, the event time would advance along
>>>> with
>>>> the lagging "big_topic" partitions and would not result in messages
>>>> being
>>>> recognized as late and discarded.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/
>>>>
>>>
>>>
>

Reply via email to