Hi Becket, I think the problem is not with the split re-assignment, but with dynamic split discovery. We do not always know before the hand the number of splits (for example Kafka partition/topic discovery, but this can also happen in batch), while the source parallelism is fixed/known before hand.
> 1. What if the SplitReader implementation cannot easily add a split to read > on the fly? Always initiating one consumer per split will not be efficient in many cases. While if the connector needs to instantiate a new reader per each split, connector can handle this internally (addSplit() would close previous reader and create new one). > 2. Does Flink have to be involved in splits assignment? I think that this might be a good shared logic between different connectors. > @Biao, > If I understand correctly, the concern you raised was that a Source may > return a lot of splits and thus Flink may have to create a lot of fetcher > threads. This is a valid concern, but I cannot think of a solution to that. > After all, the SplitReaders may be written by third parties. Poor > implementations seem difficult to prevent. I think we can solve this and this is not as uncommon as you might think. In batch word, usually/often you have one split per HDFS chunk, each chunk being 64-256MB. With peta byte tables you end up with range from millions to billions of splits. This becomes a bottleneck if splits can be efficiently filtered out/eliminated based on some header (ORC header for example). In other words, if you have huge number of splits that are very cheap/quick to process. Piotrek > On 22 Nov 2018, at 04:54, Becket Qin <becket....@gmail.com> wrote: > > Thanks Piotrek, > >> void SplitReader#addSplit(Split) >> boolean SplitReader#doesWantMoreSplits() > > I have two questions about this API. > 1. What if the SplitReader implementation cannot easily add a split to read > on the fly? > 2. Does Flink have to be involved in splits assignment? > > I am wondering if it would be simpler to let the enumerator indicate > whether a split reassignment is needed. If the answer is yes, Flink can > just start from the beginning to get all the splits and create one reader > per split. This might be a little more expensive than dynamically adding a > split to a reader, but given that the splits change should be rare, it is > probably acceptable. > > In the Kafka case, the SplitT may just be a consumer. The enumerator will > simply check if the topic has new partitions to be assigned to this reader. > > @Biao, > If I understand correctly, the concern you raised was that a Source may > return a lot of splits and thus Flink may have to create a lot of fetcher > threads. This is a valid concern, but I cannot think of a solution to that. > After all, the SplitReaders may be written by third parties. Poor > implementations seem difficult to prevent. > > Thanks, > > Jiangjie (Becket) Qin > > On Wed, Nov 21, 2018 at 10:13 PM Piotr Nowojski <pi...@data-artisans.com> > wrote: > >> Hi again, >> >>> However I don't like the thread mode which starts a thread for each >> split. >>> Starting extra thread in operator is not an ideal way IMO. Especially >>> thread count is decided by split count. So I was wondering if there is a >>> more elegant way. Do we really want these threads in Flink core? >> >> Biao you have raised an important issue. Indeed it seems like the current >> proposal is missing something. I would guess that we need a mechanism for >> adding new splits to an already existing SplitReader and some logic to >> determine whether current instance can accept more splits or not. For >> example >> >> void SplitReader#addSplit(Split) >> boolean SplitReader#doesWantMoreSplits() >> >> Flink could randomly/round robin assign new splits to the SplitReaders >> that `doWantMoreSplits()`. Batch file readers might implement some custom >> logic in `doesWantMoreSplits()`, like one SplitReader can have at most N >> enqueued splits? >> >> Also what about Kafka. Isn’t it the case that one KafkaConsumer can read >> from multiple splits? So Kafka’s SplitReader should always return true from >> `doesWantMoreSplits()`? >> >> What do you think? >> >> Re: Becket >> >> I’m +1 for Sync and AsyncSplitReader. >> >> Piotrek >> >>> On 21 Nov 2018, at 14:49, Becket Qin <becket....@gmail.com> wrote: >>> >>> Hi Aljoscha, >>> >>> Good point on the potential optimization in the source. One thing to >>> clarify, by "adding a minimumTimestamp()/maximumTimestamp() method pair >> to >>> the split interface", did you mean "split reader interface"? If so, what >>> should the readers do if they do not have such additional information? I >> am >>> wondering if it is possible to leave such optimization to the source >>> internal implementation. >>> >>> @all >>> After reading all the feedback, Biao and I talked a little bit offline. >> We >>> would like to share some new thoughts with you and see what do you think. >>> >>> When looking at the Source API, we were trying to answer two questions. >>> First of all, how would Flink use this API if someone else implemented >> it. >>> Secondly, how would the connector contributors implement the interface? >> How >>> difficult is the implementation. >>> >>> KafkaConsumer is a typical example of a thread-less reader. The idea was >> to >>> allow different threading model on top of it. It could be a global single >>> thread handles record fetching and processing in an event loop pattern; >> it >>> could also be one dedicated fetcher thread for each consumer and a >> separate >>> thread pool for record processing. The API gives the freedom of picking >> up >>> threading model to the users. To answer the first question, I would love >> to >>> have such a source reader API so Flink can choose whatever threading >> model >>> it wants. However, implementing such an interface could be pretty >>> challenging and error prone. >>> >>> On the other hand, having a source reader with a naive blocking socket is >>> probably simple enough in most cases (actually sometimes this might even >> be >>> the most efficient way). But it does not leave much option to Flink other >>> than creating one thread per reader. >>> >>> Given the above thoughts, it might be reasonable to separate the >>> SplitReader API into two: SyncReader and AsyncReader. The sync reader >> just >>> has a simple blocking takeNext() API. And the AsyncReader just has a >>> pollNext(Callback) or Future<?> pollNext(). All the other methods are >>> shared by both readers and could be put into a package private parent >>> interface like BaseSplitReader. >>> >>> Having these two readers allows both complicated and simple >> implementation, >>> depending on the SplitReader writers. From Flink's perspective, it will >>> choose a more efficient threading model if the SplitReader is an >>> AsyncReader. Otherwise, it may have to use the one thread per reader >> model >>> if the reader is a SyncReader. Users can also choose to implement both >>> interface, in that case, it is up to Flink to choose which interface to >> use. >>> >>> Admittedly, this solution does have one more interface, but still seems >>> rewarding. Any thoughts? >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> >>> On Sun, Nov 18, 2018 at 11:33 PM Biao Liu <mmyy1...@gmail.com> wrote: >>> >>>> Hi community, >>>> >>>> Thank you guys for sharing ideas. >>>> >>>> The thing I really concern is about the thread mode. >>>> Actually in Alibaba, we have implemented our "split reader" based source >>>> two years ago. That's based on "SourceFunction", it's just an extension >> not >>>> a refactoring. It's almost same with the version Thomas and Jamie >> described >>>> in Google Doc. It really helps in many scenarios. >>>> >>>> However I don't like the thread mode which starts a thread for each >> split. >>>> Starting extra thread in operator is not an ideal way IMO. Especially >>>> thread count is decided by split count. So I was wondering if there is a >>>> more elegant way. Do we really want these threads in Flink core? >>>> >>>> I agree that blocking interface is more easy to implement. Could we at >>>> least separate the split reader with source function into different >>>> interfaces? Not all sources would like to read all splits concurrently. >> In >>>> batch scenario, reading splits one by one is more general. And also not >> all >>>> sources are partitioned, right? >>>> I prefer there is a new source interface with "pull mode" only, no >> split. >>>> There is a splittable source extended it. And there is one >> implementation >>>> that starting threads for each split, reading all splits concurrently. >>>> >>>> >>>> Thomas Weise <t...@apache.org> 于2018年11月18日周日 上午3:18写道: >>>> >>>>> @Aljoscha to address your question first: In the case of the Kinesis >>>>> consumer (with current Kinesis consumer API), there would also be N+1 >>>>> threads. I have implemented a prototype similar to what is shown in >>>> Jamie's >>>>> document, where the thread ownership is similar to what you have done >> for >>>>> Kafka. >>>>> >>>>> The equivalent of split reader manages its own thread and the "source >>>> main >>>>> thread" is responsible for emitting the data. The interface between >> the N >>>>> reader threads and the 1 emitter is a blocking queue per consumer >> thread. >>>>> The emitter can now control which queue to consume from based on the >>>> event >>>>> time progress. >>>>> >>>>> This is akin to a "non-blocking" interface *between emitter and split >>>>> reader*. Emitter uses poll to retrieve records from the N queues (which >>>>> requires non-blocking interaction). The emitter is independent of the >>>> split >>>>> reader implementation, that part could live in Flink. >>>>> >>>>> Regarding whether or not to assume that split readers always need a >>>> thread >>>>> and in addition that these reader threads should be managed by Flink: >> It >>>>> depends on the API of respective external systems and I would not bake >>>> that >>>>> assumption into Flink. Some client libraries manage their own threads >>>> (see >>>>> push based API like JMS and as I understand it may also apply to the >> new >>>>> fan-out Kinesis API: >>>>> >>>>> >>>> >> https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html >>>>> ). >>>>> In such cases it would not make sense to layer another reader thread on >>>>> top. It may instead be better if Flink provides to the split reader the >>>>> queue/buffer to push records to. >>>>> >>>>> The discussion so far has largely ignored the discovery aspect. There >> are >>>>> some important considerations such as ordering dependency of splits and >>>>> work rebalancing that may affect the split reader interface. Should we >>>> fork >>>>> this into a separate thread? >>>>> >>>>> Thanks, >>>>> Thomas >>>>> >>>>> >>>>> On Fri, Nov 16, 2018 at 8:09 AM Piotr Nowojski < >> pi...@data-artisans.com> >>>>> wrote: >>>>> >>>>>> Hi Jamie, >>>>>> >>>>>> As it was already covered with my discussion with Becket, there is an >>>>> easy >>>>>> way to provide blocking API on top of non-blocking API. And yes we >> both >>>>>> agreed that blocking API is easier to implement by users. >>>>>> >>>>>> I also do not agree with respect to usefulness of non blocking API. >>>>>> Actually Kafka connector is the one that could be more efficient >> thanks >>>>> to >>>>>> the removal of the one layer of threading. >>>>>> >>>>>> Piotrek >>>>>> >>>>>>> On 16 Nov 2018, at 02:21, Jamie Grier <jgr...@lyft.com.INVALID> >>>> wrote: >>>>>>> >>>>>>> Thanks Aljoscha for getting this effort going! >>>>>>> >>>>>>> There's been plenty of discussion here already and I'll add my big +1 >>>>> to >>>>>>> making this interface very simple to implement for a new >>>>>>> Source/SplitReader. Writing a new production quality connector for >>>>> Flink >>>>>>> is very difficult today and requires a lot of detailed knowledge >>>> about >>>>>>> Flink, event time progress, watermarking, idle shard detection, etc >>>> and >>>>>> it >>>>>>> would be good to move almost all of this type of code into Flink >>>> itself >>>>>> and >>>>>>> out of source implementations. I also think this is totally doable >>>> and >>>>>> I'm >>>>>>> really excited to see this happening. >>>>>>> >>>>>>> I do have a couple of thoughts about the API and the implementation.. >>>>>>> >>>>>>> In a perfect world there would be a single thread per Flink source >>>>>> sub-task >>>>>>> and no additional threads for SplitReaders -- but this assumes a >>>> world >>>>>>> where you have true async IO APIs for the upstream systems (like >>>> Kafka >>>>>> and >>>>>>> Kinesis, S3, HDFS, etc). If that world did exist the single thread >>>>> could >>>>>>> just sit in an efficient select() call waiting for new data to arrive >>>>> on >>>>>>> any Split. That'd be awesome.. >>>>>>> >>>>>>> But, that world doesn't exist and given that practical consideration >>>> I >>>>>>> would think the next best implementation is going to be, in practice, >>>>>>> probably a thread per SplitReader that does nothing but call the >>>> source >>>>>> API >>>>>>> and drop whatever it reads into a (blocking) queue -- as Aljoscha >>>>>> mentioned >>>>>>> (calling it N+1) and as we started to describe here: >>>>>>> >>>>>> >>>>> >>>> >> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit#heading=h.lfi2u4fv9cxa >>>>>>> >>>>>>> I guess my point is that I think we should strive to move as much of >>>>>>> something like the diagram referenced in the above doc into Flink >>>>> itself >>>>>>> and out of sources and simplify the SplitReader API as much as >>>> possible >>>>>> as >>>>>>> well. >>>>>>> >>>>>>> With the above in mind and with regard to the discussion about >>>>> blocking, >>>>>>> etc.. I'm not sure I agree with some of the discussion so far with >>>>>> regard >>>>>>> to this API design. The calls to the upstream systems >>>> (kafka/kinesis) >>>>>> are >>>>>>> in fact going to be blocking calls. So a simple API without the >>>>>> constraint >>>>>>> that the methods must be implemented in a non-blocking way seems >>>> better >>>>>> to >>>>>>> me from the point of view of somebody writing a new source >>>>>> implementation. >>>>>>> My concern is that if you force the implementer of the SplitReader >>>>>>> interface to do so in a non-blocking way you're just going to make it >>>>>>> harder to write those implementations. Those calls to read the next >>>>> bit >>>>>> of >>>>>>> data are going to be blocking calls with most known important sources >>>>> -- >>>>>> at >>>>>>> least Kafka/Kinesis/HDFS -- so I think maybe we should just deal with >>>>>> that >>>>>>> head on and work around it a higher level so the SplitReader >>>> interface >>>>>>> stays super simple to implement. This means we manage all the >>>>> threading >>>>>> in >>>>>>> Flink core, the API stays pull-based, and the implementer is allowed >>>> to >>>>>>> simply block until they have data to return. >>>>>>> >>>>>>> I maybe would change my mind about this if truly asynchronous APIs to >>>>> the >>>>>>> upstream source systems were likely to be available in the near >>>> future >>>>> or >>>>>>> are now and I'm just ignorant of it. But even then the supporting >>>> code >>>>>> in >>>>>>> Flink to drive async and sync sources would be different and in fact >>>>> they >>>>>>> might just have different APIs altogether -- SplitReader vs >>>>>>> AsyncSplitReader maybe. >>>>>>> >>>>>>> In the end I think playing with the implementation, across more than >>>>> one >>>>>>> source, and moving as much common code into Flink itself will reveal >>>>> the >>>>>>> best API of course. >>>>>>> >>>>>>> One other interesting note is that you need to preserve per-partition >>>>>>> ordering so you have to take care with the implementation if it were >>>> to >>>>>> be >>>>>>> based on a thread pool and futures so as not to reorder the reads. >>>>>>> >>>>>>> Anyway, I'm thrilled to see this starting to move forward and I'd >>>> very >>>>>> much >>>>>>> like to help with the implementation wherever I can. We're doing a >>>>>>> simplified internal version of some of this at Lyft for just Kinesis >>>>>>> because we need a solution for event time alignment in the very short >>>>>> term >>>>>>> but we'd like to immediately start helping to do this properly in >>>> Flink >>>>>>> after that. One of the end goals for us is event time alignment >>>> across >>>>>>> heterogeneous sources. Another is making it possible for non-expert >>>>>> users >>>>>>> to have a high probability of being able to write their own, correct, >>>>>>> connectors. >>>>>>> >>>>>>> -Jamie >>>>>>> >>>>>>> On Thu, Nov 15, 2018 at 3:43 AM Aljoscha Krettek < >>>> aljos...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I thought I had sent this mail a while ago but I must have forgotten >>>>> to >>>>>>>> send it. >>>>>>>> >>>>>>>> There is another thing we should consider for splits: the range of >>>>>>>> timestamps that it can contain. For example, the splits of a file >>>>> source >>>>>>>> would know what the minimum and maximum timestamp in the splits is, >>>>>>>> roughly. For infinite splits, such as Kafka partitions, the minimum >>>>>> would >>>>>>>> be meaningful but the maximum would be +Inf. If the splits expose >>>> the >>>>>>>> interval of time that they contain the readers, or the component >>>> that >>>>>>>> manages the readers can make decisions about which splits to forward >>>>> and >>>>>>>> read first. And it can also influence the minimum watermark that a >>>>>> reader >>>>>>>> forwards: it should never emit a watermark if it knows there are >>>>> splits >>>>>> to >>>>>>>> read that have a lower minimum timestamp. I think it should be as >>>> easy >>>>>> as >>>>>>>> adding a minimumTimestamp()/maximumTimestamp() method pair to the >>>>> split >>>>>>>> interface. >>>>>>>> >>>>>>>> Another thing we need to resolve is the actual reader interface. I >>>> see >>>>>>>> there has been some good discussion but I don't know if we have >>>>>> consensus. >>>>>>>> We should try and see how specific sources could be implemented with >>>>> the >>>>>>>> new interface. For example, for Kafka I think we need to have N+1 >>>>>> threads >>>>>>>> per task (where N is the number of splits that a task is reading >>>>> from). >>>>>> On >>>>>>>> thread is responsible for reading from the splits. And each split >>>> has >>>>>> its >>>>>>>> own (internal) thread for reading from Kafka and putting messages in >>>>> an >>>>>>>> internal queue to pull from. This is similar to how the current >>>> Kafka >>>>>>>> source is implemented, which has a separate fetcher thread. The >>>> reason >>>>>> for >>>>>>>> this split is that we always need to try reading from Kafka to keep >>>>> the >>>>>>>> throughput up. In the current implementation the internal queue (or >>>>>>>> handover) limits the read rate of the reader threads. >>>>>>>> >>>>>>>> @Thomas, what do you think this would look like for Kinesis? >>>>>>>> >>>>>>>> Best, >>>>>>>> Aljoscha >>>>>>>> >>>>>>>>> On 15. Nov 2018, at 03:56, Becket Qin <becket....@gmail.com> >>>> wrote: >>>>>>>>> >>>>>>>>> Hi Piotrek, >>>>>>>>> >>>>>>>>> Thanks a lot for the detailed reply. All makes sense to me. >>>>>>>>> >>>>>>>>> WRT the confusion between advance() / getCurrent(), do you think it >>>>>> would >>>>>>>>> help if we combine them and have something like: >>>>>>>>> >>>>>>>>> CompletableFuture<T> getNext(); >>>>>>>>> long getWatermark(); >>>>>>>>> long getCurrentTimestamp(); >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> >>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>> >>>>>>>>> On Tue, Nov 13, 2018 at 9:56 PM Piotr Nowojski < >>>>>> pi...@data-artisans.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Thanks again for the detailed answer :) Sorry for responding with >>>> a >>>>>>>> delay. >>>>>>>>>> >>>>>>>>>>> Completely agree that in pattern 2, having a callback is >>>> necessary >>>>>> for >>>>>>>>>> that >>>>>>>>>>> single thread outside of the connectors. And the connectors MUST >>>>> have >>>>>>>>>>> internal threads. >>>>>>>>>> >>>>>>>>>> Yes, this thread will have to exists somewhere. In pattern 2 it >>>>> exists >>>>>>>> in >>>>>>>>>> the connector (at least from the perspective of the Flink >>>> execution >>>>>>>>>> engine). In pattern 1 it exists inside the Flink execution engine. >>>>>> With >>>>>>>>>> completely blocking connectors, like simple reading from files, >>>> both >>>>>> of >>>>>>>>>> those approaches are basically the same. The difference is when >>>> user >>>>>>>>>> implementing Flink source is already working with a non blocking >>>>> code >>>>>>>> with >>>>>>>>>> some internal threads. In this case, pattern 1 would result in >>>>> "double >>>>>>>>>> thread wrapping”, while pattern 2 would allow to skip one layer of >>>>>>>>>> indirection. >>>>>>>>>> >>>>>>>>>>> If we go that way, we should have something like "void >>>>>>>>>>> poll(Callback) / void advance(callback)". I am curious how would >>>>>>>>>>> CompletableFuture work here, though. If 10 readers returns 10 >>>>>>>> completable >>>>>>>>>>> futures, will there be 10 additional threads (so 20 threads in >>>>> total) >>>>>>>>>>> blocking waiting on them? Or will there be a single thread busy >>>>> loop >>>>>>>>>>> checking around? >>>>>>>>>> >>>>>>>>>> To be honest, I haven’t thought this completely through and I >>>>> haven’t >>>>>>>>>> tested/POC’ed it. Having said that, I can think of at least couple >>>>> of >>>>>>>>>> solutions. First is something like this: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507 >>>>>>>>>> < >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507 >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Line: >>>>>>>>>> >>>>>>>>>> `blocked = split.process();` >>>>>>>>>> >>>>>>>>>> Is where the execution goes into to the task/sources. This is >>>> where >>>>>> the >>>>>>>>>> returned future is handled: >>>>>>>>>> >>>>>>>>>> blocked.addListener(() -> { >>>>>>>>>> blockedSplits.remove(split); >>>>>>>>>> // reset the level priority to >>>>>>>> prevent >>>>>>>>>> previously-blocked splits from starving existing splits >>>>>>>>>> split.resetLevelPriority(); >>>>>>>>>> waitingSplits.offer(split); >>>>>>>>>> }, executor); >>>>>>>>>> >>>>>>>>>> Fundamentally callbacks and Futures are more or less >>>> interchangeable >>>>>> You >>>>>>>>>> can always wrap one into another (creating a callback that >>>>> completes a >>>>>>>>>> future and attach a callback once future completes). In this case >>>>> the >>>>>>>>>> difference for me is mostly: >>>>>>>>>> - api with passing callback allows the callback to be fired >>>> multiple >>>>>>>> times >>>>>>>>>> and to fire it even if the connector is not blocked. This is what >>>> I >>>>>>>> meant >>>>>>>>>> by saying that api `CompletableFuture<?> isBlocked()` is a bit >>>>>> simpler. >>>>>>>>>> Connector can only return either “I’m not blocked” or “I’m blocked >>>>>> and I >>>>>>>>>> will tell you only once when I’m not blocked anymore”. >>>>>>>>>> >>>>>>>>>> But this is not the most important thing for me here. For me >>>>> important >>>>>>>>>> thing is to try our best to make Flink task’s control and >>>> execution >>>>>>>> single >>>>>>>>>> threaded. For that both callback and future APIs should work the >>>>> same. >>>>>>>>>> >>>>>>>>>>> WRT pattern 1, a single blocking take() API should just work. The >>>>>> good >>>>>>>>>>> thing is that a blocking read API is usually simpler to >>>> implement. >>>>>>>>>> >>>>>>>>>> Yes, they are easier to implement (especially if you are not the >>>> one >>>>>>>> that >>>>>>>>>> have to deal with the additional threading required around them ;) >>>>> ). >>>>>>>> But >>>>>>>>>> to answer this issue, if we choose pattern 2, we can always >>>> provide >>>>> a >>>>>>>>>> proxy/wrapper that would using the internal thread implement the >>>>>>>>>> non-blocking API while exposing blocking API to the user. It would >>>>>>>>>> implement pattern 2 for the user exposing to him pattern 1. In >>>> other >>>>>>>> words >>>>>>>>>> implementing pattern 1 in pattern 2 paradigm, while making it >>>>> possible >>>>>>>> to >>>>>>>>>> implement pure pattern 2 connectors. >>>>>>>>>> >>>>>>>>>>> BTW, one thing I am also trying to avoid is pushing users to >>>>> perform >>>>>> IO >>>>>>>>>> in >>>>>>>>>>> a method like "isBlocked()". If the method is expected to fetch >>>>>> records >>>>>>>>>>> (even if not returning them), naming it something more explicit >>>>> would >>>>>>>>>> help >>>>>>>>>>> avoid confusion. >>>>>>>>>> >>>>>>>>>> If we choose so, we could rework it into something like: >>>>>>>>>> >>>>>>>>>> CompletableFuture<?> advance() >>>>>>>>>> T getCurrent(); >>>>>>>>>> Watermark getCurrentWatermark() >>>>>>>>>> >>>>>>>>>> But as I wrote before, this is more confusing to me for the exact >>>>>>>> reasons >>>>>>>>>> you mentioned :) I would be confused what should be done in >>>>>> `adanvce()` >>>>>>>> and >>>>>>>>>> what in `getCurrent()`. However, again this naming issue is not >>>> that >>>>>>>>>> important to me and probably is matter of taste/personal >>>>> preferences. >>>>>>>>>> >>>>>>>>>> Piotrek >>>>>>>>>> >>>>>>>>>>> On 9 Nov 2018, at 18:37, Becket Qin <becket....@gmail.com> >>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi Piotrek, >>>>>>>>>>> >>>>>>>>>>> Thanks for the explanation. We are probably talking about the >>>> same >>>>>>>> thing >>>>>>>>>>> but in different ways. To clarify a little bit, I think there are >>>>> two >>>>>>>>>>> patterns to read from a connector. >>>>>>>>>>> >>>>>>>>>>> Pattern 1: Thread-less connector with a blocking read API. >>>> Outside >>>>> of >>>>>>>> the >>>>>>>>>>> connector, there is one IO thread per reader, doing blocking >>>> read. >>>>> An >>>>>>>>>>> additional thread will interact with all the IO threads. >>>>>>>>>>> Pattern 2: Connector with internal thread(s) and non-blocking >>>> API. >>>>>>>>>> Outside >>>>>>>>>>> of the connector, there is one thread for ALL readers, doing IO >>>>>> relying >>>>>>>>>> on >>>>>>>>>>> notification callbacks in the reader. >>>>>>>>>>> >>>>>>>>>>> In both patterns, there must be at least one thread per >>>> connector, >>>>>>>> either >>>>>>>>>>> inside (created by connector writers) or outside (created by >>>> Flink) >>>>>> of >>>>>>>>>> the >>>>>>>>>>> connector. Ideally there are NUM_CONNECTORS + 1 threads in total, >>>>> to >>>>>>>> make >>>>>>>>>>> sure that 1 thread is fully non-blocking. >>>>>>>>>>> >>>>>>>>>>>> Btw, I don’t know if you understand my point. Having only >>>> `poll()` >>>>>> and >>>>>>>>>>> `take()` is not enough for single threaded task. If our source >>>>>>>> interface >>>>>>>>>>> doesn’t provide `notify()` callback nor >`CompletableFuture<?> >>>>>>>>>>> isBlocked(),`, there is no way to implement single threaded task >>>>> that >>>>>>>>>> both >>>>>>>>>>> reads the data from the source connector and can also react to >>>>> system >>>>>>>>>>> events. Ok, non >blocking `poll()` would allow that, but with >>>> busy >>>>>>>>>> looping. >>>>>>>>>>> >>>>>>>>>>> Completely agree that in pattern 2, having a callback is >>>> necessary >>>>>> for >>>>>>>>>> that >>>>>>>>>>> single thread outside of the connectors. And the connectors MUST >>>>> have >>>>>>>>>>> internal threads. If we go that way, we should have something >>>> like >>>>>>>> "void >>>>>>>>>>> poll(Callback) / void advance(callback)". I am curious how would >>>>>>>>>>> CompletableFuture work here, though. If 10 readers returns 10 >>>>>>>> completable >>>>>>>>>>> futures, will there be 10 additional threads (so 20 threads in >>>>> total) >>>>>>>>>>> blocking waiting on them? Or will there be a single thread busy >>>>> loop >>>>>>>>>>> checking around? >>>>>>>>>>> >>>>>>>>>>> WRT pattern 1, a single blocking take() API should just work. The >>>>>> good >>>>>>>>>>> thing is that a blocking read API is usually simpler to >>>> implement. >>>>> An >>>>>>>>>>> additional non-blocking "T poll()" method here is indeed optional >>>>> and >>>>>>>>>> could >>>>>>>>>>> be used in cases like Flink does not want the thread to block >>>>>> forever. >>>>>>>>>> They >>>>>>>>>>> can also be combined to have a "T poll(Timeout)", which is >>>> exactly >>>>>> what >>>>>>>>>>> KafkaConsumer did. >>>>>>>>>>> >>>>>>>>>>> It sounds that you are proposing pattern 2 with something similar >>>>> to >>>>>>>> NIO2 >>>>>>>>>>> AsynchronousByteChannel[1]. That API would work, except that the >>>>>>>>>> signature >>>>>>>>>>> returning future seems not necessary. If that is the case, a >>>> minor >>>>>>>> change >>>>>>>>>>> on the current FLIP proposal to have "void advance(callback)" >>>>> should >>>>>>>>>> work. >>>>>>>>>>> And this means the connectors MUST have their internal threads. >>>>>>>>>>> >>>>>>>>>>> BTW, one thing I am also trying to avoid is pushing users to >>>>> perform >>>>>> IO >>>>>>>>>> in >>>>>>>>>>> a method like "isBlocked()". If the method is expected to fetch >>>>>> records >>>>>>>>>>> (even if not returning them), naming it something more explicit >>>>> would >>>>>>>>>> help >>>>>>>>>>> avoid confusion. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> >>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >> https://docs.oracle.com/javase/8/docs/api/java/nio/channels/AsynchronousByteChannel.html >>>>>>>>>>> >>>>>>>>>>> On Fri, Nov 9, 2018 at 11:20 PM Piotr Nowojski < >>>>>>>> pi...@data-artisans.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi >>>>>>>>>>>> >>>>>>>>>>>> Good point with select/epoll, however I do not see how they >>>>> couldn’t >>>>>>>> be >>>>>>>>>>>> with Flink if we would like single task in Flink to be >>>>>> single-threaded >>>>>>>>>> (and >>>>>>>>>>>> I believe we should pursue this goal). If your connector blocks >>>> on >>>>>>>>>>>> `select`, then it can not process/handle control messages from >>>>>> Flink, >>>>>>>>>> like >>>>>>>>>>>> checkpoints, releasing resources and potentially output flushes. >>>>>> This >>>>>>>>>> would >>>>>>>>>>>> require tight integration between connector and Flink’s main >>>> event >>>>>>>>>>>> loop/selects/etc. >>>>>>>>>>>> >>>>>>>>>>>> Looking at it from other perspective. Let’s assume that we have >>>> a >>>>>>>>>>>> connector implemented on top of `select`/`epoll`. In order to >>>>>>>> integrate >>>>>>>>>> it >>>>>>>>>>>> with Flink’s checkpointing/flushes/resource releasing it will >>>> have >>>>>> to >>>>>>>> be >>>>>>>>>>>> executed in separate thread one way or another. At least if our >>>>> API >>>>>>>> will >>>>>>>>>>>> enforce/encourage non blocking implementations with some kind of >>>>>>>>>>>> notifications (`isBlocked()` or `notify()` callback), some >>>>>> connectors >>>>>>>>>> might >>>>>>>>>>>> skip one layer of wapping threads. >>>>>>>>>>>> >>>>>>>>>>>> Btw, I don’t know if you understand my point. Having only >>>> `poll()` >>>>>> and >>>>>>>>>>>> `take()` is not enough for single threaded task. If our source >>>>>>>> interface >>>>>>>>>>>> doesn’t provide `notify()` callback nor `CompletableFuture<?> >>>>>>>>>>>> isBlocked(),`, there is no way to implement single threaded task >>>>>> that >>>>>>>>>> both >>>>>>>>>>>> reads the data from the source connector and can also react to >>>>>> system >>>>>>>>>>>> events. Ok, non blocking `poll()` would allow that, but with >>>> busy >>>>>>>>>> looping. >>>>>>>>>>>> >>>>>>>>>>>> Piotrek >>>>>>>>>>>> >>>>>>>>>>>>> On 8 Nov 2018, at 06:56, Becket Qin <becket....@gmail.com> >>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi Piotrek, >>>>>>>>>>>>> >>>>>>>>>>>>>> But I don’t see a reason why we should expose both blocking >>>>>> `take()` >>>>>>>>>> and >>>>>>>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone >>>> (Flink >>>>>>>>>> engine >>>>>>>>>>>> or >>>>>>>>>>>>> connector) would have to do the same busy >>>>>>>>>>>>>> looping anyway and I think it would be better to have a >>>> simpler >>>>>>>>>>>> connector >>>>>>>>>>>>> API (that would solve our problems) and force connectors to >>>>> comply >>>>>>>> one >>>>>>>>>>>> way >>>>>>>>>>>>> or another. >>>>>>>>>>>>> >>>>>>>>>>>>> If we let the block happen inside the connector, the blocking >>>>> does >>>>>>>> not >>>>>>>>>>>> have >>>>>>>>>>>>> to be a busy loop. For example, to do the block waiting >>>>>> efficiently, >>>>>>>>>> the >>>>>>>>>>>>> connector can use java NIO selector().select which relies on OS >>>>>>>> syscall >>>>>>>>>>>>> like epoll[1] instead of busy looping. But if Flink engine >>>> blocks >>>>>>>>>> outside >>>>>>>>>>>>> the connector, it pretty much has to do the busy loop. So if >>>>> there >>>>>> is >>>>>>>>>>>> only >>>>>>>>>>>>> one API to get the element, a blocking getNextElement() makes >>>>> more >>>>>>>>>> sense. >>>>>>>>>>>>> In any case, we should avoid ambiguity. It has to be crystal >>>>> clear >>>>>>>>>> about >>>>>>>>>>>>> whether a method is expected to be blocking or non-blocking. >>>>>>>> Otherwise >>>>>>>>>> it >>>>>>>>>>>>> would be very difficult for Flink engine to do the right thing >>>>> with >>>>>>>> the >>>>>>>>>>>>> connectors. At the first glance at getCurrent(), the expected >>>>>>>> behavior >>>>>>>>>> is >>>>>>>>>>>>> not quite clear. >>>>>>>>>>>>> >>>>>>>>>>>>> That said, I do agree that functionality wise, poll() and >>>> take() >>>>>> kind >>>>>>>>>> of >>>>>>>>>>>>> overlap. But they are actually not quite different from >>>>>>>>>>>>> isBlocked()/getNextElement(). Compared with isBlocked(), the >>>> only >>>>>>>>>>>>> difference is that poll() also returns the next record if it is >>>>>>>>>>>> available. >>>>>>>>>>>>> But I agree that the isBlocked() + getNextElement() is more >>>>>> flexible >>>>>>>> as >>>>>>>>>>>>> users can just check the record availability, but not fetch the >>>>>> next >>>>>>>>>>>>> element. >>>>>>>>>>>>> >>>>>>>>>>>>>> In case of thread-less readers with only non-blocking >>>>>> `queue.poll()` >>>>>>>>>> (is >>>>>>>>>>>>> that really a thing? I can not think about a real >>>> implementation >>>>>> that >>>>>>>>>>>>> enforces such constraints) >>>>>>>>>>>>> Right, it is pretty much a syntax sugar to allow user combine >>>> the >>>>>>>>>>>>> check-and-take into one method. It could be achieved with >>>>>>>> isBlocked() + >>>>>>>>>>>>> getNextElement(). >>>>>>>>>>>>> >>>>>>>>>>>>> [1] http://man7.org/linux/man-pages/man7/epoll.7.html >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> >>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski < >>>>>>>>>> pi...@data-artisans.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Becket, >>>>>>>>>>>>>> >>>>>>>>>>>>>> With my proposal, both of your examples would have to be >>>> solved >>>>> by >>>>>>>> the >>>>>>>>>>>>>> connector and solution to both problems would be the same: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Pretend that connector is never blocked (`isBlocked() { return >>>>>>>>>>>>>> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking >>>>>>>> fashion >>>>>>>>>>>> (or >>>>>>>>>>>>>> semi blocking with return of control from time to time to >>>> allow >>>>>> for >>>>>>>>>>>>>> checkpointing, network flushing and other resource management >>>>>> things >>>>>>>>>> to >>>>>>>>>>>>>> happen in the same main thread). In other words, exactly how >>>> you >>>>>>>> would >>>>>>>>>>>>>> implement `take()` method or how the same source connector >>>> would >>>>>> be >>>>>>>>>>>>>> implemented NOW with current source interface. The difference >>>>> with >>>>>>>>>>>> current >>>>>>>>>>>>>> interface would be only that main loop would be outside of the >>>>>>>>>>>> connector, >>>>>>>>>>>>>> and instead of periodically releasing checkpointing lock, >>>>>>>> periodically >>>>>>>>>>>>>> `return null;` or `return Optional.empty();` from >>>>>>>> `getNextElement()`. >>>>>>>>>>>>>> >>>>>>>>>>>>>> In case of thread-less readers with only non-blocking >>>>>> `queue.poll()` >>>>>>>>>> (is >>>>>>>>>>>>>> that really a thing? I can not think about a real >>>> implementation >>>>>>>> that >>>>>>>>>>>>>> enforces such constraints), we could provide a wrapper that >>>>> hides >>>>>>>> the >>>>>>>>>>>> busy >>>>>>>>>>>>>> looping. The same applies how to solve forever blocking >>>> readers >>>>> - >>>>>> we >>>>>>>>>>>> could >>>>>>>>>>>>>> provider another wrapper running the connector in separate >>>>> thread. >>>>>>>>>>>>>> >>>>>>>>>>>>>> But I don’t see a reason why we should expose both blocking >>>>>> `take()` >>>>>>>>>> and >>>>>>>>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone >>>>> (Flink >>>>>>>>>>>> engine or >>>>>>>>>>>>>> connector) would have to do the same busy looping anyway and I >>>>>> think >>>>>>>>>> it >>>>>>>>>>>>>> would be better to have a simpler connector API (that would >>>>> solve >>>>>>>> our >>>>>>>>>>>>>> problems) and force connectors to comply one way or another. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 7 Nov 2018, at 10:55, Becket Qin <becket....@gmail.com> >>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Piotr, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I might have misunderstood you proposal. But let me try to >>>>>> explain >>>>>>>> my >>>>>>>>>>>>>>> concern. I am thinking about the following case: >>>>>>>>>>>>>>> 1. a reader has the following two interfaces, >>>>>>>>>>>>>>> boolean isBlocked() >>>>>>>>>>>>>>> T getNextElement() >>>>>>>>>>>>>>> 2. the implementation of getNextElement() is non-blocking. >>>>>>>>>>>>>>> 3. The reader is thread-less, i.e. it does not have any >>>>> internal >>>>>>>>>>>> thread. >>>>>>>>>>>>>>> For example, it might just delegate the getNextElement() to a >>>>>>>>>>>>>> queue.poll(), >>>>>>>>>>>>>>> and isBlocked() is just queue.isEmpty(). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> How can Flink efficiently implement a blocking reading >>>> behavior >>>>>>>> with >>>>>>>>>>>> this >>>>>>>>>>>>>>> reader? Either a tight loop or a backoff interval is needed. >>>>>>>> Neither >>>>>>>>>> of >>>>>>>>>>>>>>> them is ideal. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Now let's say in the reader mentioned above implements a >>>>> blocking >>>>>>>>>>>>>>> getNextElement() method. Because there is no internal thread >>>> in >>>>>> the >>>>>>>>>>>>>> reader, >>>>>>>>>>>>>>> after isBlocked() returns false. Flink will still have to >>>> loop >>>>> on >>>>>>>>>>>>>>> isBlocked() to check whether the next record is available. If >>>>> the >>>>>>>>>> next >>>>>>>>>>>>>>> record reaches after 10 min, it is a tight loop for 10 min. >>>> You >>>>>>>> have >>>>>>>>>>>>>>> probably noticed that in this case, even isBlocked() returns >>>> a >>>>>>>>>> future, >>>>>>>>>>>>>> that >>>>>>>>>>>>>>> future() will not be completed if Flink does not call some >>>>> method >>>>>>>>>> from >>>>>>>>>>>>>> the >>>>>>>>>>>>>>> reader, because the reader has no internal thread to complete >>>>>> that >>>>>>>>>>>> future >>>>>>>>>>>>>>> by itself. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Due to the above reasons, a blocking take() API would allow >>>>> Flink >>>>>>>> to >>>>>>>>>>>> have >>>>>>>>>>>>>>> an efficient way to read from a reader. There are many ways >>>> to >>>>>> wake >>>>>>>>>> up >>>>>>>>>>>>>> the >>>>>>>>>>>>>>> blocking thread when checkpointing is needed depending on the >>>>>>>>>>>>>>> implementation. But I think the poll()/take() API would also >>>>> work >>>>>>>> in >>>>>>>>>>>> that >>>>>>>>>>>>>>> case. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski < >>>>>>>>>> pi...@data-artisans.com >>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> a) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> BTW, regarding the isBlock() method, I have a few more >>>>>> questions. >>>>>>>>>> 21, >>>>>>>>>>>>>> Is >>>>>>>>>>>>>>>> a method isReady() with boolean as a return value >>>>>>>>>>>>>>>>> equivalent? Personally I found it is a little bit confusing >>>>> in >>>>>>>> what >>>>>>>>>>>> is >>>>>>>>>>>>>>>> supposed to be returned when the future is completed. 22. if >>>>>>>>>>>>>>>>> the implementation of isBlocked() is optional, how do the >>>>>> callers >>>>>>>>>>>> know >>>>>>>>>>>>>>>> whether the method is properly implemented or not? >>>>>>>>>>>>>>>>> Does not implemented mean it always return a completed >>>>> future? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> `CompletableFuture<?> isBlocked()` is more or less an >>>>> equivalent >>>>>>>> to >>>>>>>>>>>>>>>> `boolean hasNext()` which in case of “false” provides some >>>>> kind >>>>>>>> of a >>>>>>>>>>>>>>>> listener/callback that notifies about presence of next >>>>> element. >>>>>>>>>> There >>>>>>>>>>>>>> are >>>>>>>>>>>>>>>> some minor details, like `CompletableFuture<?>` has a >>>> minimal >>>>>> two >>>>>>>>>>>> state >>>>>>>>>>>>>>>> logic: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1. Future is completed - we have more data >>>>>>>>>>>>>>>> 2. Future not yet completed - we don’t have data now, but we >>>>>>>>>> might/we >>>>>>>>>>>>>> will >>>>>>>>>>>>>>>> have in the future >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> While `boolean hasNext()` and `notify()` callback are a bit >>>>> more >>>>>>>>>>>>>>>> complicated/dispersed and can lead/encourage `notify()` >>>> spam. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> b) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 3. If merge the `advance` and `getCurrent` to one method >>>>> like >>>>>>>>>>>>>> `getNext` >>>>>>>>>>>>>>>> the `getNext` would need return a >>>>>>>>>>>>>>>>> `ElementWithTimestamp` because some sources want to add >>>>>> timestamp >>>>>>>>>> to >>>>>>>>>>>>>>>> every element. IMO, this is not so memory friendly >>>>>>>>>>>>>>>>> so I prefer this design. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Guowei I don’t quite understand this. Could you elaborate >>>> why >>>>>>>>>> having a >>>>>>>>>>>>>>>> separate `advance()` help? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> c) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regarding advance/poll/take. What’s the value of having two >>>>>>>> separate >>>>>>>>>>>>>>>> methods: poll and take? Which one of them should be called >>>> and >>>>>>>> which >>>>>>>>>>>>>>>> implemented? What’s the benefit of having those methods >>>>> compared >>>>>>>> to >>>>>>>>>>>>>> having >>>>>>>>>>>>>>>> a one single method `getNextElement()` (or `pollElement() or >>>>>>>>>> whatever >>>>>>>>>>>> we >>>>>>>>>>>>>>>> name it) with following contract: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> CompletableFuture<?> isBlocked(); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> /** >>>>>>>>>>>>>>>> Return next element - will be called only if `isBlocked()` >>>> is >>>>>>>>>>>> completed. >>>>>>>>>>>>>>>> Try to implement it in non blocking fashion, but if that’s >>>>>>>>>> impossible >>>>>>>>>>>> or >>>>>>>>>>>>>>>> you just don’t need the effort, you can block in this >>>> method. >>>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>>> T getNextElement(); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I mean, if the connector is implemented non-blockingly, >>>> Flink >>>>>>>> should >>>>>>>>>>>> use >>>>>>>>>>>>>>>> it that way. If it’s not, then `poll()` will `throw new >>>>>>>>>>>>>>>> NotImplementedException()`. Implementing both of them and >>>>>>>> providing >>>>>>>>>>>>>> both of >>>>>>>>>>>>>>>> them to Flink wouldn’t make a sense, thus why not merge them >>>>>> into >>>>>>>> a >>>>>>>>>>>>>> single >>>>>>>>>>>>>>>> method call that should preferably (but not necessarily need >>>>> to) >>>>>>>> be >>>>>>>>>>>>>>>> non-blocking? It’s not like we are implementing general >>>>> purpose >>>>>>>>>>>> `Queue`, >>>>>>>>>>>>>>>> which users might want to call either of `poll` or `take`. >>>> We >>>>>>>> would >>>>>>>>>>>>>> always >>>>>>>>>>>>>>>> prefer to call `poll`, but if it’s blocking, then still we >>>>> have >>>>>> no >>>>>>>>>>>>>> choice, >>>>>>>>>>>>>>>> but to call it and block on it. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> d) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking >>>> source >>>>>> is >>>>>>>>>> very >>>>>>>>>>>>>>>>> important. But in addition to `Future/poll`, there may be >>>>>> another >>>>>>>>>> way >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> achieve this. I think it may be not very memory friendly if >>>>>> every >>>>>>>>>>>>>> advance >>>>>>>>>>>>>>>>> call return a Future. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I didn’t want to mention this, to not clog my initial >>>>> proposal, >>>>>>>> but >>>>>>>>>>>>>> there >>>>>>>>>>>>>>>> is a simple solution for the problem: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public interface SplitReader { >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> (…) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> CompletableFuture<?> NOT_BLOCKED = >>>>>>>>>>>>>>>> CompletableFuture.completedFuture(null); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> /** >>>>>>>>>>>>>>>> * Returns a future that will be completed when the page >>>> source >>>>>>>>>>>>>> becomes >>>>>>>>>>>>>>>> * unblocked. If the page source is not blocked, this method >>>>>>>>>> should >>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>> * {@code NOT_BLOCKED}. >>>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>>> default CompletableFuture<?> isBlocked() >>>>>>>>>>>>>>>> { >>>>>>>>>>>>>>>> return NOT_BLOCKED; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> If we are blocked and we are waiting for the IO, then >>>>> creating a >>>>>>>> new >>>>>>>>>>>>>>>> Future is non-issue. Under full throttle/throughput and not >>>>>>>> blocked >>>>>>>>>>>>>> sources >>>>>>>>>>>>>>>> returning a static `NOT_BLOCKED` constant should also solve >>>>> the >>>>>>>>>>>>>> problem. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> One more remark, non-blocking sources might be a necessity >>>> in >>>>> a >>>>>>>>>> single >>>>>>>>>>>>>>>> threaded model without a checkpointing lock. (Currently when >>>>>>>> sources >>>>>>>>>>>> are >>>>>>>>>>>>>>>> blocked, they can release checkpointing lock and re-acquire >>>> it >>>>>>>> again >>>>>>>>>>>>>>>> later). Non-blocking `poll`/`getNext()` would allow for >>>>>>>> checkpoints >>>>>>>>>> to >>>>>>>>>>>>>>>> happen when source is idling. In that case either `notify()` >>>>> or >>>>>> my >>>>>>>>>>>>>> proposed >>>>>>>>>>>>>>>> `isBlocked()` would allow to avoid busy-looping. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 5 Nov 2018, at 03:59, Becket Qin <becket....@gmail.com> >>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The iterator-like API was also the first thing that came to >>>>> me. >>>>>>>> But >>>>>>>>>>>> it >>>>>>>>>>>>>>>>> seems a little confusing that hasNext() does not mean "the >>>>>> stream >>>>>>>>>> has >>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>> ended", but means "the next record is ready", which is >>>>>>>> repurposing >>>>>>>>>>>> the >>>>>>>>>>>>>>>> well >>>>>>>>>>>>>>>>> known meaning of hasNext(). If we follow the >>>> hasNext()/next() >>>>>>>>>>>> pattern, >>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>> additional isNextReady() method to indicate whether the >>>> next >>>>>>>> record >>>>>>>>>>>> is >>>>>>>>>>>>>>>>> ready seems more intuitive to me. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Similarly, in poll()/take() pattern, another method of >>>>> isDone() >>>>>>>> is >>>>>>>>>>>>>> needed >>>>>>>>>>>>>>>>> to indicate whether the stream has ended or not. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Compared with hasNext()/next()/isNextReady() pattern, >>>>>>>>>>>>>>>>> isDone()/poll()/take() seems more flexible for the reader >>>>>>>>>>>>>> implementation. >>>>>>>>>>>>>>>>> When I am implementing a reader, I could have a couple of >>>>>>>> choices: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - A thread-less reader that does not have any internal >>>>> thread. >>>>>>>>>>>>>>>>> - When poll() is called, the same calling thread will >>>>> perform a >>>>>>>>>> bunch >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>> IO asynchronously. >>>>>>>>>>>>>>>>> - When take() is called, the same calling thread will >>>>> perform a >>>>>>>>>>>>>>>> bunch >>>>>>>>>>>>>>>>> of IO and wait until the record is ready. >>>>>>>>>>>>>>>>> - A reader with internal threads performing network IO and >>>>> put >>>>>>>>>>>> records >>>>>>>>>>>>>>>>> into a buffer. >>>>>>>>>>>>>>>>> - When poll() is called, the calling thread simply reads >>>> from >>>>>>>> the >>>>>>>>>>>>>>>>> buffer and return empty result immediately if there is no >>>>>>>> record. >>>>>>>>>>>>>>>>> - When take() is called, the calling thread reads from the >>>>>>>> buffer >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> block waiting if the buffer is empty. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On the other hand, with the hasNext()/next()/isNextReady() >>>>> API, >>>>>>>> it >>>>>>>>>> is >>>>>>>>>>>>>>>> less >>>>>>>>>>>>>>>>> intuitive for the reader developers to write the >>>> thread-less >>>>>>>>>> pattern. >>>>>>>>>>>>>>>>> Although technically speaking one can still do the >>>>> asynchronous >>>>>>>> IO >>>>>>>>>> to >>>>>>>>>>>>>>>>> prepare the record in isNextReady(). But it is inexplicit >>>> and >>>>>>>> seems >>>>>>>>>>>>>>>>> somewhat hacky. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise < >>>> t...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Couple more points regarding discovery: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The proposal mentions that discovery could be outside the >>>>>>>>>> execution >>>>>>>>>>>>>>>> graph. >>>>>>>>>>>>>>>>>> Today, discovered partitions/shards are checkpointed. I >>>>>> believe >>>>>>>>>> that >>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>> also need to be the case in the future, even when >>>> discovery >>>>>> and >>>>>>>>>>>>>> reading >>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>> split between different tasks. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> For cases such as resharding of a Kinesis stream, the >>>>>>>> relationship >>>>>>>>>>>>>>>> between >>>>>>>>>>>>>>>>>> splits needs to be considered. Splits cannot be randomly >>>>>>>>>> distributed >>>>>>>>>>>>>>>> over >>>>>>>>>>>>>>>>>> readers in certain situations. An example was mentioned >>>>> here: >>>>>>>>>>>>>>>>>> >>>>>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise < >>>> t...@apache.org >>>>>> >>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for getting the ball rolling on this! >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Can the number of splits decrease? Yes, splits can be >>>>> closed >>>>>>>> and >>>>>>>>>> go >>>>>>>>>>>>>>>> away. >>>>>>>>>>>>>>>>>>> An example would be a shard merge in Kinesis (2 existing >>>>>> shards >>>>>>>>>>>> will >>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>> closed and replaced with a new shard). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regarding advance/poll/take: IMO the least restrictive >>>>>> approach >>>>>>>>>>>> would >>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>> the thread-less IO model (pull based, non-blocking, >>>> caller >>>>>>>>>>>> retrieves >>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>> records when available). The current Kinesis API requires >>>>> the >>>>>>>> use >>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>> threads. But that can be internal to the split reader and >>>>>> does >>>>>>>>>> not >>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> be a source API concern. In fact, that's what we are >>>>> working >>>>>> on >>>>>>>>>>>> right >>>>>>>>>>>>>>>> now >>>>>>>>>>>>>>>>>>> as improvement to the existing consumer: Each shard >>>>> consumer >>>>>>>>>> thread >>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>> push to a queue, the consumer main thread will poll the >>>>>>>> queue(s). >>>>>>>>>>>> It >>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>> essentially a mapping from threaded IO to non-blocking. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The proposed SplitReader interface would fit the >>>>> thread-less >>>>>> IO >>>>>>>>>>>>>> model. >>>>>>>>>>>>>>>>>>> Similar to an iterator, we find out if there is a new >>>>> element >>>>>>>>>>>>>> (hasNext) >>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> if so, move to it (next()). Separate calls deliver the >>>> meta >>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>> (timestamp, watermark). Perhaps advance call could offer >>>> a >>>>>>>>>> timeout >>>>>>>>>>>>>>>>>> option, >>>>>>>>>>>>>>>>>>> so that the caller does not end up in a busy wait. On the >>>>>> other >>>>>>>>>>>>>> hand, a >>>>>>>>>>>>>>>>>>> caller processing multiple splits may want to cycle >>>> through >>>>>>>> fast, >>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> process elements of other splits as soon as they become >>>>>>>>>> available. >>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>> nice >>>>>>>>>>>>>>>>>>> thing is that this "split merge" logic can now live in >>>>> Flink >>>>>>>> and >>>>>>>>>> be >>>>>>>>>>>>>>>>>>> optimized and shared between different sources. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma < >>>>>> guowei....@gmail.com >>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>> Thanks Aljoscha for this FLIP. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking >>>>>> source >>>>>>>> is >>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>>> important. But in addition to `Future/poll`, there may >>>> be >>>>>>>>>> another >>>>>>>>>>>>>> way >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> achieve this. I think it may be not very memory friendly >>>>> if >>>>>>>>>> every >>>>>>>>>>>>>>>>>> advance >>>>>>>>>>>>>>>>>>>> call return a Future. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> public interface Listener { >>>>>>>>>>>>>>>>>>>> public void notify(); >>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> public interface SplitReader() { >>>>>>>>>>>>>>>>>>>> /** >>>>>>>>>>>>>>>>>>>> * When there is no element temporarily, this will return >>>>>>>>>> false. >>>>>>>>>>>>>>>>>>>> * When elements is available again splitReader can call >>>>>>>>>>>>>>>>>>>> listener.notify() >>>>>>>>>>>>>>>>>>>> * In addition the frame would check `advance` >>>>> periodically . >>>>>>>>>>>>>>>>>>>> * Of course advance can always return true and ignore >>>> the >>>>>>>>>>>>>>>> listener >>>>>>>>>>>>>>>>>>>> argument for simplicity. >>>>>>>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>>>>>>> public boolean advance(Listener listener); >>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 2. The FLIP tells us very clearly that how to create >>>> all >>>>>>>> Splits >>>>>>>>>>>> and >>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>>>>> to create a SplitReader from a Split. But there is no >>>>>> strategy >>>>>>>>>> for >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>> to choose how to assign the splits to the tasks. I think >>>>> we >>>>>>>>>> could >>>>>>>>>>>>>> add >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>> Enum to let user to choose. >>>>>>>>>>>>>>>>>>>> /** >>>>>>>>>>>>>>>>>>>> public Enum SplitsAssignmentPolicy { >>>>>>>>>>>>>>>>>>>> Location, >>>>>>>>>>>>>>>>>>>> Workload, >>>>>>>>>>>>>>>>>>>> Random, >>>>>>>>>>>>>>>>>>>> Average >>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 3. If merge the `advance` and `getCurrent` to one >>>> method >>>>>> like >>>>>>>>>>>>>>>> `getNext` >>>>>>>>>>>>>>>>>>>> the `getNext` would need return a `ElementWithTimestamp` >>>>>>>> because >>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>> sources want to add timestamp to every element. IMO, >>>> this >>>>> is >>>>>>>> not >>>>>>>>>>>> so >>>>>>>>>>>>>>>>>> memory >>>>>>>>>>>>>>>>>>>> friendly so I prefer this design. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 >>>>>>>>>> 下午6:08写道: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite >>>> a >>>>>> lot >>>>>>>> of >>>>>>>>>>>>>> other >>>>>>>>>>>>>>>>>>>>> possible improvements. I have one proposal. Instead of >>>>>>>> having a >>>>>>>>>>>>>>>> method: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> boolean advance() throws IOException; >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I would replace it with >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> /* >>>>>>>>>>>>>>>>>>>>> * Return a future, which when completed means that >>>> source >>>>>> has >>>>>>>>>>>> more >>>>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>>>>> and getNext() will not block. >>>>>>>>>>>>>>>>>>>>> * If you wish to use benefits of non blocking >>>> connectors, >>>>>>>>>> please >>>>>>>>>>>>>>>>>>>>> implement this method appropriately. >>>>>>>>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>>>>>>>> default CompletableFuture<?> isBlocked() { >>>>>>>>>>>>>>>>>>>>> return CompletableFuture.completedFuture(null); >>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> And rename `getCurrent()` to `getNext()`. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Couple of arguments: >>>>>>>>>>>>>>>>>>>>> 1. I don’t understand the division of work between >>>>>>>> `advance()` >>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>> `getCurrent()`. What should be done in which, >>>> especially >>>>>> for >>>>>>>>>>>>>>>> connectors >>>>>>>>>>>>>>>>>>>>> that handle records in batches (like Kafka) and when >>>>> should >>>>>>>> you >>>>>>>>>>>>>> call >>>>>>>>>>>>>>>>>>>>> `advance` and when `getCurrent()`. >>>>>>>>>>>>>>>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will >>>>>> allow >>>>>>>>>> us >>>>>>>>>>>> in >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> future to have asynchronous/non blocking connectors and >>>>>> more >>>>>>>>>>>>>>>>>> efficiently >>>>>>>>>>>>>>>>>>>>> handle large number of blocked threads, without busy >>>>>> waiting. >>>>>>>>>>>> While >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> same time it doesn’t add much complexity, since naive >>>>>>>> connector >>>>>>>>>>>>>>>>>>>>> implementations can be always blocking. >>>>>>>>>>>>>>>>>>>>> 3. This also would allow us to use a fixed size thread >>>>> pool >>>>>>>> of >>>>>>>>>>>> task >>>>>>>>>>>>>>>>>>>>> executors, instead of one thread per task. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek < >>>>>>>>>> aljos...@apache.org >>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi All, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> In order to finally get the ball rolling on the new >>>>> source >>>>>>>>>>>>>> interface >>>>>>>>>>>>>>>>>>>>> that we have discussed for so long I finally created a >>>>>> FLIP: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing >>>>>>>>>> work/discussion >>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>>> adding per-partition watermark support to the Kinesis >>>>>> source >>>>>>>>>> and >>>>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>>>>> this would enable generic implementation of event-time >>>>>>>>>> alignment >>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>>> sources. Maybe we need another FLIP for the event-time >>>>>>>>>> alignment >>>>>>>>>>>>>>>> part, >>>>>>>>>>>>>>>>>>>>> especially the part about information sharing between >>>>>>>>>> operations >>>>>>>>>>>>>> (I'm >>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>> calling it state sharing because state has a special >>>>>> meaning >>>>>>>> in >>>>>>>>>>>>>>>> Flink). >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Please discuss away! >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Aljoscha >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >> >>