Hi Piotrek, Thanks a lot for the detailed reply. All makes sense to me.
WRT the confusion between advance() / getCurrent(), do you think it would help if we combine them and have something like: CompletableFuture<T> getNext(); long getWatermark(); long getCurrentTimestamp(); Cheers, Jiangjie (Becket) Qin On Tue, Nov 13, 2018 at 9:56 PM Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi, > > Thanks again for the detailed answer :) Sorry for responding with a delay. > > > Completely agree that in pattern 2, having a callback is necessary for > that > > single thread outside of the connectors. And the connectors MUST have > > internal threads. > > Yes, this thread will have to exists somewhere. In pattern 2 it exists in > the connector (at least from the perspective of the Flink execution > engine). In pattern 1 it exists inside the Flink execution engine. With > completely blocking connectors, like simple reading from files, both of > those approaches are basically the same. The difference is when user > implementing Flink source is already working with a non blocking code with > some internal threads. In this case, pattern 1 would result in "double > thread wrapping”, while pattern 2 would allow to skip one layer of > indirection. > > > If we go that way, we should have something like "void > > poll(Callback) / void advance(callback)". I am curious how would > > CompletableFuture work here, though. If 10 readers returns 10 completable > > futures, will there be 10 additional threads (so 20 threads in total) > > blocking waiting on them? Or will there be a single thread busy loop > > checking around? > > To be honest, I haven’t thought this completely through and I haven’t > tested/POC’ed it. Having said that, I can think of at least couple of > solutions. First is something like this: > > > https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507 > < > https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507 > > > > Line: > > `blocked = split.process();` > > Is where the execution goes into to the task/sources. This is where the > returned future is handled: > > blocked.addListener(() -> { > blockedSplits.remove(split); > // reset the level priority to prevent > previously-blocked splits from starving existing splits > split.resetLevelPriority(); > waitingSplits.offer(split); > }, executor); > > Fundamentally callbacks and Futures are more or less interchangeable You > can always wrap one into another (creating a callback that completes a > future and attach a callback once future completes). In this case the > difference for me is mostly: > - api with passing callback allows the callback to be fired multiple times > and to fire it even if the connector is not blocked. This is what I meant > by saying that api `CompletableFuture<?> isBlocked()` is a bit simpler. > Connector can only return either “I’m not blocked” or “I’m blocked and I > will tell you only once when I’m not blocked anymore”. > > But this is not the most important thing for me here. For me important > thing is to try our best to make Flink task’s control and execution single > threaded. For that both callback and future APIs should work the same. > > > WRT pattern 1, a single blocking take() API should just work. The good > > thing is that a blocking read API is usually simpler to implement. > > Yes, they are easier to implement (especially if you are not the one that > have to deal with the additional threading required around them ;) ). But > to answer this issue, if we choose pattern 2, we can always provide a > proxy/wrapper that would using the internal thread implement the > non-blocking API while exposing blocking API to the user. It would > implement pattern 2 for the user exposing to him pattern 1. In other words > implementing pattern 1 in pattern 2 paradigm, while making it possible to > implement pure pattern 2 connectors. > > > BTW, one thing I am also trying to avoid is pushing users to perform IO > in > > a method like "isBlocked()". If the method is expected to fetch records > > (even if not returning them), naming it something more explicit would > help > > avoid confusion. > > If we choose so, we could rework it into something like: > > CompletableFuture<?> advance() > T getCurrent(); > Watermark getCurrentWatermark() > > But as I wrote before, this is more confusing to me for the exact reasons > you mentioned :) I would be confused what should be done in `adanvce()` and > what in `getCurrent()`. However, again this naming issue is not that > important to me and probably is matter of taste/personal preferences. > > Piotrek > > > On 9 Nov 2018, at 18:37, Becket Qin <becket....@gmail.com> wrote: > > > > Hi Piotrek, > > > > Thanks for the explanation. We are probably talking about the same thing > > but in different ways. To clarify a little bit, I think there are two > > patterns to read from a connector. > > > > Pattern 1: Thread-less connector with a blocking read API. Outside of the > > connector, there is one IO thread per reader, doing blocking read. An > > additional thread will interact with all the IO threads. > > Pattern 2: Connector with internal thread(s) and non-blocking API. > Outside > > of the connector, there is one thread for ALL readers, doing IO relying > on > > notification callbacks in the reader. > > > > In both patterns, there must be at least one thread per connector, either > > inside (created by connector writers) or outside (created by Flink) of > the > > connector. Ideally there are NUM_CONNECTORS + 1 threads in total, to make > > sure that 1 thread is fully non-blocking. > > > >> Btw, I don’t know if you understand my point. Having only `poll()` and > > `take()` is not enough for single threaded task. If our source interface > > doesn’t provide `notify()` callback nor >`CompletableFuture<?> > > isBlocked(),`, there is no way to implement single threaded task that > both > > reads the data from the source connector and can also react to system > > events. Ok, non >blocking `poll()` would allow that, but with busy > looping. > > > > Completely agree that in pattern 2, having a callback is necessary for > that > > single thread outside of the connectors. And the connectors MUST have > > internal threads. If we go that way, we should have something like "void > > poll(Callback) / void advance(callback)". I am curious how would > > CompletableFuture work here, though. If 10 readers returns 10 completable > > futures, will there be 10 additional threads (so 20 threads in total) > > blocking waiting on them? Or will there be a single thread busy loop > > checking around? > > > > WRT pattern 1, a single blocking take() API should just work. The good > > thing is that a blocking read API is usually simpler to implement. An > > additional non-blocking "T poll()" method here is indeed optional and > could > > be used in cases like Flink does not want the thread to block forever. > They > > can also be combined to have a "T poll(Timeout)", which is exactly what > > KafkaConsumer did. > > > > It sounds that you are proposing pattern 2 with something similar to NIO2 > > AsynchronousByteChannel[1]. That API would work, except that the > signature > > returning future seems not necessary. If that is the case, a minor change > > on the current FLIP proposal to have "void advance(callback)" should > work. > > And this means the connectors MUST have their internal threads. > > > > BTW, one thing I am also trying to avoid is pushing users to perform IO > in > > a method like "isBlocked()". If the method is expected to fetch records > > (even if not returning them), naming it something more explicit would > help > > avoid confusion. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > [1] > > > https://docs.oracle.com/javase/8/docs/api/java/nio/channels/AsynchronousByteChannel.html > > > > On Fri, Nov 9, 2018 at 11:20 PM Piotr Nowojski <pi...@data-artisans.com> > > wrote: > > > >> Hi > >> > >> Good point with select/epoll, however I do not see how they couldn’t be > >> with Flink if we would like single task in Flink to be single-threaded > (and > >> I believe we should pursue this goal). If your connector blocks on > >> `select`, then it can not process/handle control messages from Flink, > like > >> checkpoints, releasing resources and potentially output flushes. This > would > >> require tight integration between connector and Flink’s main event > >> loop/selects/etc. > >> > >> Looking at it from other perspective. Let’s assume that we have a > >> connector implemented on top of `select`/`epoll`. In order to integrate > it > >> with Flink’s checkpointing/flushes/resource releasing it will have to be > >> executed in separate thread one way or another. At least if our API will > >> enforce/encourage non blocking implementations with some kind of > >> notifications (`isBlocked()` or `notify()` callback), some connectors > might > >> skip one layer of wapping threads. > >> > >> Btw, I don’t know if you understand my point. Having only `poll()` and > >> `take()` is not enough for single threaded task. If our source interface > >> doesn’t provide `notify()` callback nor `CompletableFuture<?> > >> isBlocked(),`, there is no way to implement single threaded task that > both > >> reads the data from the source connector and can also react to system > >> events. Ok, non blocking `poll()` would allow that, but with busy > looping. > >> > >> Piotrek > >> > >>> On 8 Nov 2018, at 06:56, Becket Qin <becket....@gmail.com> wrote: > >>> > >>> Hi Piotrek, > >>> > >>>> But I don’t see a reason why we should expose both blocking `take()` > and > >>> non-blocking `poll()` methods to the Flink engine. Someone (Flink > engine > >> or > >>> connector) would have to do the same busy > >>>> looping anyway and I think it would be better to have a simpler > >> connector > >>> API (that would solve our problems) and force connectors to comply one > >> way > >>> or another. > >>> > >>> If we let the block happen inside the connector, the blocking does not > >> have > >>> to be a busy loop. For example, to do the block waiting efficiently, > the > >>> connector can use java NIO selector().select which relies on OS syscall > >>> like epoll[1] instead of busy looping. But if Flink engine blocks > outside > >>> the connector, it pretty much has to do the busy loop. So if there is > >> only > >>> one API to get the element, a blocking getNextElement() makes more > sense. > >>> In any case, we should avoid ambiguity. It has to be crystal clear > about > >>> whether a method is expected to be blocking or non-blocking. Otherwise > it > >>> would be very difficult for Flink engine to do the right thing with the > >>> connectors. At the first glance at getCurrent(), the expected behavior > is > >>> not quite clear. > >>> > >>> That said, I do agree that functionality wise, poll() and take() kind > of > >>> overlap. But they are actually not quite different from > >>> isBlocked()/getNextElement(). Compared with isBlocked(), the only > >>> difference is that poll() also returns the next record if it is > >> available. > >>> But I agree that the isBlocked() + getNextElement() is more flexible as > >>> users can just check the record availability, but not fetch the next > >>> element. > >>> > >>>> In case of thread-less readers with only non-blocking `queue.poll()` > (is > >>> that really a thing? I can not think about a real implementation that > >>> enforces such constraints) > >>> Right, it is pretty much a syntax sugar to allow user combine the > >>> check-and-take into one method. It could be achieved with isBlocked() + > >>> getNextElement(). > >>> > >>> [1] http://man7.org/linux/man-pages/man7/epoll.7.html > >>> > >>> Thanks, > >>> > >>> Jiangjie (Becket) Qin > >>> > >>> On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski < > pi...@data-artisans.com> > >>> wrote: > >>> > >>>> Hi Becket, > >>>> > >>>> With my proposal, both of your examples would have to be solved by the > >>>> connector and solution to both problems would be the same: > >>>> > >>>> Pretend that connector is never blocked (`isBlocked() { return > >>>> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking fashion > >> (or > >>>> semi blocking with return of control from time to time to allow for > >>>> checkpointing, network flushing and other resource management things > to > >>>> happen in the same main thread). In other words, exactly how you would > >>>> implement `take()` method or how the same source connector would be > >>>> implemented NOW with current source interface. The difference with > >> current > >>>> interface would be only that main loop would be outside of the > >> connector, > >>>> and instead of periodically releasing checkpointing lock, periodically > >>>> `return null;` or `return Optional.empty();` from `getNextElement()`. > >>>> > >>>> In case of thread-less readers with only non-blocking `queue.poll()` > (is > >>>> that really a thing? I can not think about a real implementation that > >>>> enforces such constraints), we could provide a wrapper that hides the > >> busy > >>>> looping. The same applies how to solve forever blocking readers - we > >> could > >>>> provider another wrapper running the connector in separate thread. > >>>> > >>>> But I don’t see a reason why we should expose both blocking `take()` > and > >>>> non-blocking `poll()` methods to the Flink engine. Someone (Flink > >> engine or > >>>> connector) would have to do the same busy looping anyway and I think > it > >>>> would be better to have a simpler connector API (that would solve our > >>>> problems) and force connectors to comply one way or another. > >>>> > >>>> Piotrek > >>>> > >>>>> On 7 Nov 2018, at 10:55, Becket Qin <becket....@gmail.com> wrote: > >>>>> > >>>>> Hi Piotr, > >>>>> > >>>>> I might have misunderstood you proposal. But let me try to explain my > >>>>> concern. I am thinking about the following case: > >>>>> 1. a reader has the following two interfaces, > >>>>> boolean isBlocked() > >>>>> T getNextElement() > >>>>> 2. the implementation of getNextElement() is non-blocking. > >>>>> 3. The reader is thread-less, i.e. it does not have any internal > >> thread. > >>>>> For example, it might just delegate the getNextElement() to a > >>>> queue.poll(), > >>>>> and isBlocked() is just queue.isEmpty(). > >>>>> > >>>>> How can Flink efficiently implement a blocking reading behavior with > >> this > >>>>> reader? Either a tight loop or a backoff interval is needed. Neither > of > >>>>> them is ideal. > >>>>> > >>>>> Now let's say in the reader mentioned above implements a blocking > >>>>> getNextElement() method. Because there is no internal thread in the > >>>> reader, > >>>>> after isBlocked() returns false. Flink will still have to loop on > >>>>> isBlocked() to check whether the next record is available. If the > next > >>>>> record reaches after 10 min, it is a tight loop for 10 min. You have > >>>>> probably noticed that in this case, even isBlocked() returns a > future, > >>>> that > >>>>> future() will not be completed if Flink does not call some method > from > >>>> the > >>>>> reader, because the reader has no internal thread to complete that > >> future > >>>>> by itself. > >>>>> > >>>>> Due to the above reasons, a blocking take() API would allow Flink to > >> have > >>>>> an efficient way to read from a reader. There are many ways to wake > up > >>>> the > >>>>> blocking thread when checkpointing is needed depending on the > >>>>> implementation. But I think the poll()/take() API would also work in > >> that > >>>>> case. > >>>>> > >>>>> Thanks, > >>>>> > >>>>> Jiangjie (Becket) Qin > >>>>> > >>>>> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski < > pi...@data-artisans.com > >>> > >>>>> wrote: > >>>>> > >>>>>> Hi, > >>>>>> > >>>>>> a) > >>>>>> > >>>>>>> BTW, regarding the isBlock() method, I have a few more questions. > 21, > >>>> Is > >>>>>> a method isReady() with boolean as a return value > >>>>>>> equivalent? Personally I found it is a little bit confusing in what > >> is > >>>>>> supposed to be returned when the future is completed. 22. if > >>>>>>> the implementation of isBlocked() is optional, how do the callers > >> know > >>>>>> whether the method is properly implemented or not? > >>>>>>> Does not implemented mean it always return a completed future? > >>>>>> > >>>>>> `CompletableFuture<?> isBlocked()` is more or less an equivalent to > >>>>>> `boolean hasNext()` which in case of “false” provides some kind of a > >>>>>> listener/callback that notifies about presence of next element. > There > >>>> are > >>>>>> some minor details, like `CompletableFuture<?>` has a minimal two > >> state > >>>>>> logic: > >>>>>> > >>>>>> 1. Future is completed - we have more data > >>>>>> 2. Future not yet completed - we don’t have data now, but we > might/we > >>>> will > >>>>>> have in the future > >>>>>> > >>>>>> While `boolean hasNext()` and `notify()` callback are a bit more > >>>>>> complicated/dispersed and can lead/encourage `notify()` spam. > >>>>>> > >>>>>> b) > >>>>>> > >>>>>>> 3. If merge the `advance` and `getCurrent` to one method like > >>>> `getNext` > >>>>>> the `getNext` would need return a > >>>>>>> `ElementWithTimestamp` because some sources want to add timestamp > to > >>>>>> every element. IMO, this is not so memory friendly > >>>>>>> so I prefer this design. > >>>>>> > >>>>>> Guowei I don’t quite understand this. Could you elaborate why > having a > >>>>>> separate `advance()` help? > >>>>>> > >>>>>> c) > >>>>>> > >>>>>> Regarding advance/poll/take. What’s the value of having two separate > >>>>>> methods: poll and take? Which one of them should be called and which > >>>>>> implemented? What’s the benefit of having those methods compared to > >>>> having > >>>>>> a one single method `getNextElement()` (or `pollElement() or > whatever > >> we > >>>>>> name it) with following contract: > >>>>>> > >>>>>> CompletableFuture<?> isBlocked(); > >>>>>> > >>>>>> /** > >>>>>> Return next element - will be called only if `isBlocked()` is > >> completed. > >>>>>> Try to implement it in non blocking fashion, but if that’s > impossible > >> or > >>>>>> you just don’t need the effort, you can block in this method. > >>>>>> */ > >>>>>> T getNextElement(); > >>>>>> > >>>>>> I mean, if the connector is implemented non-blockingly, Flink should > >> use > >>>>>> it that way. If it’s not, then `poll()` will `throw new > >>>>>> NotImplementedException()`. Implementing both of them and providing > >>>> both of > >>>>>> them to Flink wouldn’t make a sense, thus why not merge them into a > >>>> single > >>>>>> method call that should preferably (but not necessarily need to) be > >>>>>> non-blocking? It’s not like we are implementing general purpose > >> `Queue`, > >>>>>> which users might want to call either of `poll` or `take`. We would > >>>> always > >>>>>> prefer to call `poll`, but if it’s blocking, then still we have no > >>>> choice, > >>>>>> but to call it and block on it. > >>>>>> > >>>>>> d) > >>>>>> > >>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is > very > >>>>>>> important. But in addition to `Future/poll`, there may be another > way > >>>> to > >>>>>>> achieve this. I think it may be not very memory friendly if every > >>>> advance > >>>>>>> call return a Future. > >>>>>> > >>>>>> I didn’t want to mention this, to not clog my initial proposal, but > >>>> there > >>>>>> is a simple solution for the problem: > >>>>>> > >>>>>> public interface SplitReader { > >>>>>> > >>>>>> (…) > >>>>>> > >>>>>> CompletableFuture<?> NOT_BLOCKED = > >>>>>> CompletableFuture.completedFuture(null); > >>>>>> > >>>>>> /** > >>>>>> * Returns a future that will be completed when the page source > >>>> becomes > >>>>>> * unblocked. If the page source is not blocked, this method > should > >>>>>> return > >>>>>> * {@code NOT_BLOCKED}. > >>>>>> */ > >>>>>> default CompletableFuture<?> isBlocked() > >>>>>> { > >>>>>> return NOT_BLOCKED; > >>>>>> } > >>>>>> > >>>>>> If we are blocked and we are waiting for the IO, then creating a new > >>>>>> Future is non-issue. Under full throttle/throughput and not blocked > >>>> sources > >>>>>> returning a static `NOT_BLOCKED` constant should also solve the > >>>> problem. > >>>>>> > >>>>>> One more remark, non-blocking sources might be a necessity in a > single > >>>>>> threaded model without a checkpointing lock. (Currently when sources > >> are > >>>>>> blocked, they can release checkpointing lock and re-acquire it again > >>>>>> later). Non-blocking `poll`/`getNext()` would allow for checkpoints > to > >>>>>> happen when source is idling. In that case either `notify()` or my > >>>> proposed > >>>>>> `isBlocked()` would allow to avoid busy-looping. > >>>>>> > >>>>>> Piotrek > >>>>>> > >>>>>>> On 5 Nov 2018, at 03:59, Becket Qin <becket....@gmail.com> wrote: > >>>>>>> > >>>>>>> Hi Thomas, > >>>>>>> > >>>>>>> The iterator-like API was also the first thing that came to me. But > >> it > >>>>>>> seems a little confusing that hasNext() does not mean "the stream > has > >>>> not > >>>>>>> ended", but means "the next record is ready", which is repurposing > >> the > >>>>>> well > >>>>>>> known meaning of hasNext(). If we follow the hasNext()/next() > >> pattern, > >>>> an > >>>>>>> additional isNextReady() method to indicate whether the next record > >> is > >>>>>>> ready seems more intuitive to me. > >>>>>>> > >>>>>>> Similarly, in poll()/take() pattern, another method of isDone() is > >>>> needed > >>>>>>> to indicate whether the stream has ended or not. > >>>>>>> > >>>>>>> Compared with hasNext()/next()/isNextReady() pattern, > >>>>>>> isDone()/poll()/take() seems more flexible for the reader > >>>> implementation. > >>>>>>> When I am implementing a reader, I could have a couple of choices: > >>>>>>> > >>>>>>> - A thread-less reader that does not have any internal thread. > >>>>>>> - When poll() is called, the same calling thread will perform a > bunch > >>>>>> of > >>>>>>> IO asynchronously. > >>>>>>> - When take() is called, the same calling thread will perform a > >>>>>> bunch > >>>>>>> of IO and wait until the record is ready. > >>>>>>> - A reader with internal threads performing network IO and put > >> records > >>>>>>> into a buffer. > >>>>>>> - When poll() is called, the calling thread simply reads from the > >>>>>>> buffer and return empty result immediately if there is no record. > >>>>>>> - When take() is called, the calling thread reads from the buffer > >>>>>> and > >>>>>>> block waiting if the buffer is empty. > >>>>>>> > >>>>>>> On the other hand, with the hasNext()/next()/isNextReady() API, it > is > >>>>>> less > >>>>>>> intuitive for the reader developers to write the thread-less > pattern. > >>>>>>> Although technically speaking one can still do the asynchronous IO > to > >>>>>>> prepare the record in isNextReady(). But it is inexplicit and seems > >>>>>>> somewhat hacky. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> > >>>>>>> Jiangjie (Becket) Qin > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <t...@apache.org> > wrote: > >>>>>>> > >>>>>>>> Couple more points regarding discovery: > >>>>>>>> > >>>>>>>> The proposal mentions that discovery could be outside the > execution > >>>>>> graph. > >>>>>>>> Today, discovered partitions/shards are checkpointed. I believe > that > >>>>>> will > >>>>>>>> also need to be the case in the future, even when discovery and > >>>> reading > >>>>>> are > >>>>>>>> split between different tasks. > >>>>>>>> > >>>>>>>> For cases such as resharding of a Kinesis stream, the relationship > >>>>>> between > >>>>>>>> splits needs to be considered. Splits cannot be randomly > distributed > >>>>>> over > >>>>>>>> readers in certain situations. An example was mentioned here: > >>>>>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809 > >>>>>>>> > >>>>>>>> Thomas > >>>>>>>> > >>>>>>>> > >>>>>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <t...@apache.org> > wrote: > >>>>>>>> > >>>>>>>>> Thanks for getting the ball rolling on this! > >>>>>>>>> > >>>>>>>>> Can the number of splits decrease? Yes, splits can be closed and > go > >>>>>> away. > >>>>>>>>> An example would be a shard merge in Kinesis (2 existing shards > >> will > >>>> be > >>>>>>>>> closed and replaced with a new shard). > >>>>>>>>> > >>>>>>>>> Regarding advance/poll/take: IMO the least restrictive approach > >> would > >>>>>> be > >>>>>>>>> the thread-less IO model (pull based, non-blocking, caller > >> retrieves > >>>>>> new > >>>>>>>>> records when available). The current Kinesis API requires the use > >> of > >>>>>>>>> threads. But that can be internal to the split reader and does > not > >>>> need > >>>>>>>> to > >>>>>>>>> be a source API concern. In fact, that's what we are working on > >> right > >>>>>> now > >>>>>>>>> as improvement to the existing consumer: Each shard consumer > thread > >>>>>> will > >>>>>>>>> push to a queue, the consumer main thread will poll the queue(s). > >> It > >>>> is > >>>>>>>>> essentially a mapping from threaded IO to non-blocking. > >>>>>>>>> > >>>>>>>>> The proposed SplitReader interface would fit the thread-less IO > >>>> model. > >>>>>>>>> Similar to an iterator, we find out if there is a new element > >>>> (hasNext) > >>>>>>>> and > >>>>>>>>> if so, move to it (next()). Separate calls deliver the meta > >>>> information > >>>>>>>>> (timestamp, watermark). Perhaps advance call could offer a > timeout > >>>>>>>> option, > >>>>>>>>> so that the caller does not end up in a busy wait. On the other > >>>> hand, a > >>>>>>>>> caller processing multiple splits may want to cycle through fast, > >> to > >>>>>>>>> process elements of other splits as soon as they become > available. > >>>> The > >>>>>>>> nice > >>>>>>>>> thing is that this "split merge" logic can now live in Flink and > be > >>>>>>>>> optimized and shared between different sources. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Thomas > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei....@gmail.com> > >>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi, > >>>>>>>>>> Thanks Aljoscha for this FLIP. > >>>>>>>>>> > >>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is > >>>> very > >>>>>>>>>> important. But in addition to `Future/poll`, there may be > another > >>>> way > >>>>>> to > >>>>>>>>>> achieve this. I think it may be not very memory friendly if > every > >>>>>>>> advance > >>>>>>>>>> call return a Future. > >>>>>>>>>> > >>>>>>>>>> public interface Listener { > >>>>>>>>>> public void notify(); > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> public interface SplitReader() { > >>>>>>>>>> /** > >>>>>>>>>> * When there is no element temporarily, this will return > false. > >>>>>>>>>> * When elements is available again splitReader can call > >>>>>>>>>> listener.notify() > >>>>>>>>>> * In addition the frame would check `advance` periodically . > >>>>>>>>>> * Of course advance can always return true and ignore the > >>>>>> listener > >>>>>>>>>> argument for simplicity. > >>>>>>>>>> */ > >>>>>>>>>> public boolean advance(Listener listener); > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> 2. The FLIP tells us very clearly that how to create all Splits > >> and > >>>>>> how > >>>>>>>>>> to create a SplitReader from a Split. But there is no strategy > for > >>>> the > >>>>>>>> user > >>>>>>>>>> to choose how to assign the splits to the tasks. I think we > could > >>>> add > >>>>>> a > >>>>>>>>>> Enum to let user to choose. > >>>>>>>>>> /** > >>>>>>>>>> public Enum SplitsAssignmentPolicy { > >>>>>>>>>> Location, > >>>>>>>>>> Workload, > >>>>>>>>>> Random, > >>>>>>>>>> Average > >>>>>>>>>> } > >>>>>>>>>> */ > >>>>>>>>>> > >>>>>>>>>> 3. If merge the `advance` and `getCurrent` to one method like > >>>>>> `getNext` > >>>>>>>>>> the `getNext` would need return a `ElementWithTimestamp` because > >>>> some > >>>>>>>>>> sources want to add timestamp to every element. IMO, this is not > >> so > >>>>>>>> memory > >>>>>>>>>> friendly so I prefer this design. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Thanks > >>>>>>>>>> > >>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 > 下午6:08写道: > >>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of > >>>> other > >>>>>>>>>>> possible improvements. I have one proposal. Instead of having a > >>>>>> method: > >>>>>>>>>>> > >>>>>>>>>>> boolean advance() throws IOException; > >>>>>>>>>>> > >>>>>>>>>>> I would replace it with > >>>>>>>>>>> > >>>>>>>>>>> /* > >>>>>>>>>>> * Return a future, which when completed means that source has > >> more > >>>>>>>> data > >>>>>>>>>>> and getNext() will not block. > >>>>>>>>>>> * If you wish to use benefits of non blocking connectors, > please > >>>>>>>>>>> implement this method appropriately. > >>>>>>>>>>> */ > >>>>>>>>>>> default CompletableFuture<?> isBlocked() { > >>>>>>>>>>> return CompletableFuture.completedFuture(null); > >>>>>>>>>>> } > >>>>>>>>>>> > >>>>>>>>>>> And rename `getCurrent()` to `getNext()`. > >>>>>>>>>>> > >>>>>>>>>>> Couple of arguments: > >>>>>>>>>>> 1. I don’t understand the division of work between `advance()` > >> and > >>>>>>>>>>> `getCurrent()`. What should be done in which, especially for > >>>>>> connectors > >>>>>>>>>>> that handle records in batches (like Kafka) and when should you > >>>> call > >>>>>>>>>>> `advance` and when `getCurrent()`. > >>>>>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow > us > >> in > >>>>>> the > >>>>>>>>>>> future to have asynchronous/non blocking connectors and more > >>>>>>>> efficiently > >>>>>>>>>>> handle large number of blocked threads, without busy waiting. > >> While > >>>>>> at > >>>>>>>> the > >>>>>>>>>>> same time it doesn’t add much complexity, since naive connector > >>>>>>>>>>> implementations can be always blocking. > >>>>>>>>>>> 3. This also would allow us to use a fixed size thread pool of > >> task > >>>>>>>>>>> executors, instead of one thread per task. > >>>>>>>>>>> > >>>>>>>>>>> Piotrek > >>>>>>>>>>> > >>>>>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek < > aljos...@apache.org > >>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Hi All, > >>>>>>>>>>>> > >>>>>>>>>>>> In order to finally get the ball rolling on the new source > >>>> interface > >>>>>>>>>>> that we have discussed for so long I finally created a FLIP: > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > >>>>>>>>>>>> > >>>>>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing > work/discussion > >>>>>> about > >>>>>>>>>>> adding per-partition watermark support to the Kinesis source > and > >>>>>>>> because > >>>>>>>>>>> this would enable generic implementation of event-time > alignment > >>>> for > >>>>>>>> all > >>>>>>>>>>> sources. Maybe we need another FLIP for the event-time > alignment > >>>>>> part, > >>>>>>>>>>> especially the part about information sharing between > operations > >>>> (I'm > >>>>>>>> not > >>>>>>>>>>> calling it state sharing because state has a special meaning in > >>>>>> Flink). > >>>>>>>>>>>> > >>>>>>>>>>>> Please discuss away! > >>>>>>>>>>>> > >>>>>>>>>>>> Aljoscha > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >>>>>> > >>>> > >>>> > >> > >> > >