Thanks for the explanation, Piotr, I agree that the completable future solution would work for single-threaded readers. From API perspective, returning a completable future means the reader must have an internal thread to complete that future. I was actually thinking of some sources that are "thread-less" like the Kafka consumers.
The Kafka consumer itself does not have an internal thread except the heartbeat thread which does nothing but heartbeat. So a Kafka consumer relies on the user thread to call poll() to make progress. Inside the poll() methods, things are asynchronous (with very few exceptions such as rebalance which cannot be interrupted), receiving responses, sending FetchRequests, Heartbeat, etc. So technically speaking, the consumer itself is "thread-less", In that case, if the consumer provides a isBlocked() method and return a CompletableFuture, unless the users call poll() again, that CompletableFuture will never be completed because the consumer itself does not have any thread to complete that future. Instead, it relies on the user thread, which is holding the future, to complete that same future. While it looks counter intuitive at the first glance, such thread-less readers could be more efficient in some cases. For example, if there are hundreds of readers in a single task, the thread-less readers can be managed by a single thread. That thread just need to call poll() on each readers. On the other hand, for the single-threaded readers, there will be one thread per reader, hundreds of threads in total. From this perspective, such thread-less readers can do pretty well in terms of limiting the number of threads. And users can also choose to use a thread pool to manage these thread-less readers if they wish. And it is also trivial to wrap such readers to create a single-threaded reader. BTW, regarding the isBlock() method, I have a few more questions. 21, Is a method isReady() with boolean as a return value equivalent? Personally I found it is a little bit confusing in what is supposed to be returned when the future is completed. 22. if the implementation of isBlocked() is optional, how do the callers know whether the method is properly implemented or not? Does not implemented mean it always return a completed future? Thanks, Jiangjie (Becket) Qin On Sat, Nov 3, 2018 at 4:30 AM Piotr Nowojski <pi...@data-artisans.com> wrote: > Hey Becket, > > Re 2. > > With: > > If source is purely single threaded and blocking, then it could be > implemented in the following way: > > /* > * Return a future, which when completed means that source has more data > and getNext() will not block. > * If you wish to use benefits of non blocking connectors, please implement > this method appropriately. > */ > CompletableFuture<?> isBlocked() { > return CompletableFuture.completedFuture(null); // this would be > the default behaviour, so user wouldn’t need to override this at all > } > > T getNext() { > // do some blocking reading operation > return result; > } > > Implementing `isBlocked` doesn’t have to be mandatory. It’s more like an > optional optimisation that some connectors might provide. > > Providing non blocking `poll` method doesn’t solve the problem of actually > limiting the number of active threads. One of the potential benefits of > `CompletableFuture<?> isBlocked()` is that we could have a fixed size pool > of worker threads. Worker thread could pick a non blocked task that’s > waiting to be executed and to this `CompletableFuture<?>` would be needed > to juggle between blocked/active state. Other potential side benefit could > be for reporting in UI/metrics which tasks are blocked (kind of like > current back pressure monitoring). > > Maybe such extension could use of some PoC that would (or not) show some > benefits. > > Piotrek > > > On 1 Nov 2018, at 19:29, Becket Qin <becket....@gmail.com> wrote: > > > > Thanks for the FLIP, Aljoscha. > > > > The proposal makes sense to me. Separating the split discovery and > > consumption is very useful as it enables Flink to better manage the > sources. > > > > Looking at the interface, I have a few questions: > > 1. *SplitEnumerator*.*discoverNewSplits()* seems assuming that the number > > of splits can only increase, In your example, the source was Kafka, so > the > > assumption was true. But I am wondering are there case that the number of > > splits can decrease? > > 2. I agree with Piotr that we need to be careful about potentially > blocking > > implementations. However, it is not clear to me how does the completable > > future work if the underlying reader does not have its own thread (e.g. a > > Kafka consumer). In that case, the future will never be completed unless > > the caller thread touches the reader again. I am wondering if the > following > > interfaces for the reader makes sense: > > boolean isDone(); // Whether the source has more records. > > T poll(); // non-blocking read. We can add a timeout if needed. > > T take(); // blocking read; > > This seems more intuitive to people who are familiar with existing > > convention of poll() and take(). And with the non-blocking poll() we > could > > have an nio Selector-like API when there are multiple splits. > > > > BTW, it would be really helpful if there is some Java doc describing the > > behavior of the the interfaces in the FLIP. > > > > Thanks again for the great proposal. > > > > Jiangjie (Becket) Qin > > > > On Thu, Nov 1, 2018 at 6:08 PM Piotr Nowojski <pi...@data-artisans.com> > > wrote: > > > >> Hi, > >> > >> Thanks Aljoscha for starting this, it’s blocking quite a lot of other > >> possible improvements. I have one proposal. Instead of having a method: > >> > >> boolean advance() throws IOException; > >> > >> I would replace it with > >> > >> /* > >> * Return a future, which when completed means that source has more data > >> and getNext() will not block. > >> * If you wish to use benefits of non blocking connectors, please > >> implement this method appropriately. > >> */ > >> default CompletableFuture<?> isBlocked() { > >> return CompletableFuture.completedFuture(null); > >> } > >> > >> And rename `getCurrent()` to `getNext()`. > >> > >> Couple of arguments: > >> 1. I don’t understand the division of work between `advance()` and > >> `getCurrent()`. What should be done in which, especially for connectors > >> that handle records in batches (like Kafka) and when should you call > >> `advance` and when `getCurrent()`. > >> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the > >> future to have asynchronous/non blocking connectors and more efficiently > >> handle large number of blocked threads, without busy waiting. While at > the > >> same time it doesn’t add much complexity, since naive connector > >> implementations can be always blocking. > >> 3. This also would allow us to use a fixed size thread pool of task > >> executors, instead of one thread per task. > >> > >> Piotrek > >> > >>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljos...@apache.org> > wrote: > >>> > >>> Hi All, > >>> > >>> In order to finally get the ball rolling on the new source interface > >> that we have discussed for so long I finally created a FLIP: > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > >>> > >>> I cc'ed Thomas and Jamie because of the ongoing work/discussion about > >> adding per-partition watermark support to the Kinesis source and because > >> this would enable generic implementation of event-time alignment for all > >> sources. Maybe we need another FLIP for the event-time alignment part, > >> especially the part about information sharing between operations (I'm > not > >> calling it state sharing because state has a special meaning in Flink). > >>> > >>> Please discuss away! > >>> > >>> Aljoscha > >>> > >>> > >> > >> > >