Great. I'd be happy to contribute. I added 2 sub-tasks in https://issues.apache.org/jira/browse/FLINK-5479.
Someone with the privileges could assign this sub-task to me: https://issues.apache.org/jira/browse/FLINK-9183? On Mon, Apr 16, 2018 at 3:14 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Fully agree Juho! > > Do you want to contribute the docs fix? > If yes, we should update FLINK-5479 to make sure that the warning is > removed once the bug is fixed. > > Thanks, Fabian > > 2018-04-12 9:32 GMT+02:00 Juho Autio <juho.au...@rovio.com>: > >> Looks like the bug https://issues.apache.org/jira/browse/FLINK-5479 is >> entirely preventing this feature to be used if there are any idle >> partitions. It would be nice to mention in documentation that currently >> this requires all subscribed partitions to have a constant stream of data >> with growing timestamps. When watermark gets stalled on an idle partition >> it blocks everything. >> >> Link to current documentation: >> https://ci.apache.org/projects/flink/flink-docs-master/dev/ >> connectors/kafka.html#kafka-consumers-and-timestamp- >> extractionwatermark-emission >> >> On Mon, Dec 4, 2017 at 4:29 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> You are right, offsets cannot be used for tracking processing progress. >>> I think setting Kafka offsets with respect to some progress notion other >>> than "has been consumed" would be highly application specific and hard to >>> generalize. >>> As you said, there might be a window (such as a session window) that is >>> open much longer than all other windows and which would hold back the >>> offset. Other applications might not use the built-in windows at all but >>> custom ProcessFunctions. >>> >>> Have you considered tracking progress using watermarks? >>> >>> 2017-12-04 14:42 GMT+01:00 Juho Autio <juho.au...@rovio.com>: >>> >>>> Thank you Fabian. Really clear explanation. That matches with my >>>> observation indeed (data is not dropped from either small or big topic, but >>>> the offsets are advancing in kafka side already before those offsets have >>>> been triggered from a window operator). >>>> >>>> This means that it's a bit harder to meaningfully monitor the job's >>>> progress solely based on kafka consumer offsets. Is there a reason why >>>> Flink couldn't instead commit the offsets after they have been triggered >>>> from downstream windows? I could imagine that this might pose a problem if >>>> there are any windows that remain open for a very long time, but in general >>>> it would be useful IMHO. Or Flink could even commit both (read vs. >>>> triggered) offsets to kafka for monitoring purposes. >>>> >>>> On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi Juho, >>>>> >>>>> the partitions of both topics are independently consumed, i.e., at >>>>> their own speed without coordination. With the configuration that Gordon >>>>> linked, watermarks are generated per partition. >>>>> Each source task maintains the latest (and highest) watermark per >>>>> partition and propagates the smallest watermark. The same mechanism is >>>>> applied for watermarks across tasks (this is what Kien referred to). >>>>> >>>>> In the case that you are describing, the partitions of the smaller >>>>> topic are faster consumed (hence the offsets are faster aligned) but >>>>> watermarks are emitted "at the speed" of the bigger topic. >>>>> Therefore, the timestamps of records from the smaller topic can be >>>>> much ahead of the watermark. >>>>> In principle, that does not pose a problem. Stateful operators (such >>>>> as windows) remember the "early" records and process them when they >>>>> receive >>>>> a watermark passes the timestamps of the early records. >>>>> >>>>> Regarding your question "Are they committed to Kafka before their >>>>> watermark has passed on Flink's side?": >>>>> The offsets of the smaller topic might be checkpointed when all >>>>> partitions have been read to the "end" and the bigger topic is still >>>>> catching up. >>>>> The watermarks are moving at the speed of the bigger topic, but all >>>>> "early" events of the smaller topic are stored in stateful operators and >>>>> are checkpointed as well. >>>>> >>>>> So, you do not lose neither early nor late data. >>>>> >>>>> Best, Fabian >>>>> >>>>> >>>>> >>>>> 2017-12-01 13:43 GMT+01:00 Juho Autio <juho.au...@rovio.com>: >>>>> >>>>>> Thanks for the answers, I still don't understand why I can see the >>>>>> offsets being quickly committed to Kafka for the "small topic"? Are they >>>>>> committed to Kafka before their watermark has passed on Flink's side? >>>>>> That >>>>>> would be quite confusing.. Indeed when Flink handles the state/offsets >>>>>> internally, the consumer offsets are committed to Kafka just for >>>>>> reference. >>>>>> >>>>>> Otherwise, what you're saying sounds very good to me. The >>>>>> documentation just doesn't explicitly say anything about how it works >>>>>> across topics. >>>>>> >>>>>> On Kien's answer: "When you join multiple stream with different >>>>>> watermarks", note that I'm not joining any topics myself, I get them as a >>>>>> single stream from the Flink kafka consumer based on the list of topics >>>>>> that I asked for. >>>>>> >>>>>> Thanks, >>>>>> Juho >>>>>> >>>>>> On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai < >>>>>> tzuli...@apache.org> wrote: >>>>>> >>>>>>> Hi! >>>>>>> >>>>>>> The FlinkKafkaConsumer can handle watermark advancement with >>>>>>> per-Kafka-partition awareness (across partitions of different >>>>>>> topics). >>>>>>> You can see an example of how to do that here [1]. >>>>>>> >>>>>>> Basically what this does is that it generates watermarks within the >>>>>>> Kafka >>>>>>> consumer individually for each Kafka partition, and the per-partition >>>>>>> watermarks are aggregated and emitted from the consumer in the same >>>>>>> way that >>>>>>> watermarks are aggregated on a stream shuffle; only when the low >>>>>>> watermark >>>>>>> advances across all partitions, should a watermark be emitted from >>>>>>> the >>>>>>> consumer. >>>>>>> >>>>>>> Therefore, this helps avoid the problem that you described, in which >>>>>>> a >>>>>>> "big_topic" has subscribed partitions that lags behind others. In >>>>>>> this case >>>>>>> and when the above feature is used, the event time would advance >>>>>>> along with >>>>>>> the lagging "big_topic" partitions and would not result in messages >>>>>>> being >>>>>>> recognized as late and discarded. >>>>>>> >>>>>>> Cheers, >>>>>>> Gordon >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>>>>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Sent from: http://apache-flink-user-maili >>>>>>> ng-list-archive.2336050.n4.nabble.com/ >>>>>>> >>>>>> >>>>>> >>>> >>> >> >