You are right, offsets cannot be used for tracking processing progress. I think setting Kafka offsets with respect to some progress notion other than "has been consumed" would be highly application specific and hard to generalize. As you said, there might be a window (such as a session window) that is open much longer than all other windows and which would hold back the offset. Other applications might not use the built-in windows at all but custom ProcessFunctions.
Have you considered tracking progress using watermarks? 2017-12-04 14:42 GMT+01:00 Juho Autio <juho.au...@rovio.com>: > Thank you Fabian. Really clear explanation. That matches with my > observation indeed (data is not dropped from either small or big topic, but > the offsets are advancing in kafka side already before those offsets have > been triggered from a window operator). > > This means that it's a bit harder to meaningfully monitor the job's > progress solely based on kafka consumer offsets. Is there a reason why > Flink couldn't instead commit the offsets after they have been triggered > from downstream windows? I could imagine that this might pose a problem if > there are any windows that remain open for a very long time, but in general > it would be useful IMHO. Or Flink could even commit both (read vs. > triggered) offsets to kafka for monitoring purposes. > > On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Juho, >> >> the partitions of both topics are independently consumed, i.e., at their >> own speed without coordination. With the configuration that Gordon linked, >> watermarks are generated per partition. >> Each source task maintains the latest (and highest) watermark per >> partition and propagates the smallest watermark. The same mechanism is >> applied for watermarks across tasks (this is what Kien referred to). >> >> In the case that you are describing, the partitions of the smaller topic >> are faster consumed (hence the offsets are faster aligned) but watermarks >> are emitted "at the speed" of the bigger topic. >> Therefore, the timestamps of records from the smaller topic can be much >> ahead of the watermark. >> In principle, that does not pose a problem. Stateful operators (such as >> windows) remember the "early" records and process them when they receive a >> watermark passes the timestamps of the early records. >> >> Regarding your question "Are they committed to Kafka before their >> watermark has passed on Flink's side?": >> The offsets of the smaller topic might be checkpointed when all >> partitions have been read to the "end" and the bigger topic is still >> catching up. >> The watermarks are moving at the speed of the bigger topic, but all >> "early" events of the smaller topic are stored in stateful operators and >> are checkpointed as well. >> >> So, you do not lose neither early nor late data. >> >> Best, Fabian >> >> >> >> 2017-12-01 13:43 GMT+01:00 Juho Autio <juho.au...@rovio.com>: >> >>> Thanks for the answers, I still don't understand why I can see the >>> offsets being quickly committed to Kafka for the "small topic"? Are they >>> committed to Kafka before their watermark has passed on Flink's side? That >>> would be quite confusing.. Indeed when Flink handles the state/offsets >>> internally, the consumer offsets are committed to Kafka just for reference. >>> >>> Otherwise, what you're saying sounds very good to me. The documentation >>> just doesn't explicitly say anything about how it works across topics. >>> >>> On Kien's answer: "When you join multiple stream with different >>> watermarks", note that I'm not joining any topics myself, I get them as a >>> single stream from the Flink kafka consumer based on the list of topics >>> that I asked for. >>> >>> Thanks, >>> Juho >>> >>> On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai < >>> tzuli...@apache.org> wrote: >>> >>>> Hi! >>>> >>>> The FlinkKafkaConsumer can handle watermark advancement with >>>> per-Kafka-partition awareness (across partitions of different topics). >>>> You can see an example of how to do that here [1]. >>>> >>>> Basically what this does is that it generates watermarks within the >>>> Kafka >>>> consumer individually for each Kafka partition, and the per-partition >>>> watermarks are aggregated and emitted from the consumer in the same way >>>> that >>>> watermarks are aggregated on a stream shuffle; only when the low >>>> watermark >>>> advances across all partitions, should a watermark be emitted from the >>>> consumer. >>>> >>>> Therefore, this helps avoid the problem that you described, in which a >>>> "big_topic" has subscribed partitions that lags behind others. In this >>>> case >>>> and when the above feature is used, the event time would advance along >>>> with >>>> the lagging "big_topic" partitions and would not result in messages >>>> being >>>> recognized as late and discarded. >>>> >>>> Cheers, >>>> Gordon >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition >>>> >>>> >>>> >>>> -- >>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>> ble.com/ >>>> >>> >>> >