Thanks for getting the ball rolling on this! Can the number of splits decrease? Yes, splits can be closed and go away. An example would be a shard merge in Kinesis (2 existing shards will be closed and replaced with a new shard).
Regarding advance/poll/take: IMO the least restrictive approach would be the thread-less IO model (pull based, non-blocking, caller retrieves new records when available). The current Kinesis API requires the use of threads. But that can be internal to the split reader and does not need to be a source API concern. In fact, that's what we are working on right now as improvement to the existing consumer: Each shard consumer thread will push to a queue, the consumer main thread will poll the queue(s). It is essentially a mapping from threaded IO to non-blocking. The proposed SplitReader interface would fit the thread-less IO model. Similar to an iterator, we find out if there is a new element (hasNext) and if so, move to it (next()). Separate calls deliver the meta information (timestamp, watermark). Perhaps advance call could offer a timeout option, so that the caller does not end up in a busy wait. On the other hand, a caller processing multiple splits may want to cycle through fast, to process elements of other splits as soon as they become available. The nice thing is that this "split merge" logic can now live in Flink and be optimized and shared between different sources. Thanks, Thomas On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei....@gmail.com> wrote: > Hi, > Thanks Aljoscha for this FLIP. > > 1. I agree with Piotr and Becket that the non-blocking source is very > important. But in addition to `Future/poll`, there may be another way to > achieve this. I think it may be not very memory friendly if every advance > call return a Future. > > public interface Listener { > public void notify(); > } > > public interface SplitReader() { > /** > * When there is no element temporarily, this will return false. > * When elements is available again splitReader can call > listener.notify() > * In addition the frame would check `advance` periodically . > * Of course advance can always return true and ignore the listener > argument for simplicity. > */ > public boolean advance(Listener listener); > } > > 2. The FLIP tells us very clearly that how to create all Splits and how > to create a SplitReader from a Split. But there is no strategy for the user > to choose how to assign the splits to the tasks. I think we could add a > Enum to let user to choose. > /** > public Enum SplitsAssignmentPolicy { > Location, > Workload, > Random, > Average > } > */ > > 3. If merge the `advance` and `getCurrent` to one method like `getNext` > the `getNext` would need return a `ElementWithTimestamp` because some > sources want to add timestamp to every element. IMO, this is not so memory > friendly so I prefer this design. > > > > Thanks > > Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 下午6:08写道: > >> Hi, >> >> Thanks Aljoscha for starting this, it’s blocking quite a lot of other >> possible improvements. I have one proposal. Instead of having a method: >> >> boolean advance() throws IOException; >> >> I would replace it with >> >> /* >> * Return a future, which when completed means that source has more data >> and getNext() will not block. >> * If you wish to use benefits of non blocking connectors, please >> implement this method appropriately. >> */ >> default CompletableFuture<?> isBlocked() { >> return CompletableFuture.completedFuture(null); >> } >> >> And rename `getCurrent()` to `getNext()`. >> >> Couple of arguments: >> 1. I don’t understand the division of work between `advance()` and >> `getCurrent()`. What should be done in which, especially for connectors >> that handle records in batches (like Kafka) and when should you call >> `advance` and when `getCurrent()`. >> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the >> future to have asynchronous/non blocking connectors and more efficiently >> handle large number of blocked threads, without busy waiting. While at the >> same time it doesn’t add much complexity, since naive connector >> implementations can be always blocking. >> 3. This also would allow us to use a fixed size thread pool of task >> executors, instead of one thread per task. >> >> Piotrek >> >> > On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljos...@apache.org> wrote: >> > >> > Hi All, >> > >> > In order to finally get the ball rolling on the new source interface >> that we have discussed for so long I finally created a FLIP: >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface >> > >> > I cc'ed Thomas and Jamie because of the ongoing work/discussion about >> adding per-partition watermark support to the Kinesis source and because >> this would enable generic implementation of event-time alignment for all >> sources. Maybe we need another FLIP for the event-time alignment part, >> especially the part about information sharing between operations (I'm not >> calling it state sharing because state has a special meaning in Flink). >> > >> > Please discuss away! >> > >> > Aljoscha >> > >> > >> >>