Hi, a)
> BTW, regarding the isBlock() method, I have a few more questions. 21, Is a > method isReady() with boolean as a return value > equivalent? Personally I found it is a little bit confusing in what is > supposed to be returned when the future is completed. 22. if > the implementation of isBlocked() is optional, how do the callers know > whether the method is properly implemented or not? > Does not implemented mean it always return a completed future? `CompletableFuture<?> isBlocked()` is more or less an equivalent to `boolean hasNext()` which in case of “false” provides some kind of a listener/callback that notifies about presence of next element. There are some minor details, like `CompletableFuture<?>` has a minimal two state logic: 1. Future is completed - we have more data 2. Future not yet completed - we don’t have data now, but we might/we will have in the future While `boolean hasNext()` and `notify()` callback are a bit more complicated/dispersed and can lead/encourage `notify()` spam. b) > 3. If merge the `advance` and `getCurrent` to one method like `getNext` the > `getNext` would need return a >`ElementWithTimestamp` because some sources want to add timestamp to every >element. IMO, this is not so memory friendly > so I prefer this design. Guowei I don’t quite understand this. Could you elaborate why having a separate `advance()` help? c) Regarding advance/poll/take. What’s the value of having two separate methods: poll and take? Which one of them should be called and which implemented? What’s the benefit of having those methods compared to having a one single method `getNextElement()` (or `pollElement() or whatever we name it) with following contract: CompletableFuture<?> isBlocked(); /** Return next element - will be called only if `isBlocked()` is completed. Try to implement it in non blocking fashion, but if that’s impossible or you just don’t need the effort, you can block in this method. */ T getNextElement(); I mean, if the connector is implemented non-blockingly, Flink should use it that way. If it’s not, then `poll()` will `throw new NotImplementedException()`. Implementing both of them and providing both of them to Flink wouldn’t make a sense, thus why not merge them into a single method call that should preferably (but not necessarily need to) be non-blocking? It’s not like we are implementing general purpose `Queue`, which users might want to call either of `poll` or `take`. We would always prefer to call `poll`, but if it’s blocking, then still we have no choice, but to call it and block on it. d) > 1. I agree with Piotr and Becket that the non-blocking source is very > important. But in addition to `Future/poll`, there may be another way to > achieve this. I think it may be not very memory friendly if every advance > call return a Future. I didn’t want to mention this, to not clog my initial proposal, but there is a simple solution for the problem: public interface SplitReader { (…) CompletableFuture<?> NOT_BLOCKED = CompletableFuture.completedFuture(null); /** * Returns a future that will be completed when the page source becomes * unblocked. If the page source is not blocked, this method should return * {@code NOT_BLOCKED}. */ default CompletableFuture<?> isBlocked() { return NOT_BLOCKED; } If we are blocked and we are waiting for the IO, then creating a new Future is non-issue. Under full throttle/throughput and not blocked sources returning a static `NOT_BLOCKED` constant should also solve the problem. One more remark, non-blocking sources might be a necessity in a single threaded model without a checkpointing lock. (Currently when sources are blocked, they can release checkpointing lock and re-acquire it again later). Non-blocking `poll`/`getNext()` would allow for checkpoints to happen when source is idling. In that case either `notify()` or my proposed `isBlocked()` would allow to avoid busy-looping. Piotrek > On 5 Nov 2018, at 03:59, Becket Qin <becket....@gmail.com> wrote: > > Hi Thomas, > > The iterator-like API was also the first thing that came to me. But it > seems a little confusing that hasNext() does not mean "the stream has not > ended", but means "the next record is ready", which is repurposing the well > known meaning of hasNext(). If we follow the hasNext()/next() pattern, an > additional isNextReady() method to indicate whether the next record is > ready seems more intuitive to me. > > Similarly, in poll()/take() pattern, another method of isDone() is needed > to indicate whether the stream has ended or not. > > Compared with hasNext()/next()/isNextReady() pattern, > isDone()/poll()/take() seems more flexible for the reader implementation. > When I am implementing a reader, I could have a couple of choices: > > - A thread-less reader that does not have any internal thread. > - When poll() is called, the same calling thread will perform a bunch of > IO asynchronously. > - When take() is called, the same calling thread will perform a bunch > of IO and wait until the record is ready. > - A reader with internal threads performing network IO and put records > into a buffer. > - When poll() is called, the calling thread simply reads from the > buffer and return empty result immediately if there is no record. > - When take() is called, the calling thread reads from the buffer and > block waiting if the buffer is empty. > > On the other hand, with the hasNext()/next()/isNextReady() API, it is less > intuitive for the reader developers to write the thread-less pattern. > Although technically speaking one can still do the asynchronous IO to > prepare the record in isNextReady(). But it is inexplicit and seems > somewhat hacky. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <t...@apache.org> wrote: > >> Couple more points regarding discovery: >> >> The proposal mentions that discovery could be outside the execution graph. >> Today, discovered partitions/shards are checkpointed. I believe that will >> also need to be the case in the future, even when discovery and reading are >> split between different tasks. >> >> For cases such as resharding of a Kinesis stream, the relationship between >> splits needs to be considered. Splits cannot be randomly distributed over >> readers in certain situations. An example was mentioned here: >> https://github.com/apache/flink/pull/6980#issuecomment-435202809 >> >> Thomas >> >> >> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <t...@apache.org> wrote: >> >>> Thanks for getting the ball rolling on this! >>> >>> Can the number of splits decrease? Yes, splits can be closed and go away. >>> An example would be a shard merge in Kinesis (2 existing shards will be >>> closed and replaced with a new shard). >>> >>> Regarding advance/poll/take: IMO the least restrictive approach would be >>> the thread-less IO model (pull based, non-blocking, caller retrieves new >>> records when available). The current Kinesis API requires the use of >>> threads. But that can be internal to the split reader and does not need >> to >>> be a source API concern. In fact, that's what we are working on right now >>> as improvement to the existing consumer: Each shard consumer thread will >>> push to a queue, the consumer main thread will poll the queue(s). It is >>> essentially a mapping from threaded IO to non-blocking. >>> >>> The proposed SplitReader interface would fit the thread-less IO model. >>> Similar to an iterator, we find out if there is a new element (hasNext) >> and >>> if so, move to it (next()). Separate calls deliver the meta information >>> (timestamp, watermark). Perhaps advance call could offer a timeout >> option, >>> so that the caller does not end up in a busy wait. On the other hand, a >>> caller processing multiple splits may want to cycle through fast, to >>> process elements of other splits as soon as they become available. The >> nice >>> thing is that this "split merge" logic can now live in Flink and be >>> optimized and shared between different sources. >>> >>> Thanks, >>> Thomas >>> >>> >>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei....@gmail.com> wrote: >>> >>>> Hi, >>>> Thanks Aljoscha for this FLIP. >>>> >>>> 1. I agree with Piotr and Becket that the non-blocking source is very >>>> important. But in addition to `Future/poll`, there may be another way to >>>> achieve this. I think it may be not very memory friendly if every >> advance >>>> call return a Future. >>>> >>>> public interface Listener { >>>> public void notify(); >>>> } >>>> >>>> public interface SplitReader() { >>>> /** >>>> * When there is no element temporarily, this will return false. >>>> * When elements is available again splitReader can call >>>> listener.notify() >>>> * In addition the frame would check `advance` periodically . >>>> * Of course advance can always return true and ignore the listener >>>> argument for simplicity. >>>> */ >>>> public boolean advance(Listener listener); >>>> } >>>> >>>> 2. The FLIP tells us very clearly that how to create all Splits and how >>>> to create a SplitReader from a Split. But there is no strategy for the >> user >>>> to choose how to assign the splits to the tasks. I think we could add a >>>> Enum to let user to choose. >>>> /** >>>> public Enum SplitsAssignmentPolicy { >>>> Location, >>>> Workload, >>>> Random, >>>> Average >>>> } >>>> */ >>>> >>>> 3. If merge the `advance` and `getCurrent` to one method like `getNext` >>>> the `getNext` would need return a `ElementWithTimestamp` because some >>>> sources want to add timestamp to every element. IMO, this is not so >> memory >>>> friendly so I prefer this design. >>>> >>>> >>>> >>>> Thanks >>>> >>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 下午6:08写道: >>>> >>>>> Hi, >>>>> >>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other >>>>> possible improvements. I have one proposal. Instead of having a method: >>>>> >>>>> boolean advance() throws IOException; >>>>> >>>>> I would replace it with >>>>> >>>>> /* >>>>> * Return a future, which when completed means that source has more >> data >>>>> and getNext() will not block. >>>>> * If you wish to use benefits of non blocking connectors, please >>>>> implement this method appropriately. >>>>> */ >>>>> default CompletableFuture<?> isBlocked() { >>>>> return CompletableFuture.completedFuture(null); >>>>> } >>>>> >>>>> And rename `getCurrent()` to `getNext()`. >>>>> >>>>> Couple of arguments: >>>>> 1. I don’t understand the division of work between `advance()` and >>>>> `getCurrent()`. What should be done in which, especially for connectors >>>>> that handle records in batches (like Kafka) and when should you call >>>>> `advance` and when `getCurrent()`. >>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the >>>>> future to have asynchronous/non blocking connectors and more >> efficiently >>>>> handle large number of blocked threads, without busy waiting. While at >> the >>>>> same time it doesn’t add much complexity, since naive connector >>>>> implementations can be always blocking. >>>>> 3. This also would allow us to use a fixed size thread pool of task >>>>> executors, instead of one thread per task. >>>>> >>>>> Piotrek >>>>> >>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljos...@apache.org> >>>>> wrote: >>>>>> >>>>>> Hi All, >>>>>> >>>>>> In order to finally get the ball rolling on the new source interface >>>>> that we have discussed for so long I finally created a FLIP: >>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface >>>>>> >>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion about >>>>> adding per-partition watermark support to the Kinesis source and >> because >>>>> this would enable generic implementation of event-time alignment for >> all >>>>> sources. Maybe we need another FLIP for the event-time alignment part, >>>>> especially the part about information sharing between operations (I'm >> not >>>>> calling it state sharing because state has a special meaning in Flink). >>>>>> >>>>>> Please discuss away! >>>>>> >>>>>> Aljoscha >>>>>> >>>>>> >>>>> >>>>> >>