Hey Becket, Re 2.
With: If source is purely single threaded and blocking, then it could be implemented in the following way: /* * Return a future, which when completed means that source has more data and getNext() will not block. * If you wish to use benefits of non blocking connectors, please implement this method appropriately. */ CompletableFuture<?> isBlocked() { return CompletableFuture.completedFuture(null); // this would be the default behaviour, so user wouldn’t need to override this at all } T getNext() { // do some blocking reading operation return result; } Implementing `isBlocked` doesn’t have to be mandatory. It’s more like an optional optimisation that some connectors might provide. Providing non blocking `poll` method doesn’t solve the problem of actually limiting the number of active threads. One of the potential benefits of `CompletableFuture<?> isBlocked()` is that we could have a fixed size pool of worker threads. Worker thread could pick a non blocked task that’s waiting to be executed and to this `CompletableFuture<?>` would be needed to juggle between blocked/active state. Other potential side benefit could be for reporting in UI/metrics which tasks are blocked (kind of like current back pressure monitoring). Maybe such extension could use of some PoC that would (or not) show some benefits. Piotrek > On 1 Nov 2018, at 19:29, Becket Qin <becket....@gmail.com> wrote: > > Thanks for the FLIP, Aljoscha. > > The proposal makes sense to me. Separating the split discovery and > consumption is very useful as it enables Flink to better manage the sources. > > Looking at the interface, I have a few questions: > 1. *SplitEnumerator*.*discoverNewSplits()* seems assuming that the number > of splits can only increase, In your example, the source was Kafka, so the > assumption was true. But I am wondering are there case that the number of > splits can decrease? > 2. I agree with Piotr that we need to be careful about potentially blocking > implementations. However, it is not clear to me how does the completable > future work if the underlying reader does not have its own thread (e.g. a > Kafka consumer). In that case, the future will never be completed unless > the caller thread touches the reader again. I am wondering if the following > interfaces for the reader makes sense: > boolean isDone(); // Whether the source has more records. > T poll(); // non-blocking read. We can add a timeout if needed. > T take(); // blocking read; > This seems more intuitive to people who are familiar with existing > convention of poll() and take(). And with the non-blocking poll() we could > have an nio Selector-like API when there are multiple splits. > > BTW, it would be really helpful if there is some Java doc describing the > behavior of the the interfaces in the FLIP. > > Thanks again for the great proposal. > > Jiangjie (Becket) Qin > > On Thu, Nov 1, 2018 at 6:08 PM Piotr Nowojski <pi...@data-artisans.com> > wrote: > >> Hi, >> >> Thanks Aljoscha for starting this, it’s blocking quite a lot of other >> possible improvements. I have one proposal. Instead of having a method: >> >> boolean advance() throws IOException; >> >> I would replace it with >> >> /* >> * Return a future, which when completed means that source has more data >> and getNext() will not block. >> * If you wish to use benefits of non blocking connectors, please >> implement this method appropriately. >> */ >> default CompletableFuture<?> isBlocked() { >> return CompletableFuture.completedFuture(null); >> } >> >> And rename `getCurrent()` to `getNext()`. >> >> Couple of arguments: >> 1. I don’t understand the division of work between `advance()` and >> `getCurrent()`. What should be done in which, especially for connectors >> that handle records in batches (like Kafka) and when should you call >> `advance` and when `getCurrent()`. >> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the >> future to have asynchronous/non blocking connectors and more efficiently >> handle large number of blocked threads, without busy waiting. While at the >> same time it doesn’t add much complexity, since naive connector >> implementations can be always blocking. >> 3. This also would allow us to use a fixed size thread pool of task >> executors, instead of one thread per task. >> >> Piotrek >> >>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljos...@apache.org> wrote: >>> >>> Hi All, >>> >>> In order to finally get the ball rolling on the new source interface >> that we have discussed for so long I finally created a FLIP: >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface >>> >>> I cc'ed Thomas and Jamie because of the ongoing work/discussion about >> adding per-partition watermark support to the Kinesis source and because >> this would enable generic implementation of event-time alignment for all >> sources. Maybe we need another FLIP for the event-time alignment part, >> especially the part about information sharing between operations (I'm not >> calling it state sharing because state has a special meaning in Flink). >>> >>> Please discuss away! >>> >>> Aljoscha >>> >>> >> >>