Thanks Piotrek, > void SplitReader#addSplit(Split) > boolean SplitReader#doesWantMoreSplits()
I have two questions about this API. 1. What if the SplitReader implementation cannot easily add a split to read on the fly? 2. Does Flink have to be involved in splits assignment? I am wondering if it would be simpler to let the enumerator indicate whether a split reassignment is needed. If the answer is yes, Flink can just start from the beginning to get all the splits and create one reader per split. This might be a little more expensive than dynamically adding a split to a reader, but given that the splits change should be rare, it is probably acceptable. In the Kafka case, the SplitT may just be a consumer. The enumerator will simply check if the topic has new partitions to be assigned to this reader. @Biao, If I understand correctly, the concern you raised was that a Source may return a lot of splits and thus Flink may have to create a lot of fetcher threads. This is a valid concern, but I cannot think of a solution to that. After all, the SplitReaders may be written by third parties. Poor implementations seem difficult to prevent. Thanks, Jiangjie (Becket) Qin On Wed, Nov 21, 2018 at 10:13 PM Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi again, > > > However I don't like the thread mode which starts a thread for each > split. > > Starting extra thread in operator is not an ideal way IMO. Especially > > thread count is decided by split count. So I was wondering if there is a > > more elegant way. Do we really want these threads in Flink core? > > Biao you have raised an important issue. Indeed it seems like the current > proposal is missing something. I would guess that we need a mechanism for > adding new splits to an already existing SplitReader and some logic to > determine whether current instance can accept more splits or not. For > example > > void SplitReader#addSplit(Split) > boolean SplitReader#doesWantMoreSplits() > > Flink could randomly/round robin assign new splits to the SplitReaders > that `doWantMoreSplits()`. Batch file readers might implement some custom > logic in `doesWantMoreSplits()`, like one SplitReader can have at most N > enqueued splits? > > Also what about Kafka. Isn’t it the case that one KafkaConsumer can read > from multiple splits? So Kafka’s SplitReader should always return true from > `doesWantMoreSplits()`? > > What do you think? > > Re: Becket > > I’m +1 for Sync and AsyncSplitReader. > > Piotrek > > > On 21 Nov 2018, at 14:49, Becket Qin <becket....@gmail.com> wrote: > > > > Hi Aljoscha, > > > > Good point on the potential optimization in the source. One thing to > > clarify, by "adding a minimumTimestamp()/maximumTimestamp() method pair > to > > the split interface", did you mean "split reader interface"? If so, what > > should the readers do if they do not have such additional information? I > am > > wondering if it is possible to leave such optimization to the source > > internal implementation. > > > > @all > > After reading all the feedback, Biao and I talked a little bit offline. > We > > would like to share some new thoughts with you and see what do you think. > > > > When looking at the Source API, we were trying to answer two questions. > > First of all, how would Flink use this API if someone else implemented > it. > > Secondly, how would the connector contributors implement the interface? > How > > difficult is the implementation. > > > > KafkaConsumer is a typical example of a thread-less reader. The idea was > to > > allow different threading model on top of it. It could be a global single > > thread handles record fetching and processing in an event loop pattern; > it > > could also be one dedicated fetcher thread for each consumer and a > separate > > thread pool for record processing. The API gives the freedom of picking > up > > threading model to the users. To answer the first question, I would love > to > > have such a source reader API so Flink can choose whatever threading > model > > it wants. However, implementing such an interface could be pretty > > challenging and error prone. > > > > On the other hand, having a source reader with a naive blocking socket is > > probably simple enough in most cases (actually sometimes this might even > be > > the most efficient way). But it does not leave much option to Flink other > > than creating one thread per reader. > > > > Given the above thoughts, it might be reasonable to separate the > > SplitReader API into two: SyncReader and AsyncReader. The sync reader > just > > has a simple blocking takeNext() API. And the AsyncReader just has a > > pollNext(Callback) or Future<?> pollNext(). All the other methods are > > shared by both readers and could be put into a package private parent > > interface like BaseSplitReader. > > > > Having these two readers allows both complicated and simple > implementation, > > depending on the SplitReader writers. From Flink's perspective, it will > > choose a more efficient threading model if the SplitReader is an > > AsyncReader. Otherwise, it may have to use the one thread per reader > model > > if the reader is a SyncReader. Users can also choose to implement both > > interface, in that case, it is up to Flink to choose which interface to > use. > > > > Admittedly, this solution does have one more interface, but still seems > > rewarding. Any thoughts? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > On Sun, Nov 18, 2018 at 11:33 PM Biao Liu <mmyy1...@gmail.com> wrote: > > > >> Hi community, > >> > >> Thank you guys for sharing ideas. > >> > >> The thing I really concern is about the thread mode. > >> Actually in Alibaba, we have implemented our "split reader" based source > >> two years ago. That's based on "SourceFunction", it's just an extension > not > >> a refactoring. It's almost same with the version Thomas and Jamie > described > >> in Google Doc. It really helps in many scenarios. > >> > >> However I don't like the thread mode which starts a thread for each > split. > >> Starting extra thread in operator is not an ideal way IMO. Especially > >> thread count is decided by split count. So I was wondering if there is a > >> more elegant way. Do we really want these threads in Flink core? > >> > >> I agree that blocking interface is more easy to implement. Could we at > >> least separate the split reader with source function into different > >> interfaces? Not all sources would like to read all splits concurrently. > In > >> batch scenario, reading splits one by one is more general. And also not > all > >> sources are partitioned, right? > >> I prefer there is a new source interface with "pull mode" only, no > split. > >> There is a splittable source extended it. And there is one > implementation > >> that starting threads for each split, reading all splits concurrently. > >> > >> > >> Thomas Weise <t...@apache.org> 于2018年11月18日周日 上午3:18写道: > >> > >>> @Aljoscha to address your question first: In the case of the Kinesis > >>> consumer (with current Kinesis consumer API), there would also be N+1 > >>> threads. I have implemented a prototype similar to what is shown in > >> Jamie's > >>> document, where the thread ownership is similar to what you have done > for > >>> Kafka. > >>> > >>> The equivalent of split reader manages its own thread and the "source > >> main > >>> thread" is responsible for emitting the data. The interface between > the N > >>> reader threads and the 1 emitter is a blocking queue per consumer > thread. > >>> The emitter can now control which queue to consume from based on the > >> event > >>> time progress. > >>> > >>> This is akin to a "non-blocking" interface *between emitter and split > >>> reader*. Emitter uses poll to retrieve records from the N queues (which > >>> requires non-blocking interaction). The emitter is independent of the > >> split > >>> reader implementation, that part could live in Flink. > >>> > >>> Regarding whether or not to assume that split readers always need a > >> thread > >>> and in addition that these reader threads should be managed by Flink: > It > >>> depends on the API of respective external systems and I would not bake > >> that > >>> assumption into Flink. Some client libraries manage their own threads > >> (see > >>> push based API like JMS and as I understand it may also apply to the > new > >>> fan-out Kinesis API: > >>> > >>> > >> > https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html > >>> ). > >>> In such cases it would not make sense to layer another reader thread on > >>> top. It may instead be better if Flink provides to the split reader the > >>> queue/buffer to push records to. > >>> > >>> The discussion so far has largely ignored the discovery aspect. There > are > >>> some important considerations such as ordering dependency of splits and > >>> work rebalancing that may affect the split reader interface. Should we > >> fork > >>> this into a separate thread? > >>> > >>> Thanks, > >>> Thomas > >>> > >>> > >>> On Fri, Nov 16, 2018 at 8:09 AM Piotr Nowojski < > pi...@data-artisans.com> > >>> wrote: > >>> > >>>> Hi Jamie, > >>>> > >>>> As it was already covered with my discussion with Becket, there is an > >>> easy > >>>> way to provide blocking API on top of non-blocking API. And yes we > both > >>>> agreed that blocking API is easier to implement by users. > >>>> > >>>> I also do not agree with respect to usefulness of non blocking API. > >>>> Actually Kafka connector is the one that could be more efficient > thanks > >>> to > >>>> the removal of the one layer of threading. > >>>> > >>>> Piotrek > >>>> > >>>>> On 16 Nov 2018, at 02:21, Jamie Grier <jgr...@lyft.com.INVALID> > >> wrote: > >>>>> > >>>>> Thanks Aljoscha for getting this effort going! > >>>>> > >>>>> There's been plenty of discussion here already and I'll add my big +1 > >>> to > >>>>> making this interface very simple to implement for a new > >>>>> Source/SplitReader. Writing a new production quality connector for > >>> Flink > >>>>> is very difficult today and requires a lot of detailed knowledge > >> about > >>>>> Flink, event time progress, watermarking, idle shard detection, etc > >> and > >>>> it > >>>>> would be good to move almost all of this type of code into Flink > >> itself > >>>> and > >>>>> out of source implementations. I also think this is totally doable > >> and > >>>> I'm > >>>>> really excited to see this happening. > >>>>> > >>>>> I do have a couple of thoughts about the API and the implementation.. > >>>>> > >>>>> In a perfect world there would be a single thread per Flink source > >>>> sub-task > >>>>> and no additional threads for SplitReaders -- but this assumes a > >> world > >>>>> where you have true async IO APIs for the upstream systems (like > >> Kafka > >>>> and > >>>>> Kinesis, S3, HDFS, etc). If that world did exist the single thread > >>> could > >>>>> just sit in an efficient select() call waiting for new data to arrive > >>> on > >>>>> any Split. That'd be awesome.. > >>>>> > >>>>> But, that world doesn't exist and given that practical consideration > >> I > >>>>> would think the next best implementation is going to be, in practice, > >>>>> probably a thread per SplitReader that does nothing but call the > >> source > >>>> API > >>>>> and drop whatever it reads into a (blocking) queue -- as Aljoscha > >>>> mentioned > >>>>> (calling it N+1) and as we started to describe here: > >>>>> > >>>> > >>> > >> > https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit#heading=h.lfi2u4fv9cxa > >>>>> > >>>>> I guess my point is that I think we should strive to move as much of > >>>>> something like the diagram referenced in the above doc into Flink > >>> itself > >>>>> and out of sources and simplify the SplitReader API as much as > >> possible > >>>> as > >>>>> well. > >>>>> > >>>>> With the above in mind and with regard to the discussion about > >>> blocking, > >>>>> etc.. I'm not sure I agree with some of the discussion so far with > >>>> regard > >>>>> to this API design. The calls to the upstream systems > >> (kafka/kinesis) > >>>> are > >>>>> in fact going to be blocking calls. So a simple API without the > >>>> constraint > >>>>> that the methods must be implemented in a non-blocking way seems > >> better > >>>> to > >>>>> me from the point of view of somebody writing a new source > >>>> implementation. > >>>>> My concern is that if you force the implementer of the SplitReader > >>>>> interface to do so in a non-blocking way you're just going to make it > >>>>> harder to write those implementations. Those calls to read the next > >>> bit > >>>> of > >>>>> data are going to be blocking calls with most known important sources > >>> -- > >>>> at > >>>>> least Kafka/Kinesis/HDFS -- so I think maybe we should just deal with > >>>> that > >>>>> head on and work around it a higher level so the SplitReader > >> interface > >>>>> stays super simple to implement. This means we manage all the > >>> threading > >>>> in > >>>>> Flink core, the API stays pull-based, and the implementer is allowed > >> to > >>>>> simply block until they have data to return. > >>>>> > >>>>> I maybe would change my mind about this if truly asynchronous APIs to > >>> the > >>>>> upstream source systems were likely to be available in the near > >> future > >>> or > >>>>> are now and I'm just ignorant of it. But even then the supporting > >> code > >>>> in > >>>>> Flink to drive async and sync sources would be different and in fact > >>> they > >>>>> might just have different APIs altogether -- SplitReader vs > >>>>> AsyncSplitReader maybe. > >>>>> > >>>>> In the end I think playing with the implementation, across more than > >>> one > >>>>> source, and moving as much common code into Flink itself will reveal > >>> the > >>>>> best API of course. > >>>>> > >>>>> One other interesting note is that you need to preserve per-partition > >>>>> ordering so you have to take care with the implementation if it were > >> to > >>>> be > >>>>> based on a thread pool and futures so as not to reorder the reads. > >>>>> > >>>>> Anyway, I'm thrilled to see this starting to move forward and I'd > >> very > >>>> much > >>>>> like to help with the implementation wherever I can. We're doing a > >>>>> simplified internal version of some of this at Lyft for just Kinesis > >>>>> because we need a solution for event time alignment in the very short > >>>> term > >>>>> but we'd like to immediately start helping to do this properly in > >> Flink > >>>>> after that. One of the end goals for us is event time alignment > >> across > >>>>> heterogeneous sources. Another is making it possible for non-expert > >>>> users > >>>>> to have a high probability of being able to write their own, correct, > >>>>> connectors. > >>>>> > >>>>> -Jamie > >>>>> > >>>>> On Thu, Nov 15, 2018 at 3:43 AM Aljoscha Krettek < > >> aljos...@apache.org> > >>>>> wrote: > >>>>> > >>>>>> Hi, > >>>>>> > >>>>>> I thought I had sent this mail a while ago but I must have forgotten > >>> to > >>>>>> send it. > >>>>>> > >>>>>> There is another thing we should consider for splits: the range of > >>>>>> timestamps that it can contain. For example, the splits of a file > >>> source > >>>>>> would know what the minimum and maximum timestamp in the splits is, > >>>>>> roughly. For infinite splits, such as Kafka partitions, the minimum > >>>> would > >>>>>> be meaningful but the maximum would be +Inf. If the splits expose > >> the > >>>>>> interval of time that they contain the readers, or the component > >> that > >>>>>> manages the readers can make decisions about which splits to forward > >>> and > >>>>>> read first. And it can also influence the minimum watermark that a > >>>> reader > >>>>>> forwards: it should never emit a watermark if it knows there are > >>> splits > >>>> to > >>>>>> read that have a lower minimum timestamp. I think it should be as > >> easy > >>>> as > >>>>>> adding a minimumTimestamp()/maximumTimestamp() method pair to the > >>> split > >>>>>> interface. > >>>>>> > >>>>>> Another thing we need to resolve is the actual reader interface. I > >> see > >>>>>> there has been some good discussion but I don't know if we have > >>>> consensus. > >>>>>> We should try and see how specific sources could be implemented with > >>> the > >>>>>> new interface. For example, for Kafka I think we need to have N+1 > >>>> threads > >>>>>> per task (where N is the number of splits that a task is reading > >>> from). > >>>> On > >>>>>> thread is responsible for reading from the splits. And each split > >> has > >>>> its > >>>>>> own (internal) thread for reading from Kafka and putting messages in > >>> an > >>>>>> internal queue to pull from. This is similar to how the current > >> Kafka > >>>>>> source is implemented, which has a separate fetcher thread. The > >> reason > >>>> for > >>>>>> this split is that we always need to try reading from Kafka to keep > >>> the > >>>>>> throughput up. In the current implementation the internal queue (or > >>>>>> handover) limits the read rate of the reader threads. > >>>>>> > >>>>>> @Thomas, what do you think this would look like for Kinesis? > >>>>>> > >>>>>> Best, > >>>>>> Aljoscha > >>>>>> > >>>>>>> On 15. Nov 2018, at 03:56, Becket Qin <becket....@gmail.com> > >> wrote: > >>>>>>> > >>>>>>> Hi Piotrek, > >>>>>>> > >>>>>>> Thanks a lot for the detailed reply. All makes sense to me. > >>>>>>> > >>>>>>> WRT the confusion between advance() / getCurrent(), do you think it > >>>> would > >>>>>>> help if we combine them and have something like: > >>>>>>> > >>>>>>> CompletableFuture<T> getNext(); > >>>>>>> long getWatermark(); > >>>>>>> long getCurrentTimestamp(); > >>>>>>> > >>>>>>> Cheers, > >>>>>>> > >>>>>>> Jiangjie (Becket) Qin > >>>>>>> > >>>>>>> On Tue, Nov 13, 2018 at 9:56 PM Piotr Nowojski < > >>>> pi...@data-artisans.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> Thanks again for the detailed answer :) Sorry for responding with > >> a > >>>>>> delay. > >>>>>>>> > >>>>>>>>> Completely agree that in pattern 2, having a callback is > >> necessary > >>>> for > >>>>>>>> that > >>>>>>>>> single thread outside of the connectors. And the connectors MUST > >>> have > >>>>>>>>> internal threads. > >>>>>>>> > >>>>>>>> Yes, this thread will have to exists somewhere. In pattern 2 it > >>> exists > >>>>>> in > >>>>>>>> the connector (at least from the perspective of the Flink > >> execution > >>>>>>>> engine). In pattern 1 it exists inside the Flink execution engine. > >>>> With > >>>>>>>> completely blocking connectors, like simple reading from files, > >> both > >>>> of > >>>>>>>> those approaches are basically the same. The difference is when > >> user > >>>>>>>> implementing Flink source is already working with a non blocking > >>> code > >>>>>> with > >>>>>>>> some internal threads. In this case, pattern 1 would result in > >>> "double > >>>>>>>> thread wrapping”, while pattern 2 would allow to skip one layer of > >>>>>>>> indirection. > >>>>>>>> > >>>>>>>>> If we go that way, we should have something like "void > >>>>>>>>> poll(Callback) / void advance(callback)". I am curious how would > >>>>>>>>> CompletableFuture work here, though. If 10 readers returns 10 > >>>>>> completable > >>>>>>>>> futures, will there be 10 additional threads (so 20 threads in > >>> total) > >>>>>>>>> blocking waiting on them? Or will there be a single thread busy > >>> loop > >>>>>>>>> checking around? > >>>>>>>> > >>>>>>>> To be honest, I haven’t thought this completely through and I > >>> haven’t > >>>>>>>> tested/POC’ed it. Having said that, I can think of at least couple > >>> of > >>>>>>>> solutions. First is something like this: > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507 > >>>>>>>> < > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507 > >>>>>>>>> > >>>>>>>> > >>>>>>>> Line: > >>>>>>>> > >>>>>>>> `blocked = split.process();` > >>>>>>>> > >>>>>>>> Is where the execution goes into to the task/sources. This is > >> where > >>>> the > >>>>>>>> returned future is handled: > >>>>>>>> > >>>>>>>> blocked.addListener(() -> { > >>>>>>>> blockedSplits.remove(split); > >>>>>>>> // reset the level priority to > >>>>>> prevent > >>>>>>>> previously-blocked splits from starving existing splits > >>>>>>>> split.resetLevelPriority(); > >>>>>>>> waitingSplits.offer(split); > >>>>>>>> }, executor); > >>>>>>>> > >>>>>>>> Fundamentally callbacks and Futures are more or less > >> interchangeable > >>>> You > >>>>>>>> can always wrap one into another (creating a callback that > >>> completes a > >>>>>>>> future and attach a callback once future completes). In this case > >>> the > >>>>>>>> difference for me is mostly: > >>>>>>>> - api with passing callback allows the callback to be fired > >> multiple > >>>>>> times > >>>>>>>> and to fire it even if the connector is not blocked. This is what > >> I > >>>>>> meant > >>>>>>>> by saying that api `CompletableFuture<?> isBlocked()` is a bit > >>>> simpler. > >>>>>>>> Connector can only return either “I’m not blocked” or “I’m blocked > >>>> and I > >>>>>>>> will tell you only once when I’m not blocked anymore”. > >>>>>>>> > >>>>>>>> But this is not the most important thing for me here. For me > >>> important > >>>>>>>> thing is to try our best to make Flink task’s control and > >> execution > >>>>>> single > >>>>>>>> threaded. For that both callback and future APIs should work the > >>> same. > >>>>>>>> > >>>>>>>>> WRT pattern 1, a single blocking take() API should just work. The > >>>> good > >>>>>>>>> thing is that a blocking read API is usually simpler to > >> implement. > >>>>>>>> > >>>>>>>> Yes, they are easier to implement (especially if you are not the > >> one > >>>>>> that > >>>>>>>> have to deal with the additional threading required around them ;) > >>> ). > >>>>>> But > >>>>>>>> to answer this issue, if we choose pattern 2, we can always > >> provide > >>> a > >>>>>>>> proxy/wrapper that would using the internal thread implement the > >>>>>>>> non-blocking API while exposing blocking API to the user. It would > >>>>>>>> implement pattern 2 for the user exposing to him pattern 1. In > >> other > >>>>>> words > >>>>>>>> implementing pattern 1 in pattern 2 paradigm, while making it > >>> possible > >>>>>> to > >>>>>>>> implement pure pattern 2 connectors. > >>>>>>>> > >>>>>>>>> BTW, one thing I am also trying to avoid is pushing users to > >>> perform > >>>> IO > >>>>>>>> in > >>>>>>>>> a method like "isBlocked()". If the method is expected to fetch > >>>> records > >>>>>>>>> (even if not returning them), naming it something more explicit > >>> would > >>>>>>>> help > >>>>>>>>> avoid confusion. > >>>>>>>> > >>>>>>>> If we choose so, we could rework it into something like: > >>>>>>>> > >>>>>>>> CompletableFuture<?> advance() > >>>>>>>> T getCurrent(); > >>>>>>>> Watermark getCurrentWatermark() > >>>>>>>> > >>>>>>>> But as I wrote before, this is more confusing to me for the exact > >>>>>> reasons > >>>>>>>> you mentioned :) I would be confused what should be done in > >>>> `adanvce()` > >>>>>> and > >>>>>>>> what in `getCurrent()`. However, again this naming issue is not > >> that > >>>>>>>> important to me and probably is matter of taste/personal > >>> preferences. > >>>>>>>> > >>>>>>>> Piotrek > >>>>>>>> > >>>>>>>>> On 9 Nov 2018, at 18:37, Becket Qin <becket....@gmail.com> > >> wrote: > >>>>>>>>> > >>>>>>>>> Hi Piotrek, > >>>>>>>>> > >>>>>>>>> Thanks for the explanation. We are probably talking about the > >> same > >>>>>> thing > >>>>>>>>> but in different ways. To clarify a little bit, I think there are > >>> two > >>>>>>>>> patterns to read from a connector. > >>>>>>>>> > >>>>>>>>> Pattern 1: Thread-less connector with a blocking read API. > >> Outside > >>> of > >>>>>> the > >>>>>>>>> connector, there is one IO thread per reader, doing blocking > >> read. > >>> An > >>>>>>>>> additional thread will interact with all the IO threads. > >>>>>>>>> Pattern 2: Connector with internal thread(s) and non-blocking > >> API. > >>>>>>>> Outside > >>>>>>>>> of the connector, there is one thread for ALL readers, doing IO > >>>> relying > >>>>>>>> on > >>>>>>>>> notification callbacks in the reader. > >>>>>>>>> > >>>>>>>>> In both patterns, there must be at least one thread per > >> connector, > >>>>>> either > >>>>>>>>> inside (created by connector writers) or outside (created by > >> Flink) > >>>> of > >>>>>>>> the > >>>>>>>>> connector. Ideally there are NUM_CONNECTORS + 1 threads in total, > >>> to > >>>>>> make > >>>>>>>>> sure that 1 thread is fully non-blocking. > >>>>>>>>> > >>>>>>>>>> Btw, I don’t know if you understand my point. Having only > >> `poll()` > >>>> and > >>>>>>>>> `take()` is not enough for single threaded task. If our source > >>>>>> interface > >>>>>>>>> doesn’t provide `notify()` callback nor >`CompletableFuture<?> > >>>>>>>>> isBlocked(),`, there is no way to implement single threaded task > >>> that > >>>>>>>> both > >>>>>>>>> reads the data from the source connector and can also react to > >>> system > >>>>>>>>> events. Ok, non >blocking `poll()` would allow that, but with > >> busy > >>>>>>>> looping. > >>>>>>>>> > >>>>>>>>> Completely agree that in pattern 2, having a callback is > >> necessary > >>>> for > >>>>>>>> that > >>>>>>>>> single thread outside of the connectors. And the connectors MUST > >>> have > >>>>>>>>> internal threads. If we go that way, we should have something > >> like > >>>>>> "void > >>>>>>>>> poll(Callback) / void advance(callback)". I am curious how would > >>>>>>>>> CompletableFuture work here, though. If 10 readers returns 10 > >>>>>> completable > >>>>>>>>> futures, will there be 10 additional threads (so 20 threads in > >>> total) > >>>>>>>>> blocking waiting on them? Or will there be a single thread busy > >>> loop > >>>>>>>>> checking around? > >>>>>>>>> > >>>>>>>>> WRT pattern 1, a single blocking take() API should just work. The > >>>> good > >>>>>>>>> thing is that a blocking read API is usually simpler to > >> implement. > >>> An > >>>>>>>>> additional non-blocking "T poll()" method here is indeed optional > >>> and > >>>>>>>> could > >>>>>>>>> be used in cases like Flink does not want the thread to block > >>>> forever. > >>>>>>>> They > >>>>>>>>> can also be combined to have a "T poll(Timeout)", which is > >> exactly > >>>> what > >>>>>>>>> KafkaConsumer did. > >>>>>>>>> > >>>>>>>>> It sounds that you are proposing pattern 2 with something similar > >>> to > >>>>>> NIO2 > >>>>>>>>> AsynchronousByteChannel[1]. That API would work, except that the > >>>>>>>> signature > >>>>>>>>> returning future seems not necessary. If that is the case, a > >> minor > >>>>>> change > >>>>>>>>> on the current FLIP proposal to have "void advance(callback)" > >>> should > >>>>>>>> work. > >>>>>>>>> And this means the connectors MUST have their internal threads. > >>>>>>>>> > >>>>>>>>> BTW, one thing I am also trying to avoid is pushing users to > >>> perform > >>>> IO > >>>>>>>> in > >>>>>>>>> a method like "isBlocked()". If the method is expected to fetch > >>>> records > >>>>>>>>> (even if not returning them), naming it something more explicit > >>> would > >>>>>>>> help > >>>>>>>>> avoid confusion. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> > >>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>> > >>>>>>>>> [1] > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://docs.oracle.com/javase/8/docs/api/java/nio/channels/AsynchronousByteChannel.html > >>>>>>>>> > >>>>>>>>> On Fri, Nov 9, 2018 at 11:20 PM Piotr Nowojski < > >>>>>> pi...@data-artisans.com> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi > >>>>>>>>>> > >>>>>>>>>> Good point with select/epoll, however I do not see how they > >>> couldn’t > >>>>>> be > >>>>>>>>>> with Flink if we would like single task in Flink to be > >>>> single-threaded > >>>>>>>> (and > >>>>>>>>>> I believe we should pursue this goal). If your connector blocks > >> on > >>>>>>>>>> `select`, then it can not process/handle control messages from > >>>> Flink, > >>>>>>>> like > >>>>>>>>>> checkpoints, releasing resources and potentially output flushes. > >>>> This > >>>>>>>> would > >>>>>>>>>> require tight integration between connector and Flink’s main > >> event > >>>>>>>>>> loop/selects/etc. > >>>>>>>>>> > >>>>>>>>>> Looking at it from other perspective. Let’s assume that we have > >> a > >>>>>>>>>> connector implemented on top of `select`/`epoll`. In order to > >>>>>> integrate > >>>>>>>> it > >>>>>>>>>> with Flink’s checkpointing/flushes/resource releasing it will > >> have > >>>> to > >>>>>> be > >>>>>>>>>> executed in separate thread one way or another. At least if our > >>> API > >>>>>> will > >>>>>>>>>> enforce/encourage non blocking implementations with some kind of > >>>>>>>>>> notifications (`isBlocked()` or `notify()` callback), some > >>>> connectors > >>>>>>>> might > >>>>>>>>>> skip one layer of wapping threads. > >>>>>>>>>> > >>>>>>>>>> Btw, I don’t know if you understand my point. Having only > >> `poll()` > >>>> and > >>>>>>>>>> `take()` is not enough for single threaded task. If our source > >>>>>> interface > >>>>>>>>>> doesn’t provide `notify()` callback nor `CompletableFuture<?> > >>>>>>>>>> isBlocked(),`, there is no way to implement single threaded task > >>>> that > >>>>>>>> both > >>>>>>>>>> reads the data from the source connector and can also react to > >>>> system > >>>>>>>>>> events. Ok, non blocking `poll()` would allow that, but with > >> busy > >>>>>>>> looping. > >>>>>>>>>> > >>>>>>>>>> Piotrek > >>>>>>>>>> > >>>>>>>>>>> On 8 Nov 2018, at 06:56, Becket Qin <becket....@gmail.com> > >>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hi Piotrek, > >>>>>>>>>>> > >>>>>>>>>>>> But I don’t see a reason why we should expose both blocking > >>>> `take()` > >>>>>>>> and > >>>>>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone > >> (Flink > >>>>>>>> engine > >>>>>>>>>> or > >>>>>>>>>>> connector) would have to do the same busy > >>>>>>>>>>>> looping anyway and I think it would be better to have a > >> simpler > >>>>>>>>>> connector > >>>>>>>>>>> API (that would solve our problems) and force connectors to > >>> comply > >>>>>> one > >>>>>>>>>> way > >>>>>>>>>>> or another. > >>>>>>>>>>> > >>>>>>>>>>> If we let the block happen inside the connector, the blocking > >>> does > >>>>>> not > >>>>>>>>>> have > >>>>>>>>>>> to be a busy loop. For example, to do the block waiting > >>>> efficiently, > >>>>>>>> the > >>>>>>>>>>> connector can use java NIO selector().select which relies on OS > >>>>>> syscall > >>>>>>>>>>> like epoll[1] instead of busy looping. But if Flink engine > >> blocks > >>>>>>>> outside > >>>>>>>>>>> the connector, it pretty much has to do the busy loop. So if > >>> there > >>>> is > >>>>>>>>>> only > >>>>>>>>>>> one API to get the element, a blocking getNextElement() makes > >>> more > >>>>>>>> sense. > >>>>>>>>>>> In any case, we should avoid ambiguity. It has to be crystal > >>> clear > >>>>>>>> about > >>>>>>>>>>> whether a method is expected to be blocking or non-blocking. > >>>>>> Otherwise > >>>>>>>> it > >>>>>>>>>>> would be very difficult for Flink engine to do the right thing > >>> with > >>>>>> the > >>>>>>>>>>> connectors. At the first glance at getCurrent(), the expected > >>>>>> behavior > >>>>>>>> is > >>>>>>>>>>> not quite clear. > >>>>>>>>>>> > >>>>>>>>>>> That said, I do agree that functionality wise, poll() and > >> take() > >>>> kind > >>>>>>>> of > >>>>>>>>>>> overlap. But they are actually not quite different from > >>>>>>>>>>> isBlocked()/getNextElement(). Compared with isBlocked(), the > >> only > >>>>>>>>>>> difference is that poll() also returns the next record if it is > >>>>>>>>>> available. > >>>>>>>>>>> But I agree that the isBlocked() + getNextElement() is more > >>>> flexible > >>>>>> as > >>>>>>>>>>> users can just check the record availability, but not fetch the > >>>> next > >>>>>>>>>>> element. > >>>>>>>>>>> > >>>>>>>>>>>> In case of thread-less readers with only non-blocking > >>>> `queue.poll()` > >>>>>>>> (is > >>>>>>>>>>> that really a thing? I can not think about a real > >> implementation > >>>> that > >>>>>>>>>>> enforces such constraints) > >>>>>>>>>>> Right, it is pretty much a syntax sugar to allow user combine > >> the > >>>>>>>>>>> check-and-take into one method. It could be achieved with > >>>>>> isBlocked() + > >>>>>>>>>>> getNextElement(). > >>>>>>>>>>> > >>>>>>>>>>> [1] http://man7.org/linux/man-pages/man7/epoll.7.html > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> > >>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>> > >>>>>>>>>>> On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski < > >>>>>>>> pi...@data-artisans.com> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Becket, > >>>>>>>>>>>> > >>>>>>>>>>>> With my proposal, both of your examples would have to be > >> solved > >>> by > >>>>>> the > >>>>>>>>>>>> connector and solution to both problems would be the same: > >>>>>>>>>>>> > >>>>>>>>>>>> Pretend that connector is never blocked (`isBlocked() { return > >>>>>>>>>>>> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking > >>>>>> fashion > >>>>>>>>>> (or > >>>>>>>>>>>> semi blocking with return of control from time to time to > >> allow > >>>> for > >>>>>>>>>>>> checkpointing, network flushing and other resource management > >>>> things > >>>>>>>> to > >>>>>>>>>>>> happen in the same main thread). In other words, exactly how > >> you > >>>>>> would > >>>>>>>>>>>> implement `take()` method or how the same source connector > >> would > >>>> be > >>>>>>>>>>>> implemented NOW with current source interface. The difference > >>> with > >>>>>>>>>> current > >>>>>>>>>>>> interface would be only that main loop would be outside of the > >>>>>>>>>> connector, > >>>>>>>>>>>> and instead of periodically releasing checkpointing lock, > >>>>>> periodically > >>>>>>>>>>>> `return null;` or `return Optional.empty();` from > >>>>>> `getNextElement()`. > >>>>>>>>>>>> > >>>>>>>>>>>> In case of thread-less readers with only non-blocking > >>>> `queue.poll()` > >>>>>>>> (is > >>>>>>>>>>>> that really a thing? I can not think about a real > >> implementation > >>>>>> that > >>>>>>>>>>>> enforces such constraints), we could provide a wrapper that > >>> hides > >>>>>> the > >>>>>>>>>> busy > >>>>>>>>>>>> looping. The same applies how to solve forever blocking > >> readers > >>> - > >>>> we > >>>>>>>>>> could > >>>>>>>>>>>> provider another wrapper running the connector in separate > >>> thread. > >>>>>>>>>>>> > >>>>>>>>>>>> But I don’t see a reason why we should expose both blocking > >>>> `take()` > >>>>>>>> and > >>>>>>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone > >>> (Flink > >>>>>>>>>> engine or > >>>>>>>>>>>> connector) would have to do the same busy looping anyway and I > >>>> think > >>>>>>>> it > >>>>>>>>>>>> would be better to have a simpler connector API (that would > >>> solve > >>>>>> our > >>>>>>>>>>>> problems) and force connectors to comply one way or another. > >>>>>>>>>>>> > >>>>>>>>>>>> Piotrek > >>>>>>>>>>>> > >>>>>>>>>>>>> On 7 Nov 2018, at 10:55, Becket Qin <becket....@gmail.com> > >>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi Piotr, > >>>>>>>>>>>>> > >>>>>>>>>>>>> I might have misunderstood you proposal. But let me try to > >>>> explain > >>>>>> my > >>>>>>>>>>>>> concern. I am thinking about the following case: > >>>>>>>>>>>>> 1. a reader has the following two interfaces, > >>>>>>>>>>>>> boolean isBlocked() > >>>>>>>>>>>>> T getNextElement() > >>>>>>>>>>>>> 2. the implementation of getNextElement() is non-blocking. > >>>>>>>>>>>>> 3. The reader is thread-less, i.e. it does not have any > >>> internal > >>>>>>>>>> thread. > >>>>>>>>>>>>> For example, it might just delegate the getNextElement() to a > >>>>>>>>>>>> queue.poll(), > >>>>>>>>>>>>> and isBlocked() is just queue.isEmpty(). > >>>>>>>>>>>>> > >>>>>>>>>>>>> How can Flink efficiently implement a blocking reading > >> behavior > >>>>>> with > >>>>>>>>>> this > >>>>>>>>>>>>> reader? Either a tight loop or a backoff interval is needed. > >>>>>> Neither > >>>>>>>> of > >>>>>>>>>>>>> them is ideal. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Now let's say in the reader mentioned above implements a > >>> blocking > >>>>>>>>>>>>> getNextElement() method. Because there is no internal thread > >> in > >>>> the > >>>>>>>>>>>> reader, > >>>>>>>>>>>>> after isBlocked() returns false. Flink will still have to > >> loop > >>> on > >>>>>>>>>>>>> isBlocked() to check whether the next record is available. If > >>> the > >>>>>>>> next > >>>>>>>>>>>>> record reaches after 10 min, it is a tight loop for 10 min. > >> You > >>>>>> have > >>>>>>>>>>>>> probably noticed that in this case, even isBlocked() returns > >> a > >>>>>>>> future, > >>>>>>>>>>>> that > >>>>>>>>>>>>> future() will not be completed if Flink does not call some > >>> method > >>>>>>>> from > >>>>>>>>>>>> the > >>>>>>>>>>>>> reader, because the reader has no internal thread to complete > >>>> that > >>>>>>>>>> future > >>>>>>>>>>>>> by itself. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Due to the above reasons, a blocking take() API would allow > >>> Flink > >>>>>> to > >>>>>>>>>> have > >>>>>>>>>>>>> an efficient way to read from a reader. There are many ways > >> to > >>>> wake > >>>>>>>> up > >>>>>>>>>>>> the > >>>>>>>>>>>>> blocking thread when checkpointing is needed depending on the > >>>>>>>>>>>>> implementation. But I think the poll()/take() API would also > >>> work > >>>>>> in > >>>>>>>>>> that > >>>>>>>>>>>>> case. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski < > >>>>>>>> pi...@data-artisans.com > >>>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> a) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> BTW, regarding the isBlock() method, I have a few more > >>>> questions. > >>>>>>>> 21, > >>>>>>>>>>>> Is > >>>>>>>>>>>>>> a method isReady() with boolean as a return value > >>>>>>>>>>>>>>> equivalent? Personally I found it is a little bit confusing > >>> in > >>>>>> what > >>>>>>>>>> is > >>>>>>>>>>>>>> supposed to be returned when the future is completed. 22. if > >>>>>>>>>>>>>>> the implementation of isBlocked() is optional, how do the > >>>> callers > >>>>>>>>>> know > >>>>>>>>>>>>>> whether the method is properly implemented or not? > >>>>>>>>>>>>>>> Does not implemented mean it always return a completed > >>> future? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> `CompletableFuture<?> isBlocked()` is more or less an > >>> equivalent > >>>>>> to > >>>>>>>>>>>>>> `boolean hasNext()` which in case of “false” provides some > >>> kind > >>>>>> of a > >>>>>>>>>>>>>> listener/callback that notifies about presence of next > >>> element. > >>>>>>>> There > >>>>>>>>>>>> are > >>>>>>>>>>>>>> some minor details, like `CompletableFuture<?>` has a > >> minimal > >>>> two > >>>>>>>>>> state > >>>>>>>>>>>>>> logic: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. Future is completed - we have more data > >>>>>>>>>>>>>> 2. Future not yet completed - we don’t have data now, but we > >>>>>>>> might/we > >>>>>>>>>>>> will > >>>>>>>>>>>>>> have in the future > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> While `boolean hasNext()` and `notify()` callback are a bit > >>> more > >>>>>>>>>>>>>> complicated/dispersed and can lead/encourage `notify()` > >> spam. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> b) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 3. If merge the `advance` and `getCurrent` to one method > >>> like > >>>>>>>>>>>> `getNext` > >>>>>>>>>>>>>> the `getNext` would need return a > >>>>>>>>>>>>>>> `ElementWithTimestamp` because some sources want to add > >>>> timestamp > >>>>>>>> to > >>>>>>>>>>>>>> every element. IMO, this is not so memory friendly > >>>>>>>>>>>>>>> so I prefer this design. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Guowei I don’t quite understand this. Could you elaborate > >> why > >>>>>>>> having a > >>>>>>>>>>>>>> separate `advance()` help? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> c) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Regarding advance/poll/take. What’s the value of having two > >>>>>> separate > >>>>>>>>>>>>>> methods: poll and take? Which one of them should be called > >> and > >>>>>> which > >>>>>>>>>>>>>> implemented? What’s the benefit of having those methods > >>> compared > >>>>>> to > >>>>>>>>>>>> having > >>>>>>>>>>>>>> a one single method `getNextElement()` (or `pollElement() or > >>>>>>>> whatever > >>>>>>>>>> we > >>>>>>>>>>>>>> name it) with following contract: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> CompletableFuture<?> isBlocked(); > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> /** > >>>>>>>>>>>>>> Return next element - will be called only if `isBlocked()` > >> is > >>>>>>>>>> completed. > >>>>>>>>>>>>>> Try to implement it in non blocking fashion, but if that’s > >>>>>>>> impossible > >>>>>>>>>> or > >>>>>>>>>>>>>> you just don’t need the effort, you can block in this > >> method. > >>>>>>>>>>>>>> */ > >>>>>>>>>>>>>> T getNextElement(); > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I mean, if the connector is implemented non-blockingly, > >> Flink > >>>>>> should > >>>>>>>>>> use > >>>>>>>>>>>>>> it that way. If it’s not, then `poll()` will `throw new > >>>>>>>>>>>>>> NotImplementedException()`. Implementing both of them and > >>>>>> providing > >>>>>>>>>>>> both of > >>>>>>>>>>>>>> them to Flink wouldn’t make a sense, thus why not merge them > >>>> into > >>>>>> a > >>>>>>>>>>>> single > >>>>>>>>>>>>>> method call that should preferably (but not necessarily need > >>> to) > >>>>>> be > >>>>>>>>>>>>>> non-blocking? It’s not like we are implementing general > >>> purpose > >>>>>>>>>> `Queue`, > >>>>>>>>>>>>>> which users might want to call either of `poll` or `take`. > >> We > >>>>>> would > >>>>>>>>>>>> always > >>>>>>>>>>>>>> prefer to call `poll`, but if it’s blocking, then still we > >>> have > >>>> no > >>>>>>>>>>>> choice, > >>>>>>>>>>>>>> but to call it and block on it. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> d) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking > >> source > >>>> is > >>>>>>>> very > >>>>>>>>>>>>>>> important. But in addition to `Future/poll`, there may be > >>>> another > >>>>>>>> way > >>>>>>>>>>>> to > >>>>>>>>>>>>>>> achieve this. I think it may be not very memory friendly if > >>>> every > >>>>>>>>>>>> advance > >>>>>>>>>>>>>>> call return a Future. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I didn’t want to mention this, to not clog my initial > >>> proposal, > >>>>>> but > >>>>>>>>>>>> there > >>>>>>>>>>>>>> is a simple solution for the problem: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> public interface SplitReader { > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> (…) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> CompletableFuture<?> NOT_BLOCKED = > >>>>>>>>>>>>>> CompletableFuture.completedFuture(null); > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> /** > >>>>>>>>>>>>>> * Returns a future that will be completed when the page > >> source > >>>>>>>>>>>> becomes > >>>>>>>>>>>>>> * unblocked. If the page source is not blocked, this method > >>>>>>>> should > >>>>>>>>>>>>>> return > >>>>>>>>>>>>>> * {@code NOT_BLOCKED}. > >>>>>>>>>>>>>> */ > >>>>>>>>>>>>>> default CompletableFuture<?> isBlocked() > >>>>>>>>>>>>>> { > >>>>>>>>>>>>>> return NOT_BLOCKED; > >>>>>>>>>>>>>> } > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> If we are blocked and we are waiting for the IO, then > >>> creating a > >>>>>> new > >>>>>>>>>>>>>> Future is non-issue. Under full throttle/throughput and not > >>>>>> blocked > >>>>>>>>>>>> sources > >>>>>>>>>>>>>> returning a static `NOT_BLOCKED` constant should also solve > >>> the > >>>>>>>>>>>> problem. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> One more remark, non-blocking sources might be a necessity > >> in > >>> a > >>>>>>>> single > >>>>>>>>>>>>>> threaded model without a checkpointing lock. (Currently when > >>>>>> sources > >>>>>>>>>> are > >>>>>>>>>>>>>> blocked, they can release checkpointing lock and re-acquire > >> it > >>>>>> again > >>>>>>>>>>>>>> later). Non-blocking `poll`/`getNext()` would allow for > >>>>>> checkpoints > >>>>>>>> to > >>>>>>>>>>>>>> happen when source is idling. In that case either `notify()` > >>> or > >>>> my > >>>>>>>>>>>> proposed > >>>>>>>>>>>>>> `isBlocked()` would allow to avoid busy-looping. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Piotrek > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 5 Nov 2018, at 03:59, Becket Qin <becket....@gmail.com> > >>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Thomas, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> The iterator-like API was also the first thing that came to > >>> me. > >>>>>> But > >>>>>>>>>> it > >>>>>>>>>>>>>>> seems a little confusing that hasNext() does not mean "the > >>>> stream > >>>>>>>> has > >>>>>>>>>>>> not > >>>>>>>>>>>>>>> ended", but means "the next record is ready", which is > >>>>>> repurposing > >>>>>>>>>> the > >>>>>>>>>>>>>> well > >>>>>>>>>>>>>>> known meaning of hasNext(). If we follow the > >> hasNext()/next() > >>>>>>>>>> pattern, > >>>>>>>>>>>> an > >>>>>>>>>>>>>>> additional isNextReady() method to indicate whether the > >> next > >>>>>> record > >>>>>>>>>> is > >>>>>>>>>>>>>>> ready seems more intuitive to me. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Similarly, in poll()/take() pattern, another method of > >>> isDone() > >>>>>> is > >>>>>>>>>>>> needed > >>>>>>>>>>>>>>> to indicate whether the stream has ended or not. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Compared with hasNext()/next()/isNextReady() pattern, > >>>>>>>>>>>>>>> isDone()/poll()/take() seems more flexible for the reader > >>>>>>>>>>>> implementation. > >>>>>>>>>>>>>>> When I am implementing a reader, I could have a couple of > >>>>>> choices: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> - A thread-less reader that does not have any internal > >>> thread. > >>>>>>>>>>>>>>> - When poll() is called, the same calling thread will > >>> perform a > >>>>>>>> bunch > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>> IO asynchronously. > >>>>>>>>>>>>>>> - When take() is called, the same calling thread will > >>> perform a > >>>>>>>>>>>>>> bunch > >>>>>>>>>>>>>>> of IO and wait until the record is ready. > >>>>>>>>>>>>>>> - A reader with internal threads performing network IO and > >>> put > >>>>>>>>>> records > >>>>>>>>>>>>>>> into a buffer. > >>>>>>>>>>>>>>> - When poll() is called, the calling thread simply reads > >> from > >>>>>> the > >>>>>>>>>>>>>>> buffer and return empty result immediately if there is no > >>>>>> record. > >>>>>>>>>>>>>>> - When take() is called, the calling thread reads from the > >>>>>> buffer > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>> block waiting if the buffer is empty. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On the other hand, with the hasNext()/next()/isNextReady() > >>> API, > >>>>>> it > >>>>>>>> is > >>>>>>>>>>>>>> less > >>>>>>>>>>>>>>> intuitive for the reader developers to write the > >> thread-less > >>>>>>>> pattern. > >>>>>>>>>>>>>>> Although technically speaking one can still do the > >>> asynchronous > >>>>>> IO > >>>>>>>> to > >>>>>>>>>>>>>>> prepare the record in isNextReady(). But it is inexplicit > >> and > >>>>>> seems > >>>>>>>>>>>>>>> somewhat hacky. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise < > >> t...@apache.org> > >>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Couple more points regarding discovery: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> The proposal mentions that discovery could be outside the > >>>>>>>> execution > >>>>>>>>>>>>>> graph. > >>>>>>>>>>>>>>>> Today, discovered partitions/shards are checkpointed. I > >>>> believe > >>>>>>>> that > >>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>> also need to be the case in the future, even when > >> discovery > >>>> and > >>>>>>>>>>>> reading > >>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>> split between different tasks. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> For cases such as resharding of a Kinesis stream, the > >>>>>> relationship > >>>>>>>>>>>>>> between > >>>>>>>>>>>>>>>> splits needs to be considered. Splits cannot be randomly > >>>>>>>> distributed > >>>>>>>>>>>>>> over > >>>>>>>>>>>>>>>> readers in certain situations. An example was mentioned > >>> here: > >>>>>>>>>>>>>>>> > >>>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809 > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thomas > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise < > >> t...@apache.org > >>>> > >>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks for getting the ball rolling on this! > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Can the number of splits decrease? Yes, splits can be > >>> closed > >>>>>> and > >>>>>>>> go > >>>>>>>>>>>>>> away. > >>>>>>>>>>>>>>>>> An example would be a shard merge in Kinesis (2 existing > >>>> shards > >>>>>>>>>> will > >>>>>>>>>>>> be > >>>>>>>>>>>>>>>>> closed and replaced with a new shard). > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Regarding advance/poll/take: IMO the least restrictive > >>>> approach > >>>>>>>>>> would > >>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>> the thread-less IO model (pull based, non-blocking, > >> caller > >>>>>>>>>> retrieves > >>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>> records when available). The current Kinesis API requires > >>> the > >>>>>> use > >>>>>>>>>> of > >>>>>>>>>>>>>>>>> threads. But that can be internal to the split reader and > >>>> does > >>>>>>>> not > >>>>>>>>>>>> need > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> be a source API concern. In fact, that's what we are > >>> working > >>>> on > >>>>>>>>>> right > >>>>>>>>>>>>>> now > >>>>>>>>>>>>>>>>> as improvement to the existing consumer: Each shard > >>> consumer > >>>>>>>> thread > >>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>> push to a queue, the consumer main thread will poll the > >>>>>> queue(s). > >>>>>>>>>> It > >>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> essentially a mapping from threaded IO to non-blocking. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> The proposed SplitReader interface would fit the > >>> thread-less > >>>> IO > >>>>>>>>>>>> model. > >>>>>>>>>>>>>>>>> Similar to an iterator, we find out if there is a new > >>> element > >>>>>>>>>>>> (hasNext) > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> if so, move to it (next()). Separate calls deliver the > >> meta > >>>>>>>>>>>> information > >>>>>>>>>>>>>>>>> (timestamp, watermark). Perhaps advance call could offer > >> a > >>>>>>>> timeout > >>>>>>>>>>>>>>>> option, > >>>>>>>>>>>>>>>>> so that the caller does not end up in a busy wait. On the > >>>> other > >>>>>>>>>>>> hand, a > >>>>>>>>>>>>>>>>> caller processing multiple splits may want to cycle > >> through > >>>>>> fast, > >>>>>>>>>> to > >>>>>>>>>>>>>>>>> process elements of other splits as soon as they become > >>>>>>>> available. > >>>>>>>>>>>> The > >>>>>>>>>>>>>>>> nice > >>>>>>>>>>>>>>>>> thing is that this "split merge" logic can now live in > >>> Flink > >>>>>> and > >>>>>>>> be > >>>>>>>>>>>>>>>>> optimized and shared between different sources. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>> Thomas > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma < > >>>> guowei....@gmail.com > >>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>> Thanks Aljoscha for this FLIP. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking > >>>> source > >>>>>> is > >>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>> important. But in addition to `Future/poll`, there may > >> be > >>>>>>>> another > >>>>>>>>>>>> way > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> achieve this. I think it may be not very memory friendly > >>> if > >>>>>>>> every > >>>>>>>>>>>>>>>> advance > >>>>>>>>>>>>>>>>>> call return a Future. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> public interface Listener { > >>>>>>>>>>>>>>>>>> public void notify(); > >>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> public interface SplitReader() { > >>>>>>>>>>>>>>>>>> /** > >>>>>>>>>>>>>>>>>> * When there is no element temporarily, this will return > >>>>>>>> false. > >>>>>>>>>>>>>>>>>> * When elements is available again splitReader can call > >>>>>>>>>>>>>>>>>> listener.notify() > >>>>>>>>>>>>>>>>>> * In addition the frame would check `advance` > >>> periodically . > >>>>>>>>>>>>>>>>>> * Of course advance can always return true and ignore > >> the > >>>>>>>>>>>>>> listener > >>>>>>>>>>>>>>>>>> argument for simplicity. > >>>>>>>>>>>>>>>>>> */ > >>>>>>>>>>>>>>>>>> public boolean advance(Listener listener); > >>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 2. The FLIP tells us very clearly that how to create > >> all > >>>>>> Splits > >>>>>>>>>> and > >>>>>>>>>>>>>> how > >>>>>>>>>>>>>>>>>> to create a SplitReader from a Split. But there is no > >>>> strategy > >>>>>>>> for > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> user > >>>>>>>>>>>>>>>>>> to choose how to assign the splits to the tasks. I think > >>> we > >>>>>>>> could > >>>>>>>>>>>> add > >>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>> Enum to let user to choose. > >>>>>>>>>>>>>>>>>> /** > >>>>>>>>>>>>>>>>>> public Enum SplitsAssignmentPolicy { > >>>>>>>>>>>>>>>>>> Location, > >>>>>>>>>>>>>>>>>> Workload, > >>>>>>>>>>>>>>>>>> Random, > >>>>>>>>>>>>>>>>>> Average > >>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>> */ > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 3. If merge the `advance` and `getCurrent` to one > >> method > >>>> like > >>>>>>>>>>>>>> `getNext` > >>>>>>>>>>>>>>>>>> the `getNext` would need return a `ElementWithTimestamp` > >>>>>> because > >>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>> sources want to add timestamp to every element. IMO, > >> this > >>> is > >>>>>> not > >>>>>>>>>> so > >>>>>>>>>>>>>>>> memory > >>>>>>>>>>>>>>>>>> friendly so I prefer this design. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 > >>>>>>>> 下午6:08写道: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite > >> a > >>>> lot > >>>>>> of > >>>>>>>>>>>> other > >>>>>>>>>>>>>>>>>>> possible improvements. I have one proposal. Instead of > >>>>>> having a > >>>>>>>>>>>>>> method: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> boolean advance() throws IOException; > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I would replace it with > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> /* > >>>>>>>>>>>>>>>>>>> * Return a future, which when completed means that > >> source > >>>> has > >>>>>>>>>> more > >>>>>>>>>>>>>>>> data > >>>>>>>>>>>>>>>>>>> and getNext() will not block. > >>>>>>>>>>>>>>>>>>> * If you wish to use benefits of non blocking > >> connectors, > >>>>>>>> please > >>>>>>>>>>>>>>>>>>> implement this method appropriately. > >>>>>>>>>>>>>>>>>>> */ > >>>>>>>>>>>>>>>>>>> default CompletableFuture<?> isBlocked() { > >>>>>>>>>>>>>>>>>>> return CompletableFuture.completedFuture(null); > >>>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> And rename `getCurrent()` to `getNext()`. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Couple of arguments: > >>>>>>>>>>>>>>>>>>> 1. I don’t understand the division of work between > >>>>>> `advance()` > >>>>>>>>>> and > >>>>>>>>>>>>>>>>>>> `getCurrent()`. What should be done in which, > >> especially > >>>> for > >>>>>>>>>>>>>> connectors > >>>>>>>>>>>>>>>>>>> that handle records in batches (like Kafka) and when > >>> should > >>>>>> you > >>>>>>>>>>>> call > >>>>>>>>>>>>>>>>>>> `advance` and when `getCurrent()`. > >>>>>>>>>>>>>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will > >>>> allow > >>>>>>>> us > >>>>>>>>>> in > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> future to have asynchronous/non blocking connectors and > >>>> more > >>>>>>>>>>>>>>>> efficiently > >>>>>>>>>>>>>>>>>>> handle large number of blocked threads, without busy > >>>> waiting. > >>>>>>>>>> While > >>>>>>>>>>>>>> at > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> same time it doesn’t add much complexity, since naive > >>>>>> connector > >>>>>>>>>>>>>>>>>>> implementations can be always blocking. > >>>>>>>>>>>>>>>>>>> 3. This also would allow us to use a fixed size thread > >>> pool > >>>>>> of > >>>>>>>>>> task > >>>>>>>>>>>>>>>>>>> executors, instead of one thread per task. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Piotrek > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek < > >>>>>>>> aljos...@apache.org > >>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi All, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> In order to finally get the ball rolling on the new > >>> source > >>>>>>>>>>>> interface > >>>>>>>>>>>>>>>>>>> that we have discussed for so long I finally created a > >>>> FLIP: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing > >>>>>>>> work/discussion > >>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>>> adding per-partition watermark support to the Kinesis > >>>> source > >>>>>>>> and > >>>>>>>>>>>>>>>> because > >>>>>>>>>>>>>>>>>>> this would enable generic implementation of event-time > >>>>>>>> alignment > >>>>>>>>>>>> for > >>>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>>>>> sources. Maybe we need another FLIP for the event-time > >>>>>>>> alignment > >>>>>>>>>>>>>> part, > >>>>>>>>>>>>>>>>>>> especially the part about information sharing between > >>>>>>>> operations > >>>>>>>>>>>> (I'm > >>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>> calling it state sharing because state has a special > >>>> meaning > >>>>>> in > >>>>>>>>>>>>>> Flink). > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Please discuss away! > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Aljoscha > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>> > >>>>>> > >>>> > >>>> > >>>> > >>> > >> > >