Hey Becket,

Re 2.

With:

If source is purely single threaded and blocking, then it could be implemented 
in the following way:

/* 
* Return a future, which when completed means that source has more data and 
getNext() will not block.
* If you wish to use benefits of non blocking connectors, please implement this 
method appropriately.
*/
CompletableFuture<?> isBlocked() {
        return CompletableFuture.completedFuture(null); // this would be the 
default behaviour, so user wouldn’t need to override this at all 
}

T getNext() {
        // do some blocking reading operation
        return result;
}

Implementing `isBlocked` doesn’t have to be mandatory. It’s more like an 
optional optimisation that some connectors might provide.

Providing non blocking `poll` method doesn’t solve the problem of actually 
limiting the number of active threads. One of the potential benefits of 
`CompletableFuture<?> isBlocked()` is that we could have a fixed size pool of 
worker threads. Worker thread could pick a non blocked task that’s waiting to 
be executed and to this `CompletableFuture<?>` would be needed to juggle 
between blocked/active state. Other potential side benefit could be for 
reporting in UI/metrics which tasks are blocked (kind of like current back 
pressure monitoring).

Maybe such extension could use of some PoC that would (or not) show some 
benefits.

Piotrek 

> On 1 Nov 2018, at 19:29, Becket Qin <becket....@gmail.com> wrote:
> 
> Thanks for the FLIP, Aljoscha.
> 
> The proposal makes sense to me. Separating the split discovery and
> consumption is very useful as it enables Flink to better manage the sources.
> 
> Looking at the interface, I have a few questions:
> 1. *SplitEnumerator*.*discoverNewSplits()* seems assuming that the number
> of splits can only increase, In your example, the source was Kafka, so the
> assumption was true. But I am wondering are there case that the number of
> splits can decrease?
> 2. I agree with Piotr that we need to be careful about potentially blocking
> implementations. However, it is not clear to me how does the completable
> future work if the underlying reader does not have its own thread (e.g. a
> Kafka consumer). In that case, the future will never be completed unless
> the caller thread touches the reader again. I am wondering if the following
> interfaces for the reader makes sense:
>    boolean isDone(); // Whether the source has more records.
>    T poll(); // non-blocking read. We can add a timeout if needed.
>    T take(); // blocking read;
> This seems more intuitive to people who are familiar with existing
> convention of poll() and take(). And with the non-blocking poll() we could
> have an nio Selector-like API when there are multiple splits.
> 
> BTW, it would be really helpful if there is some Java doc describing the
> behavior of the the interfaces in the FLIP.
> 
> Thanks again for the great proposal.
> 
> Jiangjie (Becket) Qin
> 
> On Thu, Nov 1, 2018 at 6:08 PM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
> 
>> Hi,
>> 
>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
>> possible improvements. I have one proposal. Instead of having a method:
>> 
>> boolean advance() throws IOException;
>> 
>> I would replace it with
>> 
>> /*
>> * Return a future, which when completed means that source has more data
>> and getNext() will not block.
>> * If you wish to use benefits of non blocking connectors, please
>> implement this method appropriately.
>> */
>> default CompletableFuture<?> isBlocked() {
>>        return CompletableFuture.completedFuture(null);
>> }
>> 
>> And rename `getCurrent()` to `getNext()`.
>> 
>> Couple of arguments:
>> 1. I don’t understand the division of work between `advance()` and
>> `getCurrent()`. What should be done in which, especially for connectors
>> that handle records in batches (like Kafka) and when should you call
>> `advance` and when `getCurrent()`.
>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
>> future to have asynchronous/non blocking connectors and more efficiently
>> handle large number of blocked threads, without busy waiting. While at the
>> same time it doesn’t add much complexity, since naive connector
>> implementations can be always blocking.
>> 3. This also would allow us to use a fixed size thread pool of task
>> executors, instead of one thread per task.
>> 
>> Piotrek
>> 
>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljos...@apache.org> wrote:
>>> 
>>> Hi All,
>>> 
>>> In order to finally get the ball rolling on the new source interface
>> that we have discussed for so long I finally created a FLIP:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>> 
>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion about
>> adding per-partition watermark support to the Kinesis source and because
>> this would enable generic implementation of event-time alignment for all
>> sources. Maybe we need another FLIP for the event-time alignment part,
>> especially the part about information sharing between operations (I'm not
>> calling it state sharing because state has a special meaning in Flink).
>>> 
>>> Please discuss away!
>>> 
>>> Aljoscha
>>> 
>>> 
>> 
>> 

Reply via email to