Regarding the naming style. The advantage of `poll()` style is that basically the name of `poll` means it should be a non-blocking operator, same with `Queue` in Java API. It's easy to understand. We don't need to write too much in docs to imply the implementation should not do something heavy. However `poll` also means it should return the thing we want. In our scenario, there are 3 types currently, record, timestamp and watermark. So the return type of `poll` should be tuple3 or something like that. It looks a little hacky IMO.
The `advance()` style is more like RecordReader <https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/RecordReader.html> of MapReduce, or ISpout <https://storm.apache.org/releases/1.1.2/javadocs/org/apache/storm/spout/ISpout.html> of Storm. It means moving the offset forward indeed. It makes sense to me. To be honest I like `advance()` style more. And there is also another small point I can't get. Why use `start()` and `close()` in `SplitReader`? `start()` makes me think of "starting a thread" or something like that. We should not assume there would be some thread. I prefer `open()`, it also matches the `close()` better. Becket Qin <becket....@gmail.com> 于2018年11月6日周二 上午11:04写道: > Thanks for updating the wiki, Aljoscha. > > The isDone()/advance()/getCurrent() API looks more similar to > hasNext()/isNextReady()/getNext(), but implying some different behaviors. > > If users call getCurrent() twice without calling advance() in between, will > they get the same record back? From the API itself, users might think > advance() is the API that moves the offset forward, and getCurrent() just > return the record at the current offset. > > Thanks, > > Jiangjie (Becket) Qin > > On Mon, Nov 5, 2018 at 10:41 PM Aljoscha Krettek <aljos...@apache.org> > wrote: > > > I updated the FLIP [1] with some Javadoc for the SplitReader to outline > > what I had in mind with the interface. Sorry for not doing that earlier, > > it's not quite clear how the methods should work from the name alone. > > > > The gist of it is that advance() should be non-blocking, so > > isDone/advance()/getCurrent() are very similar to isDone()/poll()/take() > > that I have seen mentioned. > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > < > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface > > > > > > > > On 5. Nov 2018, at 11:05, Biao Liu <mmyy1...@gmail.com> wrote: > > > > > > Thanks Aljoscha for bringing us this discussion! > > > > > > 1. I think one of the reason about separating `advance()` and > > > `getCurrent()` is that we have several different types returned by > > source. > > > Not just the `record`, but also the timestamp of record and the > > watermark. > > > If we don't separate these into different methods, the source has to > > return > > > a tuple3 which is not so user friendly. The prototype of Aljoscha is > > > acceptable to me. Regarding the specific method name, I'm not sure > which > > > one is better. Both of them are reasonable for me. > > > > > > 2. As Thomas and Becket mentioned before, I think a non-blocking API is > > > necessary. Moreover, IMO we should not offer a blocking API. It doesn't > > > help but makes things more complicated. > > > > > > 3. About the thread model. > > > I agree with Thomas about the thread-less IO model. A standard workflow > > > should look like below. > > > - If there is available data, Flink would read it. > > > - If there is no data available temporary, Flink would check again a > > > moment later. Maybe waiting on a semaphore until a timer wake it up. > > > Furthermore, we can offer an optional optimization for source which has > > > external thread. Like Guowei mentioned, there can be a listener which > the > > > reader can wake the framework up as soon as new data comes. This can > > solve > > > Piotr's concern about efficiency. > > > > > > 4. One more thing. After taking a look at the prototype codes. Off the > > top > > > of my head, the implementation is more fit for batch job not streaming > > job. > > > There are two types of tasks in prototype. First is a source task that > > > discovers the splits. The source passes the splits to the second task > > which > > > process the splits one by one. And then the source keeps watch to > > discover > > > more splits. > > > > > > However, I think the more common scenario of streaming job is: > > > there are fixed splits, each of the subtasks takes several splits. The > > > subtasks just keep processing the fixed splits. There would be > continuous > > > datum in each split. We don't need a source task to discover more > splits. > > > It can not be finished in streaming job since we don't want the > > processing > > > task finished even there are no more splits. > > > > > > So IMO we should offer another source operator for the new interface. > It > > > would discover all splits when it is opening. Then picks the splits > > belong > > > to this subtask. Keep processing these splits until all of them are > > > finished. > > > > > > > > > Becket Qin <becket....@gmail.com> 于2018年11月5日周一 上午11:00写道: > > > > > >> Hi Thomas, > > >> > > >> The iterator-like API was also the first thing that came to me. But it > > >> seems a little confusing that hasNext() does not mean "the stream has > > not > > >> ended", but means "the next record is ready", which is repurposing the > > well > > >> known meaning of hasNext(). If we follow the hasNext()/next() pattern, > > an > > >> additional isNextReady() method to indicate whether the next record is > > >> ready seems more intuitive to me. > > >> > > >> Similarly, in poll()/take() pattern, another method of isDone() is > > needed > > >> to indicate whether the stream has ended or not. > > >> > > >> Compared with hasNext()/next()/isNextReady() pattern, > > >> isDone()/poll()/take() seems more flexible for the reader > > implementation. > > >> When I am implementing a reader, I could have a couple of choices: > > >> > > >> - A thread-less reader that does not have any internal thread. > > >> - When poll() is called, the same calling thread will perform a > bunch > > of > > >> IO asynchronously. > > >> - When take() is called, the same calling thread will perform a > > bunch > > >> of IO and wait until the record is ready. > > >> - A reader with internal threads performing network IO and put > records > > >> into a buffer. > > >> - When poll() is called, the calling thread simply reads from the > > >> buffer and return empty result immediately if there is no record. > > >> - When take() is called, the calling thread reads from the buffer > > and > > >> block waiting if the buffer is empty. > > >> > > >> On the other hand, with the hasNext()/next()/isNextReady() API, it is > > less > > >> intuitive for the reader developers to write the thread-less pattern. > > >> Although technically speaking one can still do the asynchronous IO to > > >> prepare the record in isNextReady(). But it is inexplicit and seems > > >> somewhat hacky. > > >> > > >> Thanks, > > >> > > >> Jiangjie (Becket) Qin > > >> > > >> > > >> > > >> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <t...@apache.org> wrote: > > >> > > >>> Couple more points regarding discovery: > > >>> > > >>> The proposal mentions that discovery could be outside the execution > > >> graph. > > >>> Today, discovered partitions/shards are checkpointed. I believe that > > will > > >>> also need to be the case in the future, even when discovery and > reading > > >> are > > >>> split between different tasks. > > >>> > > >>> For cases such as resharding of a Kinesis stream, the relationship > > >> between > > >>> splits needs to be considered. Splits cannot be randomly distributed > > over > > >>> readers in certain situations. An example was mentioned here: > > >>> https://github.com/apache/flink/pull/6980#issuecomment-435202809 > > >>> > > >>> Thomas > > >>> > > >>> > > >>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <t...@apache.org> wrote: > > >>> > > >>>> Thanks for getting the ball rolling on this! > > >>>> > > >>>> Can the number of splits decrease? Yes, splits can be closed and go > > >> away. > > >>>> An example would be a shard merge in Kinesis (2 existing shards will > > be > > >>>> closed and replaced with a new shard). > > >>>> > > >>>> Regarding advance/poll/take: IMO the least restrictive approach > would > > >> be > > >>>> the thread-less IO model (pull based, non-blocking, caller retrieves > > >> new > > >>>> records when available). The current Kinesis API requires the use of > > >>>> threads. But that can be internal to the split reader and does not > > need > > >>> to > > >>>> be a source API concern. In fact, that's what we are working on > right > > >> now > > >>>> as improvement to the existing consumer: Each shard consumer thread > > >> will > > >>>> push to a queue, the consumer main thread will poll the queue(s). It > > is > > >>>> essentially a mapping from threaded IO to non-blocking. > > >>>> > > >>>> The proposed SplitReader interface would fit the thread-less IO > model. > > >>>> Similar to an iterator, we find out if there is a new element > > (hasNext) > > >>> and > > >>>> if so, move to it (next()). Separate calls deliver the meta > > information > > >>>> (timestamp, watermark). Perhaps advance call could offer a timeout > > >>> option, > > >>>> so that the caller does not end up in a busy wait. On the other > hand, > > a > > >>>> caller processing multiple splits may want to cycle through fast, to > > >>>> process elements of other splits as soon as they become available. > The > > >>> nice > > >>>> thing is that this "split merge" logic can now live in Flink and be > > >>>> optimized and shared between different sources. > > >>>> > > >>>> Thanks, > > >>>> Thomas > > >>>> > > >>>> > > >>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <guowei....@gmail.com> > > wrote: > > >>>> > > >>>>> Hi, > > >>>>> Thanks Aljoscha for this FLIP. > > >>>>> > > >>>>> 1. I agree with Piotr and Becket that the non-blocking source is > very > > >>>>> important. But in addition to `Future/poll`, there may be another > way > > >> to > > >>>>> achieve this. I think it may be not very memory friendly if every > > >>> advance > > >>>>> call return a Future. > > >>>>> > > >>>>> public interface Listener { > > >>>>> public void notify(); > > >>>>> } > > >>>>> > > >>>>> public interface SplitReader() { > > >>>>> /** > > >>>>> * When there is no element temporarily, this will return > false. > > >>>>> * When elements is available again splitReader can call > > >>>>> listener.notify() > > >>>>> * In addition the frame would check `advance` periodically . > > >>>>> * Of course advance can always return true and ignore the > > >> listener > > >>>>> argument for simplicity. > > >>>>> */ > > >>>>> public boolean advance(Listener listener); > > >>>>> } > > >>>>> > > >>>>> 2. The FLIP tells us very clearly that how to create all Splits > and > > >> how > > >>>>> to create a SplitReader from a Split. But there is no strategy for > > the > > >>> user > > >>>>> to choose how to assign the splits to the tasks. I think we could > add > > >> a > > >>>>> Enum to let user to choose. > > >>>>> /** > > >>>>> public Enum SplitsAssignmentPolicy { > > >>>>> Location, > > >>>>> Workload, > > >>>>> Random, > > >>>>> Average > > >>>>> } > > >>>>> */ > > >>>>> > > >>>>> 3. If merge the `advance` and `getCurrent` to one method like > > >> `getNext` > > >>>>> the `getNext` would need return a `ElementWithTimestamp` because > some > > >>>>> sources want to add timestamp to every element. IMO, this is not so > > >>> memory > > >>>>> friendly so I prefer this design. > > >>>>> > > >>>>> > > >>>>> > > >>>>> Thanks > > >>>>> > > >>>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 下午6:08写道: > > >>>>> > > >>>>>> Hi, > > >>>>>> > > >>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of > > other > > >>>>>> possible improvements. I have one proposal. Instead of having a > > >> method: > > >>>>>> > > >>>>>> boolean advance() throws IOException; > > >>>>>> > > >>>>>> I would replace it with > > >>>>>> > > >>>>>> /* > > >>>>>> * Return a future, which when completed means that source has more > > >>> data > > >>>>>> and getNext() will not block. > > >>>>>> * If you wish to use benefits of non blocking connectors, please > > >>>>>> implement this method appropriately. > > >>>>>> */ > > >>>>>> default CompletableFuture<?> isBlocked() { > > >>>>>> return CompletableFuture.completedFuture(null); > > >>>>>> } > > >>>>>> > > >>>>>> And rename `getCurrent()` to `getNext()`. > > >>>>>> > > >>>>>> Couple of arguments: > > >>>>>> 1. I don’t understand the division of work between `advance()` and > > >>>>>> `getCurrent()`. What should be done in which, especially for > > >> connectors > > >>>>>> that handle records in batches (like Kafka) and when should you > call > > >>>>>> `advance` and when `getCurrent()`. > > >>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us > in > > >> the > > >>>>>> future to have asynchronous/non blocking connectors and more > > >>> efficiently > > >>>>>> handle large number of blocked threads, without busy waiting. > While > > >> at > > >>> the > > >>>>>> same time it doesn’t add much complexity, since naive connector > > >>>>>> implementations can be always blocking. > > >>>>>> 3. This also would allow us to use a fixed size thread pool of > task > > >>>>>> executors, instead of one thread per task. > > >>>>>> > > >>>>>> Piotrek > > >>>>>> > > >>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <aljos...@apache.org> > > >>>>>> wrote: > > >>>>>>> > > >>>>>>> Hi All, > > >>>>>>> > > >>>>>>> In order to finally get the ball rolling on the new source > > >> interface > > >>>>>> that we have discussed for so long I finally created a FLIP: > > >>>>>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > >>>>>>> > > >>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion > > >> about > > >>>>>> adding per-partition watermark support to the Kinesis source and > > >>> because > > >>>>>> this would enable generic implementation of event-time alignment > for > > >>> all > > >>>>>> sources. Maybe we need another FLIP for the event-time alignment > > >> part, > > >>>>>> especially the part about information sharing between operations > > (I'm > > >>> not > > >>>>>> calling it state sharing because state has a special meaning in > > >> Flink). > > >>>>>>> > > >>>>>>> Please discuss away! > > >>>>>>> > > >>>>>>> Aljoscha > > >>>>>>> > > >>>>>>> > > >>>>>> > > >>>>>> > > >>> > > >> > > > > >