Hi,

One more thing. I think the Kafka client would be a good example of a connector 
that could use of this `isBlocked()`/callbacks single threaded API from the 
“Pattern 2”

If we have N threads per N splits, there would be no need for the (N+1)th 
thread. It could be implemented as a non blocking queue, that notifies the 
callback/completes the blocked future whenever the queue becomes non empty. The 
same thread that handles checkpoints, network flushes, resource management 
could handle reading from this queue.

Piotrek

> On 15 Nov 2018, at 17:13, Piotr Nowojski <pi...@data-artisans.com> wrote:
> 
> Hi
> 
> Re: Becket
> 
>> WRT the confusion between advance() / getCurrent(), do you think it would
>> help if we combine them and have something like:
>> 
>> CompletableFuture<T> getNext();
>> long getWatermark();
>> long getCurrentTimestamp();
> 
> I think that technically this would work the same as `CompletableFuture<?> 
> isBlocked()`, `CompletableFuture<?> advance()` or callbac/`notify()` options. 
> I see two differences:
> 1. in this case once connector unblocks itself and completes the future, 
> Flink’s engine would be responsible for holding the record somewhere, while 
> during this time Flink’s engine can be busy doing other things. Maybe that’s 
> not a big issue, but will slightly complicate the execution engine.
> 2. This might cause some performance overhead, since every record will have 
> to go through the future. As I wrote somewhere before, both `advance()` and 
> `isBlocked()` during full throughput could return static/const NOT_BLOCKED 
> instance, which should/could behave better.
> 
> Nevertheless maybe the choice between those options is secondary one and 
> could be done somewhere else/later or during comparison of some POCs?
> 
> Re: Aljoscha
> 
>> I think it should be as easy as adding a 
>> minimumTimestamp()/maximumTimestamp() method pair to the split interface.
> 
> I think that `minimumTimestamp()/maximumTimestamp()` extension seems 
> reasonable if we want Flink to be aware of that. Since watermark 
> handling/emitting would be a custom logic anyway, maybe `minimum` and 
> `maximum` timestamps of a split could be handled as a private fields of the 
> specific connector implementation? I mean, the current proposal with 
> `getCurrentTimestamp()` method indicates that this logic will be hidden from 
> the Flink’s engine anyway, so there might be no need to expose them via API?
> 
>> I see there has been some good discussion but I don't know if we have 
>> consensus.
> 
> I think we are converging to a point that having some kind of additional 
> notification that the connector is not blocked anymore would be more flexible 
> for us.
> 
> From the perspective of the execution engine, I would be in favour of testing 
> out our ideas and maybe benchmarking them to make sure that we are not 
> omitting something.
> 
> Piotrek
> 
>> On 15 Nov 2018, at 12:43, Aljoscha Krettek <aljos...@apache.org> wrote:
>> 
>> Hi,
>> 
>> I thought I had sent this mail a while ago but I must have forgotten to send 
>> it.
>> 
>> There is another thing we should consider for splits: the range of 
>> timestamps that it can contain. For example, the splits of a file source 
>> would know what the minimum and maximum timestamp in the splits is, roughly. 
>> For infinite splits, such as Kafka partitions, the minimum would be 
>> meaningful but the maximum would be +Inf. If the splits expose the interval 
>> of time that they contain the readers, or the component that manages the 
>> readers can make decisions about which splits to forward and read first. And 
>> it can also influence the minimum watermark that a reader forwards: it 
>> should never emit a watermark if it knows there are splits to read that have 
>> a lower minimum timestamp. I think it should be as easy as adding a 
>> minimumTimestamp()/maximumTimestamp() method pair to the split interface.
>> 
>> Another thing we need to resolve is the actual reader interface. I see there 
>> has been some good discussion but I don't know if we have consensus. We 
>> should try and see how specific sources could be implemented with the new 
>> interface. For example, for Kafka I think we need to have N+1 threads per 
>> task (where N is the number of splits that a task is reading from). On 
>> thread is responsible for reading from the splits. And each split has its 
>> own (internal) thread for reading from Kafka and putting messages in an 
>> internal queue to pull from. This is similar to how the current Kafka source 
>> is implemented, which has a separate fetcher thread. The reason for this 
>> split is that we always need to try reading from Kafka to keep the 
>> throughput up. In the current implementation the internal queue (or 
>> handover) limits the read rate of the reader threads.
>> 
>> @Thomas, what do you think this would look like for Kinesis?
>> 
>> Best,
>> Aljoscha
>> 
>>> On 15. Nov 2018, at 03:56, Becket Qin <becket....@gmail.com> wrote:
>>> 
>>> Hi Piotrek,
>>> 
>>> Thanks a lot for the detailed reply. All makes sense to me.
>>> 
>>> WRT the confusion between advance() / getCurrent(), do you think it would
>>> help if we combine them and have something like:
>>> 
>>> CompletableFuture<T> getNext();
>>> long getWatermark();
>>> long getCurrentTimestamp();
>>> 
>>> Cheers,
>>> 
>>> Jiangjie (Becket) Qin
>>> 
>>> On Tue, Nov 13, 2018 at 9:56 PM Piotr Nowojski <pi...@data-artisans.com>
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> Thanks again for the detailed answer :) Sorry for responding with a delay.
>>>> 
>>>>> Completely agree that in pattern 2, having a callback is necessary for
>>>> that
>>>>> single thread outside of the connectors. And the connectors MUST have
>>>>> internal threads.
>>>> 
>>>> Yes, this thread will have to exists somewhere. In pattern 2 it exists in
>>>> the connector (at least from the perspective of the Flink execution
>>>> engine). In pattern 1 it exists inside the Flink execution engine. With
>>>> completely blocking connectors, like simple reading from files, both of
>>>> those approaches are basically the same. The difference is when user
>>>> implementing Flink source is already working with a non blocking code with
>>>> some internal threads. In this case, pattern 1 would result in "double
>>>> thread wrapping”, while pattern 2 would allow to skip one layer of
>>>> indirection.
>>>> 
>>>>> If we go that way, we should have something like "void
>>>>> poll(Callback) / void advance(callback)". I am curious how would
>>>>> CompletableFuture work here, though. If 10 readers returns 10 completable
>>>>> futures, will there be 10 additional threads (so 20 threads in total)
>>>>> blocking waiting on them? Or will there be a single thread busy loop
>>>>> checking around?
>>>> 
>>>> To be honest, I haven’t thought this completely through and I haven’t
>>>> tested/POC’ed it. Having said that, I can think of at least couple of
>>>> solutions. First is something like this:
>>>> 
>>>> 
>>>> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507
>>>> <
>>>> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507
>>>>> 
>>>> 
>>>> Line:
>>>> 
>>>>                              `blocked = split.process();`
>>>> 
>>>> Is where the execution goes into to the task/sources. This is where the
>>>> returned future is handled:
>>>> 
>>>>                              blocked.addListener(() -> {
>>>>                                  blockedSplits.remove(split);
>>>>                                  // reset the level priority to prevent
>>>> previously-blocked splits from starving existing splits
>>>>                                  split.resetLevelPriority();
>>>>                                  waitingSplits.offer(split);
>>>>                              }, executor);
>>>> 
>>>> Fundamentally callbacks and Futures are more or less interchangeable You
>>>> can always wrap one into another (creating a callback that completes a
>>>> future and attach a callback once future completes). In this case the
>>>> difference for me is mostly:
>>>> - api with passing callback allows the callback to be fired multiple times
>>>> and to fire it even if the connector is not blocked. This is what I meant
>>>> by saying that api `CompletableFuture<?> isBlocked()` is a bit simpler.
>>>> Connector can only return either “I’m not blocked” or “I’m blocked and I
>>>> will tell you only once when I’m not blocked anymore”.
>>>> 
>>>> But this is not the most important thing for me here. For me important
>>>> thing is to try our best to make Flink task’s control and execution single
>>>> threaded. For that both callback and future APIs should work the same.
>>>> 
>>>>> WRT pattern 1, a single blocking take() API should just work. The good
>>>>> thing is that a blocking read API is usually simpler to implement.
>>>> 
>>>> Yes, they are easier to implement (especially if you are not the one that
>>>> have to deal with the additional threading required around them ;) ). But
>>>> to answer this issue, if we choose pattern 2, we can always provide a
>>>> proxy/wrapper that would using the internal thread implement the
>>>> non-blocking API while exposing blocking API to the user. It would
>>>> implement pattern 2 for the user exposing to him pattern 1. In other words
>>>> implementing pattern 1 in pattern 2 paradigm, while making it possible to
>>>> implement pure pattern 2 connectors.
>>>> 
>>>>> BTW, one thing I am also trying to avoid is pushing users to perform IO
>>>> in
>>>>> a method like "isBlocked()". If the method is expected to fetch records
>>>>> (even if not returning them), naming it something more explicit would
>>>> help
>>>>> avoid confusion.
>>>> 
>>>> If we choose so, we could rework it into something like:
>>>> 
>>>> CompletableFuture<?> advance()
>>>> T getCurrent();
>>>> Watermark getCurrentWatermark()
>>>> 
>>>> But as I wrote before, this is more confusing to me for the exact reasons
>>>> you mentioned :) I would be confused what should be done in `adanvce()` and
>>>> what in `getCurrent()`. However, again this naming issue is not that
>>>> important to me and probably is matter of taste/personal preferences.
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 9 Nov 2018, at 18:37, Becket Qin <becket....@gmail.com> wrote:
>>>>> 
>>>>> Hi Piotrek,
>>>>> 
>>>>> Thanks for the explanation. We are probably talking about the same thing
>>>>> but in different ways. To clarify a little bit, I think there are two
>>>>> patterns to read from a connector.
>>>>> 
>>>>> Pattern 1: Thread-less connector with a blocking read API. Outside of the
>>>>> connector, there is one IO thread per reader, doing blocking read. An
>>>>> additional thread will interact with all the IO threads.
>>>>> Pattern 2: Connector with internal thread(s) and non-blocking API.
>>>> Outside
>>>>> of the connector, there is one thread for ALL readers, doing IO relying
>>>> on
>>>>> notification callbacks in the reader.
>>>>> 
>>>>> In both patterns, there must be at least one thread per connector, either
>>>>> inside (created by connector writers) or outside (created by Flink) of
>>>> the
>>>>> connector. Ideally there are NUM_CONNECTORS + 1 threads in total, to make
>>>>> sure that 1 thread is fully non-blocking.
>>>>> 
>>>>>> Btw, I don’t know if you understand my point. Having only `poll()` and
>>>>> `take()` is not enough for single threaded task. If our source interface
>>>>> doesn’t provide `notify()` callback nor >`CompletableFuture<?>
>>>>> isBlocked(),`, there is no way to implement single threaded task that
>>>> both
>>>>> reads the data from the source connector and can also react to system
>>>>> events. Ok, non >blocking `poll()` would allow that, but with busy
>>>> looping.
>>>>> 
>>>>> Completely agree that in pattern 2, having a callback is necessary for
>>>> that
>>>>> single thread outside of the connectors. And the connectors MUST have
>>>>> internal threads. If we go that way, we should have something like "void
>>>>> poll(Callback) / void advance(callback)". I am curious how would
>>>>> CompletableFuture work here, though. If 10 readers returns 10 completable
>>>>> futures, will there be 10 additional threads (so 20 threads in total)
>>>>> blocking waiting on them? Or will there be a single thread busy loop
>>>>> checking around?
>>>>> 
>>>>> WRT pattern 1, a single blocking take() API should just work. The good
>>>>> thing is that a blocking read API is usually simpler to implement. An
>>>>> additional non-blocking "T poll()" method here is indeed optional and
>>>> could
>>>>> be used in cases like Flink does not want the thread to block forever.
>>>> They
>>>>> can also be combined to have a "T poll(Timeout)", which is exactly what
>>>>> KafkaConsumer did.
>>>>> 
>>>>> It sounds that you are proposing pattern 2 with something similar to NIO2
>>>>> AsynchronousByteChannel[1]. That API would work, except that the
>>>> signature
>>>>> returning future seems not necessary. If that is the case, a minor change
>>>>> on the current FLIP proposal to have "void advance(callback)" should
>>>> work.
>>>>> And this means the connectors MUST have their internal threads.
>>>>> 
>>>>> BTW, one thing I am also trying to avoid is pushing users to perform IO
>>>> in
>>>>> a method like "isBlocked()". If the method is expected to fetch records
>>>>> (even if not returning them), naming it something more explicit would
>>>> help
>>>>> avoid confusion.
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jiangjie (Becket) Qin
>>>>> 
>>>>> [1]
>>>>> 
>>>> https://docs.oracle.com/javase/8/docs/api/java/nio/channels/AsynchronousByteChannel.html
>>>>> 
>>>>> On Fri, Nov 9, 2018 at 11:20 PM Piotr Nowojski <pi...@data-artisans.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi
>>>>>> 
>>>>>> Good point with select/epoll, however I do not see how they couldn’t be
>>>>>> with Flink if we would like single task in Flink to be single-threaded
>>>> (and
>>>>>> I believe we should pursue this goal). If your connector blocks on
>>>>>> `select`, then it can not process/handle control messages from Flink,
>>>> like
>>>>>> checkpoints, releasing resources and potentially output flushes. This
>>>> would
>>>>>> require tight integration between connector and Flink’s main event
>>>>>> loop/selects/etc.
>>>>>> 
>>>>>> Looking at it from other perspective. Let’s assume that we have a
>>>>>> connector implemented on top of `select`/`epoll`. In order to integrate
>>>> it
>>>>>> with Flink’s checkpointing/flushes/resource releasing it will have to be
>>>>>> executed in separate thread one way or another. At least if our API will
>>>>>> enforce/encourage non blocking implementations with some kind of
>>>>>> notifications (`isBlocked()` or `notify()` callback), some connectors
>>>> might
>>>>>> skip one layer of wapping threads.
>>>>>> 
>>>>>> Btw, I don’t know if you understand my point. Having only `poll()` and
>>>>>> `take()` is not enough for single threaded task. If our source interface
>>>>>> doesn’t provide `notify()` callback nor `CompletableFuture<?>
>>>>>> isBlocked(),`, there is no way to implement single threaded task that
>>>> both
>>>>>> reads the data from the source connector and can also react to system
>>>>>> events. Ok, non blocking `poll()` would allow that, but with busy
>>>> looping.
>>>>>> 
>>>>>> Piotrek
>>>>>> 
>>>>>>> On 8 Nov 2018, at 06:56, Becket Qin <becket....@gmail.com> wrote:
>>>>>>> 
>>>>>>> Hi Piotrek,
>>>>>>> 
>>>>>>>> But I don’t see a reason why we should expose both blocking `take()`
>>>> and
>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone (Flink
>>>> engine
>>>>>> or
>>>>>>> connector) would have to do the same busy
>>>>>>>> looping anyway and I think it would be better to have a simpler
>>>>>> connector
>>>>>>> API (that would solve our problems) and force connectors to comply one
>>>>>> way
>>>>>>> or another.
>>>>>>> 
>>>>>>> If we let the block happen inside the connector, the blocking does not
>>>>>> have
>>>>>>> to be a busy loop. For example, to do the block waiting efficiently,
>>>> the
>>>>>>> connector can use java NIO selector().select which relies on OS syscall
>>>>>>> like epoll[1] instead of busy looping. But if Flink engine blocks
>>>> outside
>>>>>>> the connector, it pretty much has to do the busy loop. So if there is
>>>>>> only
>>>>>>> one API to get the element, a blocking getNextElement() makes more
>>>> sense.
>>>>>>> In any case, we should avoid ambiguity. It has to be crystal clear
>>>> about
>>>>>>> whether a method is expected to be blocking or non-blocking. Otherwise
>>>> it
>>>>>>> would be very difficult for Flink engine to do the right thing with the
>>>>>>> connectors. At the first glance at getCurrent(), the expected behavior
>>>> is
>>>>>>> not quite clear.
>>>>>>> 
>>>>>>> That said, I do agree that functionality wise, poll() and take() kind
>>>> of
>>>>>>> overlap. But they are actually not quite different from
>>>>>>> isBlocked()/getNextElement(). Compared with isBlocked(), the only
>>>>>>> difference is that poll() also returns the next record if it is
>>>>>> available.
>>>>>>> But I agree that the isBlocked() + getNextElement() is more flexible as
>>>>>>> users can just check the record availability, but not fetch the next
>>>>>>> element.
>>>>>>> 
>>>>>>>> In case of thread-less readers with only non-blocking `queue.poll()`
>>>> (is
>>>>>>> that really a thing? I can not think about a real implementation that
>>>>>>> enforces such constraints)
>>>>>>> Right, it is pretty much a syntax sugar to allow user combine the
>>>>>>> check-and-take into one method. It could be achieved with isBlocked() +
>>>>>>> getNextElement().
>>>>>>> 
>>>>>>> [1] http://man7.org/linux/man-pages/man7/epoll.7.html
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Jiangjie (Becket) Qin
>>>>>>> 
>>>>>>> On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski <
>>>> pi...@data-artisans.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Becket,
>>>>>>>> 
>>>>>>>> With my proposal, both of your examples would have to be solved by the
>>>>>>>> connector and solution to both problems would be the same:
>>>>>>>> 
>>>>>>>> Pretend that connector is never blocked (`isBlocked() { return
>>>>>>>> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking fashion
>>>>>> (or
>>>>>>>> semi blocking with return of control from time to time to allow for
>>>>>>>> checkpointing, network flushing and other resource management things
>>>> to
>>>>>>>> happen in the same main thread). In other words, exactly how you would
>>>>>>>> implement `take()` method or how the same source connector would be
>>>>>>>> implemented NOW with current source interface. The difference with
>>>>>> current
>>>>>>>> interface would be only that main loop would be outside of the
>>>>>> connector,
>>>>>>>> and instead of periodically releasing checkpointing lock, periodically
>>>>>>>> `return null;` or `return Optional.empty();` from `getNextElement()`.
>>>>>>>> 
>>>>>>>> In case of thread-less readers with only non-blocking `queue.poll()`
>>>> (is
>>>>>>>> that really a thing? I can not think about a real implementation that
>>>>>>>> enforces such constraints), we could provide a wrapper that hides the
>>>>>> busy
>>>>>>>> looping. The same applies how to solve forever blocking readers - we
>>>>>> could
>>>>>>>> provider another wrapper running the connector in separate thread.
>>>>>>>> 
>>>>>>>> But I don’t see a reason why we should expose both blocking `take()`
>>>> and
>>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone (Flink
>>>>>> engine or
>>>>>>>> connector) would have to do the same busy looping anyway and I think
>>>> it
>>>>>>>> would be better to have a simpler connector API (that would solve our
>>>>>>>> problems) and force connectors to comply one way or another.
>>>>>>>> 
>>>>>>>> Piotrek
>>>>>>>> 
>>>>>>>>> On 7 Nov 2018, at 10:55, Becket Qin <becket....@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>> Hi Piotr,
>>>>>>>>> 
>>>>>>>>> I might have misunderstood you proposal. But let me try to explain my
>>>>>>>>> concern. I am thinking about the following case:
>>>>>>>>> 1. a reader has the following two interfaces,
>>>>>>>>> boolean isBlocked()
>>>>>>>>> T getNextElement()
>>>>>>>>> 2. the implementation of getNextElement() is non-blocking.
>>>>>>>>> 3. The reader is thread-less, i.e. it does not have any internal
>>>>>> thread.
>>>>>>>>> For example, it might just delegate the getNextElement() to a
>>>>>>>> queue.poll(),
>>>>>>>>> and isBlocked() is just queue.isEmpty().
>>>>>>>>> 
>>>>>>>>> How can Flink efficiently implement a blocking reading behavior with
>>>>>> this
>>>>>>>>> reader? Either a tight loop or a backoff interval is needed. Neither
>>>> of
>>>>>>>>> them is ideal.
>>>>>>>>> 
>>>>>>>>> Now let's say in the reader mentioned above implements a blocking
>>>>>>>>> getNextElement() method. Because there is no internal thread in the
>>>>>>>> reader,
>>>>>>>>> after isBlocked() returns false. Flink will still have to loop on
>>>>>>>>> isBlocked() to check whether the next record is available. If the
>>>> next
>>>>>>>>> record reaches after 10 min, it is a tight loop for 10 min. You have
>>>>>>>>> probably noticed that in this case, even isBlocked() returns a
>>>> future,
>>>>>>>> that
>>>>>>>>> future() will not be completed if Flink does not call some method
>>>> from
>>>>>>>> the
>>>>>>>>> reader, because the reader has no internal thread to complete that
>>>>>> future
>>>>>>>>> by itself.
>>>>>>>>> 
>>>>>>>>> Due to the above reasons, a blocking take() API would allow Flink to
>>>>>> have
>>>>>>>>> an efficient way to read from a reader. There are many ways to wake
>>>> up
>>>>>>>> the
>>>>>>>>> blocking thread when checkpointing is needed depending on the
>>>>>>>>> implementation. But I think the poll()/take() API would also work in
>>>>>> that
>>>>>>>>> case.
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> 
>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>> 
>>>>>>>>> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski <
>>>> pi...@data-artisans.com
>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi,
>>>>>>>>>> 
>>>>>>>>>> a)
>>>>>>>>>> 
>>>>>>>>>>> BTW, regarding the isBlock() method, I have a few more questions.
>>>> 21,
>>>>>>>> Is
>>>>>>>>>> a method isReady() with boolean as a return value
>>>>>>>>>>> equivalent? Personally I found it is a little bit confusing in what
>>>>>> is
>>>>>>>>>> supposed to be returned when the future is completed. 22. if
>>>>>>>>>>> the implementation of isBlocked() is optional, how do the callers
>>>>>> know
>>>>>>>>>> whether the method is properly implemented or not?
>>>>>>>>>>> Does not implemented mean it always return a completed future?
>>>>>>>>>> 
>>>>>>>>>> `CompletableFuture<?> isBlocked()` is more or less an equivalent to
>>>>>>>>>> `boolean hasNext()` which in case of “false” provides some kind of a
>>>>>>>>>> listener/callback that notifies about presence of next element.
>>>> There
>>>>>>>> are
>>>>>>>>>> some minor details, like `CompletableFuture<?>` has a minimal two
>>>>>> state
>>>>>>>>>> logic:
>>>>>>>>>> 
>>>>>>>>>> 1. Future is completed - we have more data
>>>>>>>>>> 2. Future not yet completed - we don’t have data now, but we
>>>> might/we
>>>>>>>> will
>>>>>>>>>> have in the future
>>>>>>>>>> 
>>>>>>>>>> While `boolean hasNext()` and `notify()` callback are a bit more
>>>>>>>>>> complicated/dispersed and can lead/encourage `notify()` spam.
>>>>>>>>>> 
>>>>>>>>>> b)
>>>>>>>>>> 
>>>>>>>>>>> 3. If merge the `advance` and `getCurrent`  to one method like
>>>>>>>> `getNext`
>>>>>>>>>> the `getNext` would need return a
>>>>>>>>>>> `ElementWithTimestamp` because some sources want to add timestamp
>>>> to
>>>>>>>>>> every element. IMO, this is not so memory friendly
>>>>>>>>>>> so I prefer this design.
>>>>>>>>>> 
>>>>>>>>>> Guowei I don’t quite understand this. Could you elaborate why
>>>> having a
>>>>>>>>>> separate `advance()` help?
>>>>>>>>>> 
>>>>>>>>>> c)
>>>>>>>>>> 
>>>>>>>>>> Regarding advance/poll/take. What’s the value of having two separate
>>>>>>>>>> methods: poll and take? Which one of them should be called and which
>>>>>>>>>> implemented? What’s the benefit of having those methods compared to
>>>>>>>> having
>>>>>>>>>> a one single method `getNextElement()` (or `pollElement() or
>>>> whatever
>>>>>> we
>>>>>>>>>> name it) with following contract:
>>>>>>>>>> 
>>>>>>>>>> CompletableFuture<?> isBlocked();
>>>>>>>>>> 
>>>>>>>>>> /**
>>>>>>>>>> Return next element - will be called only if `isBlocked()` is
>>>>>> completed.
>>>>>>>>>> Try to implement it in non blocking fashion, but if that’s
>>>> impossible
>>>>>> or
>>>>>>>>>> you just don’t need the effort, you can block in this method.
>>>>>>>>>> */
>>>>>>>>>> T getNextElement();
>>>>>>>>>> 
>>>>>>>>>> I mean, if the connector is implemented non-blockingly, Flink should
>>>>>> use
>>>>>>>>>> it that way. If it’s not, then `poll()` will `throw new
>>>>>>>>>> NotImplementedException()`. Implementing both of them and providing
>>>>>>>> both of
>>>>>>>>>> them to Flink wouldn’t make a sense, thus why not merge them into a
>>>>>>>> single
>>>>>>>>>> method call that should preferably (but not necessarily need to) be
>>>>>>>>>> non-blocking? It’s not like we are implementing general purpose
>>>>>> `Queue`,
>>>>>>>>>> which users might want to call either of `poll` or `take`. We would
>>>>>>>> always
>>>>>>>>>> prefer to call `poll`, but if it’s blocking, then still we have no
>>>>>>>> choice,
>>>>>>>>>> but to call it and block on it.
>>>>>>>>>> 
>>>>>>>>>> d)
>>>>>>>>>> 
>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is
>>>> very
>>>>>>>>>>> important. But in addition to `Future/poll`, there may be another
>>>> way
>>>>>>>> to
>>>>>>>>>>> achieve this. I think it may be not very memory friendly if every
>>>>>>>> advance
>>>>>>>>>>> call return a Future.
>>>>>>>>>> 
>>>>>>>>>> I didn’t want to mention this, to not clog my initial proposal, but
>>>>>>>> there
>>>>>>>>>> is a simple solution for the problem:
>>>>>>>>>> 
>>>>>>>>>> public interface SplitReader {
>>>>>>>>>> 
>>>>>>>>>> (…)
>>>>>>>>>> 
>>>>>>>>>> CompletableFuture<?> NOT_BLOCKED =
>>>>>>>>>> CompletableFuture.completedFuture(null);
>>>>>>>>>> 
>>>>>>>>>> /**
>>>>>>>>>> * Returns a future that will be completed when the page source
>>>>>>>> becomes
>>>>>>>>>> * unblocked.  If the page source is not blocked, this method
>>>> should
>>>>>>>>>> return
>>>>>>>>>> * {@code NOT_BLOCKED}.
>>>>>>>>>> */
>>>>>>>>>> default CompletableFuture<?> isBlocked()
>>>>>>>>>> {
>>>>>>>>>>   return NOT_BLOCKED;
>>>>>>>>>> }
>>>>>>>>>> 
>>>>>>>>>> If we are blocked and we are waiting for the IO, then creating a new
>>>>>>>>>> Future is non-issue. Under full throttle/throughput and not blocked
>>>>>>>> sources
>>>>>>>>>> returning a static `NOT_BLOCKED` constant  should also solve the
>>>>>>>> problem.
>>>>>>>>>> 
>>>>>>>>>> One more remark, non-blocking sources might be a necessity in a
>>>> single
>>>>>>>>>> threaded model without a checkpointing lock. (Currently when sources
>>>>>> are
>>>>>>>>>> blocked, they can release checkpointing lock and re-acquire it again
>>>>>>>>>> later). Non-blocking `poll`/`getNext()` would allow for checkpoints
>>>> to
>>>>>>>>>> happen when source is idling. In that case either `notify()` or my
>>>>>>>> proposed
>>>>>>>>>> `isBlocked()` would allow to avoid busy-looping.
>>>>>>>>>> 
>>>>>>>>>> Piotrek
>>>>>>>>>> 
>>>>>>>>>>> On 5 Nov 2018, at 03:59, Becket Qin <becket....@gmail.com> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>> 
>>>>>>>>>>> The iterator-like API was also the first thing that came to me. But
>>>>>> it
>>>>>>>>>>> seems a little confusing that hasNext() does not mean "the stream
>>>> has
>>>>>>>> not
>>>>>>>>>>> ended", but means "the next record is ready", which is repurposing
>>>>>> the
>>>>>>>>>> well
>>>>>>>>>>> known meaning of hasNext(). If we follow the hasNext()/next()
>>>>>> pattern,
>>>>>>>> an
>>>>>>>>>>> additional isNextReady() method to indicate whether the next record
>>>>>> is
>>>>>>>>>>> ready seems more intuitive to me.
>>>>>>>>>>> 
>>>>>>>>>>> Similarly, in poll()/take() pattern, another method of isDone() is
>>>>>>>> needed
>>>>>>>>>>> to indicate whether the stream has ended or not.
>>>>>>>>>>> 
>>>>>>>>>>> Compared with hasNext()/next()/isNextReady() pattern,
>>>>>>>>>>> isDone()/poll()/take() seems more flexible for the reader
>>>>>>>> implementation.
>>>>>>>>>>> When I am implementing a reader, I could have a couple of choices:
>>>>>>>>>>> 
>>>>>>>>>>> - A thread-less reader that does not have any internal thread.
>>>>>>>>>>> - When poll() is called, the same calling thread will perform a
>>>> bunch
>>>>>>>>>> of
>>>>>>>>>>> IO asynchronously.
>>>>>>>>>>> - When take() is called, the same calling thread will perform a
>>>>>>>>>> bunch
>>>>>>>>>>> of IO and wait until the record is ready.
>>>>>>>>>>> - A reader with internal threads performing network IO and put
>>>>>> records
>>>>>>>>>>> into a buffer.
>>>>>>>>>>> - When poll() is called, the calling thread simply reads from the
>>>>>>>>>>> buffer and return empty result immediately if there is no record.
>>>>>>>>>>> - When take() is called, the calling thread reads from the buffer
>>>>>>>>>> and
>>>>>>>>>>> block waiting if the buffer is empty.
>>>>>>>>>>> 
>>>>>>>>>>> On the other hand, with the hasNext()/next()/isNextReady() API, it
>>>> is
>>>>>>>>>> less
>>>>>>>>>>> intuitive for the reader developers to write the thread-less
>>>> pattern.
>>>>>>>>>>> Although technically speaking one can still do the asynchronous IO
>>>> to
>>>>>>>>>>> prepare the record in isNextReady(). But it is inexplicit and seems
>>>>>>>>>>> somewhat hacky.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> 
>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <t...@apache.org>
>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Couple more points regarding discovery:
>>>>>>>>>>>> 
>>>>>>>>>>>> The proposal mentions that discovery could be outside the
>>>> execution
>>>>>>>>>> graph.
>>>>>>>>>>>> Today, discovered partitions/shards are checkpointed. I believe
>>>> that
>>>>>>>>>> will
>>>>>>>>>>>> also need to be the case in the future, even when discovery and
>>>>>>>> reading
>>>>>>>>>> are
>>>>>>>>>>>> split between different tasks.
>>>>>>>>>>>> 
>>>>>>>>>>>> For cases such as resharding of a Kinesis stream, the relationship
>>>>>>>>>> between
>>>>>>>>>>>> splits needs to be considered. Splits cannot be randomly
>>>> distributed
>>>>>>>>>> over
>>>>>>>>>>>> readers in certain situations. An example was mentioned here:
>>>>>>>>>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
>>>>>>>>>>>> 
>>>>>>>>>>>> Thomas
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <t...@apache.org>
>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for getting the ball rolling on this!
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Can the number of splits decrease? Yes, splits can be closed and
>>>> go
>>>>>>>>>> away.
>>>>>>>>>>>>> An example would be a shard merge in Kinesis (2 existing shards
>>>>>> will
>>>>>>>> be
>>>>>>>>>>>>> closed and replaced with a new shard).
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Regarding advance/poll/take: IMO the least restrictive approach
>>>>>> would
>>>>>>>>>> be
>>>>>>>>>>>>> the thread-less IO model (pull based, non-blocking, caller
>>>>>> retrieves
>>>>>>>>>> new
>>>>>>>>>>>>> records when available). The current Kinesis API requires the use
>>>>>> of
>>>>>>>>>>>>> threads. But that can be internal to the split reader and does
>>>> not
>>>>>>>> need
>>>>>>>>>>>> to
>>>>>>>>>>>>> be a source API concern. In fact, that's what we are working on
>>>>>> right
>>>>>>>>>> now
>>>>>>>>>>>>> as improvement to the existing consumer: Each shard consumer
>>>> thread
>>>>>>>>>> will
>>>>>>>>>>>>> push to a queue, the consumer main thread will poll the queue(s).
>>>>>> It
>>>>>>>> is
>>>>>>>>>>>>> essentially a mapping from threaded IO to non-blocking.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The proposed SplitReader interface would fit the thread-less IO
>>>>>>>> model.
>>>>>>>>>>>>> Similar to an iterator, we find out if there is a new element
>>>>>>>> (hasNext)
>>>>>>>>>>>> and
>>>>>>>>>>>>> if so, move to it (next()). Separate calls deliver the meta
>>>>>>>> information
>>>>>>>>>>>>> (timestamp, watermark). Perhaps advance call could offer a
>>>> timeout
>>>>>>>>>>>> option,
>>>>>>>>>>>>> so that the caller does not end up in a busy wait. On the other
>>>>>>>> hand, a
>>>>>>>>>>>>> caller processing multiple splits may want to cycle through fast,
>>>>>> to
>>>>>>>>>>>>> process elements of other splits as soon as they become
>>>> available.
>>>>>>>> The
>>>>>>>>>>>> nice
>>>>>>>>>>>>> thing is that this "split merge" logic can now live in Flink and
>>>> be
>>>>>>>>>>>>> optimized and shared between different sources.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei....@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> Thanks Aljoscha for this FLIP.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is
>>>>>>>> very
>>>>>>>>>>>>>> important. But in addition to `Future/poll`, there may be
>>>> another
>>>>>>>> way
>>>>>>>>>> to
>>>>>>>>>>>>>> achieve this. I think it may be not very memory friendly if
>>>> every
>>>>>>>>>>>> advance
>>>>>>>>>>>>>> call return a Future.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> public interface Listener {
>>>>>>>>>>>>>> public void notify();
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> public interface SplitReader() {
>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>> * When there is no element temporarily, this will return
>>>> false.
>>>>>>>>>>>>>> * When elements is available again splitReader can call
>>>>>>>>>>>>>> listener.notify()
>>>>>>>>>>>>>> * In addition the frame would check `advance` periodically .
>>>>>>>>>>>>>> * Of course advance can always return true and ignore the
>>>>>>>>>> listener
>>>>>>>>>>>>>> argument for simplicity.
>>>>>>>>>>>>>> */
>>>>>>>>>>>>>> public boolean advance(Listener listener);
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 2.  The FLIP tells us very clearly that how to create all Splits
>>>>>> and
>>>>>>>>>> how
>>>>>>>>>>>>>> to create a SplitReader from a Split. But there is no strategy
>>>> for
>>>>>>>> the
>>>>>>>>>>>> user
>>>>>>>>>>>>>> to choose how to assign the splits to the tasks. I think we
>>>> could
>>>>>>>> add
>>>>>>>>>> a
>>>>>>>>>>>>>> Enum to let user to choose.
>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>> public Enum SplitsAssignmentPolicy {
>>>>>>>>>>>>>> Location,
>>>>>>>>>>>>>> Workload,
>>>>>>>>>>>>>> Random,
>>>>>>>>>>>>>> Average
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> */
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 3. If merge the `advance` and `getCurrent`  to one method like
>>>>>>>>>> `getNext`
>>>>>>>>>>>>>> the `getNext` would need return a `ElementWithTimestamp` because
>>>>>>>> some
>>>>>>>>>>>>>> sources want to add timestamp to every element. IMO, this is not
>>>>>> so
>>>>>>>>>>>> memory
>>>>>>>>>>>>>> friendly so I prefer this design.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四
>>>> 下午6:08写道:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of
>>>>>>>> other
>>>>>>>>>>>>>>> possible improvements. I have one proposal. Instead of having a
>>>>>>>>>> method:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> boolean advance() throws IOException;
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I would replace it with
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> /*
>>>>>>>>>>>>>>> * Return a future, which when completed means that source has
>>>>>> more
>>>>>>>>>>>> data
>>>>>>>>>>>>>>> and getNext() will not block.
>>>>>>>>>>>>>>> * If you wish to use benefits of non blocking connectors,
>>>> please
>>>>>>>>>>>>>>> implement this method appropriately.
>>>>>>>>>>>>>>> */
>>>>>>>>>>>>>>> default CompletableFuture<?> isBlocked() {
>>>>>>>>>>>>>>>  return CompletableFuture.completedFuture(null);
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> And rename `getCurrent()` to `getNext()`.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Couple of arguments:
>>>>>>>>>>>>>>> 1. I don’t understand the division of work between `advance()`
>>>>>> and
>>>>>>>>>>>>>>> `getCurrent()`. What should be done in which, especially for
>>>>>>>>>> connectors
>>>>>>>>>>>>>>> that handle records in batches (like Kafka) and when should you
>>>>>>>> call
>>>>>>>>>>>>>>> `advance` and when `getCurrent()`.
>>>>>>>>>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow
>>>> us
>>>>>> in
>>>>>>>>>> the
>>>>>>>>>>>>>>> future to have asynchronous/non blocking connectors and more
>>>>>>>>>>>> efficiently
>>>>>>>>>>>>>>> handle large number of blocked threads, without busy waiting.
>>>>>> While
>>>>>>>>>> at
>>>>>>>>>>>> the
>>>>>>>>>>>>>>> same time it doesn’t add much complexity, since naive connector
>>>>>>>>>>>>>>> implementations can be always blocking.
>>>>>>>>>>>>>>> 3. This also would allow us to use a fixed size thread pool of
>>>>>> task
>>>>>>>>>>>>>>> executors, instead of one thread per task.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <
>>>> aljos...@apache.org
>>>>>>> 
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> In order to finally get the ball rolling on the new source
>>>>>>>> interface
>>>>>>>>>>>>>>> that we have discussed for so long I finally created a FLIP:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing
>>>> work/discussion
>>>>>>>>>> about
>>>>>>>>>>>>>>> adding per-partition watermark support to the Kinesis source
>>>> and
>>>>>>>>>>>> because
>>>>>>>>>>>>>>> this would enable generic implementation of event-time
>>>> alignment
>>>>>>>> for
>>>>>>>>>>>> all
>>>>>>>>>>>>>>> sources. Maybe we need another FLIP for the event-time
>>>> alignment
>>>>>>>>>> part,
>>>>>>>>>>>>>>> especially the part about information sharing between
>>>> operations
>>>>>>>> (I'm
>>>>>>>>>>>> not
>>>>>>>>>>>>>>> calling it state sharing because state has a special meaning in
>>>>>>>>>> Flink).
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Please discuss away!
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
> 


Reply via email to