Hi Good point with select/epoll, however I do not see how they couldn’t be with Flink if we would like single task in Flink to be single-threaded (and I believe we should pursue this goal). If your connector blocks on `select`, then it can not process/handle control messages from Flink, like checkpoints, releasing resources and potentially output flushes. This would require tight integration between connector and Flink’s main event loop/selects/etc.
Looking at it from other perspective. Let’s assume that we have a connector implemented on top of `select`/`epoll`. In order to integrate it with Flink’s checkpointing/flushes/resource releasing it will have to be executed in separate thread one way or another. At least if our API will enforce/encourage non blocking implementations with some kind of notifications (`isBlocked()` or `notify()` callback), some connectors might skip one layer of wapping threads. Btw, I don’t know if you understand my point. Having only `poll()` and `take()` is not enough for single threaded task. If our source interface doesn’t provide `notify()` callback nor `CompletableFuture<?> isBlocked(),`, there is no way to implement single threaded task that both reads the data from the source connector and can also react to system events. Ok, non blocking `poll()` would allow that, but with busy looping. Piotrek > On 8 Nov 2018, at 06:56, Becket Qin <becket....@gmail.com> wrote: > > Hi Piotrek, > >> But I don’t see a reason why we should expose both blocking `take()` and > non-blocking `poll()` methods to the Flink engine. Someone (Flink engine or > connector) would have to do the same busy >> looping anyway and I think it would be better to have a simpler connector > API (that would solve our problems) and force connectors to comply one way > or another. > > If we let the block happen inside the connector, the blocking does not have > to be a busy loop. For example, to do the block waiting efficiently, the > connector can use java NIO selector().select which relies on OS syscall > like epoll[1] instead of busy looping. But if Flink engine blocks outside > the connector, it pretty much has to do the busy loop. So if there is only > one API to get the element, a blocking getNextElement() makes more sense. > In any case, we should avoid ambiguity. It has to be crystal clear about > whether a method is expected to be blocking or non-blocking. Otherwise it > would be very difficult for Flink engine to do the right thing with the > connectors. At the first glance at getCurrent(), the expected behavior is > not quite clear. > > That said, I do agree that functionality wise, poll() and take() kind of > overlap. But they are actually not quite different from > isBlocked()/getNextElement(). Compared with isBlocked(), the only > difference is that poll() also returns the next record if it is available. > But I agree that the isBlocked() + getNextElement() is more flexible as > users can just check the record availability, but not fetch the next > element. > >> In case of thread-less readers with only non-blocking `queue.poll()` (is > that really a thing? I can not think about a real implementation that > enforces such constraints) > Right, it is pretty much a syntax sugar to allow user combine the > check-and-take into one method. It could be achieved with isBlocked() + > getNextElement(). > > [1] http://man7.org/linux/man-pages/man7/epoll.7.html > > Thanks, > > Jiangjie (Becket) Qin > > On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski <pi...@data-artisans.com> > wrote: > >> Hi Becket, >> >> With my proposal, both of your examples would have to be solved by the >> connector and solution to both problems would be the same: >> >> Pretend that connector is never blocked (`isBlocked() { return >> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking fashion (or >> semi blocking with return of control from time to time to allow for >> checkpointing, network flushing and other resource management things to >> happen in the same main thread). In other words, exactly how you would >> implement `take()` method or how the same source connector would be >> implemented NOW with current source interface. The difference with current >> interface would be only that main loop would be outside of the connector, >> and instead of periodically releasing checkpointing lock, periodically >> `return null;` or `return Optional.empty();` from `getNextElement()`. >> >> In case of thread-less readers with only non-blocking `queue.poll()` (is >> that really a thing? I can not think about a real implementation that >> enforces such constraints), we could provide a wrapper that hides the busy >> looping. The same applies how to solve forever blocking readers - we could >> provider another wrapper running the connector in separate thread. >> >> But I don’t see a reason why we should expose both blocking `take()` and >> non-blocking `poll()` methods to the Flink engine. Someone (Flink engine or >> connector) would have to do the same busy looping anyway and I think it >> would be better to have a simpler connector API (that would solve our >> problems) and force connectors to comply one way or another. >> >> Piotrek >> >>> On 7 Nov 2018, at 10:55, Becket Qin <becket....@gmail.com> wrote: >>> >>> Hi Piotr, >>> >>> I might have misunderstood you proposal. But let me try to explain my >>> concern. I am thinking about the following case: >>> 1. a reader has the following two interfaces, >>> boolean isBlocked() >>> T getNextElement() >>> 2. the implementation of getNextElement() is non-blocking. >>> 3. The reader is thread-less, i.e. it does not have any internal thread. >>> For example, it might just delegate the getNextElement() to a >> queue.poll(), >>> and isBlocked() is just queue.isEmpty(). >>> >>> How can Flink efficiently implement a blocking reading behavior with this >>> reader? Either a tight loop or a backoff interval is needed. Neither of >>> them is ideal. >>> >>> Now let's say in the reader mentioned above implements a blocking >>> getNextElement() method. Because there is no internal thread in the >> reader, >>> after isBlocked() returns false. Flink will still have to loop on >>> isBlocked() to check whether the next record is available. If the next >>> record reaches after 10 min, it is a tight loop for 10 min. You have >>> probably noticed that in this case, even isBlocked() returns a future, >> that >>> future() will not be completed if Flink does not call some method from >> the >>> reader, because the reader has no internal thread to complete that future >>> by itself. >>> >>> Due to the above reasons, a blocking take() API would allow Flink to have >>> an efficient way to read from a reader. There are many ways to wake up >> the >>> blocking thread when checkpointing is needed depending on the >>> implementation. But I think the poll()/take() API would also work in that >>> case. >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski <pi...@data-artisans.com> >>> wrote: >>> >>>> Hi, >>>> >>>> a) >>>> >>>>> BTW, regarding the isBlock() method, I have a few more questions. 21, >> Is >>>> a method isReady() with boolean as a return value >>>>> equivalent? Personally I found it is a little bit confusing in what is >>>> supposed to be returned when the future is completed. 22. if >>>>> the implementation of isBlocked() is optional, how do the callers know >>>> whether the method is properly implemented or not? >>>>> Does not implemented mean it always return a completed future? >>>> >>>> `CompletableFuture<?> isBlocked()` is more or less an equivalent to >>>> `boolean hasNext()` which in case of “false” provides some kind of a >>>> listener/callback that notifies about presence of next element. There >> are >>>> some minor details, like `CompletableFuture<?>` has a minimal two state >>>> logic: >>>> >>>> 1. Future is completed - we have more data >>>> 2. Future not yet completed - we don’t have data now, but we might/we >> will >>>> have in the future >>>> >>>> While `boolean hasNext()` and `notify()` callback are a bit more >>>> complicated/dispersed and can lead/encourage `notify()` spam. >>>> >>>> b) >>>> >>>>> 3. If merge the `advance` and `getCurrent` to one method like >> `getNext` >>>> the `getNext` would need return a >>>>> `ElementWithTimestamp` because some sources want to add timestamp to >>>> every element. IMO, this is not so memory friendly >>>>> so I prefer this design. >>>> >>>> Guowei I don’t quite understand this. Could you elaborate why having a >>>> separate `advance()` help? >>>> >>>> c) >>>> >>>> Regarding advance/poll/take. What’s the value of having two separate >>>> methods: poll and take? Which one of them should be called and which >>>> implemented? What’s the benefit of having those methods compared to >> having >>>> a one single method `getNextElement()` (or `pollElement() or whatever we >>>> name it) with following contract: >>>> >>>> CompletableFuture<?> isBlocked(); >>>> >>>> /** >>>> Return next element - will be called only if `isBlocked()` is completed. >>>> Try to implement it in non blocking fashion, but if that’s impossible or >>>> you just don’t need the effort, you can block in this method. >>>> */ >>>> T getNextElement(); >>>> >>>> I mean, if the connector is implemented non-blockingly, Flink should use >>>> it that way. If it’s not, then `poll()` will `throw new >>>> NotImplementedException()`. Implementing both of them and providing >> both of >>>> them to Flink wouldn’t make a sense, thus why not merge them into a >> single >>>> method call that should preferably (but not necessarily need to) be >>>> non-blocking? It’s not like we are implementing general purpose `Queue`, >>>> which users might want to call either of `poll` or `take`. We would >> always >>>> prefer to call `poll`, but if it’s blocking, then still we have no >> choice, >>>> but to call it and block on it. >>>> >>>> d) >>>> >>>>> 1. I agree with Piotr and Becket that the non-blocking source is very >>>>> important. But in addition to `Future/poll`, there may be another way >> to >>>>> achieve this. I think it may be not very memory friendly if every >> advance >>>>> call return a Future. >>>> >>>> I didn’t want to mention this, to not clog my initial proposal, but >> there >>>> is a simple solution for the problem: >>>> >>>> public interface SplitReader { >>>> >>>> (…) >>>> >>>> CompletableFuture<?> NOT_BLOCKED = >>>> CompletableFuture.completedFuture(null); >>>> >>>> /** >>>> * Returns a future that will be completed when the page source >> becomes >>>> * unblocked. If the page source is not blocked, this method should >>>> return >>>> * {@code NOT_BLOCKED}. >>>> */ >>>> default CompletableFuture<?> isBlocked() >>>> { >>>> return NOT_BLOCKED; >>>> } >>>> >>>> If we are blocked and we are waiting for the IO, then creating a new >>>> Future is non-issue. Under full throttle/throughput and not blocked >> sources >>>> returning a static `NOT_BLOCKED` constant should also solve the >> problem. >>>> >>>> One more remark, non-blocking sources might be a necessity in a single >>>> threaded model without a checkpointing lock. (Currently when sources are >>>> blocked, they can release checkpointing lock and re-acquire it again >>>> later). Non-blocking `poll`/`getNext()` would allow for checkpoints to >>>> happen when source is idling. In that case either `notify()` or my >> proposed >>>> `isBlocked()` would allow to avoid busy-looping. >>>> >>>> Piotrek >>>> >>>>> On 5 Nov 2018, at 03:59, Becket Qin <becket....@gmail.com> wrote: >>>>> >>>>> Hi Thomas, >>>>> >>>>> The iterator-like API was also the first thing that came to me. But it >>>>> seems a little confusing that hasNext() does not mean "the stream has >> not >>>>> ended", but means "the next record is ready", which is repurposing the >>>> well >>>>> known meaning of hasNext(). If we follow the hasNext()/next() pattern, >> an >>>>> additional isNextReady() method to indicate whether the next record is >>>>> ready seems more intuitive to me. >>>>> >>>>> Similarly, in poll()/take() pattern, another method of isDone() is >> needed >>>>> to indicate whether the stream has ended or not. >>>>> >>>>> Compared with hasNext()/next()/isNextReady() pattern, >>>>> isDone()/poll()/take() seems more flexible for the reader >> implementation. >>>>> When I am implementing a reader, I could have a couple of choices: >>>>> >>>>> - A thread-less reader that does not have any internal thread. >>>>> - When poll() is called, the same calling thread will perform a bunch >>>> of >>>>> IO asynchronously. >>>>> - When take() is called, the same calling thread will perform a >>>> bunch >>>>> of IO and wait until the record is ready. >>>>> - A reader with internal threads performing network IO and put records >>>>> into a buffer. >>>>> - When poll() is called, the calling thread simply reads from the >>>>> buffer and return empty result immediately if there is no record. >>>>> - When take() is called, the calling thread reads from the buffer >>>> and >>>>> block waiting if the buffer is empty. >>>>> >>>>> On the other hand, with the hasNext()/next()/isNextReady() API, it is >>>> less >>>>> intuitive for the reader developers to write the thread-less pattern. >>>>> Although technically speaking one can still do the asynchronous IO to >>>>> prepare the record in isNextReady(). But it is inexplicit and seems >>>>> somewhat hacky. >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> >>>>> >>>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <t...@apache.org> wrote: >>>>> >>>>>> Couple more points regarding discovery: >>>>>> >>>>>> The proposal mentions that discovery could be outside the execution >>>> graph. >>>>>> Today, discovered partitions/shards are checkpointed. I believe that >>>> will >>>>>> also need to be the case in the future, even when discovery and >> reading >>>> are >>>>>> split between different tasks. >>>>>> >>>>>> For cases such as resharding of a Kinesis stream, the relationship >>>> between >>>>>> splits needs to be considered. Splits cannot be randomly distributed >>>> over >>>>>> readers in certain situations. An example was mentioned here: >>>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809 >>>>>> >>>>>> Thomas >>>>>> >>>>>> >>>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <t...@apache.org> wrote: >>>>>> >>>>>>> Thanks for getting the ball rolling on this! >>>>>>> >>>>>>> Can the number of splits decrease? Yes, splits can be closed and go >>>> away. >>>>>>> An example would be a shard merge in Kinesis (2 existing shards will >> be >>>>>>> closed and replaced with a new shard). >>>>>>> >>>>>>> Regarding advance/poll/take: IMO the least restrictive approach would >>>> be >>>>>>> the thread-less IO model (pull based, non-blocking, caller retrieves >>>> new >>>>>>> records when available). The current Kinesis API requires the use of >>>>>>> threads. But that can be internal to the split reader and does not >> need >>>>>> to >>>>>>> be a source API concern. In fact, that's what we are working on right >>>> now >>>>>>> as improvement to the existing consumer: Each shard consumer thread >>>> will >>>>>>> push to a queue, the consumer main thread will poll the queue(s). It >> is >>>>>>> essentially a mapping from threaded IO to non-blocking. >>>>>>> >>>>>>> The proposed SplitReader interface would fit the thread-less IO >> model. >>>>>>> Similar to an iterator, we find out if there is a new element >> (hasNext) >>>>>> and >>>>>>> if so, move to it (next()). Separate calls deliver the meta >> information >>>>>>> (timestamp, watermark). Perhaps advance call could offer a timeout >>>>>> option, >>>>>>> so that the caller does not end up in a busy wait. On the other >> hand, a >>>>>>> caller processing multiple splits may want to cycle through fast, to >>>>>>> process elements of other splits as soon as they become available. >> The >>>>>> nice >>>>>>> thing is that this "split merge" logic can now live in Flink and be >>>>>>> optimized and shared between different sources. >>>>>>> >>>>>>> Thanks, >>>>>>> Thomas >>>>>>> >>>>>>> >>>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei....@gmail.com> >> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> Thanks Aljoscha for this FLIP. >>>>>>>> >>>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is >> very >>>>>>>> important. But in addition to `Future/poll`, there may be another >> way >>>> to >>>>>>>> achieve this. I think it may be not very memory friendly if every >>>>>> advance >>>>>>>> call return a Future. >>>>>>>> >>>>>>>> public interface Listener { >>>>>>>> public void notify(); >>>>>>>> } >>>>>>>> >>>>>>>> public interface SplitReader() { >>>>>>>> /** >>>>>>>> * When there is no element temporarily, this will return false. >>>>>>>> * When elements is available again splitReader can call >>>>>>>> listener.notify() >>>>>>>> * In addition the frame would check `advance` periodically . >>>>>>>> * Of course advance can always return true and ignore the >>>> listener >>>>>>>> argument for simplicity. >>>>>>>> */ >>>>>>>> public boolean advance(Listener listener); >>>>>>>> } >>>>>>>> >>>>>>>> 2. The FLIP tells us very clearly that how to create all Splits and >>>> how >>>>>>>> to create a SplitReader from a Split. But there is no strategy for >> the >>>>>> user >>>>>>>> to choose how to assign the splits to the tasks. I think we could >> add >>>> a >>>>>>>> Enum to let user to choose. >>>>>>>> /** >>>>>>>> public Enum SplitsAssignmentPolicy { >>>>>>>> Location, >>>>>>>> Workload, >>>>>>>> Random, >>>>>>>> Average >>>>>>>> } >>>>>>>> */ >>>>>>>> >>>>>>>> 3. If merge the `advance` and `getCurrent` to one method like >>>> `getNext` >>>>>>>> the `getNext` would need return a `ElementWithTimestamp` because >> some >>>>>>>> sources want to add timestamp to every element. IMO, this is not so >>>>>> memory >>>>>>>> friendly so I prefer this design. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 下午6:08写道: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of >> other >>>>>>>>> possible improvements. I have one proposal. Instead of having a >>>> method: >>>>>>>>> >>>>>>>>> boolean advance() throws IOException; >>>>>>>>> >>>>>>>>> I would replace it with >>>>>>>>> >>>>>>>>> /* >>>>>>>>> * Return a future, which when completed means that source has more >>>>>> data >>>>>>>>> and getNext() will not block. >>>>>>>>> * If you wish to use benefits of non blocking connectors, please >>>>>>>>> implement this method appropriately. >>>>>>>>> */ >>>>>>>>> default CompletableFuture<?> isBlocked() { >>>>>>>>> return CompletableFuture.completedFuture(null); >>>>>>>>> } >>>>>>>>> >>>>>>>>> And rename `getCurrent()` to `getNext()`. >>>>>>>>> >>>>>>>>> Couple of arguments: >>>>>>>>> 1. I don’t understand the division of work between `advance()` and >>>>>>>>> `getCurrent()`. What should be done in which, especially for >>>> connectors >>>>>>>>> that handle records in batches (like Kafka) and when should you >> call >>>>>>>>> `advance` and when `getCurrent()`. >>>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in >>>> the >>>>>>>>> future to have asynchronous/non blocking connectors and more >>>>>> efficiently >>>>>>>>> handle large number of blocked threads, without busy waiting. While >>>> at >>>>>> the >>>>>>>>> same time it doesn’t add much complexity, since naive connector >>>>>>>>> implementations can be always blocking. >>>>>>>>> 3. This also would allow us to use a fixed size thread pool of task >>>>>>>>> executors, instead of one thread per task. >>>>>>>>> >>>>>>>>> Piotrek >>>>>>>>> >>>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljos...@apache.org> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi All, >>>>>>>>>> >>>>>>>>>> In order to finally get the ball rolling on the new source >> interface >>>>>>>>> that we have discussed for so long I finally created a FLIP: >>>>>>>>> >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface >>>>>>>>>> >>>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion >>>> about >>>>>>>>> adding per-partition watermark support to the Kinesis source and >>>>>> because >>>>>>>>> this would enable generic implementation of event-time alignment >> for >>>>>> all >>>>>>>>> sources. Maybe we need another FLIP for the event-time alignment >>>> part, >>>>>>>>> especially the part about information sharing between operations >> (I'm >>>>>> not >>>>>>>>> calling it state sharing because state has a special meaning in >>>> Flink). >>>>>>>>>> >>>>>>>>>> Please discuss away! >>>>>>>>>> >>>>>>>>>> Aljoscha >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>> >>>> >>>> >> >>