Hi, Thanks Aljoscha for this FLIP. 1. I agree with Piotr and Becket that the non-blocking source is very important. But in addition to `Future/poll`, there may be another way to achieve this. I think it may be not very memory friendly if every advance call return a Future.
public interface Listener { public void notify(); } public interface SplitReader() { /** * When there is no element temporarily, this will return false. * When elements is available again splitReader can call listener.notify() * In addition the frame would check `advance` periodically . * Of course advance can always return true and ignore the listener argument for simplicity. */ public boolean advance(Listener listener); } 2. The FLIP tells us very clearly that how to create all Splits and how to create a SplitReader from a Split. But there is no strategy for the user to choose how to assign the splits to the tasks. I think we could add a Enum to let user to choose. /** public Enum SplitsAssignmentPolicy { Location, Workload, Random, Average } */ 3. If merge the `advance` and `getCurrent` to one method like `getNext` the `getNext` would need return a `ElementWithTimestamp` because some sources want to add timestamp to every element. IMO, this is not so memory friendly so I prefer this design. Thanks Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 下午6:08写道: > Hi, > > Thanks Aljoscha for starting this, it’s blocking quite a lot of other > possible improvements. I have one proposal. Instead of having a method: > > boolean advance() throws IOException; > > I would replace it with > > /* > * Return a future, which when completed means that source has more data > and getNext() will not block. > * If you wish to use benefits of non blocking connectors, please > implement this method appropriately. > */ > default CompletableFuture<?> isBlocked() { > return CompletableFuture.completedFuture(null); > } > > And rename `getCurrent()` to `getNext()`. > > Couple of arguments: > 1. I don’t understand the division of work between `advance()` and > `getCurrent()`. What should be done in which, especially for connectors > that handle records in batches (like Kafka) and when should you call > `advance` and when `getCurrent()`. > 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the > future to have asynchronous/non blocking connectors and more efficiently > handle large number of blocked threads, without busy waiting. While at the > same time it doesn’t add much complexity, since naive connector > implementations can be always blocking. > 3. This also would allow us to use a fixed size thread pool of task > executors, instead of one thread per task. > > Piotrek > > > On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljos...@apache.org> wrote: > > > > Hi All, > > > > In order to finally get the ball rolling on the new source interface > that we have discussed for so long I finally created a FLIP: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > > > I cc'ed Thomas and Jamie because of the ongoing work/discussion about > adding per-partition watermark support to the Kinesis source and because > this would enable generic implementation of event-time alignment for all > sources. Maybe we need another FLIP for the event-time alignment part, > especially the part about information sharing between operations (I'm not > calling it state sharing because state has a special meaning in Flink). > > > > Please discuss away! > > > > Aljoscha > > > > > >