Hi Becket, quick clarification from my side because I think you misunderstood my question. I did not suggest to let the SourceReader return only a single record at a time when calling getNextRecords. As the return type indicates, the method can return an arbitrary number of records.
Cheers, Till On Wed, Dec 11, 2019 at 10:13 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Becket, > > Issue #1 - Design of Source interface > > I mentioned the lack of a method like Source#createEnumerator(Boundedness > boundedness, SplitEnumeratorContext context), because without the current > proposal is not complete/does not work. > > If we say that boundedness is an intrinsic property of a source imo we > don't need the Source#createEnumerator(Boundedness boundedness, > SplitEnumeratorContext context) method. > > Assuming a source from my previous example: > > Source source = KafkaSource.builder() > ... > .untilTimestamp(...) > .build() > > Would the enumerator differ if created like > source.createEnumerator(CONTINUOUS_UNBOUNDED, ...) vs source > .createEnumerator(BOUNDED, ...)? I know I am repeating myself, but this is > the part that my opinion differ the most from the current proposal. I > really think it should always be the source that tells if it is bounded or > not. In the current proposal methods continousSource/boundedSource somewhat > reconfigure the source, which I think is misleading. > > I think a call like: > > Source source = KafkaSource.builder() > ... > .readContinously() / readUntilLatestOffset() / readUntilTimestamp / > readUntilOffsets / ... > .build() > > is way cleaner (and expressive) than > > Source source = KafkaSource.builder() > ... > .build() > > > env.continousSource(source) // which actually underneath would call > createEnumerator(CONTINUOUS, ctx) which would be equivalent to > source.readContinously().createEnumerator(ctx) > // or > env.boundedSource(source) // which actually underneath would call > createEnumerator(BOUNDED, ctx) which would be equivalent to > source.readUntilLatestOffset().createEnumerator(ctx) > > > Sorry for the comparison, but to me it seems there is too much magic > happening underneath those two calls. > > I really believe the Source interface should have getBoundedness method > instead of (supportBoundedness) + createEnumerator(Boundedness, ...) > > > Issue #2 - Design of > ExecutionEnvironment#source()/continuousSource()/boundedSource() > > As you might have guessed I am slightly in favor of option #2 modified. > Yes I am aware every step of the dag would have to be able to say if it is > bounded or not. I have a feeling it would be easier to express cross > bounded/unbounded operations, but I must admit I have not thought it > through thoroughly, In the spirit of batch is just a special case of > streaming I thought BoundedStream would extend from DataStream. Correct me > if I am wrong. In such a setup the cross bounded/unbounded operation could > be expressed quite easily I think: > > DataStream { > DataStream join(DataStream, ...); // we could not really tell if the result > is bounded or not, but because bounded stream is a special case of unbounded > the API object is correct, irrespective if the left or right side of the join > is bounded > } > > BoundedStream extends DataStream { > BoundedStream join(BoundedStream, ...); // only if both sides are bounded > the result can be bounded as well. However we do have access to the > DataStream#join here, so you can still join with a DataStream > } > > > On the other hand I also see benefits of two completely disjointed APIs, > as we could prohibit some streaming calls in the bounded API. I can't think > of any unbounded operators that could not be implemented for bounded stream. > > Besides I think we both agree we don't like the method: > > DataStream boundedStream(Source) > > suggested in the current state of the FLIP. Do we ? :) > > Best, > > Dawid > > On 10/12/2019 18:57, Becket Qin wrote: > > Hi folks, > > Thanks for the discussion, great feedback. Also thanks Dawid for the > explanation, it is much clearer now. > > One thing that is indeed missing from the FLIP is how the boundedness is > passed to the Source implementation. So the API should be > Source#createEnumerator(Boundedness boundedness, SplitEnumeratorContext > context) > And we can probably remove the Source#supportBoundedness(Boundedness > boundedness) method. > > Assuming we have that, we are essentially choosing from one of the > following two options: > > Option 1: > // The source is continuous source, and only unbounded operations can be > performed. > DataStream<Type> datastream = env.continuousSource(someSource); > > // The source is bounded source, both bounded and unbounded operations can > be performed. > BoundedDataStream<Type> boundedDataStream = env.boundedSource(someSource); > > - Pros: > a) explicit boundary between bounded / unbounded streams, it is > quite simple and clear to the users. > - Cons: > a) For applications that do not involve bounded operations, they > still have to call different API to distinguish bounded / unbounded streams. > b) No support for bounded stream to run in a streaming runtime > setting, i.e. scheduling and operators behaviors. > > > Option 2: > // The source is either bounded or unbounded, but only unbounded operations > could be performed on the returned DataStream. > DataStream<Type> dataStream = env.source(someSource); > > // The source must be a bounded source, otherwise exception is thrown. > BoundedDataStream<Type> boundedDataStream = > env.boundedSource(boundedSource); > > The pros and cons are exactly the opposite of option 1. > - Pros: > a) For applications that do not involve bounded operations, they > still have to call different API to distinguish bounded / unbounded streams. > b) Support for bounded stream to run in a streaming runtime setting, > i.e. scheduling and operators behaviors. > - Cons: > a) Bounded / unbounded streams are kind of mixed, i.e. given a > DataStream, it is not clear whether it is bounded or not, unless you have > the access to its source. > > > If we only think from the Source API perspective, option 2 seems a better > choice because functionality wise it is a superset of option 1, at the cost > of some seemingly acceptable ambiguity in the DataStream API. > But if we look at the DataStream API as a whole, option 1 seems a clearer > choice. For example, some times a library may have to know whether a > certain task will finish or not. And it would be difficult to tell if the > input is a DataStream, unless additional information is provided all the > way from the Source. One possible solution is to have a *modified option 2* > which adds a method to the DataStream API to indicate boundedness, such as > getBoundedness(). It would solve the problem with a potential confusion of > what is difference between a DataStream with getBoundedness()=true and a > BoundedDataStream. But that seems not super difficult to explain. > > So from API's perspective, I don't have a strong opinion between *option 1* > and *modified option 2. *I like the cleanness of option 1, but modified > option 2 would be more attractive if we have concrete use case for the > "Bounded stream with unbounded streaming runtime settings". > > Re: Till > > > Maybe this has already been asked before but I was wondering why the > SourceReader interface has the method pollNext which hands the > responsibility of outputting elements to the SourceReader implementation? > Has this been done for backwards compatibility reasons with the old source > interface? If not, then one could define a Collection<E> getNextRecords() > method which returns the currently retrieved records and then the caller > emits them outside of the SourceReader. That way the interface would not > allow to implement an outputting loop where we never hand back control to > the caller. At the moment, this contract can be easily broken and is only > mentioned loosely in the JavaDocs. > > > The primary reason we handover the SourceOutput to the SourceReader is > because sometimes it is difficult for a SourceReader to emit one record at > a time. One example is some batched messaging systems which only have an > offset for the entire batch instead of individual messages in the batch. In > that case, returning one record at a time would leave the SourceReader in > an uncheckpointable state because they can only checkpoint at the batch > boundaries. > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Dec 10, 2019 at 5:33 PM Till Rohrmann <trohrm...@apache.org> > <trohrm...@apache.org> wrote: > > > Hi everyone, > > thanks for drafting this FLIP. It reads very well. > > Concerning Dawid's proposal, I tend to agree. The boundedness could come > from the source and tell the system how to treat the operator (scheduling > wise). From a user's perspective it should be fine to get back a DataStream > when calling env.source(boundedSource) if he does not need special > operations defined on a BoundedDataStream. If he needs this, then one could > use the method BoundedDataStream env.boundedSource(boundedSource). > > If possible, we could enforce the proper usage of env.boundedSource() by > introducing a BoundedSource type so that one cannot pass an > unbounded source to it. That way users would not be able to shoot > themselves in the foot. > > Maybe this has already been asked before but I was wondering why the > SourceReader interface has the method pollNext which hands the > responsibility of outputting elements to the SourceReader implementation? > Has this been done for backwards compatibility reasons with the old source > interface? If not, then one could define a Collection<E> getNextRecords() > method which returns the currently retrieved records and then the caller > emits them outside of the SourceReader. That way the interface would not > allow to implement an outputting loop where we never hand back control to > the caller. At the moment, this contract can be easily broken and is only > mentioned loosely in the JavaDocs. > > Cheers, > Till > > On Tue, Dec 10, 2019 at 7:49 AM Jingsong Li <jingsongl...@gmail.com> > <jingsongl...@gmail.com> > wrote: > > > Hi all, > > I think current design is good. > > My understanding is: > > For execution mode: bounded mode and continuous mode, It's totally > different. I don't think we have the ability to integrate the two models > > at > > present. It's about scheduling, memory, algorithms, States, etc. we > shouldn't confuse them. > > For source capabilities: only bounded, only continuous, both bounded and > continuous. > I think Kafka is a source that can be ran both bounded > and continuous execution mode. > And Kafka with end offset should be ran both bounded > and continuous execution mode. Using apache Beam with Flink runner, I > > used > > to run a "bounded" Kafka in streaming mode. For our previous DataStream, > > it > > is not necessarily required that the source cannot be bounded. > > So it is my thought for Dawid's question: > 1.pass a bounded source to continuousSource() +1 > 2.pass a continuous source to boundedSource() -1, should throw exception. > > In StreamExecutionEnvironment, continuousSource and boundedSource define > the execution mode. It defines a clear boundary of execution mode. > > Best, > Jingsong Lee > > On Tue, Dec 10, 2019 at 10:37 AM Jark Wu <imj...@gmail.com> > <imj...@gmail.com> wrote: > > > I agree with Dawid's point that the boundedness information should come > from the source itself (e.g. the end timestamp), not through > env.boundedSouce()/continuousSource(). > I think if we want to support something like `env.source()` that derive > > the > > execution mode from source, `supportsBoundedness(Boundedness)` > method is not enough, because we don't know whether it is bounded or > > not. > > Best, > Jark > > > On Mon, 9 Dec 2019 at 22:21, Dawid Wysakowicz <dwysakow...@apache.org> > <dwysakow...@apache.org> > wrote: > > > One more thing. In the current proposal, with the > supportsBoundedness(Boundedness) method and the boundedness coming > > from > > either continuousSource or boundedSource I could not find how this > information is fed back to the SplitEnumerator. > > Best, > > Dawid > > On 09/12/2019 13:52, Becket Qin wrote: > > Hi Dawid, > > Thanks for the comments. This actually brings another relevant > > question > > about what does a "bounded source" imply. I actually had the same > impression when I look at the Source API. Here is what I understand > > after > > some discussion with Stephan. The bounded source has the following > > impacts. > > 1. API validity. > - A bounded source generates a bounded stream so some operations > > that > > only > > works for bounded records would be performed, e.g. sort. > - To expose these bounded stream only APIs, there are two options: > a. Add them to the DataStream API and throw exception if a > > method > > is > > called on an unbounded stream. > b. Create a BoundedDataStream class which is returned from > env.boundedSource(), while DataStream is returned from > > env.continousSource(). > > Note that this cannot be done by having single > > env.source(theSource) > > even > > the Source has a getBoundedness() method. > > 2. Scheduling > - A bounded source could be computed stage by stage without > > bringing > > up > > all > > the tasks at the same time. > > 3. Operator behaviors > - A bounded source indicates the records are finite so some > > operators > > can > > wait until it receives all the records before it starts the > > processing. > > In the above impact, only 1 is relevant to the API design. And the > > current > > proposal in FLIP-27 is following 1.b. > > // boundedness depends of source property, imo this should always > > be > > preferred > > > DataStream<MyType> stream = env.source(theSource); > > > In your proposal, does DataStream have bounded stream only methods? > > It > > looks it should have, otherwise passing a bounded Source to > > env.source() > > would be confusing. In that case, we will essentially do 1.a if an > unbounded Source is created from env.source(unboundedSource). > > If we have the methods only supported for bounded streams in > > DataStream, > > it > > seems a little weird to have a separate BoundedDataStream > > interface. > > Am I understand it correctly? > > Thanks, > > Jiangjie (Becket) Qin > > > > On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz < > > dwysakow...@apache.org> > > wrote: > > > Hi all, > > Really well written proposal and very important one. I must admit > > I > > have > > not understood all the intricacies of it yet. > > One question I have though is about where does the information > > about > > boundedness come from. I think in most cases it is a property of > > the > > source. As you described it might be e.g. end offset, a flag > > should > > it > > monitor new splits etc. I think it would be a really nice use case > > to > > be > > able to say: > > new KafkaSource().readUntil(long timestamp), > > which could work as an "end offset". Moreover I think all Bounded > > sources > > support continuous mode, but no intrinsically continuous source > > support > > the > > Bounded mode. If I understood the proposal correctly it suggest > > the > > boundedness sort of "comes" from the outside of the source, from > > the > > invokation of either boundedStream or continousSource. > > I am wondering if it would make sense to actually change the > > method > > boolean Source#supportsBoundedness(Boundedness) > > to > > Boundedness Source#getBoundedness(). > > As for the methods #boundedSource, #continousSource, assuming the > boundedness is property of the source they do not affect how the > > enumerator > > works, but mostly how the dag is scheduled, right? I am not > > against > > those > > methods, but I think it is a very specific use case to actually > > override > > the property of the source. In general I would expect users to > > only > > call > > env.source(theSource), where the source tells if it is bounded or > > not. I > > would suggest considering following set of methods: > > // boundedness depends of source property, imo this should always > > be > > preferred > > DataStream<MyType> stream = env.source(theSource); > > > // always continous execution, whether bounded or unbounded source > > DataStream<MyType> boundedStream = env.continousSource(theSource); > > // imo this would make sense if the BoundedDataStream provides > > additional features unavailable for continous mode > > BoundedDataStream<MyType> batch = env.boundedSource(theSource); > > > Best, > > Dawid > > > On 04/12/2019 11:25, Stephan Ewen wrote: > > Thanks, Becket, for updating this. > > I agree with moving the aspects you mentioned into separate FLIPs > > - > > this > > one way becoming unwieldy in size. > > +1 to the FLIP in its current state. Its a very detailed write-up, > > nicely > > done! > > On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com> > <becket....@gmail.com> > > < > > becket....@gmail.com> wrote: > > Hi all, > > Sorry for the long belated update. I have updated FLIP-27 wiki > > page > > with > > the latest proposals. Some noticeable changes include: > 1. A new generic communication mechanism between SplitEnumerator > > and > > SourceReader. > 2. Some detail API method signature changes. > > We left a few things out of this FLIP and will address them in > > separate > > FLIPs. Including: > 1. Per split event time. > 2. Event time alignment. > 3. Fine grained failover for SplitEnumerator failure. > > Please let us know if you have any question. > > Thanks, > > Jiangjie (Becket) Qin > > On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org> > <se...@apache.org> < > > se...@apache.org> wrote: > > Hi Łukasz! > > Becket and me are working hard on figuring out the last details > > and > > implementing the first PoC. We would update the FLIP hopefully > > next > > week. > > There is a fair chance that a first version of this will be in > > 1.10, > > but > > I > > think it will take another release to battle test it and migrate > > the > > connectors. > > Best, > Stephan > > > > > On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl > > < > > l...@touk.pl> > > wrote: > > Hi, > > This proposal looks very promising for us. Do you have any plans > > in > > which > > Flink release it is going to be released? We are thinking on > > using a > > Data > > Set API for our future use cases but on the other hand Data Set > > API > > is > > going to be deprecated so using proposed bounded data streams > > solution > > could be more viable in the long term. > > Thanks, > Łukasz > > On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com> > <thomas.we...@gmail.com> < > > thomas.we...@gmail.com> wrote: > > Thanks for putting together this proposal! > > I see that the "Per Split Event Time" and "Event Time Alignment" > > sections > > are still TBD. > > It would probably be good to flesh those out a bit before > > proceeding > > too > > far > > as the event time alignment will probably influence the > > interaction > > with > > the split reader, specifically ReaderStatus > > emitNext(SourceOutput<E> > > output). > > We currently have only one implementation for event time alignment > > in > > the > > Kinesis consumer. The synchronization in that case takes place as > > the > > last > > step before records are emitted downstream (RecordEmitter). With > > the > > currently proposed interfaces, the equivalent can be implemented > > in > > the > > reader loop, although note that in the Kinesis consumer the per > > shard > > threads push records. > > Synchronization has not been implemented for the Kafka consumer > > yet. > > https://issues.apache.org/jira/browse/FLINK-12675 > > When I looked at it, I realized that the implementation will look > > quite > > different > from Kinesis because it needs to take place in the pull part, > > where > > records > > are taken from the Kafka client. Due to the multiplexing it cannot > > be > > done > > by blocking the split thread like it currently works for Kinesis. > > Reading > > from individual Kafka partitions needs to be controlled via > > pause/resume > > on the Kafka client. > > To take on that responsibility the split thread would need to be > > aware > > of > > the > watermarks or at least whether it should or should not continue to > > consume > > a given split and this may require a different SourceReader or > > SourceOutput > > interface. > > Thanks, > Thomas > > > On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com> > <mmyy1...@gmail.com> < > > mmyy1...@gmail.com> wrote: > > Hi Stephan, > > Thank you for feedback! > Will take a look at your branch before public discussing. > > > On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org> > <se...@apache.org> > > < > > se...@apache.org> > > wrote: > > Hi Biao! > > Thanks for reviving this. I would like to join this discussion, > > but > > am > > quite occupied with the 1.9 release, so can we maybe pause this > > discussion > > for a week or so? > > In the meantime I can share some suggestion based on prior > > experiments: > > How to do watermarks / timestamp extractors in a simpler and more > > flexible > > way. I think that part is quite promising should be part of the > > new > > source > > interface. > > > > > > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime > > https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java > > Some experiments on how to build the source reader and its > > library > > for > > common threading/split patterns: > > > > > > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src > > Best, > Stephan > > > On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com> > <mmyy1...@gmail.com> < > > mmyy1...@gmail.com> > > wrote: > > Hi devs, > > Since 1.9 is nearly released, I think we could get back to > > FLIP-27. > > I > > believe it should be included in 1.10. > > There are so many things mentioned in document of FLIP-27. [1] I > > think > > we'd better discuss them separately. However the wiki is not a > > good > > place > > to discuss. I wrote google doc about SplitReader API which > > misses > > some > > details in the document. [2] > > 1. > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface > > 2. > > > > > https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing > > CC Stephan, Aljoscha, Piotrek, Becket > > > On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com> > <mmyy1...@gmail.com> < > > mmyy1...@gmail.com> > > wrote: > > Hi Steven, > Thank you for the feedback. Please take a look at the document > > FLIP-27 > > < > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > which > > is updated recently. A lot of details of enumerator were added > > in > > this > > document. I think it would help. > > Steven Wu <stevenz...@gmail.com> <stevenz...@gmail.com> > <stevenz...@gmail.com> <stevenz...@gmail.com> > > 于2019年3月28日周四 > > 下午12:52写道: > > This proposal mentioned that SplitEnumerator might run on the > JobManager or > in a single task on a TaskManager. > > if enumerator is a single task on a taskmanager, then the job > > DAG > > can > > never > been embarrassingly parallel anymore. That will nullify the > > leverage > > of > > fine-grained recovery for embarrassingly parallel jobs. > > It's not clear to me what's the implication of running > > enumerator > > on > > the > > jobmanager. So I will leave that out for now. > > On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> > <mmyy1...@gmail.com> < > > mmyy1...@gmail.com> > > wrote: > > Hi Stephan & Piotrek, > > Thank you for feedback. > > It seems that there are a lot of things to do in community. > > I > > am > > just > > afraid that this discussion may be forgotten since there so > > many > > proposals > > recently. > Anyway, wish to see the split topics soon :) > > Piotr Nowojski <pi...@da-platform.com> <pi...@da-platform.com> > <pi...@da-platform.com> <pi...@da-platform.com> > > 于2019年1月24日周四 > > 下午8:21写道: > > Hi Biao! > > This discussion was stalled because of preparations for > > the > > open > > sourcing > > & merging Blink. I think before creating the tickets we > > should > > split this > > discussion into topics/areas outlined by Stephan and > > create > > Flips > > for > > that. > > I think there is no chance for this to be completed in > > couple > > of > > remaining > > weeks/1 month before 1.8 feature freeze, however it would > > be > > good > > to aim > > with those changes for 1.9. > > Piotrek > > > On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> <mmyy1...@gmail.com> < > > mmyy1...@gmail.com> > > wrote: > > Hi community, > The summary of Stephan makes a lot sense to me. It is > > much > > clearer > > indeed > > after splitting the complex topic into small ones. > I was wondering is there any detail plan for next step? > > If > > not, > > I > > would > > like to push this thing forward by creating some JIRA > > issues. > > Another question is that should version 1.8 include > > these > > features? > > Stephan Ewen <se...@apache.org> <se...@apache.org> <se...@apache.org> > <se...@apache.org> 于2018年12月1日周六 > > 上午4:20写道: > > Thanks everyone for the lively discussion. Let me try > > to > > summarize > > where I > > see convergence in the discussion and open issues. > I'll try to group this by design aspect of the source. > > Please > > let me > > know > > if I got things wrong or missed something crucial here. > > For issues 1-3, if the below reflects the state of the > > discussion, I > > would > > try and update the FLIP in the next days. > For the remaining ones we need more discussion. > > I would suggest to fork each of these aspects into a > > separate > > mail > > thread, > > or will loose sight of the individual aspects. > > *(1) Separation of Split Enumerator and Split Reader* > > - All seem to agree this is a good thing > - Split Enumerator could in the end live on JobManager > > (and > > assign > > splits > > via RPC) or in a task (and assign splits via data > > streams) > > - this discussion is orthogonal and should come later, > > when > > the > > interface > > is agreed upon. > > *(2) Split Readers for one or more splits* > > - Discussion seems to agree that we need to support > > one > > reader > > that > > possibly handles multiple splits concurrently. > - The requirement comes from sources where one > > poll()-style > > call > > fetches > > data from different splits / partitions > --> example sources that require that would be for > > example > > Kafka, > > Pravega, Pulsar > > - Could have one split reader per source, or multiple > > split > > readers > > that > > share the "poll()" function > - To not make it too complicated, we can start with > > thinking > > about > > one > > split reader for all splits initially and see if that > > covers > > all > > requirements > > *(3) Threading model of the Split Reader* > > - Most active part of the discussion ;-) > > - A non-blocking way for Flink's task code to interact > > with > > the > > source > > is > > needed in order to a task runtime code based on a > single-threaded/actor-style task design > --> I personally am a big proponent of that, it will > > help > > with > > well-behaved checkpoints, efficiency, and simpler yet > > more > > robust > > runtime > > code > > - Users care about simple abstraction, so as a > > subclass > > of > > SplitReader > > (non-blocking / async) we need to have a > > BlockingSplitReader > > which > > will > > form the basis of most source implementations. > > BlockingSplitReader > > lets > > users do blocking simple poll() calls. > - The BlockingSplitReader would spawn a thread (or > > more) > > and > > the > > thread(s) can make blocking calls and hand over data > > buffers > > via > > a > > blocking > > queue > - This should allow us to cover both, a fully async > > runtime, > > and a > > simple > > blocking interface for users. > - This is actually very similar to how the Kafka > > connectors > > work. > > Kafka > > 9+ with one thread, Kafka 8 with multiple threads > > - On the base SplitReader (the async one), the > > non-blocking > > method > > that > > gets the next chunk of data would signal data > > availability > > via > > a > > CompletableFuture, because that gives the best > > flexibility > > (can > > await > > completion or register notification handlers). > - The source task would register a "thenHandle()" (or > > similar) > > on the > > future to put a "take next data" task into the > > actor-style > > mailbox > > *(4) Split Enumeration and Assignment* > > - Splits may be generated lazily, both in cases where > > there > > is a > > limited > > number of splits (but very many), or splits are > > discovered > > over > > time > > - Assignment should also be lazy, to get better load > > balancing > > - Assignment needs support locality preferences > > - Possible design based on discussion so far: > > --> SplitReader has a method "addSplits(SplitT...)" > > to > > add > > one or > > more > > splits. Some split readers might assume they have only > > one > > split > > ever, > > concurrently, others assume multiple splits. (Note: > > idea > > behind > > being > > able > > to add multiple splits at the same time is to ease > > startup > > where > > multiple > > splits may be assigned instantly.) > --> SplitReader has a context object on which it can > > call > > indicate > > when > > splits are completed. The enumerator gets that > > notification and > > can > > use > > to > > decide when to assign new splits. This should help both > > in > > cases > > of > > sources > > that take splits lazily (file readers) and in case the > > source > > needs to > > preserve a partial order between splits (Kinesis, > > Pravega, > > Pulsar may > > need > > that). > --> SplitEnumerator gets notification when > > SplitReaders > > start > > and > > when > > they finish splits. They can decide at that moment to > > push > > more > > splits > > to > > that reader > --> The SplitEnumerator should probably be aware of > > the > > source > > parallelism, to build its initial distribution. > > - Open question: Should the source expose something > > like > > "host > > preferences", so that yarn/mesos/k8s can take this into > > account > > when > > selecting a node to start a TM on? > > *(5) Watermarks and event time alignment* > > - Watermark generation, as well as idleness, needs to > > be > > per > > split > > (like > > currently in the Kafka Source, per partition) > - It is desirable to support optional > > event-time-alignment, > > meaning > > that > > splits that are ahead are back-pressured or temporarily > > unsubscribed > > - I think i would be desirable to encapsulate > > watermark > > generation > > logic > > in watermark generators, for a separation of concerns. > > The > > watermark > > generators should run per split. > - Using watermark generators would also help with > > another > > problem of > > the > > suggested interface, namely supporting non-periodic > > watermarks > > efficiently. > > - Need a way to "dispatch" next record to different > > watermark > > generators > > - Need a way to tell SplitReader to "suspend" a split > > until a > > certain > > watermark is reached (event time backpressure) > - This would in fact be not needed (and thus simpler) > > if > > we > > had > > a > > SplitReader per split and may be a reason to re-open > > that > > discussion > > *(6) Watermarks across splits and in the Split > > Enumerator* > > - The split enumerator may need some watermark > > awareness, > > which > > should > > be > > purely based on split metadata (like create timestamp > > of > > file > > splits) > > - If there are still more splits with overlapping > > event > > time > > range > > for > > a > > split reader, then that split reader should not advance > > the > > watermark > > within the split beyond the overlap boundary. Otherwise > > future > > splits > > will > > produce late data. > > - One way to approach this could be that the split > > enumerator > > may > > send > > watermarks to the readers, and the readers cannot emit > > watermarks > > beyond > > that received watermark. > - Many split enumerators would simply immediately send > > Long.MAX > > out > > and > > leave the progress purely to the split readers. > > - For event-time alignment / split back pressure, this > > begs > > the > > question > > how we can avoid deadlocks that may arise when splits > > are > > suspended > > for > > event time back pressure, > > *(7) Batch and streaming Unification* > > - Functionality wise, the above design should support > > both > > - Batch often (mostly) does not care about reading "in > > order" > > and > > generating watermarks > --> Might use different enumerator logic that is > > more > > locality > > aware > > and ignores event time order > --> Does not generate watermarks > - Would be great if bounded sources could be > > identified > > at > > compile > > time, > > so that "env.addBoundedSource(...)" is type safe and > > can > > return a > > "BoundedDataStream". > - Possible to defer this discussion until later > > *Miscellaneous Comments* > > - Should the source have a TypeInformation for the > > produced > > type, > > instead > > of a serializer? We need a type information in the > > stream > > anyways, and > > can > > derive the serializer from that. Plus, creating the > > serializer > > should > > respect the ExecutionConfig. > > - The TypeSerializer interface is very powerful but > > also > > not > > easy to > > implement. Its purpose is to handle data super > > efficiently, > > support > > flexible ways of evolution, etc. > For metadata I would suggest to look at the > > SimpleVersionedSerializer > > instead, which is used for example for checkpoint > > master > > hooks, > > or for > > the > > streaming file sink. I think that is is a good match > > for > > cases > > where > > we > > do > > not need more than ser/deser (no copy, etc.) and don't > > need to > > push > > versioning out of the serialization paths for best > > performance > > (as in > > the > > TypeSerializer) > > > On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas < > > k.klou...@data-artisans.com> > > wrote: > > > Hi Biao, > > Thanks for the answer! > > So given the multi-threaded readers, now we have as > > open > > questions: > > 1) How do we let the checkpoints pass through our > > multi-threaded > > reader > > operator? > > 2) Do we have separate reader and source operators or > > not? In > > the > > strategy > > that has a separate source, the source operator has a > > parallelism of > > 1 > > and > > is responsible for split recovery only. > > For the first one, given also the constraints > > (blocking, > > finite > > queues, > > etc), I do not have an answer yet. > > For the 2nd, I think that we should go with separate > > operators > > for > > the > > source and the readers, for the following reasons: > > 1) This is more aligned with a potential future > > improvement > > where the > > split > > discovery becomes a responsibility of the JobManager > > and > > readers are > > pooling more work from the JM. > > 2) The source is going to be the "single point of > > truth". > > It > > will > > know > > what > > has been processed and what not. If the source and the > > readers > > are a > > single > > operator with parallelism > 1, or in general, if the > > split > > discovery > > is > > done by each task individually, then: > i) we have to have a deterministic scheme for each > > reader to > > assign > > splits to itself (e.g. mod subtaskId). This is not > > necessarily > > trivial > > for > > all sources. > ii) each reader would have to keep a copy of all its > > processed > > slpits > > iii) the state has to be a union state with a > > non-trivial > > merging > > logic > > in order to support rescaling. > > Two additional points that you raised above: > > i) The point that you raised that we need to keep all > > splits > > (processed > > and > > not-processed) I think is a bit of a strong > > requirement. > > This > > would > > imply > > that for infinite sources the state will grow > > indefinitely. > > This is > > problem > > is even more pronounced if we do not have a single > > source > > that > > assigns > > splits to readers, as each reader will have its own > > copy > > of > > the > > state. > > ii) it is true that for finite sources we need to > > somehow > > not > > close > > the > > readers when the source/split discoverer finishes. The > ContinuousFileReaderOperator has a work-around for > > that. > > It is > > not > > elegant, > > and checkpoints are not emitted after closing the > > source, > > but > > this, I > > believe, is a bigger problem which requires more > > changes > > than > > just > > refactoring the source interface. > > Cheers, > Kostas > > > > > -- > Best, Jingsong Lee > > >