Yes, I think this is the way to go.

This would also go well with a redesign of the source interface that has been 
floated for a while now. I also created a prototype a while back: 
https://github.com/aljoscha/flink/tree/refactor-source-interface 
<https://github.com/aljoscha/flink/tree/refactor-source-interface>. Just as a 
refresher, the redesign aims at several things:

1. Make partitions/splits explicit in the interface. Currently, the fact that 
there are file splits or Kafka partitions or Kinesis shards is hidden in the 
source implementation while it would be beneficial for the system to know of 
these and to be able to track watermarks for them. Currently, there is a custom 
implementation for per-partition watermark tracking in the Kafka Consumer that 
this redesign would obviate.

2. Split split/partition/shard discovery from the reading part. This would 
allow rebalancing work and again makes the nature of sources more explicit in 
the interfaces.

3. Go away from the push model to a pull model. The problem with the current 
source interface is that the source controls the read-loop and has to get the 
checkpoint lock for emitting elements/updating state. If we get the loop out of 
the source this leaves more potential for Flink to be clever about reading from 
sources.

The prototype posted above defines three new interfaces: Source, 
SplitEnumerator, and SplitReader, along with a naive example and a working 
Kafka Consumer (with checkpointing, actually).

If we had this source interface, along with a service for propagating watermark 
information the code that reads form the splits could de-prioritise certain 
splits and we would get the event-time alignment behaviour for all sources that 
are implemented using the new interface without requiring special code in each 
source implementation.

@Elias Do you know if Kafka Consumers do this alignment across multiple 
consumers or only within one Consumer across the partitions that it reads from.

> On 9. Oct 2018, at 00:55, Elias Levy <fearsome.lucid...@gmail.com> wrote:
> 
> Kafka Streams handles this problem, time alignment, by processing records
> from the partitions with the lowest timestamp in a best effort basis.
> See KIP-353 for the details.  The same could be done within the Kafka
> source and multiple input stream operators.  I opened FLINK-4558
> <https://issues.apache.org/jira/browse/FLINK-4558> a while ago regarding
> this topic.
> 
> On Mon, Oct 8, 2018 at 3:41 PM Jamie Grier <jgr...@lyft.com.invalid> wrote:
> 
>> I'd be very curious to hear others' thoughts on this..  I would expect many
>> people to have run into similar issues.  I also wonder if anybody has
>> already been working on similar issues.  It seems there is room for some
>> core Flink changes to address this as well and I'm guessing people have
>> already thought about it.
>> 

Reply via email to