Hi Shikar! What you are seeing is that some streams (here the different Kafka Partitions in one source) get merged in the source task. That happens before watermarks are generated. In such a case, records are out-of-order when they arrive at the timestamp-extractor/watermark generator, and the watermark generator needs to be implemented such that it is aware of these out-of-order records, and uses some heuristic to generate watermarks. This is actually the general case that one also has if timestamps are not ascending inside a single Kafka partition.
You probably want to make use of the simple case, where timestamps are ascending inside one Kafka partition, and use the ascending-timestamp-extractor that auto-generates watermarks. With Kafka, that one only works when there is 1:1 sources to partitions. I think we can add some tooling that makes it possible to use the simple ascending timestamp extraction also in cases where one parallel source task has multiple Kafka partitions. Effectively, the Kafka source has to internally generate the watermarks and use the same "watermark union" technique as for example the join operator. Here is the issue to track this: https://issues.apache.org/jira/browse/FLINK-3375 Greetings, Stephan On Mon, Feb 8, 2016 at 9:51 PM, shikhar <shik...@schmizz.net> wrote: > Stephan explained in that thread that we're picking the min watermark when > doing operations that join streams from multiple sources. If we have m:n > partition-source assignment where m>n, the source is going to end up with > the max watermark. Having m<=n ensures that the lowest watermark is used. > > Re: automatic enforcement, perhaps allowing for more than 1 Kafka partition > on a source should require opt-in, e.g. allowOversubscription() > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4788.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >