Hi Becket,I completely agree with Dawid's suggestion. The information about the boundedness should come out of the source. Because most of the streaming sources can be made bounded based on some connector specific criterion. In Kafka, it would be an end offset or end timestamp but in any case having just a env.boundedSource() is not enough because parameters for making the source bounded are missing.
I suggest to have a simple `isBounded(): Boolean` flag in every source that might be influenced by a connector builder as Dawid mentioned.
For type safety during programming, we can still go with *Final state 1*. By having a env.source() vs env.boundedSource(). The latter would just enforce that the boolean flag is set to `true` and could make bounded operations available (if we need that actually).
However, I don't think that we should start making a unified Table API ununified again. Boundedness is an optimization property. Every bounded operation can also executed in an unbounded way using updates/retraction or watermarks.
Regards, Timo On 15.12.19 14:22, Becket Qin wrote:
Hi Dawid and Jark, I think the discussion ultimately boils down to the question that which one of the following two final states do we want? Once we make this decision, everything else can be naturally derived. *Final state 1*: Separate API for bounded / unbounded DataStream & Table. That means any code users write will be valid at the point when they write the code. This is similar to having type safety check at programming time. For example, BoundedDataStream extends DataStream { // Operations only available for bounded data. BoundedDataStream sort(...); // Interaction with another BoundedStream returns a Bounded stream. BoundedJoinedDataStream join(BoundedDataStream other) // Interaction with another unbounded stream returns an unbounded stream. JoinedDataStream join(DataStream other) } BoundedTable extends Table { // Bounded only operation. BoundedTable sort(...); // Interaction with another BoundedTable returns a BoundedTable. BoundedTable join(BoundedTable other) // Interaction with another unbounded table returns an unbounded table. Table join(Table other) } *Final state 2*: One unified API for bounded / unbounded DataStream / Table. That unified API may throw exception at DAG compilation time if an invalid operation is tried. This is what Table API currently follows. DataStream { // Throws exception if the DataStream is unbounded. DataStream sort(); // Get boundedness. Boundedness getBoundedness(); } Table { // Throws exception if the table has infinite rows. Table orderBy(); // Get boundedness. Boundedness getBoundedness(); }From what I understand, there is no consensus so far on this decision yet.Whichever final state we choose, we need to make it consistent across the entire project. We should avoid the case that Table follows one final state while DataStream follows another. Some arguments I am aware of from both sides so far are following: Arguments for final state 1: 1a) Clean API with method safety check at programming time. 1b) (Counter 2b) Although SQL does not have programming time error check, SQL is not really a "programming language" per se. So SQL can be different from Table and DataStream. 1c) Although final state 2 seems making it easier for SQL to use given it is more "config based" than "parameter based", final state 1 can probably also meet what SQL wants by wrapping the Source in TableSource / TableSourceFactory API if needed. Arguments for final state 2: 2a) The Source API itself seems already sort of following the unified API pattern. 2b) There is no "programming time" method error check in SQL case, so we cannot really achieve final state 1 across the board. 2c) It is an easier path given our current status, i.e. Table is already following final state 2. 2d) Users can always explicitly check the boundedness if they want to. As I mentioned earlier, my initial thought was also to have a "configuration based" Source rather than a "parameter based" Source. So it is completely possible that I missed some important consideration or design principles that we want to enforce for the project. It would be good if @Stephan Ewen <step...@ververica.com> and @Aljoscha Krettek <aljos...@ververica.com> can also provide more thoughts on this. Re: Jingsong As you said, there are some batched system source, like parquet/orc source.Could we have the batch emit interface to improve performance? The queue of per record may cause performance degradation.The current interface does not necessarily cause performance problem in a multi-threading case. In fact, the base implementation allows SplitReaders to add a batch <E> of records<T> to the records queue<E>, so each element in the records queue would be a batch <E>. In this case, when the main thread polls records, it will take a batch <E> of records <T> from the shared records queue and process the records <T> in a batch manner. Thanks, Jiangjie (Becket) Qin On Thu, Dec 12, 2019 at 1:29 PM Jingsong Li <jingsongl...@gmail.com> wrote:Hi Becket, I also have some performance concerns too. If I understand correctly, SourceOutput will emit data per record into the queue? I'm worried about the multithreading performance of this queue.One example is some batched messaging systems which only have an offsetfor the entire batch instead of individual messages in the batch. As you said, there are some batched system source, like parquet/orc source. Could we have the batch emit interface to improve performance? The queue of per record may cause performance degradation. Best, Jingsong Lee On Thu, Dec 12, 2019 at 9:15 AM Jark Wu <imj...@gmail.com> wrote:Hi Becket, I think Dawid explained things clearly and makes a lot of sense. I'm also in favor of #2, because #1 doesn't work for our future unified envrionment. You can see the vision in this documentation [1]. In the future, we would like to drop the global streaming/batch mode in SQL (i.e. EnvironmentSettings#inStreamingMode/inBatchMode). A source is bounded or unbounded once defined, so queries can be inferred from source to run in streaming or batch or hybrid mode. However, in #1, we will lose this ability because the framework doesn't know whether the source is bounded or unbounded. Best, Jark [1]:https://docs.google.com/document/d/1yrKXEIRATfxHJJ0K3t6wUgXAtZq8D-XgvEnvl2uUcr0/edit#heading=h.v4ib17buma1pOn Wed, 11 Dec 2019 at 20:52, Piotr Nowojski <pi...@ververica.com>wrote:Hi, Regarding the: Collection<E> getNextRecords() I’m pretty sure such design would unfortunately impact the performance (accessing and potentially creating the collection on the hot path). Also the InputStatus emitNext(DataOutput<T> output) throws Exception; or Status pollNext(SourceOutput<T> sourceOutput) throws Exception; Gives us some opportunities in the future, to allow Source hot looping inside, until it receives some signal “please exit because of somereasons”(output collector could return such hint upon collecting the result).Butthat’s another topic outside of this FLIP’s scope. PiotrekOn 11 Dec 2019, at 10:41, Till Rohrmann <trohrm...@apache.org>wrote:Hi Becket, quick clarification from my side because I think you misunderstood my question. I did not suggest to let the SourceReader return only asinglerecord at a time when calling getNextRecords. As the return typeindicates,the method can return an arbitrary number of records. Cheers, Till On Wed, Dec 11, 2019 at 10:13 AM Dawid Wysakowicz <dwysakow...@apache.org <mailto:dwysakow...@apache.org>>wrote:Hi Becket, Issue #1 - Design of Source interface I mentioned the lack of a method likeSource#createEnumerator(Boundednessboundedness, SplitEnumeratorContext context), because without thecurrentproposal is not complete/does not work. If we say that boundedness is an intrinsic property of a source imowedon't need the Source#createEnumerator(Boundedness boundedness, SplitEnumeratorContext context) method. Assuming a source from my previous example: Source source = KafkaSource.builder() ... .untilTimestamp(...) .build() Would the enumerator differ if created like source.createEnumerator(CONTINUOUS_UNBOUNDED, ...) vs source .createEnumerator(BOUNDED, ...)? I know I am repeating myself, butthisisthe part that my opinion differ the most from the current proposal.Ireally think it should always be the source that tells if it isboundedornot. In the current proposal methods continousSource/boundedSourcesomewhatreconfigure the source, which I think is misleading. I think a call like: Source source = KafkaSource.builder() ... .readContinously() / readUntilLatestOffset() / readUntilTimestamp /readUntilOffsets / ....build() is way cleaner (and expressive) than Source source = KafkaSource.builder() ... .build() env.continousSource(source) // which actually underneath would callcreateEnumerator(CONTINUOUS, ctx) which would be equivalent to source.readContinously().createEnumerator(ctx)// or env.boundedSource(source) // which actually underneath would callcreateEnumerator(BOUNDED, ctx) which would be equivalent to source.readUntilLatestOffset().createEnumerator(ctx)Sorry for the comparison, but to me it seems there is too much magic happening underneath those two calls. I really believe the Source interface should have getBoundednessmethodinstead of (supportBoundedness) + createEnumerator(Boundedness, ...) Issue #2 - Design of ExecutionEnvironment#source()/continuousSource()/boundedSource() As you might have guessed I am slightly in favor of option #2modified.Yes I am aware every step of the dag would have to be able to say ifitisbounded or not. I have a feeling it would be easier to express cross bounded/unbounded operations, but I must admit I have not thought it through thoroughly, In the spirit of batch is just a special case of streaming I thought BoundedStream would extend from DataStream.Correctmeif I am wrong. In such a setup the cross bounded/unbounded operationcouldbe expressed quite easily I think: DataStream { DataStream join(DataStream, ...); // we could not really tell iftheresult is bounded or not, but because bounded stream is a special caseofunbounded the API object is correct, irrespective if the left or rightsideof the join is bounded} BoundedStream extends DataStream { BoundedStream join(BoundedStream, ...); // only if both sides arebounded the result can be bounded as well. However we do have access totheDataStream#join here, so you can still join with a DataStream} On the other hand I also see benefits of two completely disjointedAPIs,as we could prohibit some streaming calls in the bounded API. Ican'tthinkof any unbounded operators that could not be implemented for boundedstream.Besides I think we both agree we don't like the method: DataStream boundedStream(Source) suggested in the current state of the FLIP. Do we ? :) Best, Dawid On 10/12/2019 18:57, Becket Qin wrote: Hi folks, Thanks for the discussion, great feedback. Also thanks Dawid for the explanation, it is much clearer now. One thing that is indeed missing from the FLIP is how theboundednessispassed to the Source implementation. So the API should be Source#createEnumerator(Boundedness boundedness,SplitEnumeratorContextcontext) And we can probably remove the Source#supportBoundedness(Boundedness boundedness) method. Assuming we have that, we are essentially choosing from one of the following two options: Option 1: // The source is continuous source, and only unbounded operationscanbeperformed. DataStream<Type> datastream = env.continuousSource(someSource); // The source is bounded source, both bounded and unboundedoperationscanbe performed. BoundedDataStream<Type> boundedDataStream =env.boundedSource(someSource);- Pros: a) explicit boundary between bounded / unbounded streams, itisquite simple and clear to the users. - Cons: a) For applications that do not involve bounded operations,theystill have to call different API to distinguish bounded / unboundedstreams.b) No support for bounded stream to run in a streaming runtime setting, i.e. scheduling and operators behaviors. Option 2: // The source is either bounded or unbounded, but only unboundedoperationscould be performed on the returned DataStream. DataStream<Type> dataStream = env.source(someSource); // The source must be a bounded source, otherwise exception isthrown.BoundedDataStream<Type> boundedDataStream = env.boundedSource(boundedSource); The pros and cons are exactly the opposite of option 1. - Pros: a) For applications that do not involve bounded operations,theystill have to call different API to distinguish bounded / unboundedstreams.b) Support for bounded stream to run in a streaming runtimesetting,i.e. scheduling and operators behaviors. - Cons: a) Bounded / unbounded streams are kind of mixed, i.e. given a DataStream, it is not clear whether it is bounded or not, unless youhavethe access to its source. If we only think from the Source API perspective, option 2 seems abetterchoice because functionality wise it is a superset of option 1, atthecostof some seemingly acceptable ambiguity in the DataStream API. But if we look at the DataStream API as a whole, option 1 seems aclearerchoice. For example, some times a library may have to know whether a certain task will finish or not. And it would be difficult to telliftheinput is a DataStream, unless additional information is provided alltheway from the Source. One possible solution is to have a *modifiedoption 2*which adds a method to the DataStream API to indicate boundedness,suchasgetBoundedness(). It would solve the problem with a potentialconfusionofwhat is difference between a DataStream with getBoundedness()=trueand aBoundedDataStream. But that seems not super difficult to explain. So from API's perspective, I don't have a strong opinion between*option 1*and *modified option 2. *I like the cleanness of option 1, butmodifiedoption 2 would be more attractive if we have concrete use case forthe"Bounded stream with unbounded streaming runtime settings". Re: Till Maybe this has already been asked before but I was wondering why the SourceReader interface has the method pollNext which hands the responsibility of outputting elements to the SourceReaderimplementation?Has this been done for backwards compatibility reasons with the oldsourceinterface? If not, then one could define a Collection<E>getNextRecords()method which returns the currently retrieved records and then thecalleremits them outside of the SourceReader. That way the interface wouldnotallow to implement an outputting loop where we never hand backcontroltothe caller. At the moment, this contract can be easily broken and isonlymentioned loosely in the JavaDocs. The primary reason we handover the SourceOutput to the SourceReaderisbecause sometimes it is difficult for a SourceReader to emit onerecordata time. One example is some batched messaging systems which onlyhaveanoffset for the entire batch instead of individual messages in thebatch. Inthat case, returning one record at a time would leave theSourceReaderinan uncheckpointable state because they can only checkpoint at thebatchboundaries. Thanks, Jiangjie (Becket) Qin On Tue, Dec 10, 2019 at 5:33 PM Till Rohrmann <trohrm...@apache.org<mailto:trohrm...@apache.org>> <trohrm...@apache.org <mailto: trohrm...@apache.org>> wrote:Hi everyone, thanks for drafting this FLIP. It reads very well. Concerning Dawid's proposal, I tend to agree. The boundedness couldcomefrom the source and tell the system how to treat the operator(schedulingwise). From a user's perspective it should be fine to get back aDataStreamwhen calling env.source(boundedSource) if he does not need special operations defined on a BoundedDataStream. If he needs this, thenonecoulduse the method BoundedDataStream env.boundedSource(boundedSource). If possible, we could enforce the proper usage ofenv.boundedSource()byintroducing a BoundedSource type so that one cannot pass an unbounded source to it. That way users would not be able to shoot themselves in the foot. Maybe this has already been asked before but I was wondering why the SourceReader interface has the method pollNext which hands the responsibility of outputting elements to the SourceReaderimplementation?Has this been done for backwards compatibility reasons with the oldsourceinterface? If not, then one could define a Collection<E>getNextRecords()method which returns the currently retrieved records and then thecalleremits them outside of the SourceReader. That way the interface wouldnotallow to implement an outputting loop where we never hand backcontroltothe caller. At the moment, this contract can be easily broken and isonlymentioned loosely in the JavaDocs. Cheers, Till On Tue, Dec 10, 2019 at 7:49 AM Jingsong Li <jingsongl...@gmail.com<mailto:jingsongl...@gmail.com>> <jingsongl...@gmail.com <mailto: jingsongl...@gmail.com>>wrote: Hi all, I think current design is good. My understanding is: For execution mode: bounded mode and continuous mode, It's totally different. I don't think we have the ability to integrate the twomodelsat present. It's about scheduling, memory, algorithms, States, etc. we shouldn't confuse them. For source capabilities: only bounded, only continuous, both boundedandcontinuous. I think Kafka is a source that can be ran both bounded and continuous execution mode. And Kafka with end offset should be ran both bounded and continuous execution mode. Using apache Beam with Flinkrunner, Iused to run a "bounded" Kafka in streaming mode. For our previousDataStream,it is not necessarily required that the source cannot be bounded. So it is my thought for Dawid's question: 1.pass a bounded source to continuousSource() +1 2.pass a continuous source to boundedSource() -1, should throwexception.In StreamExecutionEnvironment, continuousSource and boundedSourcedefinethe execution mode. It defines a clear boundary of execution mode. Best, Jingsong Lee On Tue, Dec 10, 2019 at 10:37 AM Jark Wu <imj...@gmail.com <mailto:imj...@gmail.com>> <imj...@gmail.com <mailto:imj...@gmail.com>> wrote:I agree with Dawid's point that the boundedness information shouldcomefrom the source itself (e.g. the end timestamp), not through env.boundedSouce()/continuousSource(). I think if we want to support something like `env.source()` thatderivethe execution mode from source, `supportsBoundedness(Boundedness)` method is not enough, because we don't know whether it is bounded or not. Best, Jark On Mon, 9 Dec 2019 at 22:21, Dawid Wysakowicz <dwysakow...@apache.org<mailto:dwysakow...@apache.org>> <dwysakow...@apache.org <mailto: dwysakow...@apache.org>>wrote: One more thing. In the current proposal, with the supportsBoundedness(Boundedness) method and the boundedness coming from either continuousSource or boundedSource I could not find how this information is fed back to the SplitEnumerator. Best, Dawid On 09/12/2019 13:52, Becket Qin wrote: Hi Dawid, Thanks for the comments. This actually brings another relevant question about what does a "bounded source" imply. I actually had the same impression when I look at the Source API. Here is what I understand after some discussion with Stephan. The bounded source has the following impacts. 1. API validity. - A bounded source generates a bounded stream so some operations that only works for bounded records would be performed, e.g. sort. - To expose these bounded stream only APIs, there are two options: a. Add them to the DataStream API and throw exception if a method is called on an unbounded stream. b. Create a BoundedDataStream class which is returned from env.boundedSource(), while DataStream is returned from env.continousSource(). Note that this cannot be done by having single env.source(theSource) even the Source has a getBoundedness() method. 2. Scheduling - A bounded source could be computed stage by stage without bringing up all the tasks at the same time. 3. Operator behaviors - A bounded source indicates the records are finite so some operators can wait until it receives all the records before it starts the processing. In the above impact, only 1 is relevant to the API design. And the current proposal in FLIP-27 is following 1.b. // boundedness depends of source property, imo this should always be preferred DataStream<MyType> stream = env.source(theSource); In your proposal, does DataStream have bounded stream only methods? It looks it should have, otherwise passing a bounded Source to env.source() would be confusing. In that case, we will essentially do 1.a if an unbounded Source is created from env.source(unboundedSource). If we have the methods only supported for bounded streams in DataStream, it seems a little weird to have a separate BoundedDataStream interface. Am I understand it correctly? Thanks, Jiangjie (Becket) Qin On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz < dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote: Hi all, Really well written proposal and very important one. I must admit I have not understood all the intricacies of it yet. One question I have though is about where does the information about boundedness come from. I think in most cases it is a property of the source. As you described it might be e.g. end offset, a flag should it monitor new splits etc. I think it would be a really nice use case to be able to say: new KafkaSource().readUntil(long timestamp), which could work as an "end offset". Moreover I think all Bounded sources support continuous mode, but no intrinsically continuous source support the Bounded mode. If I understood the proposal correctly it suggest the boundedness sort of "comes" from the outside of the source, from the invokation of either boundedStream or continousSource. I am wondering if it would make sense to actually change the method boolean Source#supportsBoundedness(Boundedness) to Boundedness Source#getBoundedness(). As for the methods #boundedSource, #continousSource, assuming the boundedness is property of the source they do not affect how the enumerator works, but mostly how the dag is scheduled, right? I am not against those methods, but I think it is a very specific use case to actually override the property of the source. In general I would expect users to only call env.source(theSource), where the source tells if it is bounded or not. I would suggest considering following set of methods: // boundedness depends of source property, imo this should always be preferred DataStream<MyType> stream = env.source(theSource); // always continous execution, whether bounded or unbounded source DataStream<MyType> boundedStream = env.continousSource(theSource); // imo this would make sense if the BoundedDataStream provides additional features unavailable for continous mode BoundedDataStream<MyType> batch = env.boundedSource(theSource); Best, Dawid On 04/12/2019 11:25, Stephan Ewen wrote: Thanks, Becket, for updating this. I agree with moving the aspects you mentioned into separate FLIPs - this one way becoming unwieldy in size. +1 to the FLIP in its current state. Its a very detailed write-up, nicely done! On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com<mailto:becket....@gmail.com>> <becket....@gmail.com <mailto: becket....@gmail.com>>< becket....@gmail.com <mailto:becket....@gmail.com>> wrote: Hi all, Sorry for the long belated update. I have updated FLIP-27 wiki page with the latest proposals. Some noticeable changes include: 1. A new generic communication mechanism between SplitEnumerator and SourceReader. 2. Some detail API method signature changes. We left a few things out of this FLIP and will address them in separate FLIPs. Including: 1. Per split event time. 2. Event time alignment. 3. Fine grained failover for SplitEnumerator failure. Please let us know if you have any question. Thanks, Jiangjie (Becket) Qin On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org<mailto:se...@apache.org>> <se...@apache.org <mailto:se...@apache.org>> <se...@apache.org <mailto:se...@apache.org>> wrote: Hi Łukasz! Becket and me are working hard on figuring out the last details and implementing the first PoC. We would update the FLIP hopefully next week. There is a fair chance that a first version of this will be in 1.10, but I think it will take another release to battle test it and migrate the connectors. Best, Stephan On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl<mailto:l...@touk.pl>< l...@touk.pl <mailto:l...@touk.pl>> wrote: Hi, This proposal looks very promising for us. Do you have any plans in which Flink release it is going to be released? We are thinking on using a Data Set API for our future use cases but on the other hand Data Set API is going to be deprecated so using proposed bounded data streams solution could be more viable in the long term. Thanks, Łukasz On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com<mailto:thomas.we...@gmail.com>> <thomas.we...@gmail.com <mailto: thomas.we...@gmail.com>> <thomas.we...@gmail.com <mailto:thomas.we...@gmail.com>> wrote: Thanks for putting together this proposal! I see that the "Per Split Event Time" and "Event Time Alignment" sections are still TBD. It would probably be good to flesh those out a bit before proceeding too far as the event time alignment will probably influence the interaction with the split reader, specifically ReaderStatus emitNext(SourceOutput<E> output). We currently have only one implementation for event time alignment in the Kinesis consumer. The synchronization in that case takes place as the last step before records are emitted downstream (RecordEmitter). With the currently proposed interfaces, the equivalent can be implemented in the reader loop, although note that in the Kinesis consumer the per shard threads push records. Synchronization has not been implemented for the Kafka consumer yet. https://issues.apache.org/jira/browse/FLINK-12675 <https://issues.apache.org/jira/browse/FLINK-12675>When I looked at it, I realized that the implementation will look quite different from Kinesis because it needs to take place in the pull part, where records are taken from the Kafka client. Due to the multiplexing it cannot be done by blocking the split thread like it currently works for Kinesis. Reading from individual Kafka partitions needs to be controlled via pause/resume on the Kafka client. To take on that responsibility the split thread would need to be aware of the watermarks or at least whether it should or should not continue to consume a given split and this may require a different SourceReader or SourceOutput interface. Thanks, Thomas On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com<mailto:mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>><mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> wrote: Hi Stephan, Thank you for feedback! Will take a look at your branch before public discussing. On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org<mailto:se...@apache.org>> <se...@apache.org <mailto:se...@apache.org< se...@apache.org <mailto:se...@apache.org>> wrote: Hi Biao! Thanks for reviving this. I would like to join this discussion, but am quite occupied with the 1.9 release, so can we maybe pause this discussion for a week or so? In the meantime I can share some suggestion based on prior experiments: How to do watermarks / timestamp extractors in a simpler and more flexible way. I think that part is quite promising should be part of the new source interface.https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime<https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtimehttps://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java<https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.javaSome experiments on how to build the source reader and its library for common threading/split patterns:https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src<https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/srcBest, Stephan On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com<mailto:mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>><mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> wrote: Hi devs, Since 1.9 is nearly released, I think we could get back to FLIP-27. I believe it should be included in 1.10. There are so many things mentioned in document of FLIP-27. [1] I think we'd better discuss them separately. However the wiki is not a good place to discuss. I wrote google doc about SplitReader API which misses some details in the document. [2] 1.https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface<https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface2.https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing<https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharingCC Stephan, Aljoscha, Piotrek, Becket On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com<mailto:mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>><mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> wrote: Hi Steven, Thank you for the feedback. Please take a look at the document FLIP-27 <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface<https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interfacewhich is updated recently. A lot of details of enumerator were added in this document. I think it would help. Steven Wu <stevenz...@gmail.com <mailto:stevenz...@gmail.com>> <stevenz...@gmail.com <mailto:stevenz...@gmail.com>> <stevenz...@gmail.com<mailto:stevenz...@gmail.com>> <stevenz...@gmail.com <mailto: stevenz...@gmail.com>>于2019年3月28日周四 下午12:52写道: This proposal mentioned that SplitEnumerator might run on the JobManager or in a single task on a TaskManager. if enumerator is a single task on a taskmanager, then the job DAG can never been embarrassingly parallel anymore. That will nullify the leverage of fine-grained recovery for embarrassingly parallel jobs. It's not clear to me what's the implication of running enumerator on the jobmanager. So I will leave that out for now. On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com<mailto:mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>><mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> wrote: Hi Stephan & Piotrek, Thank you for feedback. It seems that there are a lot of things to do in community. I am just afraid that this discussion may be forgotten since there so many proposals recently. Anyway, wish to see the split topics soon :) Piotr Nowojski <pi...@da-platform.com <mailto:pi...@da-platform.com<pi...@da-platform.com <mailto:pi...@da-platform.com>> < pi...@da-platform.com <mailto:pi...@da-platform.com>> < pi...@da-platform.com <mailto:pi...@da-platform.com>>于2019年1月24日周四 下午8:21写道: Hi Biao! This discussion was stalled because of preparations for the open sourcing & merging Blink. I think before creating the tickets we should split this discussion into topics/areas outlined by Stephan and create Flips for that. I think there is no chance for this to be completed in couple of remaining weeks/1 month before 1.8 feature freeze, however it would be good to aim with those changes for 1.9. Piotrek On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>><mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> wrote: Hi community, The summary of Stephan makes a lot sense to me. It is much clearer indeed after splitting the complex topic into small ones. I was wondering is there any detail plan for next step? If not, I would like to push this thing forward by creating some JIRA issues. Another question is that should version 1.8 include these features? Stephan Ewen <se...@apache.org <mailto:se...@apache.org>> <se...@apache.org <mailto:se...@apache.org>> <se...@apache.org <mailto: se...@apache.org>> <se...@apache.org <mailto:se...@apache.org>> 于2018年12月1日周六上午4:20写道: Thanks everyone for the lively discussion. Let me try to summarize where I see convergence in the discussion and open issues. I'll try to group this by design aspect of the source. Please let me know if I got things wrong or missed something crucial here. For issues 1-3, if the below reflects the state of the discussion, I would try and update the FLIP in the next days. For the remaining ones we need more discussion. I would suggest to fork each of these aspects into a separate mail thread, or will loose sight of the individual aspects. *(1) Separation of Split Enumerator and Split Reader* - All seem to agree this is a good thing - Split Enumerator could in the end live on JobManager (and assign splits via RPC) or in a task (and assign splits via data streams) - this discussion is orthogonal and should come later, when the interface is agreed upon. *(2) Split Readers for one or more splits* - Discussion seems to agree that we need to support one reader that possibly handles multiple splits concurrently. - The requirement comes from sources where one poll()-style call fetches data from different splits / partitions --> example sources that require that would be for example Kafka, Pravega, Pulsar - Could have one split reader per source, or multiple split readers that share the "poll()" function - To not make it too complicated, we can start with thinking about one split reader for all splits initially and see if that covers all requirements *(3) Threading model of the Split Reader* - Most active part of the discussion ;-) - A non-blocking way for Flink's task code to interact with the source is needed in order to a task runtime code based on a single-threaded/actor-style task design --> I personally am a big proponent of that, it will help with well-behaved checkpoints, efficiency, and simpler yet more robust runtime code - Users care about simple abstraction, so as a subclass of SplitReader (non-blocking / async) we need to have a BlockingSplitReader which will form the basis of most source implementations. BlockingSplitReader lets users do blocking simple poll() calls. - The BlockingSplitReader would spawn a thread (or more) and the thread(s) can make blocking calls and hand over data buffers via a blocking queue - This should allow us to cover both, a fully async runtime, and a simple blocking interface for users. - This is actually very similar to how the Kafka connectors work. Kafka 9+ with one thread, Kafka 8 with multiple threads - On the base SplitReader (the async one), the non-blocking method that gets the next chunk of data would signal data availability via a CompletableFuture, because that gives the best flexibility (can await completion or register notification handlers). - The source task would register a "thenHandle()" (or similar) on the future to put a "take next data" task into the actor-style mailbox *(4) Split Enumeration and Assignment* - Splits may be generated lazily, both in cases where there is a limited number of splits (but very many), or splits are discovered over time - Assignment should also be lazy, to get better load balancing - Assignment needs support locality preferences - Possible design based on discussion so far: --> SplitReader has a method "addSplits(SplitT...)" to add one or more splits. Some split readers might assume they have only one split ever, concurrently, others assume multiple splits. (Note: idea behind being able to add multiple splits at the same time is to ease startup where multiple splits may be assigned instantly.) --> SplitReader has a context object on which it can call indicate when splits are completed. The enumerator gets that notification and can use to decide when to assign new splits. This should help both in cases of sources that take splits lazily (file readers) and in case the source needs to preserve a partial order between splits (Kinesis, Pravega, Pulsar may need that). --> SplitEnumerator gets notification when SplitReaders start and when they finish splits. They can decide at that moment to push more splits to that reader --> The SplitEnumerator should probably be aware of the source parallelism, to build its initial distribution. - Open question: Should the source expose something like "host preferences", so that yarn/mesos/k8s can take this into account when selecting a node to start a TM on? *(5) Watermarks and event time alignment* - Watermark generation, as well as idleness, needs to be per split (like currently in the Kafka Source, per partition) - It is desirable to support optional event-time-alignment, meaning that splits that are ahead are back-pressured or temporarily unsubscribed - I think i would be desirable to encapsulate watermark generation logic in watermark generators, for a separation of concerns. The watermark generators should run per split. - Using watermark generators would also help with another problem of the suggested interface, namely supporting non-periodic watermarks efficiently. - Need a way to "dispatch" next record to different watermark generators - Need a way to tell SplitReader to "suspend" a split until a certain watermark is reached (event time backpressure) - This would in fact be not needed (and thus simpler) if we had a SplitReader per split and may be a reason to re-open that discussion *(6) Watermarks across splits and in the Split Enumerator* - The split enumerator may need some watermark awareness, which should be purely based on split metadata (like create timestamp of file splits) - If there are still more splits with overlapping event time range for a split reader, then that split reader should not advance the watermark within the split beyond the overlap boundary. Otherwise future splits will produce late data. - One way to approach this could be that the split enumerator may send watermarks to the readers, and the readers cannot emit watermarks beyond that received watermark. - Many split enumerators would simply immediately send Long.MAX out and leave the progress purely to the split readers. - For event-time alignment / split back pressure, this begs the question how we can avoid deadlocks that may arise when splits are suspended for event time back pressure, *(7) Batch and streaming Unification* - Functionality wise, the above design should support both - Batch often (mostly) does not care about reading "in order" and generating watermarks --> Might use different enumerator logic that is more locality aware and ignores event time order --> Does not generate watermarks - Would be great if bounded sources could be identified at compile time, so that "env.addBoundedSource(...)" is type safe and can return a "BoundedDataStream". - Possible to defer this discussion until later *Miscellaneous Comments* - Should the source have a TypeInformation for the produced type, instead of a serializer? We need a type information in the stream anyways, and can derive the serializer from that. Plus, creating the serializer should respect the ExecutionConfig. - The TypeSerializer interface is very powerful but also not easy to implement. Its purpose is to handle data super efficiently, support flexible ways of evolution, etc. For metadata I would suggest to look at the SimpleVersionedSerializer instead, which is used for example for checkpoint master hooks, or for the streaming file sink. I think that is is a good match for cases where we do not need more than ser/deser (no copy, etc.) and don't need to push versioning out of the serialization paths for best performance (as in the TypeSerializer) On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas < k.klou...@data-artisans.com> wrote: Hi Biao, Thanks for the answer! So given the multi-threaded readers, now we have as open questions: 1) How do we let the checkpoints pass through our multi-threaded reader operator? 2) Do we have separate reader and source operators or not? In the strategy that has a separate source, the source operator has a parallelism of 1 and is responsible for split recovery only. For the first one, given also the constraints (blocking, finite queues, etc), I do not have an answer yet. For the 2nd, I think that we should go with separate operators for the source and the readers, for the following reasons: 1) This is more aligned with a potential future improvement where the split discovery becomes a responsibility of the JobManager and readers are pooling more work from the JM. 2) The source is going to be the "single point of truth". It will know what has been processed and what not. If the source and the readers are a single operator with parallelism > 1, or in general, if the split discovery is done by each task individually, then: i) we have to have a deterministic scheme for each reader to assign splits to itself (e.g. mod subtaskId). This is not necessarily trivial for all sources. ii) each reader would have to keep a copy of all its processed slpits iii) the state has to be a union state with a non-trivial merging logic in order to support rescaling. Two additional points that you raised above: i) The point that you raised that we need to keep all splits (processed and not-processed) I think is a bit of a strong requirement. This would imply that for infinite sources the state will grow indefinitely. This is problem is even more pronounced if we do not have a single source that assigns splits to readers, as each reader will have its own copy of the state. ii) it is true that for finite sources we need to somehow not close the readers when the source/split discoverer finishes. The ContinuousFileReaderOperator has a work-around for that. It is not elegant, and checkpoints are not emitted after closing the source, but this, I believe, is a bigger problem which requires more changes than just refactoring the source interface. Cheers, Kostas -- Best, Jingsong Lee-- Best, Jingsong Lee