Hi Becket, I think Dawid explained things clearly and makes a lot of sense. I'm also in favor of #2, because #1 doesn't work for our future unified envrionment.
You can see the vision in this documentation [1]. In the future, we would like to drop the global streaming/batch mode in SQL (i.e. EnvironmentSettings#inStreamingMode/inBatchMode). A source is bounded or unbounded once defined, so queries can be inferred from source to run in streaming or batch or hybrid mode. However, in #1, we will lose this ability because the framework doesn't know whether the source is bounded or unbounded. Best, Jark [1]: https://docs.google.com/document/d/1yrKXEIRATfxHJJ0K3t6wUgXAtZq8D-XgvEnvl2uUcr0/edit#heading=h.v4ib17buma1p On Wed, 11 Dec 2019 at 20:52, Piotr Nowojski <pi...@ververica.com> wrote: > Hi, > > Regarding the: > > Collection<E> getNextRecords() > > I’m pretty sure such design would unfortunately impact the performance > (accessing and potentially creating the collection on the hot path). > > Also the > > InputStatus emitNext(DataOutput<T> output) throws Exception; > or > Status pollNext(SourceOutput<T> sourceOutput) throws Exception; > > Gives us some opportunities in the future, to allow Source hot looping > inside, until it receives some signal “please exit because of some reasons” > (output collector could return such hint upon collecting the result). But > that’s another topic outside of this FLIP’s scope. > > Piotrek > > > On 11 Dec 2019, at 10:41, Till Rohrmann <trohrm...@apache.org> wrote: > > > > Hi Becket, > > > > quick clarification from my side because I think you misunderstood my > > question. I did not suggest to let the SourceReader return only a single > > record at a time when calling getNextRecords. As the return type > indicates, > > the method can return an arbitrary number of records. > > > > Cheers, > > Till > > > > On Wed, Dec 11, 2019 at 10:13 AM Dawid Wysakowicz < > dwysakow...@apache.org <mailto:dwysakow...@apache.org>> > > wrote: > > > >> Hi Becket, > >> > >> Issue #1 - Design of Source interface > >> > >> I mentioned the lack of a method like > Source#createEnumerator(Boundedness > >> boundedness, SplitEnumeratorContext context), because without the > current > >> proposal is not complete/does not work. > >> > >> If we say that boundedness is an intrinsic property of a source imo we > >> don't need the Source#createEnumerator(Boundedness boundedness, > >> SplitEnumeratorContext context) method. > >> > >> Assuming a source from my previous example: > >> > >> Source source = KafkaSource.builder() > >> ... > >> .untilTimestamp(...) > >> .build() > >> > >> Would the enumerator differ if created like > >> source.createEnumerator(CONTINUOUS_UNBOUNDED, ...) vs source > >> .createEnumerator(BOUNDED, ...)? I know I am repeating myself, but this > is > >> the part that my opinion differ the most from the current proposal. I > >> really think it should always be the source that tells if it is bounded > or > >> not. In the current proposal methods continousSource/boundedSource > somewhat > >> reconfigure the source, which I think is misleading. > >> > >> I think a call like: > >> > >> Source source = KafkaSource.builder() > >> ... > >> .readContinously() / readUntilLatestOffset() / readUntilTimestamp / > readUntilOffsets / ... > >> .build() > >> > >> is way cleaner (and expressive) than > >> > >> Source source = KafkaSource.builder() > >> ... > >> .build() > >> > >> > >> env.continousSource(source) // which actually underneath would call > createEnumerator(CONTINUOUS, ctx) which would be equivalent to > source.readContinously().createEnumerator(ctx) > >> // or > >> env.boundedSource(source) // which actually underneath would call > createEnumerator(BOUNDED, ctx) which would be equivalent to > source.readUntilLatestOffset().createEnumerator(ctx) > >> > >> > >> Sorry for the comparison, but to me it seems there is too much magic > >> happening underneath those two calls. > >> > >> I really believe the Source interface should have getBoundedness method > >> instead of (supportBoundedness) + createEnumerator(Boundedness, ...) > >> > >> > >> Issue #2 - Design of > >> ExecutionEnvironment#source()/continuousSource()/boundedSource() > >> > >> As you might have guessed I am slightly in favor of option #2 modified. > >> Yes I am aware every step of the dag would have to be able to say if it > is > >> bounded or not. I have a feeling it would be easier to express cross > >> bounded/unbounded operations, but I must admit I have not thought it > >> through thoroughly, In the spirit of batch is just a special case of > >> streaming I thought BoundedStream would extend from DataStream. Correct > me > >> if I am wrong. In such a setup the cross bounded/unbounded operation > could > >> be expressed quite easily I think: > >> > >> DataStream { > >> DataStream join(DataStream, ...); // we could not really tell if the > result is bounded or not, but because bounded stream is a special case of > unbounded the API object is correct, irrespective if the left or right side > of the join is bounded > >> } > >> > >> BoundedStream extends DataStream { > >> BoundedStream join(BoundedStream, ...); // only if both sides are > bounded the result can be bounded as well. However we do have access to the > DataStream#join here, so you can still join with a DataStream > >> } > >> > >> > >> On the other hand I also see benefits of two completely disjointed APIs, > >> as we could prohibit some streaming calls in the bounded API. I can't > think > >> of any unbounded operators that could not be implemented for bounded > stream. > >> > >> Besides I think we both agree we don't like the method: > >> > >> DataStream boundedStream(Source) > >> > >> suggested in the current state of the FLIP. Do we ? :) > >> > >> Best, > >> > >> Dawid > >> > >> On 10/12/2019 18:57, Becket Qin wrote: > >> > >> Hi folks, > >> > >> Thanks for the discussion, great feedback. Also thanks Dawid for the > >> explanation, it is much clearer now. > >> > >> One thing that is indeed missing from the FLIP is how the boundedness is > >> passed to the Source implementation. So the API should be > >> Source#createEnumerator(Boundedness boundedness, SplitEnumeratorContext > >> context) > >> And we can probably remove the Source#supportBoundedness(Boundedness > >> boundedness) method. > >> > >> Assuming we have that, we are essentially choosing from one of the > >> following two options: > >> > >> Option 1: > >> // The source is continuous source, and only unbounded operations can be > >> performed. > >> DataStream<Type> datastream = env.continuousSource(someSource); > >> > >> // The source is bounded source, both bounded and unbounded operations > can > >> be performed. > >> BoundedDataStream<Type> boundedDataStream = > env.boundedSource(someSource); > >> > >> - Pros: > >> a) explicit boundary between bounded / unbounded streams, it is > >> quite simple and clear to the users. > >> - Cons: > >> a) For applications that do not involve bounded operations, they > >> still have to call different API to distinguish bounded / unbounded > streams. > >> b) No support for bounded stream to run in a streaming runtime > >> setting, i.e. scheduling and operators behaviors. > >> > >> > >> Option 2: > >> // The source is either bounded or unbounded, but only unbounded > operations > >> could be performed on the returned DataStream. > >> DataStream<Type> dataStream = env.source(someSource); > >> > >> // The source must be a bounded source, otherwise exception is thrown. > >> BoundedDataStream<Type> boundedDataStream = > >> env.boundedSource(boundedSource); > >> > >> The pros and cons are exactly the opposite of option 1. > >> - Pros: > >> a) For applications that do not involve bounded operations, they > >> still have to call different API to distinguish bounded / unbounded > streams. > >> b) Support for bounded stream to run in a streaming runtime > setting, > >> i.e. scheduling and operators behaviors. > >> - Cons: > >> a) Bounded / unbounded streams are kind of mixed, i.e. given a > >> DataStream, it is not clear whether it is bounded or not, unless you > have > >> the access to its source. > >> > >> > >> If we only think from the Source API perspective, option 2 seems a > better > >> choice because functionality wise it is a superset of option 1, at the > cost > >> of some seemingly acceptable ambiguity in the DataStream API. > >> But if we look at the DataStream API as a whole, option 1 seems a > clearer > >> choice. For example, some times a library may have to know whether a > >> certain task will finish or not. And it would be difficult to tell if > the > >> input is a DataStream, unless additional information is provided all the > >> way from the Source. One possible solution is to have a *modified > option 2* > >> which adds a method to the DataStream API to indicate boundedness, such > as > >> getBoundedness(). It would solve the problem with a potential confusion > of > >> what is difference between a DataStream with getBoundedness()=true and a > >> BoundedDataStream. But that seems not super difficult to explain. > >> > >> So from API's perspective, I don't have a strong opinion between > *option 1* > >> and *modified option 2. *I like the cleanness of option 1, but modified > >> option 2 would be more attractive if we have concrete use case for the > >> "Bounded stream with unbounded streaming runtime settings". > >> > >> Re: Till > >> > >> > >> Maybe this has already been asked before but I was wondering why the > >> SourceReader interface has the method pollNext which hands the > >> responsibility of outputting elements to the SourceReader > implementation? > >> Has this been done for backwards compatibility reasons with the old > source > >> interface? If not, then one could define a Collection<E> > getNextRecords() > >> method which returns the currently retrieved records and then the caller > >> emits them outside of the SourceReader. That way the interface would not > >> allow to implement an outputting loop where we never hand back control > to > >> the caller. At the moment, this contract can be easily broken and is > only > >> mentioned loosely in the JavaDocs. > >> > >> > >> The primary reason we handover the SourceOutput to the SourceReader is > >> because sometimes it is difficult for a SourceReader to emit one record > at > >> a time. One example is some batched messaging systems which only have an > >> offset for the entire batch instead of individual messages in the > batch. In > >> that case, returning one record at a time would leave the SourceReader > in > >> an uncheckpointable state because they can only checkpoint at the batch > >> boundaries. > >> > >> Thanks, > >> > >> Jiangjie (Becket) Qin > >> > >> On Tue, Dec 10, 2019 at 5:33 PM Till Rohrmann <trohrm...@apache.org > <mailto:trohrm...@apache.org>> <trohrm...@apache.org <mailto: > trohrm...@apache.org>> wrote: > >> > >> > >> Hi everyone, > >> > >> thanks for drafting this FLIP. It reads very well. > >> > >> Concerning Dawid's proposal, I tend to agree. The boundedness could come > >> from the source and tell the system how to treat the operator > (scheduling > >> wise). From a user's perspective it should be fine to get back a > DataStream > >> when calling env.source(boundedSource) if he does not need special > >> operations defined on a BoundedDataStream. If he needs this, then one > could > >> use the method BoundedDataStream env.boundedSource(boundedSource). > >> > >> If possible, we could enforce the proper usage of env.boundedSource() by > >> introducing a BoundedSource type so that one cannot pass an > >> unbounded source to it. That way users would not be able to shoot > >> themselves in the foot. > >> > >> Maybe this has already been asked before but I was wondering why the > >> SourceReader interface has the method pollNext which hands the > >> responsibility of outputting elements to the SourceReader > implementation? > >> Has this been done for backwards compatibility reasons with the old > source > >> interface? If not, then one could define a Collection<E> > getNextRecords() > >> method which returns the currently retrieved records and then the caller > >> emits them outside of the SourceReader. That way the interface would not > >> allow to implement an outputting loop where we never hand back control > to > >> the caller. At the moment, this contract can be easily broken and is > only > >> mentioned loosely in the JavaDocs. > >> > >> Cheers, > >> Till > >> > >> On Tue, Dec 10, 2019 at 7:49 AM Jingsong Li <jingsongl...@gmail.com > <mailto:jingsongl...@gmail.com>> <jingsongl...@gmail.com <mailto: > jingsongl...@gmail.com>> > >> wrote: > >> > >> > >> Hi all, > >> > >> I think current design is good. > >> > >> My understanding is: > >> > >> For execution mode: bounded mode and continuous mode, It's totally > >> different. I don't think we have the ability to integrate the two models > >> > >> at > >> > >> present. It's about scheduling, memory, algorithms, States, etc. we > >> shouldn't confuse them. > >> > >> For source capabilities: only bounded, only continuous, both bounded and > >> continuous. > >> I think Kafka is a source that can be ran both bounded > >> and continuous execution mode. > >> And Kafka with end offset should be ran both bounded > >> and continuous execution mode. Using apache Beam with Flink runner, I > >> > >> used > >> > >> to run a "bounded" Kafka in streaming mode. For our previous DataStream, > >> > >> it > >> > >> is not necessarily required that the source cannot be bounded. > >> > >> So it is my thought for Dawid's question: > >> 1.pass a bounded source to continuousSource() +1 > >> 2.pass a continuous source to boundedSource() -1, should throw > exception. > >> > >> In StreamExecutionEnvironment, continuousSource and boundedSource define > >> the execution mode. It defines a clear boundary of execution mode. > >> > >> Best, > >> Jingsong Lee > >> > >> On Tue, Dec 10, 2019 at 10:37 AM Jark Wu <imj...@gmail.com <mailto: > imj...@gmail.com>> <imj...@gmail.com <mailto:imj...@gmail.com>> wrote: > >> > >> > >> I agree with Dawid's point that the boundedness information should come > >> from the source itself (e.g. the end timestamp), not through > >> env.boundedSouce()/continuousSource(). > >> I think if we want to support something like `env.source()` that derive > >> > >> the > >> > >> execution mode from source, `supportsBoundedness(Boundedness)` > >> method is not enough, because we don't know whether it is bounded or > >> > >> not. > >> > >> Best, > >> Jark > >> > >> > >> On Mon, 9 Dec 2019 at 22:21, Dawid Wysakowicz <dwysakow...@apache.org > <mailto:dwysakow...@apache.org>> <dwysakow...@apache.org <mailto: > dwysakow...@apache.org>> > >> wrote: > >> > >> > >> One more thing. In the current proposal, with the > >> supportsBoundedness(Boundedness) method and the boundedness coming > >> > >> from > >> > >> either continuousSource or boundedSource I could not find how this > >> information is fed back to the SplitEnumerator. > >> > >> Best, > >> > >> Dawid > >> > >> On 09/12/2019 13:52, Becket Qin wrote: > >> > >> Hi Dawid, > >> > >> Thanks for the comments. This actually brings another relevant > >> > >> question > >> > >> about what does a "bounded source" imply. I actually had the same > >> impression when I look at the Source API. Here is what I understand > >> > >> after > >> > >> some discussion with Stephan. The bounded source has the following > >> > >> impacts. > >> > >> 1. API validity. > >> - A bounded source generates a bounded stream so some operations > >> > >> that > >> > >> only > >> > >> works for bounded records would be performed, e.g. sort. > >> - To expose these bounded stream only APIs, there are two options: > >> a. Add them to the DataStream API and throw exception if a > >> > >> method > >> > >> is > >> > >> called on an unbounded stream. > >> b. Create a BoundedDataStream class which is returned from > >> env.boundedSource(), while DataStream is returned from > >> > >> env.continousSource(). > >> > >> Note that this cannot be done by having single > >> > >> env.source(theSource) > >> > >> even > >> > >> the Source has a getBoundedness() method. > >> > >> 2. Scheduling > >> - A bounded source could be computed stage by stage without > >> > >> bringing > >> > >> up > >> > >> all > >> > >> the tasks at the same time. > >> > >> 3. Operator behaviors > >> - A bounded source indicates the records are finite so some > >> > >> operators > >> > >> can > >> > >> wait until it receives all the records before it starts the > >> > >> processing. > >> > >> In the above impact, only 1 is relevant to the API design. And the > >> > >> current > >> > >> proposal in FLIP-27 is following 1.b. > >> > >> // boundedness depends of source property, imo this should always > >> > >> be > >> > >> preferred > >> > >> > >> DataStream<MyType> stream = env.source(theSource); > >> > >> > >> In your proposal, does DataStream have bounded stream only methods? > >> > >> It > >> > >> looks it should have, otherwise passing a bounded Source to > >> > >> env.source() > >> > >> would be confusing. In that case, we will essentially do 1.a if an > >> unbounded Source is created from env.source(unboundedSource). > >> > >> If we have the methods only supported for bounded streams in > >> > >> DataStream, > >> > >> it > >> > >> seems a little weird to have a separate BoundedDataStream > >> > >> interface. > >> > >> Am I understand it correctly? > >> > >> Thanks, > >> > >> Jiangjie (Becket) Qin > >> > >> > >> > >> On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz < > >> > >> dwysakow...@apache.org <mailto:dwysakow...@apache.org>> > >> > >> wrote: > >> > >> > >> Hi all, > >> > >> Really well written proposal and very important one. I must admit > >> > >> I > >> > >> have > >> > >> not understood all the intricacies of it yet. > >> > >> One question I have though is about where does the information > >> > >> about > >> > >> boundedness come from. I think in most cases it is a property of > >> > >> the > >> > >> source. As you described it might be e.g. end offset, a flag > >> > >> should > >> > >> it > >> > >> monitor new splits etc. I think it would be a really nice use case > >> > >> to > >> > >> be > >> > >> able to say: > >> > >> new KafkaSource().readUntil(long timestamp), > >> > >> which could work as an "end offset". Moreover I think all Bounded > >> > >> sources > >> > >> support continuous mode, but no intrinsically continuous source > >> > >> support > >> > >> the > >> > >> Bounded mode. If I understood the proposal correctly it suggest > >> > >> the > >> > >> boundedness sort of "comes" from the outside of the source, from > >> > >> the > >> > >> invokation of either boundedStream or continousSource. > >> > >> I am wondering if it would make sense to actually change the > >> > >> method > >> > >> boolean Source#supportsBoundedness(Boundedness) > >> > >> to > >> > >> Boundedness Source#getBoundedness(). > >> > >> As for the methods #boundedSource, #continousSource, assuming the > >> boundedness is property of the source they do not affect how the > >> > >> enumerator > >> > >> works, but mostly how the dag is scheduled, right? I am not > >> > >> against > >> > >> those > >> > >> methods, but I think it is a very specific use case to actually > >> > >> override > >> > >> the property of the source. In general I would expect users to > >> > >> only > >> > >> call > >> > >> env.source(theSource), where the source tells if it is bounded or > >> > >> not. I > >> > >> would suggest considering following set of methods: > >> > >> // boundedness depends of source property, imo this should always > >> > >> be > >> > >> preferred > >> > >> DataStream<MyType> stream = env.source(theSource); > >> > >> > >> // always continous execution, whether bounded or unbounded source > >> > >> DataStream<MyType> boundedStream = env.continousSource(theSource); > >> > >> // imo this would make sense if the BoundedDataStream provides > >> > >> additional features unavailable for continous mode > >> > >> BoundedDataStream<MyType> batch = env.boundedSource(theSource); > >> > >> > >> Best, > >> > >> Dawid > >> > >> > >> On 04/12/2019 11:25, Stephan Ewen wrote: > >> > >> Thanks, Becket, for updating this. > >> > >> I agree with moving the aspects you mentioned into separate FLIPs > >> > >> - > >> > >> this > >> > >> one way becoming unwieldy in size. > >> > >> +1 to the FLIP in its current state. Its a very detailed write-up, > >> > >> nicely > >> > >> done! > >> > >> On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com > <mailto:becket....@gmail.com>> <becket....@gmail.com <mailto: > becket....@gmail.com>> > >> > >> < > >> > >> becket....@gmail.com <mailto:becket....@gmail.com>> wrote: > >> > >> Hi all, > >> > >> Sorry for the long belated update. I have updated FLIP-27 wiki > >> > >> page > >> > >> with > >> > >> the latest proposals. Some noticeable changes include: > >> 1. A new generic communication mechanism between SplitEnumerator > >> > >> and > >> > >> SourceReader. > >> 2. Some detail API method signature changes. > >> > >> We left a few things out of this FLIP and will address them in > >> > >> separate > >> > >> FLIPs. Including: > >> 1. Per split event time. > >> 2. Event time alignment. > >> 3. Fine grained failover for SplitEnumerator failure. > >> > >> Please let us know if you have any question. > >> > >> Thanks, > >> > >> Jiangjie (Becket) Qin > >> > >> On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org <mailto: > se...@apache.org>> <se...@apache.org <mailto:se...@apache.org>> < > >> > >> se...@apache.org <mailto:se...@apache.org>> wrote: > >> > >> Hi Łukasz! > >> > >> Becket and me are working hard on figuring out the last details > >> > >> and > >> > >> implementing the first PoC. We would update the FLIP hopefully > >> > >> next > >> > >> week. > >> > >> There is a fair chance that a first version of this will be in > >> > >> 1.10, > >> > >> but > >> > >> I > >> > >> think it will take another release to battle test it and migrate > >> > >> the > >> > >> connectors. > >> > >> Best, > >> Stephan > >> > >> > >> > >> > >> On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl > <mailto:l...@touk.pl> > >> > >> < > >> > >> l...@touk.pl <mailto:l...@touk.pl>> > >> > >> wrote: > >> > >> Hi, > >> > >> This proposal looks very promising for us. Do you have any plans > >> > >> in > >> > >> which > >> > >> Flink release it is going to be released? We are thinking on > >> > >> using a > >> > >> Data > >> > >> Set API for our future use cases but on the other hand Data Set > >> > >> API > >> > >> is > >> > >> going to be deprecated so using proposed bounded data streams > >> > >> solution > >> > >> could be more viable in the long term. > >> > >> Thanks, > >> Łukasz > >> > >> On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com <mailto: > thomas.we...@gmail.com>> <thomas.we...@gmail.com <mailto: > thomas.we...@gmail.com>> < > >> > >> thomas.we...@gmail.com <mailto:thomas.we...@gmail.com>> wrote: > >> > >> Thanks for putting together this proposal! > >> > >> I see that the "Per Split Event Time" and "Event Time Alignment" > >> > >> sections > >> > >> are still TBD. > >> > >> It would probably be good to flesh those out a bit before > >> > >> proceeding > >> > >> too > >> > >> far > >> > >> as the event time alignment will probably influence the > >> > >> interaction > >> > >> with > >> > >> the split reader, specifically ReaderStatus > >> > >> emitNext(SourceOutput<E> > >> > >> output). > >> > >> We currently have only one implementation for event time alignment > >> > >> in > >> > >> the > >> > >> Kinesis consumer. The synchronization in that case takes place as > >> > >> the > >> > >> last > >> > >> step before records are emitted downstream (RecordEmitter). With > >> > >> the > >> > >> currently proposed interfaces, the equivalent can be implemented > >> > >> in > >> > >> the > >> > >> reader loop, although note that in the Kinesis consumer the per > >> > >> shard > >> > >> threads push records. > >> > >> Synchronization has not been implemented for the Kafka consumer > >> > >> yet. > >> > >> https://issues.apache.org/jira/browse/FLINK-12675 < > https://issues.apache.org/jira/browse/FLINK-12675> > >> > >> When I looked at it, I realized that the implementation will look > >> > >> quite > >> > >> different > >> from Kinesis because it needs to take place in the pull part, > >> > >> where > >> > >> records > >> > >> are taken from the Kafka client. Due to the multiplexing it cannot > >> > >> be > >> > >> done > >> > >> by blocking the split thread like it currently works for Kinesis. > >> > >> Reading > >> > >> from individual Kafka partitions needs to be controlled via > >> > >> pause/resume > >> > >> on the Kafka client. > >> > >> To take on that responsibility the split thread would need to be > >> > >> aware > >> > >> of > >> > >> the > >> watermarks or at least whether it should or should not continue to > >> > >> consume > >> > >> a given split and this may require a different SourceReader or > >> > >> SourceOutput > >> > >> interface. > >> > >> Thanks, > >> Thomas > >> > >> > >> On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com <mailto: > mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> < > >> > >> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> wrote: > >> > >> Hi Stephan, > >> > >> Thank you for feedback! > >> Will take a look at your branch before public discussing. > >> > >> > >> On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org > <mailto:se...@apache.org>> <se...@apache.org <mailto:se...@apache.org>> > >> > >> < > >> > >> se...@apache.org <mailto:se...@apache.org>> > >> > >> wrote: > >> > >> Hi Biao! > >> > >> Thanks for reviving this. I would like to join this discussion, > >> > >> but > >> > >> am > >> > >> quite occupied with the 1.9 release, so can we maybe pause this > >> > >> discussion > >> > >> for a week or so? > >> > >> In the meantime I can share some suggestion based on prior > >> > >> experiments: > >> > >> How to do watermarks / timestamp extractors in a simpler and more > >> > >> flexible > >> > >> way. I think that part is quite promising should be part of the > >> > >> new > >> > >> source > >> > >> interface. > >> > >> > >> > >> > >> > >> > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime > < > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime > > > >> > >> > https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java > < > https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java > > > >> > >> Some experiments on how to build the source reader and its > >> > >> library > >> > >> for > >> > >> common threading/split patterns: > >> > >> > >> > >> > >> > >> > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src > < > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src > > > >> > >> Best, > >> Stephan > >> > >> > >> On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com <mailto: > mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> < > >> > >> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> > >> > >> wrote: > >> > >> Hi devs, > >> > >> Since 1.9 is nearly released, I think we could get back to > >> > >> FLIP-27. > >> > >> I > >> > >> believe it should be included in 1.10. > >> > >> There are so many things mentioned in document of FLIP-27. [1] I > >> > >> think > >> > >> we'd better discuss them separately. However the wiki is not a > >> > >> good > >> > >> place > >> > >> to discuss. I wrote google doc about SplitReader API which > >> > >> misses > >> > >> some > >> > >> details in the document. [2] > >> > >> 1. > >> > >> > >> > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface > < > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface > > > >> > >> 2. > >> > >> > >> > >> > >> > https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing > < > https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing > > > >> > >> CC Stephan, Aljoscha, Piotrek, Becket > >> > >> > >> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com <mailto: > mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> < > >> > >> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> > >> > >> wrote: > >> > >> Hi Steven, > >> Thank you for the feedback. Please take a look at the document > >> > >> FLIP-27 > >> > >> < > >> > >> > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > < > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > > >> > >> which > >> > >> is updated recently. A lot of details of enumerator were added > >> > >> in > >> > >> this > >> > >> document. I think it would help. > >> > >> Steven Wu <stevenz...@gmail.com <mailto:stevenz...@gmail.com>> < > stevenz...@gmail.com <mailto:stevenz...@gmail.com>> <stevenz...@gmail.com > <mailto:stevenz...@gmail.com>> <stevenz...@gmail.com <mailto: > stevenz...@gmail.com>> > >> > >> 于2019年3月28日周四 > >> > >> 下午12:52写道: > >> > >> This proposal mentioned that SplitEnumerator might run on the > >> JobManager or > >> in a single task on a TaskManager. > >> > >> if enumerator is a single task on a taskmanager, then the job > >> > >> DAG > >> > >> can > >> > >> never > >> been embarrassingly parallel anymore. That will nullify the > >> > >> leverage > >> > >> of > >> > >> fine-grained recovery for embarrassingly parallel jobs. > >> > >> It's not clear to me what's the implication of running > >> > >> enumerator > >> > >> on > >> > >> the > >> > >> jobmanager. So I will leave that out for now. > >> > >> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com <mailto: > mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> < > >> > >> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> > >> > >> wrote: > >> > >> Hi Stephan & Piotrek, > >> > >> Thank you for feedback. > >> > >> It seems that there are a lot of things to do in community. > >> > >> I > >> > >> am > >> > >> just > >> > >> afraid that this discussion may be forgotten since there so > >> > >> many > >> > >> proposals > >> > >> recently. > >> Anyway, wish to see the split topics soon :) > >> > >> Piotr Nowojski <pi...@da-platform.com <mailto:pi...@da-platform.com>> < > pi...@da-platform.com <mailto:pi...@da-platform.com>> < > pi...@da-platform.com <mailto:pi...@da-platform.com>> < > pi...@da-platform.com <mailto:pi...@da-platform.com>> > >> > >> 于2019年1月24日周四 > >> > >> 下午8:21写道: > >> > >> Hi Biao! > >> > >> This discussion was stalled because of preparations for > >> > >> the > >> > >> open > >> > >> sourcing > >> > >> & merging Blink. I think before creating the tickets we > >> > >> should > >> > >> split this > >> > >> discussion into topics/areas outlined by Stephan and > >> > >> create > >> > >> Flips > >> > >> for > >> > >> that. > >> > >> I think there is no chance for this to be completed in > >> > >> couple > >> > >> of > >> > >> remaining > >> > >> weeks/1 month before 1.8 feature freeze, however it would > >> > >> be > >> > >> good > >> > >> to aim > >> > >> with those changes for 1.9. > >> > >> Piotrek > >> > >> > >> On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com <mailto: > mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> < > >> > >> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> > >> > >> wrote: > >> > >> Hi community, > >> The summary of Stephan makes a lot sense to me. It is > >> > >> much > >> > >> clearer > >> > >> indeed > >> > >> after splitting the complex topic into small ones. > >> I was wondering is there any detail plan for next step? > >> > >> If > >> > >> not, > >> > >> I > >> > >> would > >> > >> like to push this thing forward by creating some JIRA > >> > >> issues. > >> > >> Another question is that should version 1.8 include > >> > >> these > >> > >> features? > >> > >> Stephan Ewen <se...@apache.org <mailto:se...@apache.org>> < > se...@apache.org <mailto:se...@apache.org>> <se...@apache.org <mailto: > se...@apache.org>> <se...@apache.org <mailto:se...@apache.org>> > 于2018年12月1日周六 > >> > >> 上午4:20写道: > >> > >> Thanks everyone for the lively discussion. Let me try > >> > >> to > >> > >> summarize > >> > >> where I > >> > >> see convergence in the discussion and open issues. > >> I'll try to group this by design aspect of the source. > >> > >> Please > >> > >> let me > >> > >> know > >> > >> if I got things wrong or missed something crucial here. > >> > >> For issues 1-3, if the below reflects the state of the > >> > >> discussion, I > >> > >> would > >> > >> try and update the FLIP in the next days. > >> For the remaining ones we need more discussion. > >> > >> I would suggest to fork each of these aspects into a > >> > >> separate > >> > >> mail > >> > >> thread, > >> > >> or will loose sight of the individual aspects. > >> > >> *(1) Separation of Split Enumerator and Split Reader* > >> > >> - All seem to agree this is a good thing > >> - Split Enumerator could in the end live on JobManager > >> > >> (and > >> > >> assign > >> > >> splits > >> > >> via RPC) or in a task (and assign splits via data > >> > >> streams) > >> > >> - this discussion is orthogonal and should come later, > >> > >> when > >> > >> the > >> > >> interface > >> > >> is agreed upon. > >> > >> *(2) Split Readers for one or more splits* > >> > >> - Discussion seems to agree that we need to support > >> > >> one > >> > >> reader > >> > >> that > >> > >> possibly handles multiple splits concurrently. > >> - The requirement comes from sources where one > >> > >> poll()-style > >> > >> call > >> > >> fetches > >> > >> data from different splits / partitions > >> --> example sources that require that would be for > >> > >> example > >> > >> Kafka, > >> > >> Pravega, Pulsar > >> > >> - Could have one split reader per source, or multiple > >> > >> split > >> > >> readers > >> > >> that > >> > >> share the "poll()" function > >> - To not make it too complicated, we can start with > >> > >> thinking > >> > >> about > >> > >> one > >> > >> split reader for all splits initially and see if that > >> > >> covers > >> > >> all > >> > >> requirements > >> > >> *(3) Threading model of the Split Reader* > >> > >> - Most active part of the discussion ;-) > >> > >> - A non-blocking way for Flink's task code to interact > >> > >> with > >> > >> the > >> > >> source > >> > >> is > >> > >> needed in order to a task runtime code based on a > >> single-threaded/actor-style task design > >> --> I personally am a big proponent of that, it will > >> > >> help > >> > >> with > >> > >> well-behaved checkpoints, efficiency, and simpler yet > >> > >> more > >> > >> robust > >> > >> runtime > >> > >> code > >> > >> - Users care about simple abstraction, so as a > >> > >> subclass > >> > >> of > >> > >> SplitReader > >> > >> (non-blocking / async) we need to have a > >> > >> BlockingSplitReader > >> > >> which > >> > >> will > >> > >> form the basis of most source implementations. > >> > >> BlockingSplitReader > >> > >> lets > >> > >> users do blocking simple poll() calls. > >> - The BlockingSplitReader would spawn a thread (or > >> > >> more) > >> > >> and > >> > >> the > >> > >> thread(s) can make blocking calls and hand over data > >> > >> buffers > >> > >> via > >> > >> a > >> > >> blocking > >> > >> queue > >> - This should allow us to cover both, a fully async > >> > >> runtime, > >> > >> and a > >> > >> simple > >> > >> blocking interface for users. > >> - This is actually very similar to how the Kafka > >> > >> connectors > >> > >> work. > >> > >> Kafka > >> > >> 9+ with one thread, Kafka 8 with multiple threads > >> > >> - On the base SplitReader (the async one), the > >> > >> non-blocking > >> > >> method > >> > >> that > >> > >> gets the next chunk of data would signal data > >> > >> availability > >> > >> via > >> > >> a > >> > >> CompletableFuture, because that gives the best > >> > >> flexibility > >> > >> (can > >> > >> await > >> > >> completion or register notification handlers). > >> - The source task would register a "thenHandle()" (or > >> > >> similar) > >> > >> on the > >> > >> future to put a "take next data" task into the > >> > >> actor-style > >> > >> mailbox > >> > >> *(4) Split Enumeration and Assignment* > >> > >> - Splits may be generated lazily, both in cases where > >> > >> there > >> > >> is a > >> > >> limited > >> > >> number of splits (but very many), or splits are > >> > >> discovered > >> > >> over > >> > >> time > >> > >> - Assignment should also be lazy, to get better load > >> > >> balancing > >> > >> - Assignment needs support locality preferences > >> > >> - Possible design based on discussion so far: > >> > >> --> SplitReader has a method "addSplits(SplitT...)" > >> > >> to > >> > >> add > >> > >> one or > >> > >> more > >> > >> splits. Some split readers might assume they have only > >> > >> one > >> > >> split > >> > >> ever, > >> > >> concurrently, others assume multiple splits. (Note: > >> > >> idea > >> > >> behind > >> > >> being > >> > >> able > >> > >> to add multiple splits at the same time is to ease > >> > >> startup > >> > >> where > >> > >> multiple > >> > >> splits may be assigned instantly.) > >> --> SplitReader has a context object on which it can > >> > >> call > >> > >> indicate > >> > >> when > >> > >> splits are completed. The enumerator gets that > >> > >> notification and > >> > >> can > >> > >> use > >> > >> to > >> > >> decide when to assign new splits. This should help both > >> > >> in > >> > >> cases > >> > >> of > >> > >> sources > >> > >> that take splits lazily (file readers) and in case the > >> > >> source > >> > >> needs to > >> > >> preserve a partial order between splits (Kinesis, > >> > >> Pravega, > >> > >> Pulsar may > >> > >> need > >> > >> that). > >> --> SplitEnumerator gets notification when > >> > >> SplitReaders > >> > >> start > >> > >> and > >> > >> when > >> > >> they finish splits. They can decide at that moment to > >> > >> push > >> > >> more > >> > >> splits > >> > >> to > >> > >> that reader > >> --> The SplitEnumerator should probably be aware of > >> > >> the > >> > >> source > >> > >> parallelism, to build its initial distribution. > >> > >> - Open question: Should the source expose something > >> > >> like > >> > >> "host > >> > >> preferences", so that yarn/mesos/k8s can take this into > >> > >> account > >> > >> when > >> > >> selecting a node to start a TM on? > >> > >> *(5) Watermarks and event time alignment* > >> > >> - Watermark generation, as well as idleness, needs to > >> > >> be > >> > >> per > >> > >> split > >> > >> (like > >> > >> currently in the Kafka Source, per partition) > >> - It is desirable to support optional > >> > >> event-time-alignment, > >> > >> meaning > >> > >> that > >> > >> splits that are ahead are back-pressured or temporarily > >> > >> unsubscribed > >> > >> - I think i would be desirable to encapsulate > >> > >> watermark > >> > >> generation > >> > >> logic > >> > >> in watermark generators, for a separation of concerns. > >> > >> The > >> > >> watermark > >> > >> generators should run per split. > >> - Using watermark generators would also help with > >> > >> another > >> > >> problem of > >> > >> the > >> > >> suggested interface, namely supporting non-periodic > >> > >> watermarks > >> > >> efficiently. > >> > >> - Need a way to "dispatch" next record to different > >> > >> watermark > >> > >> generators > >> > >> - Need a way to tell SplitReader to "suspend" a split > >> > >> until a > >> > >> certain > >> > >> watermark is reached (event time backpressure) > >> - This would in fact be not needed (and thus simpler) > >> > >> if > >> > >> we > >> > >> had > >> > >> a > >> > >> SplitReader per split and may be a reason to re-open > >> > >> that > >> > >> discussion > >> > >> *(6) Watermarks across splits and in the Split > >> > >> Enumerator* > >> > >> - The split enumerator may need some watermark > >> > >> awareness, > >> > >> which > >> > >> should > >> > >> be > >> > >> purely based on split metadata (like create timestamp > >> > >> of > >> > >> file > >> > >> splits) > >> > >> - If there are still more splits with overlapping > >> > >> event > >> > >> time > >> > >> range > >> > >> for > >> > >> a > >> > >> split reader, then that split reader should not advance > >> > >> the > >> > >> watermark > >> > >> within the split beyond the overlap boundary. Otherwise > >> > >> future > >> > >> splits > >> > >> will > >> > >> produce late data. > >> > >> - One way to approach this could be that the split > >> > >> enumerator > >> > >> may > >> > >> send > >> > >> watermarks to the readers, and the readers cannot emit > >> > >> watermarks > >> > >> beyond > >> > >> that received watermark. > >> - Many split enumerators would simply immediately send > >> > >> Long.MAX > >> > >> out > >> > >> and > >> > >> leave the progress purely to the split readers. > >> > >> - For event-time alignment / split back pressure, this > >> > >> begs > >> > >> the > >> > >> question > >> > >> how we can avoid deadlocks that may arise when splits > >> > >> are > >> > >> suspended > >> > >> for > >> > >> event time back pressure, > >> > >> *(7) Batch and streaming Unification* > >> > >> - Functionality wise, the above design should support > >> > >> both > >> > >> - Batch often (mostly) does not care about reading "in > >> > >> order" > >> > >> and > >> > >> generating watermarks > >> --> Might use different enumerator logic that is > >> > >> more > >> > >> locality > >> > >> aware > >> > >> and ignores event time order > >> --> Does not generate watermarks > >> - Would be great if bounded sources could be > >> > >> identified > >> > >> at > >> > >> compile > >> > >> time, > >> > >> so that "env.addBoundedSource(...)" is type safe and > >> > >> can > >> > >> return a > >> > >> "BoundedDataStream". > >> - Possible to defer this discussion until later > >> > >> *Miscellaneous Comments* > >> > >> - Should the source have a TypeInformation for the > >> > >> produced > >> > >> type, > >> > >> instead > >> > >> of a serializer? We need a type information in the > >> > >> stream > >> > >> anyways, and > >> > >> can > >> > >> derive the serializer from that. Plus, creating the > >> > >> serializer > >> > >> should > >> > >> respect the ExecutionConfig. > >> > >> - The TypeSerializer interface is very powerful but > >> > >> also > >> > >> not > >> > >> easy to > >> > >> implement. Its purpose is to handle data super > >> > >> efficiently, > >> > >> support > >> > >> flexible ways of evolution, etc. > >> For metadata I would suggest to look at the > >> > >> SimpleVersionedSerializer > >> > >> instead, which is used for example for checkpoint > >> > >> master > >> > >> hooks, > >> > >> or for > >> > >> the > >> > >> streaming file sink. I think that is is a good match > >> > >> for > >> > >> cases > >> > >> where > >> > >> we > >> > >> do > >> > >> not need more than ser/deser (no copy, etc.) and don't > >> > >> need to > >> > >> push > >> > >> versioning out of the serialization paths for best > >> > >> performance > >> > >> (as in > >> > >> the > >> > >> TypeSerializer) > >> > >> > >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas < > >> > >> k.klou...@data-artisans.com> > >> > >> wrote: > >> > >> > >> Hi Biao, > >> > >> Thanks for the answer! > >> > >> So given the multi-threaded readers, now we have as > >> > >> open > >> > >> questions: > >> > >> 1) How do we let the checkpoints pass through our > >> > >> multi-threaded > >> > >> reader > >> > >> operator? > >> > >> 2) Do we have separate reader and source operators or > >> > >> not? In > >> > >> the > >> > >> strategy > >> > >> that has a separate source, the source operator has a > >> > >> parallelism of > >> > >> 1 > >> > >> and > >> > >> is responsible for split recovery only. > >> > >> For the first one, given also the constraints > >> > >> (blocking, > >> > >> finite > >> > >> queues, > >> > >> etc), I do not have an answer yet. > >> > >> For the 2nd, I think that we should go with separate > >> > >> operators > >> > >> for > >> > >> the > >> > >> source and the readers, for the following reasons: > >> > >> 1) This is more aligned with a potential future > >> > >> improvement > >> > >> where the > >> > >> split > >> > >> discovery becomes a responsibility of the JobManager > >> > >> and > >> > >> readers are > >> > >> pooling more work from the JM. > >> > >> 2) The source is going to be the "single point of > >> > >> truth". > >> > >> It > >> > >> will > >> > >> know > >> > >> what > >> > >> has been processed and what not. If the source and the > >> > >> readers > >> > >> are a > >> > >> single > >> > >> operator with parallelism > 1, or in general, if the > >> > >> split > >> > >> discovery > >> > >> is > >> > >> done by each task individually, then: > >> i) we have to have a deterministic scheme for each > >> > >> reader to > >> > >> assign > >> > >> splits to itself (e.g. mod subtaskId). This is not > >> > >> necessarily > >> > >> trivial > >> > >> for > >> > >> all sources. > >> ii) each reader would have to keep a copy of all its > >> > >> processed > >> > >> slpits > >> > >> iii) the state has to be a union state with a > >> > >> non-trivial > >> > >> merging > >> > >> logic > >> > >> in order to support rescaling. > >> > >> Two additional points that you raised above: > >> > >> i) The point that you raised that we need to keep all > >> > >> splits > >> > >> (processed > >> > >> and > >> > >> not-processed) I think is a bit of a strong > >> > >> requirement. > >> > >> This > >> > >> would > >> > >> imply > >> > >> that for infinite sources the state will grow > >> > >> indefinitely. > >> > >> This is > >> > >> problem > >> > >> is even more pronounced if we do not have a single > >> > >> source > >> > >> that > >> > >> assigns > >> > >> splits to readers, as each reader will have its own > >> > >> copy > >> > >> of > >> > >> the > >> > >> state. > >> > >> ii) it is true that for finite sources we need to > >> > >> somehow > >> > >> not > >> > >> close > >> > >> the > >> > >> readers when the source/split discoverer finishes. The > >> ContinuousFileReaderOperator has a work-around for > >> > >> that. > >> > >> It is > >> > >> not > >> > >> elegant, > >> > >> and checkpoints are not emitted after closing the > >> > >> source, > >> > >> but > >> > >> this, I > >> > >> believe, is a bigger problem which requires more > >> > >> changes > >> > >> than > >> > >> just > >> > >> refactoring the source interface. > >> > >> Cheers, > >> Kostas > >> > >> > >> > >> > >> -- > >> Best, Jingsong Lee > >