Hi Piotrek,

Thanks a lot for the detailed reply. All makes sense to me.

WRT the confusion between advance() / getCurrent(), do you think it would
help if we combine them and have something like:

CompletableFuture<T> getNext();
long getWatermark();
long getCurrentTimestamp();

Cheers,

Jiangjie (Becket) Qin

On Tue, Nov 13, 2018 at 9:56 PM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Thanks again for the detailed answer :) Sorry for responding with a delay.
>
> > Completely agree that in pattern 2, having a callback is necessary for
> that
> > single thread outside of the connectors. And the connectors MUST have
> > internal threads.
>
> Yes, this thread will have to exists somewhere. In pattern 2 it exists in
> the connector (at least from the perspective of the Flink execution
> engine). In pattern 1 it exists inside the Flink execution engine. With
> completely blocking connectors, like simple reading from files, both of
> those approaches are basically the same. The difference is when user
> implementing Flink source is already working with a non blocking code with
> some internal threads. In this case, pattern 1 would result in "double
> thread wrapping”, while pattern 2 would allow to skip one layer of
> indirection.
>
> > If we go that way, we should have something like "void
> > poll(Callback) / void advance(callback)". I am curious how would
> > CompletableFuture work here, though. If 10 readers returns 10 completable
> > futures, will there be 10 additional threads (so 20 threads in total)
> > blocking waiting on them? Or will there be a single thread busy loop
> > checking around?
>
> To be honest, I haven’t thought this completely through and I haven’t
> tested/POC’ed it. Having said that, I can think of at least couple of
> solutions. First is something like this:
>
>
> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507
> <
> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507
> >
>
> Line:
>
>                                 `blocked = split.process();`
>
> Is where the execution goes into to the task/sources. This is where the
> returned future is handled:
>
>                                 blocked.addListener(() -> {
>                                     blockedSplits.remove(split);
>                                     // reset the level priority to prevent
> previously-blocked splits from starving existing splits
>                                     split.resetLevelPriority();
>                                     waitingSplits.offer(split);
>                                 }, executor);
>
> Fundamentally callbacks and Futures are more or less interchangeable You
> can always wrap one into another (creating a callback that completes a
> future and attach a callback once future completes). In this case the
> difference for me is mostly:
> - api with passing callback allows the callback to be fired multiple times
> and to fire it even if the connector is not blocked. This is what I meant
> by saying that api `CompletableFuture<?> isBlocked()` is a bit simpler.
> Connector can only return either “I’m not blocked” or “I’m blocked and I
> will tell you only once when I’m not blocked anymore”.
>
> But this is not the most important thing for me here. For me important
> thing is to try our best to make Flink task’s control and execution single
> threaded. For that both callback and future APIs should work the same.
>
> > WRT pattern 1, a single blocking take() API should just work. The good
> > thing is that a blocking read API is usually simpler to implement.
>
> Yes, they are easier to implement (especially if you are not the one that
> have to deal with the additional threading required around them ;) ). But
> to answer this issue, if we choose pattern 2, we can always provide a
> proxy/wrapper that would using the internal thread implement the
> non-blocking API while exposing blocking API to the user. It would
> implement pattern 2 for the user exposing to him pattern 1. In other words
> implementing pattern 1 in pattern 2 paradigm, while making it possible to
> implement pure pattern 2 connectors.
>
> > BTW, one thing I am also trying to avoid is pushing users to perform IO
> in
> > a method like "isBlocked()". If the method is expected to fetch records
> > (even if not returning them), naming it something more explicit would
> help
> > avoid confusion.
>
> If we choose so, we could rework it into something like:
>
> CompletableFuture<?> advance()
> T getCurrent();
> Watermark getCurrentWatermark()
>
> But as I wrote before, this is more confusing to me for the exact reasons
> you mentioned :) I would be confused what should be done in `adanvce()` and
> what in `getCurrent()`. However, again this naming issue is not that
> important to me and probably is matter of taste/personal preferences.
>
> Piotrek
>
> > On 9 Nov 2018, at 18:37, Becket Qin <becket....@gmail.com> wrote:
> >
> > Hi Piotrek,
> >
> > Thanks for the explanation. We are probably talking about the same thing
> > but in different ways. To clarify a little bit, I think there are two
> > patterns to read from a connector.
> >
> > Pattern 1: Thread-less connector with a blocking read API. Outside of the
> > connector, there is one IO thread per reader, doing blocking read. An
> > additional thread will interact with all the IO threads.
> > Pattern 2: Connector with internal thread(s) and non-blocking API.
> Outside
> > of the connector, there is one thread for ALL readers, doing IO relying
> on
> > notification callbacks in the reader.
> >
> > In both patterns, there must be at least one thread per connector, either
> > inside (created by connector writers) or outside (created by Flink) of
> the
> > connector. Ideally there are NUM_CONNECTORS + 1 threads in total, to make
> > sure that 1 thread is fully non-blocking.
> >
> >> Btw, I don’t know if you understand my point. Having only `poll()` and
> > `take()` is not enough for single threaded task. If our source interface
> > doesn’t provide `notify()` callback nor >`CompletableFuture<?>
> > isBlocked(),`, there is no way to implement single threaded task that
> both
> > reads the data from the source connector and can also react to system
> > events. Ok, non >blocking `poll()` would allow that, but with busy
> looping.
> >
> > Completely agree that in pattern 2, having a callback is necessary for
> that
> > single thread outside of the connectors. And the connectors MUST have
> > internal threads. If we go that way, we should have something like "void
> > poll(Callback) / void advance(callback)". I am curious how would
> > CompletableFuture work here, though. If 10 readers returns 10 completable
> > futures, will there be 10 additional threads (so 20 threads in total)
> > blocking waiting on them? Or will there be a single thread busy loop
> > checking around?
> >
> > WRT pattern 1, a single blocking take() API should just work. The good
> > thing is that a blocking read API is usually simpler to implement. An
> > additional non-blocking "T poll()" method here is indeed optional and
> could
> > be used in cases like Flink does not want the thread to block forever.
> They
> > can also be combined to have a "T poll(Timeout)", which is exactly what
> > KafkaConsumer did.
> >
> > It sounds that you are proposing pattern 2 with something similar to NIO2
> > AsynchronousByteChannel[1]. That API would work, except that the
> signature
> > returning future seems not necessary. If that is the case, a minor change
> > on the current FLIP proposal to have "void advance(callback)" should
> work.
> > And this means the connectors MUST have their internal threads.
> >
> > BTW, one thing I am also trying to avoid is pushing users to perform IO
> in
> > a method like "isBlocked()". If the method is expected to fetch records
> > (even if not returning them), naming it something more explicit would
> help
> > avoid confusion.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > [1]
> >
> https://docs.oracle.com/javase/8/docs/api/java/nio/channels/AsynchronousByteChannel.html
> >
> > On Fri, Nov 9, 2018 at 11:20 PM Piotr Nowojski <pi...@data-artisans.com>
> > wrote:
> >
> >> Hi
> >>
> >> Good point with select/epoll, however I do not see how they couldn’t be
> >> with Flink if we would like single task in Flink to be single-threaded
> (and
> >> I believe we should pursue this goal). If your connector blocks on
> >> `select`, then it can not process/handle control messages from Flink,
> like
> >> checkpoints, releasing resources and potentially output flushes. This
> would
> >> require tight integration between connector and Flink’s main event
> >> loop/selects/etc.
> >>
> >> Looking at it from other perspective. Let’s assume that we have a
> >> connector implemented on top of `select`/`epoll`. In order to integrate
> it
> >> with Flink’s checkpointing/flushes/resource releasing it will have to be
> >> executed in separate thread one way or another. At least if our API will
> >> enforce/encourage non blocking implementations with some kind of
> >> notifications (`isBlocked()` or `notify()` callback), some connectors
> might
> >> skip one layer of wapping threads.
> >>
> >> Btw, I don’t know if you understand my point. Having only `poll()` and
> >> `take()` is not enough for single threaded task. If our source interface
> >> doesn’t provide `notify()` callback nor `CompletableFuture<?>
> >> isBlocked(),`, there is no way to implement single threaded task that
> both
> >> reads the data from the source connector and can also react to system
> >> events. Ok, non blocking `poll()` would allow that, but with busy
> looping.
> >>
> >> Piotrek
> >>
> >>> On 8 Nov 2018, at 06:56, Becket Qin <becket....@gmail.com> wrote:
> >>>
> >>> Hi Piotrek,
> >>>
> >>>> But I don’t see a reason why we should expose both blocking `take()`
> and
> >>> non-blocking `poll()` methods to the Flink engine. Someone (Flink
> engine
> >> or
> >>> connector) would have to do the same busy
> >>>> looping anyway and I think it would be better to have a simpler
> >> connector
> >>> API (that would solve our problems) and force connectors to comply one
> >> way
> >>> or another.
> >>>
> >>> If we let the block happen inside the connector, the blocking does not
> >> have
> >>> to be a busy loop. For example, to do the block waiting efficiently,
> the
> >>> connector can use java NIO selector().select which relies on OS syscall
> >>> like epoll[1] instead of busy looping. But if Flink engine blocks
> outside
> >>> the connector, it pretty much has to do the busy loop. So if there is
> >> only
> >>> one API to get the element, a blocking getNextElement() makes more
> sense.
> >>> In any case, we should avoid ambiguity. It has to be crystal clear
> about
> >>> whether a method is expected to be blocking or non-blocking. Otherwise
> it
> >>> would be very difficult for Flink engine to do the right thing with the
> >>> connectors. At the first glance at getCurrent(), the expected behavior
> is
> >>> not quite clear.
> >>>
> >>> That said, I do agree that functionality wise, poll() and take() kind
> of
> >>> overlap. But they are actually not quite different from
> >>> isBlocked()/getNextElement(). Compared with isBlocked(), the only
> >>> difference is that poll() also returns the next record if it is
> >> available.
> >>> But I agree that the isBlocked() + getNextElement() is more flexible as
> >>> users can just check the record availability, but not fetch the next
> >>> element.
> >>>
> >>>> In case of thread-less readers with only non-blocking `queue.poll()`
> (is
> >>> that really a thing? I can not think about a real implementation that
> >>> enforces such constraints)
> >>> Right, it is pretty much a syntax sugar to allow user combine the
> >>> check-and-take into one method. It could be achieved with isBlocked() +
> >>> getNextElement().
> >>>
> >>> [1] http://man7.org/linux/man-pages/man7/epoll.7.html
> >>>
> >>> Thanks,
> >>>
> >>> Jiangjie (Becket) Qin
> >>>
> >>> On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski <
> pi...@data-artisans.com>
> >>> wrote:
> >>>
> >>>> Hi Becket,
> >>>>
> >>>> With my proposal, both of your examples would have to be solved by the
> >>>> connector and solution to both problems would be the same:
> >>>>
> >>>> Pretend that connector is never blocked (`isBlocked() { return
> >>>> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking fashion
> >> (or
> >>>> semi blocking with return of control from time to time to allow for
> >>>> checkpointing, network flushing and other resource management things
> to
> >>>> happen in the same main thread). In other words, exactly how you would
> >>>> implement `take()` method or how the same source connector would be
> >>>> implemented NOW with current source interface. The difference with
> >> current
> >>>> interface would be only that main loop would be outside of the
> >> connector,
> >>>> and instead of periodically releasing checkpointing lock, periodically
> >>>> `return null;` or `return Optional.empty();` from `getNextElement()`.
> >>>>
> >>>> In case of thread-less readers with only non-blocking `queue.poll()`
> (is
> >>>> that really a thing? I can not think about a real implementation that
> >>>> enforces such constraints), we could provide a wrapper that hides the
> >> busy
> >>>> looping. The same applies how to solve forever blocking readers - we
> >> could
> >>>> provider another wrapper running the connector in separate thread.
> >>>>
> >>>> But I don’t see a reason why we should expose both blocking `take()`
> and
> >>>> non-blocking `poll()` methods to the Flink engine. Someone (Flink
> >> engine or
> >>>> connector) would have to do the same busy looping anyway and I think
> it
> >>>> would be better to have a simpler connector API (that would solve our
> >>>> problems) and force connectors to comply one way or another.
> >>>>
> >>>> Piotrek
> >>>>
> >>>>> On 7 Nov 2018, at 10:55, Becket Qin <becket....@gmail.com> wrote:
> >>>>>
> >>>>> Hi Piotr,
> >>>>>
> >>>>> I might have misunderstood you proposal. But let me try to explain my
> >>>>> concern. I am thinking about the following case:
> >>>>> 1. a reader has the following two interfaces,
> >>>>>  boolean isBlocked()
> >>>>>  T getNextElement()
> >>>>> 2. the implementation of getNextElement() is non-blocking.
> >>>>> 3. The reader is thread-less, i.e. it does not have any internal
> >> thread.
> >>>>> For example, it might just delegate the getNextElement() to a
> >>>> queue.poll(),
> >>>>> and isBlocked() is just queue.isEmpty().
> >>>>>
> >>>>> How can Flink efficiently implement a blocking reading behavior with
> >> this
> >>>>> reader? Either a tight loop or a backoff interval is needed. Neither
> of
> >>>>> them is ideal.
> >>>>>
> >>>>> Now let's say in the reader mentioned above implements a blocking
> >>>>> getNextElement() method. Because there is no internal thread in the
> >>>> reader,
> >>>>> after isBlocked() returns false. Flink will still have to loop on
> >>>>> isBlocked() to check whether the next record is available. If the
> next
> >>>>> record reaches after 10 min, it is a tight loop for 10 min. You have
> >>>>> probably noticed that in this case, even isBlocked() returns a
> future,
> >>>> that
> >>>>> future() will not be completed if Flink does not call some method
> from
> >>>> the
> >>>>> reader, because the reader has no internal thread to complete that
> >> future
> >>>>> by itself.
> >>>>>
> >>>>> Due to the above reasons, a blocking take() API would allow Flink to
> >> have
> >>>>> an efficient way to read from a reader. There are many ways to wake
> up
> >>>> the
> >>>>> blocking thread when checkpointing is needed depending on the
> >>>>> implementation. But I think the poll()/take() API would also work in
> >> that
> >>>>> case.
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> Jiangjie (Becket) Qin
> >>>>>
> >>>>> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski <
> pi...@data-artisans.com
> >>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> a)
> >>>>>>
> >>>>>>> BTW, regarding the isBlock() method, I have a few more questions.
> 21,
> >>>> Is
> >>>>>> a method isReady() with boolean as a return value
> >>>>>>> equivalent? Personally I found it is a little bit confusing in what
> >> is
> >>>>>> supposed to be returned when the future is completed. 22. if
> >>>>>>> the implementation of isBlocked() is optional, how do the callers
> >> know
> >>>>>> whether the method is properly implemented or not?
> >>>>>>> Does not implemented mean it always return a completed future?
> >>>>>>
> >>>>>> `CompletableFuture<?> isBlocked()` is more or less an equivalent to
> >>>>>> `boolean hasNext()` which in case of “false” provides some kind of a
> >>>>>> listener/callback that notifies about presence of next element.
> There
> >>>> are
> >>>>>> some minor details, like `CompletableFuture<?>` has a minimal two
> >> state
> >>>>>> logic:
> >>>>>>
> >>>>>> 1. Future is completed - we have more data
> >>>>>> 2. Future not yet completed - we don’t have data now, but we
> might/we
> >>>> will
> >>>>>> have in the future
> >>>>>>
> >>>>>> While `boolean hasNext()` and `notify()` callback are a bit more
> >>>>>> complicated/dispersed and can lead/encourage `notify()` spam.
> >>>>>>
> >>>>>> b)
> >>>>>>
> >>>>>>> 3. If merge the `advance` and `getCurrent`  to one method like
> >>>> `getNext`
> >>>>>> the `getNext` would need return a
> >>>>>>> `ElementWithTimestamp` because some sources want to add timestamp
> to
> >>>>>> every element. IMO, this is not so memory friendly
> >>>>>>> so I prefer this design.
> >>>>>>
> >>>>>> Guowei I don’t quite understand this. Could you elaborate why
> having a
> >>>>>> separate `advance()` help?
> >>>>>>
> >>>>>> c)
> >>>>>>
> >>>>>> Regarding advance/poll/take. What’s the value of having two separate
> >>>>>> methods: poll and take? Which one of them should be called and which
> >>>>>> implemented? What’s the benefit of having those methods compared to
> >>>> having
> >>>>>> a one single method `getNextElement()` (or `pollElement() or
> whatever
> >> we
> >>>>>> name it) with following contract:
> >>>>>>
> >>>>>> CompletableFuture<?> isBlocked();
> >>>>>>
> >>>>>> /**
> >>>>>> Return next element - will be called only if `isBlocked()` is
> >> completed.
> >>>>>> Try to implement it in non blocking fashion, but if that’s
> impossible
> >> or
> >>>>>> you just don’t need the effort, you can block in this method.
> >>>>>> */
> >>>>>> T getNextElement();
> >>>>>>
> >>>>>> I mean, if the connector is implemented non-blockingly, Flink should
> >> use
> >>>>>> it that way. If it’s not, then `poll()` will `throw new
> >>>>>> NotImplementedException()`. Implementing both of them and providing
> >>>> both of
> >>>>>> them to Flink wouldn’t make a sense, thus why not merge them into a
> >>>> single
> >>>>>> method call that should preferably (but not necessarily need to) be
> >>>>>> non-blocking? It’s not like we are implementing general purpose
> >> `Queue`,
> >>>>>> which users might want to call either of `poll` or `take`. We would
> >>>> always
> >>>>>> prefer to call `poll`, but if it’s blocking, then still we have no
> >>>> choice,
> >>>>>> but to call it and block on it.
> >>>>>>
> >>>>>> d)
> >>>>>>
> >>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is
> very
> >>>>>>> important. But in addition to `Future/poll`, there may be another
> way
> >>>> to
> >>>>>>> achieve this. I think it may be not very memory friendly if every
> >>>> advance
> >>>>>>> call return a Future.
> >>>>>>
> >>>>>> I didn’t want to mention this, to not clog my initial proposal, but
> >>>> there
> >>>>>> is a simple solution for the problem:
> >>>>>>
> >>>>>> public interface SplitReader {
> >>>>>>
> >>>>>>  (…)
> >>>>>>
> >>>>>>  CompletableFuture<?> NOT_BLOCKED =
> >>>>>> CompletableFuture.completedFuture(null);
> >>>>>>
> >>>>>>  /**
> >>>>>>   * Returns a future that will be completed when the page source
> >>>> becomes
> >>>>>>   * unblocked.  If the page source is not blocked, this method
> should
> >>>>>> return
> >>>>>>   * {@code NOT_BLOCKED}.
> >>>>>>   */
> >>>>>>  default CompletableFuture<?> isBlocked()
> >>>>>>  {
> >>>>>>      return NOT_BLOCKED;
> >>>>>>  }
> >>>>>>
> >>>>>> If we are blocked and we are waiting for the IO, then creating a new
> >>>>>> Future is non-issue. Under full throttle/throughput and not blocked
> >>>> sources
> >>>>>> returning a static `NOT_BLOCKED` constant  should also solve the
> >>>> problem.
> >>>>>>
> >>>>>> One more remark, non-blocking sources might be a necessity in a
> single
> >>>>>> threaded model without a checkpointing lock. (Currently when sources
> >> are
> >>>>>> blocked, they can release checkpointing lock and re-acquire it again
> >>>>>> later). Non-blocking `poll`/`getNext()` would allow for checkpoints
> to
> >>>>>> happen when source is idling. In that case either `notify()` or my
> >>>> proposed
> >>>>>> `isBlocked()` would allow to avoid busy-looping.
> >>>>>>
> >>>>>> Piotrek
> >>>>>>
> >>>>>>> On 5 Nov 2018, at 03:59, Becket Qin <becket....@gmail.com> wrote:
> >>>>>>>
> >>>>>>> Hi Thomas,
> >>>>>>>
> >>>>>>> The iterator-like API was also the first thing that came to me. But
> >> it
> >>>>>>> seems a little confusing that hasNext() does not mean "the stream
> has
> >>>> not
> >>>>>>> ended", but means "the next record is ready", which is repurposing
> >> the
> >>>>>> well
> >>>>>>> known meaning of hasNext(). If we follow the hasNext()/next()
> >> pattern,
> >>>> an
> >>>>>>> additional isNextReady() method to indicate whether the next record
> >> is
> >>>>>>> ready seems more intuitive to me.
> >>>>>>>
> >>>>>>> Similarly, in poll()/take() pattern, another method of isDone() is
> >>>> needed
> >>>>>>> to indicate whether the stream has ended or not.
> >>>>>>>
> >>>>>>> Compared with hasNext()/next()/isNextReady() pattern,
> >>>>>>> isDone()/poll()/take() seems more flexible for the reader
> >>>> implementation.
> >>>>>>> When I am implementing a reader, I could have a couple of choices:
> >>>>>>>
> >>>>>>> - A thread-less reader that does not have any internal thread.
> >>>>>>> - When poll() is called, the same calling thread will perform a
> bunch
> >>>>>> of
> >>>>>>>   IO asynchronously.
> >>>>>>>   - When take() is called, the same calling thread will perform a
> >>>>>> bunch
> >>>>>>>   of IO and wait until the record is ready.
> >>>>>>> - A reader with internal threads performing network IO and put
> >> records
> >>>>>>> into a buffer.
> >>>>>>>   - When poll() is called, the calling thread simply reads from the
> >>>>>>>   buffer and return empty result immediately if there is no record.
> >>>>>>>   - When take() is called, the calling thread reads from the buffer
> >>>>>> and
> >>>>>>>   block waiting if the buffer is empty.
> >>>>>>>
> >>>>>>> On the other hand, with the hasNext()/next()/isNextReady() API, it
> is
> >>>>>> less
> >>>>>>> intuitive for the reader developers to write the thread-less
> pattern.
> >>>>>>> Although technically speaking one can still do the asynchronous IO
> to
> >>>>>>> prepare the record in isNextReady(). But it is inexplicit and seems
> >>>>>>> somewhat hacky.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>>
> >>>>>>> Jiangjie (Becket) Qin
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <t...@apache.org>
> wrote:
> >>>>>>>
> >>>>>>>> Couple more points regarding discovery:
> >>>>>>>>
> >>>>>>>> The proposal mentions that discovery could be outside the
> execution
> >>>>>> graph.
> >>>>>>>> Today, discovered partitions/shards are checkpointed. I believe
> that
> >>>>>> will
> >>>>>>>> also need to be the case in the future, even when discovery and
> >>>> reading
> >>>>>> are
> >>>>>>>> split between different tasks.
> >>>>>>>>
> >>>>>>>> For cases such as resharding of a Kinesis stream, the relationship
> >>>>>> between
> >>>>>>>> splits needs to be considered. Splits cannot be randomly
> distributed
> >>>>>> over
> >>>>>>>> readers in certain situations. An example was mentioned here:
> >>>>>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
> >>>>>>>>
> >>>>>>>> Thomas
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <t...@apache.org>
> wrote:
> >>>>>>>>
> >>>>>>>>> Thanks for getting the ball rolling on this!
> >>>>>>>>>
> >>>>>>>>> Can the number of splits decrease? Yes, splits can be closed and
> go
> >>>>>> away.
> >>>>>>>>> An example would be a shard merge in Kinesis (2 existing shards
> >> will
> >>>> be
> >>>>>>>>> closed and replaced with a new shard).
> >>>>>>>>>
> >>>>>>>>> Regarding advance/poll/take: IMO the least restrictive approach
> >> would
> >>>>>> be
> >>>>>>>>> the thread-less IO model (pull based, non-blocking, caller
> >> retrieves
> >>>>>> new
> >>>>>>>>> records when available). The current Kinesis API requires the use
> >> of
> >>>>>>>>> threads. But that can be internal to the split reader and does
> not
> >>>> need
> >>>>>>>> to
> >>>>>>>>> be a source API concern. In fact, that's what we are working on
> >> right
> >>>>>> now
> >>>>>>>>> as improvement to the existing consumer: Each shard consumer
> thread
> >>>>>> will
> >>>>>>>>> push to a queue, the consumer main thread will poll the queue(s).
> >> It
> >>>> is
> >>>>>>>>> essentially a mapping from threaded IO to non-blocking.
> >>>>>>>>>
> >>>>>>>>> The proposed SplitReader interface would fit the thread-less IO
> >>>> model.
> >>>>>>>>> Similar to an iterator, we find out if there is a new element
> >>>> (hasNext)
> >>>>>>>> and
> >>>>>>>>> if so, move to it (next()). Separate calls deliver the meta
> >>>> information
> >>>>>>>>> (timestamp, watermark). Perhaps advance call could offer a
> timeout
> >>>>>>>> option,
> >>>>>>>>> so that the caller does not end up in a busy wait. On the other
> >>>> hand, a
> >>>>>>>>> caller processing multiple splits may want to cycle through fast,
> >> to
> >>>>>>>>> process elements of other splits as soon as they become
> available.
> >>>> The
> >>>>>>>> nice
> >>>>>>>>> thing is that this "split merge" logic can now live in Flink and
> be
> >>>>>>>>> optimized and shared between different sources.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Thomas
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei....@gmail.com>
> >>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi,
> >>>>>>>>>> Thanks Aljoscha for this FLIP.
> >>>>>>>>>>
> >>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is
> >>>> very
> >>>>>>>>>> important. But in addition to `Future/poll`, there may be
> another
> >>>> way
> >>>>>> to
> >>>>>>>>>> achieve this. I think it may be not very memory friendly if
> every
> >>>>>>>> advance
> >>>>>>>>>> call return a Future.
> >>>>>>>>>>
> >>>>>>>>>> public interface Listener {
> >>>>>>>>>>  public void notify();
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> public interface SplitReader() {
> >>>>>>>>>>  /**
> >>>>>>>>>>   * When there is no element temporarily, this will return
> false.
> >>>>>>>>>>   * When elements is available again splitReader can call
> >>>>>>>>>> listener.notify()
> >>>>>>>>>>   * In addition the frame would check `advance` periodically .
> >>>>>>>>>>   * Of course advance can always return true and ignore the
> >>>>>> listener
> >>>>>>>>>> argument for simplicity.
> >>>>>>>>>>   */
> >>>>>>>>>>  public boolean advance(Listener listener);
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> 2.  The FLIP tells us very clearly that how to create all Splits
> >> and
> >>>>>> how
> >>>>>>>>>> to create a SplitReader from a Split. But there is no strategy
> for
> >>>> the
> >>>>>>>> user
> >>>>>>>>>> to choose how to assign the splits to the tasks. I think we
> could
> >>>> add
> >>>>>> a
> >>>>>>>>>> Enum to let user to choose.
> >>>>>>>>>> /**
> >>>>>>>>>> public Enum SplitsAssignmentPolicy {
> >>>>>>>>>> Location,
> >>>>>>>>>> Workload,
> >>>>>>>>>> Random,
> >>>>>>>>>> Average
> >>>>>>>>>> }
> >>>>>>>>>> */
> >>>>>>>>>>
> >>>>>>>>>> 3. If merge the `advance` and `getCurrent`  to one method like
> >>>>>> `getNext`
> >>>>>>>>>> the `getNext` would need return a `ElementWithTimestamp` because
> >>>> some
> >>>>>>>>>> sources want to add timestamp to every element. IMO, this is not
> >> so
> >>>>>>>> memory
> >>>>>>>>>> friendly so I prefer this design.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Thanks
> >>>>>>>>>>
> >>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四
> 下午6:08写道:
> >>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of
> >>>> other
> >>>>>>>>>>> possible improvements. I have one proposal. Instead of having a
> >>>>>> method:
> >>>>>>>>>>>
> >>>>>>>>>>> boolean advance() throws IOException;
> >>>>>>>>>>>
> >>>>>>>>>>> I would replace it with
> >>>>>>>>>>>
> >>>>>>>>>>> /*
> >>>>>>>>>>> * Return a future, which when completed means that source has
> >> more
> >>>>>>>> data
> >>>>>>>>>>> and getNext() will not block.
> >>>>>>>>>>> * If you wish to use benefits of non blocking connectors,
> please
> >>>>>>>>>>> implement this method appropriately.
> >>>>>>>>>>> */
> >>>>>>>>>>> default CompletableFuture<?> isBlocked() {
> >>>>>>>>>>>     return CompletableFuture.completedFuture(null);
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>> And rename `getCurrent()` to `getNext()`.
> >>>>>>>>>>>
> >>>>>>>>>>> Couple of arguments:
> >>>>>>>>>>> 1. I don’t understand the division of work between `advance()`
> >> and
> >>>>>>>>>>> `getCurrent()`. What should be done in which, especially for
> >>>>>> connectors
> >>>>>>>>>>> that handle records in batches (like Kafka) and when should you
> >>>> call
> >>>>>>>>>>> `advance` and when `getCurrent()`.
> >>>>>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow
> us
> >> in
> >>>>>> the
> >>>>>>>>>>> future to have asynchronous/non blocking connectors and more
> >>>>>>>> efficiently
> >>>>>>>>>>> handle large number of blocked threads, without busy waiting.
> >> While
> >>>>>> at
> >>>>>>>> the
> >>>>>>>>>>> same time it doesn’t add much complexity, since naive connector
> >>>>>>>>>>> implementations can be always blocking.
> >>>>>>>>>>> 3. This also would allow us to use a fixed size thread pool of
> >> task
> >>>>>>>>>>> executors, instead of one thread per task.
> >>>>>>>>>>>
> >>>>>>>>>>> Piotrek
> >>>>>>>>>>>
> >>>>>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <
> aljos...@apache.org
> >>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hi All,
> >>>>>>>>>>>>
> >>>>>>>>>>>> In order to finally get the ball rolling on the new source
> >>>> interface
> >>>>>>>>>>> that we have discussed for so long I finally created a FLIP:
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >>>>>>>>>>>>
> >>>>>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing
> work/discussion
> >>>>>> about
> >>>>>>>>>>> adding per-partition watermark support to the Kinesis source
> and
> >>>>>>>> because
> >>>>>>>>>>> this would enable generic implementation of event-time
> alignment
> >>>> for
> >>>>>>>> all
> >>>>>>>>>>> sources. Maybe we need another FLIP for the event-time
> alignment
> >>>>>> part,
> >>>>>>>>>>> especially the part about information sharing between
> operations
> >>>> (I'm
> >>>>>>>> not
> >>>>>>>>>>> calling it state sharing because state has a special meaning in
> >>>>>> Flink).
> >>>>>>>>>>>>
> >>>>>>>>>>>> Please discuss away!
> >>>>>>>>>>>>
> >>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >>
>
>

Reply via email to