Okay, so I think there is a lot of agreement here about (a) This is a real issue for people, and (b) an ideal long-term approach to solving it.
As Aljoscha and Elias said a full solution to this would be to also redesign the source interface such that individual partitions are exposed in the API and not hidden inside sources like now -- then we could be much smarter about the way we read from the individual partitions. We would also have to modify the stream task code such that it also reads in a time-aligned way throughout the data flow to solve the full problem -- either that or use some shared state between sources to keep them time-aligned across sub-tasks just at the source. With regard to this question of state sharing between source sub-tasks versus modifying Flink to do time-aligned reads throughout the system -- does anybody have a strong opinion on this? We're basically looking for a way forward and our initial approach, though ugly because it requires modification to all of the sources we use, was going to be to share state between source sub-tasks in order to keep them time-aligned with no further modifications required to Flink's core. However, if it seems reasonable to do and there is consensus on the best way forward maybe we should be looking at introducing the time-alignment properly instead of hacking the sources. On Tue, Oct 9, 2018 at 12:01 PM Elias Levy <fearsome.lucid...@gmail.com> wrote: > On Tue, Oct 9, 2018 at 1:25 AM Aljoscha Krettek <aljos...@apache.org> > wrote: > > > @Elias Do you know if Kafka Consumers do this alignment across multiple > > consumers or only within one Consumer across the partitions that it reads > > from. > > > > The behavior is part of Kafka Streams > < > https://github.com/apache/kafka/blob/96132e2dbb69a0c6c11cb183bb05cefef4e30557/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L65 > >, > not the Kafka consumer. The alignment does not occur across Kafka > consumers, but that is because Kafka Streams, unlikely Flink, uses a single > consumer to fetch records from multiple sources / topics. The alignment > occurs with the stream task. Stream tasks keep queues per topic-partition > (which may be from different topics), and select the next record to > processed by selecting the queue with the lowest timestamp. > > The equivalent in Flink would be for the Kafka connector source to select > the message among partitions with the lowest timestamp to emit next, and > for multiple input stream operators to select the message among inputs with > the lowest timestamp to process. >