Hi, One more thing. I think the Kafka client would be a good example of a connector that could use of this `isBlocked()`/callbacks single threaded API from the “Pattern 2”
If we have N threads per N splits, there would be no need for the (N+1)th thread. It could be implemented as a non blocking queue, that notifies the callback/completes the blocked future whenever the queue becomes non empty. The same thread that handles checkpoints, network flushes, resource management could handle reading from this queue. Piotrek > On 15 Nov 2018, at 17:13, Piotr Nowojski <pi...@data-artisans.com> wrote: > > Hi > > Re: Becket > >> WRT the confusion between advance() / getCurrent(), do you think it would >> help if we combine them and have something like: >> >> CompletableFuture<T> getNext(); >> long getWatermark(); >> long getCurrentTimestamp(); > > I think that technically this would work the same as `CompletableFuture<?> > isBlocked()`, `CompletableFuture<?> advance()` or callbac/`notify()` options. > I see two differences: > 1. in this case once connector unblocks itself and completes the future, > Flink’s engine would be responsible for holding the record somewhere, while > during this time Flink’s engine can be busy doing other things. Maybe that’s > not a big issue, but will slightly complicate the execution engine. > 2. This might cause some performance overhead, since every record will have > to go through the future. As I wrote somewhere before, both `advance()` and > `isBlocked()` during full throughput could return static/const NOT_BLOCKED > instance, which should/could behave better. > > Nevertheless maybe the choice between those options is secondary one and > could be done somewhere else/later or during comparison of some POCs? > > Re: Aljoscha > >> I think it should be as easy as adding a >> minimumTimestamp()/maximumTimestamp() method pair to the split interface. > > I think that `minimumTimestamp()/maximumTimestamp()` extension seems > reasonable if we want Flink to be aware of that. Since watermark > handling/emitting would be a custom logic anyway, maybe `minimum` and > `maximum` timestamps of a split could be handled as a private fields of the > specific connector implementation? I mean, the current proposal with > `getCurrentTimestamp()` method indicates that this logic will be hidden from > the Flink’s engine anyway, so there might be no need to expose them via API? > >> I see there has been some good discussion but I don't know if we have >> consensus. > > I think we are converging to a point that having some kind of additional > notification that the connector is not blocked anymore would be more flexible > for us. > > From the perspective of the execution engine, I would be in favour of testing > out our ideas and maybe benchmarking them to make sure that we are not > omitting something. > > Piotrek > >> On 15 Nov 2018, at 12:43, Aljoscha Krettek <aljos...@apache.org> wrote: >> >> Hi, >> >> I thought I had sent this mail a while ago but I must have forgotten to send >> it. >> >> There is another thing we should consider for splits: the range of >> timestamps that it can contain. For example, the splits of a file source >> would know what the minimum and maximum timestamp in the splits is, roughly. >> For infinite splits, such as Kafka partitions, the minimum would be >> meaningful but the maximum would be +Inf. If the splits expose the interval >> of time that they contain the readers, or the component that manages the >> readers can make decisions about which splits to forward and read first. And >> it can also influence the minimum watermark that a reader forwards: it >> should never emit a watermark if it knows there are splits to read that have >> a lower minimum timestamp. I think it should be as easy as adding a >> minimumTimestamp()/maximumTimestamp() method pair to the split interface. >> >> Another thing we need to resolve is the actual reader interface. I see there >> has been some good discussion but I don't know if we have consensus. We >> should try and see how specific sources could be implemented with the new >> interface. For example, for Kafka I think we need to have N+1 threads per >> task (where N is the number of splits that a task is reading from). On >> thread is responsible for reading from the splits. And each split has its >> own (internal) thread for reading from Kafka and putting messages in an >> internal queue to pull from. This is similar to how the current Kafka source >> is implemented, which has a separate fetcher thread. The reason for this >> split is that we always need to try reading from Kafka to keep the >> throughput up. In the current implementation the internal queue (or >> handover) limits the read rate of the reader threads. >> >> @Thomas, what do you think this would look like for Kinesis? >> >> Best, >> Aljoscha >> >>> On 15. Nov 2018, at 03:56, Becket Qin <becket....@gmail.com> wrote: >>> >>> Hi Piotrek, >>> >>> Thanks a lot for the detailed reply. All makes sense to me. >>> >>> WRT the confusion between advance() / getCurrent(), do you think it would >>> help if we combine them and have something like: >>> >>> CompletableFuture<T> getNext(); >>> long getWatermark(); >>> long getCurrentTimestamp(); >>> >>> Cheers, >>> >>> Jiangjie (Becket) Qin >>> >>> On Tue, Nov 13, 2018 at 9:56 PM Piotr Nowojski <pi...@data-artisans.com> >>> wrote: >>> >>>> Hi, >>>> >>>> Thanks again for the detailed answer :) Sorry for responding with a delay. >>>> >>>>> Completely agree that in pattern 2, having a callback is necessary for >>>> that >>>>> single thread outside of the connectors. And the connectors MUST have >>>>> internal threads. >>>> >>>> Yes, this thread will have to exists somewhere. In pattern 2 it exists in >>>> the connector (at least from the perspective of the Flink execution >>>> engine). In pattern 1 it exists inside the Flink execution engine. With >>>> completely blocking connectors, like simple reading from files, both of >>>> those approaches are basically the same. The difference is when user >>>> implementing Flink source is already working with a non blocking code with >>>> some internal threads. In this case, pattern 1 would result in "double >>>> thread wrapping”, while pattern 2 would allow to skip one layer of >>>> indirection. >>>> >>>>> If we go that way, we should have something like "void >>>>> poll(Callback) / void advance(callback)". I am curious how would >>>>> CompletableFuture work here, though. If 10 readers returns 10 completable >>>>> futures, will there be 10 additional threads (so 20 threads in total) >>>>> blocking waiting on them? Or will there be a single thread busy loop >>>>> checking around? >>>> >>>> To be honest, I haven’t thought this completely through and I haven’t >>>> tested/POC’ed it. Having said that, I can think of at least couple of >>>> solutions. First is something like this: >>>> >>>> >>>> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507 >>>> < >>>> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507 >>>>> >>>> >>>> Line: >>>> >>>> `blocked = split.process();` >>>> >>>> Is where the execution goes into to the task/sources. This is where the >>>> returned future is handled: >>>> >>>> blocked.addListener(() -> { >>>> blockedSplits.remove(split); >>>> // reset the level priority to prevent >>>> previously-blocked splits from starving existing splits >>>> split.resetLevelPriority(); >>>> waitingSplits.offer(split); >>>> }, executor); >>>> >>>> Fundamentally callbacks and Futures are more or less interchangeable You >>>> can always wrap one into another (creating a callback that completes a >>>> future and attach a callback once future completes). In this case the >>>> difference for me is mostly: >>>> - api with passing callback allows the callback to be fired multiple times >>>> and to fire it even if the connector is not blocked. This is what I meant >>>> by saying that api `CompletableFuture<?> isBlocked()` is a bit simpler. >>>> Connector can only return either “I’m not blocked” or “I’m blocked and I >>>> will tell you only once when I’m not blocked anymore”. >>>> >>>> But this is not the most important thing for me here. For me important >>>> thing is to try our best to make Flink task’s control and execution single >>>> threaded. For that both callback and future APIs should work the same. >>>> >>>>> WRT pattern 1, a single blocking take() API should just work. The good >>>>> thing is that a blocking read API is usually simpler to implement. >>>> >>>> Yes, they are easier to implement (especially if you are not the one that >>>> have to deal with the additional threading required around them ;) ). But >>>> to answer this issue, if we choose pattern 2, we can always provide a >>>> proxy/wrapper that would using the internal thread implement the >>>> non-blocking API while exposing blocking API to the user. It would >>>> implement pattern 2 for the user exposing to him pattern 1. In other words >>>> implementing pattern 1 in pattern 2 paradigm, while making it possible to >>>> implement pure pattern 2 connectors. >>>> >>>>> BTW, one thing I am also trying to avoid is pushing users to perform IO >>>> in >>>>> a method like "isBlocked()". If the method is expected to fetch records >>>>> (even if not returning them), naming it something more explicit would >>>> help >>>>> avoid confusion. >>>> >>>> If we choose so, we could rework it into something like: >>>> >>>> CompletableFuture<?> advance() >>>> T getCurrent(); >>>> Watermark getCurrentWatermark() >>>> >>>> But as I wrote before, this is more confusing to me for the exact reasons >>>> you mentioned :) I would be confused what should be done in `adanvce()` and >>>> what in `getCurrent()`. However, again this naming issue is not that >>>> important to me and probably is matter of taste/personal preferences. >>>> >>>> Piotrek >>>> >>>>> On 9 Nov 2018, at 18:37, Becket Qin <becket....@gmail.com> wrote: >>>>> >>>>> Hi Piotrek, >>>>> >>>>> Thanks for the explanation. We are probably talking about the same thing >>>>> but in different ways. To clarify a little bit, I think there are two >>>>> patterns to read from a connector. >>>>> >>>>> Pattern 1: Thread-less connector with a blocking read API. Outside of the >>>>> connector, there is one IO thread per reader, doing blocking read. An >>>>> additional thread will interact with all the IO threads. >>>>> Pattern 2: Connector with internal thread(s) and non-blocking API. >>>> Outside >>>>> of the connector, there is one thread for ALL readers, doing IO relying >>>> on >>>>> notification callbacks in the reader. >>>>> >>>>> In both patterns, there must be at least one thread per connector, either >>>>> inside (created by connector writers) or outside (created by Flink) of >>>> the >>>>> connector. Ideally there are NUM_CONNECTORS + 1 threads in total, to make >>>>> sure that 1 thread is fully non-blocking. >>>>> >>>>>> Btw, I don’t know if you understand my point. Having only `poll()` and >>>>> `take()` is not enough for single threaded task. If our source interface >>>>> doesn’t provide `notify()` callback nor >`CompletableFuture<?> >>>>> isBlocked(),`, there is no way to implement single threaded task that >>>> both >>>>> reads the data from the source connector and can also react to system >>>>> events. Ok, non >blocking `poll()` would allow that, but with busy >>>> looping. >>>>> >>>>> Completely agree that in pattern 2, having a callback is necessary for >>>> that >>>>> single thread outside of the connectors. And the connectors MUST have >>>>> internal threads. If we go that way, we should have something like "void >>>>> poll(Callback) / void advance(callback)". I am curious how would >>>>> CompletableFuture work here, though. If 10 readers returns 10 completable >>>>> futures, will there be 10 additional threads (so 20 threads in total) >>>>> blocking waiting on them? Or will there be a single thread busy loop >>>>> checking around? >>>>> >>>>> WRT pattern 1, a single blocking take() API should just work. The good >>>>> thing is that a blocking read API is usually simpler to implement. An >>>>> additional non-blocking "T poll()" method here is indeed optional and >>>> could >>>>> be used in cases like Flink does not want the thread to block forever. >>>> They >>>>> can also be combined to have a "T poll(Timeout)", which is exactly what >>>>> KafkaConsumer did. >>>>> >>>>> It sounds that you are proposing pattern 2 with something similar to NIO2 >>>>> AsynchronousByteChannel[1]. That API would work, except that the >>>> signature >>>>> returning future seems not necessary. If that is the case, a minor change >>>>> on the current FLIP proposal to have "void advance(callback)" should >>>> work. >>>>> And this means the connectors MUST have their internal threads. >>>>> >>>>> BTW, one thing I am also trying to avoid is pushing users to perform IO >>>> in >>>>> a method like "isBlocked()". If the method is expected to fetch records >>>>> (even if not returning them), naming it something more explicit would >>>> help >>>>> avoid confusion. >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> [1] >>>>> >>>> https://docs.oracle.com/javase/8/docs/api/java/nio/channels/AsynchronousByteChannel.html >>>>> >>>>> On Fri, Nov 9, 2018 at 11:20 PM Piotr Nowojski <pi...@data-artisans.com> >>>>> wrote: >>>>> >>>>>> Hi >>>>>> >>>>>> Good point with select/epoll, however I do not see how they couldn’t be >>>>>> with Flink if we would like single task in Flink to be single-threaded >>>> (and >>>>>> I believe we should pursue this goal). If your connector blocks on >>>>>> `select`, then it can not process/handle control messages from Flink, >>>> like >>>>>> checkpoints, releasing resources and potentially output flushes. This >>>> would >>>>>> require tight integration between connector and Flink’s main event >>>>>> loop/selects/etc. >>>>>> >>>>>> Looking at it from other perspective. Let’s assume that we have a >>>>>> connector implemented on top of `select`/`epoll`. In order to integrate >>>> it >>>>>> with Flink’s checkpointing/flushes/resource releasing it will have to be >>>>>> executed in separate thread one way or another. At least if our API will >>>>>> enforce/encourage non blocking implementations with some kind of >>>>>> notifications (`isBlocked()` or `notify()` callback), some connectors >>>> might >>>>>> skip one layer of wapping threads. >>>>>> >>>>>> Btw, I don’t know if you understand my point. Having only `poll()` and >>>>>> `take()` is not enough for single threaded task. If our source interface >>>>>> doesn’t provide `notify()` callback nor `CompletableFuture<?> >>>>>> isBlocked(),`, there is no way to implement single threaded task that >>>> both >>>>>> reads the data from the source connector and can also react to system >>>>>> events. Ok, non blocking `poll()` would allow that, but with busy >>>> looping. >>>>>> >>>>>> Piotrek >>>>>> >>>>>>> On 8 Nov 2018, at 06:56, Becket Qin <becket....@gmail.com> wrote: >>>>>>> >>>>>>> Hi Piotrek, >>>>>>> >>>>>>>> But I don’t see a reason why we should expose both blocking `take()` >>>> and >>>>>>> non-blocking `poll()` methods to the Flink engine. Someone (Flink >>>> engine >>>>>> or >>>>>>> connector) would have to do the same busy >>>>>>>> looping anyway and I think it would be better to have a simpler >>>>>> connector >>>>>>> API (that would solve our problems) and force connectors to comply one >>>>>> way >>>>>>> or another. >>>>>>> >>>>>>> If we let the block happen inside the connector, the blocking does not >>>>>> have >>>>>>> to be a busy loop. For example, to do the block waiting efficiently, >>>> the >>>>>>> connector can use java NIO selector().select which relies on OS syscall >>>>>>> like epoll[1] instead of busy looping. But if Flink engine blocks >>>> outside >>>>>>> the connector, it pretty much has to do the busy loop. So if there is >>>>>> only >>>>>>> one API to get the element, a blocking getNextElement() makes more >>>> sense. >>>>>>> In any case, we should avoid ambiguity. It has to be crystal clear >>>> about >>>>>>> whether a method is expected to be blocking or non-blocking. Otherwise >>>> it >>>>>>> would be very difficult for Flink engine to do the right thing with the >>>>>>> connectors. At the first glance at getCurrent(), the expected behavior >>>> is >>>>>>> not quite clear. >>>>>>> >>>>>>> That said, I do agree that functionality wise, poll() and take() kind >>>> of >>>>>>> overlap. But they are actually not quite different from >>>>>>> isBlocked()/getNextElement(). Compared with isBlocked(), the only >>>>>>> difference is that poll() also returns the next record if it is >>>>>> available. >>>>>>> But I agree that the isBlocked() + getNextElement() is more flexible as >>>>>>> users can just check the record availability, but not fetch the next >>>>>>> element. >>>>>>> >>>>>>>> In case of thread-less readers with only non-blocking `queue.poll()` >>>> (is >>>>>>> that really a thing? I can not think about a real implementation that >>>>>>> enforces such constraints) >>>>>>> Right, it is pretty much a syntax sugar to allow user combine the >>>>>>> check-and-take into one method. It could be achieved with isBlocked() + >>>>>>> getNextElement(). >>>>>>> >>>>>>> [1] http://man7.org/linux/man-pages/man7/epoll.7.html >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Jiangjie (Becket) Qin >>>>>>> >>>>>>> On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski < >>>> pi...@data-artisans.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Becket, >>>>>>>> >>>>>>>> With my proposal, both of your examples would have to be solved by the >>>>>>>> connector and solution to both problems would be the same: >>>>>>>> >>>>>>>> Pretend that connector is never blocked (`isBlocked() { return >>>>>>>> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking fashion >>>>>> (or >>>>>>>> semi blocking with return of control from time to time to allow for >>>>>>>> checkpointing, network flushing and other resource management things >>>> to >>>>>>>> happen in the same main thread). In other words, exactly how you would >>>>>>>> implement `take()` method or how the same source connector would be >>>>>>>> implemented NOW with current source interface. The difference with >>>>>> current >>>>>>>> interface would be only that main loop would be outside of the >>>>>> connector, >>>>>>>> and instead of periodically releasing checkpointing lock, periodically >>>>>>>> `return null;` or `return Optional.empty();` from `getNextElement()`. >>>>>>>> >>>>>>>> In case of thread-less readers with only non-blocking `queue.poll()` >>>> (is >>>>>>>> that really a thing? I can not think about a real implementation that >>>>>>>> enforces such constraints), we could provide a wrapper that hides the >>>>>> busy >>>>>>>> looping. The same applies how to solve forever blocking readers - we >>>>>> could >>>>>>>> provider another wrapper running the connector in separate thread. >>>>>>>> >>>>>>>> But I don’t see a reason why we should expose both blocking `take()` >>>> and >>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone (Flink >>>>>> engine or >>>>>>>> connector) would have to do the same busy looping anyway and I think >>>> it >>>>>>>> would be better to have a simpler connector API (that would solve our >>>>>>>> problems) and force connectors to comply one way or another. >>>>>>>> >>>>>>>> Piotrek >>>>>>>> >>>>>>>>> On 7 Nov 2018, at 10:55, Becket Qin <becket....@gmail.com> wrote: >>>>>>>>> >>>>>>>>> Hi Piotr, >>>>>>>>> >>>>>>>>> I might have misunderstood you proposal. But let me try to explain my >>>>>>>>> concern. I am thinking about the following case: >>>>>>>>> 1. a reader has the following two interfaces, >>>>>>>>> boolean isBlocked() >>>>>>>>> T getNextElement() >>>>>>>>> 2. the implementation of getNextElement() is non-blocking. >>>>>>>>> 3. The reader is thread-less, i.e. it does not have any internal >>>>>> thread. >>>>>>>>> For example, it might just delegate the getNextElement() to a >>>>>>>> queue.poll(), >>>>>>>>> and isBlocked() is just queue.isEmpty(). >>>>>>>>> >>>>>>>>> How can Flink efficiently implement a blocking reading behavior with >>>>>> this >>>>>>>>> reader? Either a tight loop or a backoff interval is needed. Neither >>>> of >>>>>>>>> them is ideal. >>>>>>>>> >>>>>>>>> Now let's say in the reader mentioned above implements a blocking >>>>>>>>> getNextElement() method. Because there is no internal thread in the >>>>>>>> reader, >>>>>>>>> after isBlocked() returns false. Flink will still have to loop on >>>>>>>>> isBlocked() to check whether the next record is available. If the >>>> next >>>>>>>>> record reaches after 10 min, it is a tight loop for 10 min. You have >>>>>>>>> probably noticed that in this case, even isBlocked() returns a >>>> future, >>>>>>>> that >>>>>>>>> future() will not be completed if Flink does not call some method >>>> from >>>>>>>> the >>>>>>>>> reader, because the reader has no internal thread to complete that >>>>>> future >>>>>>>>> by itself. >>>>>>>>> >>>>>>>>> Due to the above reasons, a blocking take() API would allow Flink to >>>>>> have >>>>>>>>> an efficient way to read from a reader. There are many ways to wake >>>> up >>>>>>>> the >>>>>>>>> blocking thread when checkpointing is needed depending on the >>>>>>>>> implementation. But I think the poll()/take() API would also work in >>>>>> that >>>>>>>>> case. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>> >>>>>>>>> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski < >>>> pi...@data-artisans.com >>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> a) >>>>>>>>>> >>>>>>>>>>> BTW, regarding the isBlock() method, I have a few more questions. >>>> 21, >>>>>>>> Is >>>>>>>>>> a method isReady() with boolean as a return value >>>>>>>>>>> equivalent? Personally I found it is a little bit confusing in what >>>>>> is >>>>>>>>>> supposed to be returned when the future is completed. 22. if >>>>>>>>>>> the implementation of isBlocked() is optional, how do the callers >>>>>> know >>>>>>>>>> whether the method is properly implemented or not? >>>>>>>>>>> Does not implemented mean it always return a completed future? >>>>>>>>>> >>>>>>>>>> `CompletableFuture<?> isBlocked()` is more or less an equivalent to >>>>>>>>>> `boolean hasNext()` which in case of “false” provides some kind of a >>>>>>>>>> listener/callback that notifies about presence of next element. >>>> There >>>>>>>> are >>>>>>>>>> some minor details, like `CompletableFuture<?>` has a minimal two >>>>>> state >>>>>>>>>> logic: >>>>>>>>>> >>>>>>>>>> 1. Future is completed - we have more data >>>>>>>>>> 2. Future not yet completed - we don’t have data now, but we >>>> might/we >>>>>>>> will >>>>>>>>>> have in the future >>>>>>>>>> >>>>>>>>>> While `boolean hasNext()` and `notify()` callback are a bit more >>>>>>>>>> complicated/dispersed and can lead/encourage `notify()` spam. >>>>>>>>>> >>>>>>>>>> b) >>>>>>>>>> >>>>>>>>>>> 3. If merge the `advance` and `getCurrent` to one method like >>>>>>>> `getNext` >>>>>>>>>> the `getNext` would need return a >>>>>>>>>>> `ElementWithTimestamp` because some sources want to add timestamp >>>> to >>>>>>>>>> every element. IMO, this is not so memory friendly >>>>>>>>>>> so I prefer this design. >>>>>>>>>> >>>>>>>>>> Guowei I don’t quite understand this. Could you elaborate why >>>> having a >>>>>>>>>> separate `advance()` help? >>>>>>>>>> >>>>>>>>>> c) >>>>>>>>>> >>>>>>>>>> Regarding advance/poll/take. What’s the value of having two separate >>>>>>>>>> methods: poll and take? Which one of them should be called and which >>>>>>>>>> implemented? What’s the benefit of having those methods compared to >>>>>>>> having >>>>>>>>>> a one single method `getNextElement()` (or `pollElement() or >>>> whatever >>>>>> we >>>>>>>>>> name it) with following contract: >>>>>>>>>> >>>>>>>>>> CompletableFuture<?> isBlocked(); >>>>>>>>>> >>>>>>>>>> /** >>>>>>>>>> Return next element - will be called only if `isBlocked()` is >>>>>> completed. >>>>>>>>>> Try to implement it in non blocking fashion, but if that’s >>>> impossible >>>>>> or >>>>>>>>>> you just don’t need the effort, you can block in this method. >>>>>>>>>> */ >>>>>>>>>> T getNextElement(); >>>>>>>>>> >>>>>>>>>> I mean, if the connector is implemented non-blockingly, Flink should >>>>>> use >>>>>>>>>> it that way. If it’s not, then `poll()` will `throw new >>>>>>>>>> NotImplementedException()`. Implementing both of them and providing >>>>>>>> both of >>>>>>>>>> them to Flink wouldn’t make a sense, thus why not merge them into a >>>>>>>> single >>>>>>>>>> method call that should preferably (but not necessarily need to) be >>>>>>>>>> non-blocking? It’s not like we are implementing general purpose >>>>>> `Queue`, >>>>>>>>>> which users might want to call either of `poll` or `take`. We would >>>>>>>> always >>>>>>>>>> prefer to call `poll`, but if it’s blocking, then still we have no >>>>>>>> choice, >>>>>>>>>> but to call it and block on it. >>>>>>>>>> >>>>>>>>>> d) >>>>>>>>>> >>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is >>>> very >>>>>>>>>>> important. But in addition to `Future/poll`, there may be another >>>> way >>>>>>>> to >>>>>>>>>>> achieve this. I think it may be not very memory friendly if every >>>>>>>> advance >>>>>>>>>>> call return a Future. >>>>>>>>>> >>>>>>>>>> I didn’t want to mention this, to not clog my initial proposal, but >>>>>>>> there >>>>>>>>>> is a simple solution for the problem: >>>>>>>>>> >>>>>>>>>> public interface SplitReader { >>>>>>>>>> >>>>>>>>>> (…) >>>>>>>>>> >>>>>>>>>> CompletableFuture<?> NOT_BLOCKED = >>>>>>>>>> CompletableFuture.completedFuture(null); >>>>>>>>>> >>>>>>>>>> /** >>>>>>>>>> * Returns a future that will be completed when the page source >>>>>>>> becomes >>>>>>>>>> * unblocked. If the page source is not blocked, this method >>>> should >>>>>>>>>> return >>>>>>>>>> * {@code NOT_BLOCKED}. >>>>>>>>>> */ >>>>>>>>>> default CompletableFuture<?> isBlocked() >>>>>>>>>> { >>>>>>>>>> return NOT_BLOCKED; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> If we are blocked and we are waiting for the IO, then creating a new >>>>>>>>>> Future is non-issue. Under full throttle/throughput and not blocked >>>>>>>> sources >>>>>>>>>> returning a static `NOT_BLOCKED` constant should also solve the >>>>>>>> problem. >>>>>>>>>> >>>>>>>>>> One more remark, non-blocking sources might be a necessity in a >>>> single >>>>>>>>>> threaded model without a checkpointing lock. (Currently when sources >>>>>> are >>>>>>>>>> blocked, they can release checkpointing lock and re-acquire it again >>>>>>>>>> later). Non-blocking `poll`/`getNext()` would allow for checkpoints >>>> to >>>>>>>>>> happen when source is idling. In that case either `notify()` or my >>>>>>>> proposed >>>>>>>>>> `isBlocked()` would allow to avoid busy-looping. >>>>>>>>>> >>>>>>>>>> Piotrek >>>>>>>>>> >>>>>>>>>>> On 5 Nov 2018, at 03:59, Becket Qin <becket....@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi Thomas, >>>>>>>>>>> >>>>>>>>>>> The iterator-like API was also the first thing that came to me. But >>>>>> it >>>>>>>>>>> seems a little confusing that hasNext() does not mean "the stream >>>> has >>>>>>>> not >>>>>>>>>>> ended", but means "the next record is ready", which is repurposing >>>>>> the >>>>>>>>>> well >>>>>>>>>>> known meaning of hasNext(). If we follow the hasNext()/next() >>>>>> pattern, >>>>>>>> an >>>>>>>>>>> additional isNextReady() method to indicate whether the next record >>>>>> is >>>>>>>>>>> ready seems more intuitive to me. >>>>>>>>>>> >>>>>>>>>>> Similarly, in poll()/take() pattern, another method of isDone() is >>>>>>>> needed >>>>>>>>>>> to indicate whether the stream has ended or not. >>>>>>>>>>> >>>>>>>>>>> Compared with hasNext()/next()/isNextReady() pattern, >>>>>>>>>>> isDone()/poll()/take() seems more flexible for the reader >>>>>>>> implementation. >>>>>>>>>>> When I am implementing a reader, I could have a couple of choices: >>>>>>>>>>> >>>>>>>>>>> - A thread-less reader that does not have any internal thread. >>>>>>>>>>> - When poll() is called, the same calling thread will perform a >>>> bunch >>>>>>>>>> of >>>>>>>>>>> IO asynchronously. >>>>>>>>>>> - When take() is called, the same calling thread will perform a >>>>>>>>>> bunch >>>>>>>>>>> of IO and wait until the record is ready. >>>>>>>>>>> - A reader with internal threads performing network IO and put >>>>>> records >>>>>>>>>>> into a buffer. >>>>>>>>>>> - When poll() is called, the calling thread simply reads from the >>>>>>>>>>> buffer and return empty result immediately if there is no record. >>>>>>>>>>> - When take() is called, the calling thread reads from the buffer >>>>>>>>>> and >>>>>>>>>>> block waiting if the buffer is empty. >>>>>>>>>>> >>>>>>>>>>> On the other hand, with the hasNext()/next()/isNextReady() API, it >>>> is >>>>>>>>>> less >>>>>>>>>>> intuitive for the reader developers to write the thread-less >>>> pattern. >>>>>>>>>>> Although technically speaking one can still do the asynchronous IO >>>> to >>>>>>>>>>> prepare the record in isNextReady(). But it is inexplicit and seems >>>>>>>>>>> somewhat hacky. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> >>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <t...@apache.org> >>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Couple more points regarding discovery: >>>>>>>>>>>> >>>>>>>>>>>> The proposal mentions that discovery could be outside the >>>> execution >>>>>>>>>> graph. >>>>>>>>>>>> Today, discovered partitions/shards are checkpointed. I believe >>>> that >>>>>>>>>> will >>>>>>>>>>>> also need to be the case in the future, even when discovery and >>>>>>>> reading >>>>>>>>>> are >>>>>>>>>>>> split between different tasks. >>>>>>>>>>>> >>>>>>>>>>>> For cases such as resharding of a Kinesis stream, the relationship >>>>>>>>>> between >>>>>>>>>>>> splits needs to be considered. Splits cannot be randomly >>>> distributed >>>>>>>>>> over >>>>>>>>>>>> readers in certain situations. An example was mentioned here: >>>>>>>>>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809 >>>>>>>>>>>> >>>>>>>>>>>> Thomas >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <t...@apache.org> >>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks for getting the ball rolling on this! >>>>>>>>>>>>> >>>>>>>>>>>>> Can the number of splits decrease? Yes, splits can be closed and >>>> go >>>>>>>>>> away. >>>>>>>>>>>>> An example would be a shard merge in Kinesis (2 existing shards >>>>>> will >>>>>>>> be >>>>>>>>>>>>> closed and replaced with a new shard). >>>>>>>>>>>>> >>>>>>>>>>>>> Regarding advance/poll/take: IMO the least restrictive approach >>>>>> would >>>>>>>>>> be >>>>>>>>>>>>> the thread-less IO model (pull based, non-blocking, caller >>>>>> retrieves >>>>>>>>>> new >>>>>>>>>>>>> records when available). The current Kinesis API requires the use >>>>>> of >>>>>>>>>>>>> threads. But that can be internal to the split reader and does >>>> not >>>>>>>> need >>>>>>>>>>>> to >>>>>>>>>>>>> be a source API concern. In fact, that's what we are working on >>>>>> right >>>>>>>>>> now >>>>>>>>>>>>> as improvement to the existing consumer: Each shard consumer >>>> thread >>>>>>>>>> will >>>>>>>>>>>>> push to a queue, the consumer main thread will poll the queue(s). >>>>>> It >>>>>>>> is >>>>>>>>>>>>> essentially a mapping from threaded IO to non-blocking. >>>>>>>>>>>>> >>>>>>>>>>>>> The proposed SplitReader interface would fit the thread-less IO >>>>>>>> model. >>>>>>>>>>>>> Similar to an iterator, we find out if there is a new element >>>>>>>> (hasNext) >>>>>>>>>>>> and >>>>>>>>>>>>> if so, move to it (next()). Separate calls deliver the meta >>>>>>>> information >>>>>>>>>>>>> (timestamp, watermark). Perhaps advance call could offer a >>>> timeout >>>>>>>>>>>> option, >>>>>>>>>>>>> so that the caller does not end up in a busy wait. On the other >>>>>>>> hand, a >>>>>>>>>>>>> caller processing multiple splits may want to cycle through fast, >>>>>> to >>>>>>>>>>>>> process elements of other splits as soon as they become >>>> available. >>>>>>>> The >>>>>>>>>>>> nice >>>>>>>>>>>>> thing is that this "split merge" logic can now live in Flink and >>>> be >>>>>>>>>>>>> optimized and shared between different sources. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Thomas >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei....@gmail.com> >>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> Thanks Aljoscha for this FLIP. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is >>>>>>>> very >>>>>>>>>>>>>> important. But in addition to `Future/poll`, there may be >>>> another >>>>>>>> way >>>>>>>>>> to >>>>>>>>>>>>>> achieve this. I think it may be not very memory friendly if >>>> every >>>>>>>>>>>> advance >>>>>>>>>>>>>> call return a Future. >>>>>>>>>>>>>> >>>>>>>>>>>>>> public interface Listener { >>>>>>>>>>>>>> public void notify(); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> public interface SplitReader() { >>>>>>>>>>>>>> /** >>>>>>>>>>>>>> * When there is no element temporarily, this will return >>>> false. >>>>>>>>>>>>>> * When elements is available again splitReader can call >>>>>>>>>>>>>> listener.notify() >>>>>>>>>>>>>> * In addition the frame would check `advance` periodically . >>>>>>>>>>>>>> * Of course advance can always return true and ignore the >>>>>>>>>> listener >>>>>>>>>>>>>> argument for simplicity. >>>>>>>>>>>>>> */ >>>>>>>>>>>>>> public boolean advance(Listener listener); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2. The FLIP tells us very clearly that how to create all Splits >>>>>> and >>>>>>>>>> how >>>>>>>>>>>>>> to create a SplitReader from a Split. But there is no strategy >>>> for >>>>>>>> the >>>>>>>>>>>> user >>>>>>>>>>>>>> to choose how to assign the splits to the tasks. I think we >>>> could >>>>>>>> add >>>>>>>>>> a >>>>>>>>>>>>>> Enum to let user to choose. >>>>>>>>>>>>>> /** >>>>>>>>>>>>>> public Enum SplitsAssignmentPolicy { >>>>>>>>>>>>>> Location, >>>>>>>>>>>>>> Workload, >>>>>>>>>>>>>> Random, >>>>>>>>>>>>>> Average >>>>>>>>>>>>>> } >>>>>>>>>>>>>> */ >>>>>>>>>>>>>> >>>>>>>>>>>>>> 3. If merge the `advance` and `getCurrent` to one method like >>>>>>>>>> `getNext` >>>>>>>>>>>>>> the `getNext` would need return a `ElementWithTimestamp` because >>>>>>>> some >>>>>>>>>>>>>> sources want to add timestamp to every element. IMO, this is not >>>>>> so >>>>>>>>>>>> memory >>>>>>>>>>>>>> friendly so I prefer this design. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> >>>>>>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 >>>> 下午6:08写道: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of >>>>>>>> other >>>>>>>>>>>>>>> possible improvements. I have one proposal. Instead of having a >>>>>>>>>> method: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> boolean advance() throws IOException; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I would replace it with >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> /* >>>>>>>>>>>>>>> * Return a future, which when completed means that source has >>>>>> more >>>>>>>>>>>> data >>>>>>>>>>>>>>> and getNext() will not block. >>>>>>>>>>>>>>> * If you wish to use benefits of non blocking connectors, >>>> please >>>>>>>>>>>>>>> implement this method appropriately. >>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>> default CompletableFuture<?> isBlocked() { >>>>>>>>>>>>>>> return CompletableFuture.completedFuture(null); >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> And rename `getCurrent()` to `getNext()`. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Couple of arguments: >>>>>>>>>>>>>>> 1. I don’t understand the division of work between `advance()` >>>>>> and >>>>>>>>>>>>>>> `getCurrent()`. What should be done in which, especially for >>>>>>>>>> connectors >>>>>>>>>>>>>>> that handle records in batches (like Kafka) and when should you >>>>>>>> call >>>>>>>>>>>>>>> `advance` and when `getCurrent()`. >>>>>>>>>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow >>>> us >>>>>> in >>>>>>>>>> the >>>>>>>>>>>>>>> future to have asynchronous/non blocking connectors and more >>>>>>>>>>>> efficiently >>>>>>>>>>>>>>> handle large number of blocked threads, without busy waiting. >>>>>> While >>>>>>>>>> at >>>>>>>>>>>> the >>>>>>>>>>>>>>> same time it doesn’t add much complexity, since naive connector >>>>>>>>>>>>>>> implementations can be always blocking. >>>>>>>>>>>>>>> 3. This also would allow us to use a fixed size thread pool of >>>>>> task >>>>>>>>>>>>>>> executors, instead of one thread per task. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek < >>>> aljos...@apache.org >>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi All, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> In order to finally get the ball rolling on the new source >>>>>>>> interface >>>>>>>>>>>>>>> that we have discussed for so long I finally created a FLIP: >>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing >>>> work/discussion >>>>>>>>>> about >>>>>>>>>>>>>>> adding per-partition watermark support to the Kinesis source >>>> and >>>>>>>>>>>> because >>>>>>>>>>>>>>> this would enable generic implementation of event-time >>>> alignment >>>>>>>> for >>>>>>>>>>>> all >>>>>>>>>>>>>>> sources. Maybe we need another FLIP for the event-time >>>> alignment >>>>>>>>>> part, >>>>>>>>>>>>>>> especially the part about information sharing between >>>> operations >>>>>>>> (I'm >>>>>>>>>>>> not >>>>>>>>>>>>>>> calling it state sharing because state has a special meaning in >>>>>>>>>> Flink). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Please discuss away! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Aljoscha >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>> >>>> >> >