Hi Becket, Issue #1 - Design of Source interface
I mentioned the lack of a method like Source#createEnumerator(Boundedness boundedness, SplitEnumeratorContext context), because without the current proposal is not complete/does not work. If we say that boundedness is an intrinsic property of a source imo we don't need the Source#createEnumerator(Boundedness boundedness, SplitEnumeratorContext context) method. Assuming a source from my previous example: Source source = KafkaSource.builder() ... .untilTimestamp(...) .build() Would the enumerator differ if created like source.createEnumerator(CONTINUOUS_UNBOUNDED, ...) vs source .createEnumerator(BOUNDED, ...)? I know I am repeating myself, but this is the part that my opinion differ the most from the current proposal. I really think it should always be the source that tells if it is bounded or not. In the current proposal methods continousSource/boundedSource somewhat reconfigure the source, which I think is misleading. I think a call like: Source source = KafkaSource.builder() ... .readContinously() / readUntilLatestOffset() / readUntilTimestamp / readUntilOffsets / ... .build() is way cleaner (and expressive) than Source source = KafkaSource.builder() ... .build() env.continousSource(source) // which actually underneath would call createEnumerator(CONTINUOUS, ctx) which would be equivalent to source.readContinously().createEnumerator(ctx) // or env.boundedSource(source) // which actually underneath would call createEnumerator(BOUNDED, ctx) which would be equivalent to source.readUntilLatestOffset().createEnumerator(ctx) Sorry for the comparison, but to me it seems there is too much magic happening underneath those two calls. I really believe the Source interface should have getBoundedness method instead of (supportBoundedness) + createEnumerator(Boundedness, ...) Issue #2 - Design of ExecutionEnvironment#source()/continuousSource()/boundedSource() As you might have guessed I am slightly in favor of option #2 modified. Yes I am aware every step of the dag would have to be able to say if it is bounded or not. I have a feeling it would be easier to express cross bounded/unbounded operations, but I must admit I have not thought it through thoroughly, In the spirit of batch is just a special case of streaming I thought BoundedStream would extend from DataStream. Correct me if I am wrong. In such a setup the cross bounded/unbounded operation could be expressed quite easily I think: DataStream { DataStream join(DataStream, ...); // we could not really tell if the result is bounded or not, but because bounded stream is a special case of unbounded the API object is correct, irrespective if the left or right side of the join is bounded } BoundedStream extends DataStream { BoundedStream join(BoundedStream, ...); // only if both sides are bounded the result can be bounded as well. However we do have access to the DataStream#join here, so you can still join with a DataStream } On the other hand I also see benefits of two completely disjointed APIs, as we could prohibit some streaming calls in the bounded API. I can't think of any unbounded operators that could not be implemented for bounded stream. Besides I think we both agree we don't like the method: DataStream boundedStream(Source) suggested in the current state of the FLIP. Do we ? :) Best, Dawid On 10/12/2019 18:57, Becket Qin wrote: > Hi folks, > > Thanks for the discussion, great feedback. Also thanks Dawid for the > explanation, it is much clearer now. > > One thing that is indeed missing from the FLIP is how the boundedness is > passed to the Source implementation. So the API should be > Source#createEnumerator(Boundedness boundedness, SplitEnumeratorContext > context) > And we can probably remove the Source#supportBoundedness(Boundedness > boundedness) method. > > Assuming we have that, we are essentially choosing from one of the > following two options: > > Option 1: > // The source is continuous source, and only unbounded operations can be > performed. > DataStream<Type> datastream = env.continuousSource(someSource); > > // The source is bounded source, both bounded and unbounded operations can > be performed. > BoundedDataStream<Type> boundedDataStream = env.boundedSource(someSource); > > - Pros: > a) explicit boundary between bounded / unbounded streams, it is > quite simple and clear to the users. > - Cons: > a) For applications that do not involve bounded operations, they > still have to call different API to distinguish bounded / unbounded streams. > b) No support for bounded stream to run in a streaming runtime > setting, i.e. scheduling and operators behaviors. > > > Option 2: > // The source is either bounded or unbounded, but only unbounded operations > could be performed on the returned DataStream. > DataStream<Type> dataStream = env.source(someSource); > > // The source must be a bounded source, otherwise exception is thrown. > BoundedDataStream<Type> boundedDataStream = > env.boundedSource(boundedSource); > > The pros and cons are exactly the opposite of option 1. > - Pros: > a) For applications that do not involve bounded operations, they > still have to call different API to distinguish bounded / unbounded streams. > b) Support for bounded stream to run in a streaming runtime setting, > i.e. scheduling and operators behaviors. > - Cons: > a) Bounded / unbounded streams are kind of mixed, i.e. given a > DataStream, it is not clear whether it is bounded or not, unless you have > the access to its source. > > > If we only think from the Source API perspective, option 2 seems a better > choice because functionality wise it is a superset of option 1, at the cost > of some seemingly acceptable ambiguity in the DataStream API. > But if we look at the DataStream API as a whole, option 1 seems a clearer > choice. For example, some times a library may have to know whether a > certain task will finish or not. And it would be difficult to tell if the > input is a DataStream, unless additional information is provided all the > way from the Source. One possible solution is to have a *modified option 2* > which adds a method to the DataStream API to indicate boundedness, such as > getBoundedness(). It would solve the problem with a potential confusion of > what is difference between a DataStream with getBoundedness()=true and a > BoundedDataStream. But that seems not super difficult to explain. > > So from API's perspective, I don't have a strong opinion between *option 1* > and *modified option 2. *I like the cleanness of option 1, but modified > option 2 would be more attractive if we have concrete use case for the > "Bounded stream with unbounded streaming runtime settings". > > Re: Till > >> Maybe this has already been asked before but I was wondering why the >> SourceReader interface has the method pollNext which hands the >> responsibility of outputting elements to the SourceReader implementation? >> Has this been done for backwards compatibility reasons with the old source >> interface? If not, then one could define a Collection<E> getNextRecords() >> method which returns the currently retrieved records and then the caller >> emits them outside of the SourceReader. That way the interface would not >> allow to implement an outputting loop where we never hand back control to >> the caller. At the moment, this contract can be easily broken and is only >> mentioned loosely in the JavaDocs. >> > The primary reason we handover the SourceOutput to the SourceReader is > because sometimes it is difficult for a SourceReader to emit one record at > a time. One example is some batched messaging systems which only have an > offset for the entire batch instead of individual messages in the batch. In > that case, returning one record at a time would leave the SourceReader in > an uncheckpointable state because they can only checkpoint at the batch > boundaries. > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Dec 10, 2019 at 5:33 PM Till Rohrmann <trohrm...@apache.org> wrote: > >> Hi everyone, >> >> thanks for drafting this FLIP. It reads very well. >> >> Concerning Dawid's proposal, I tend to agree. The boundedness could come >> from the source and tell the system how to treat the operator (scheduling >> wise). From a user's perspective it should be fine to get back a DataStream >> when calling env.source(boundedSource) if he does not need special >> operations defined on a BoundedDataStream. If he needs this, then one could >> use the method BoundedDataStream env.boundedSource(boundedSource). >> >> If possible, we could enforce the proper usage of env.boundedSource() by >> introducing a BoundedSource type so that one cannot pass an >> unbounded source to it. That way users would not be able to shoot >> themselves in the foot. >> >> Maybe this has already been asked before but I was wondering why the >> SourceReader interface has the method pollNext which hands the >> responsibility of outputting elements to the SourceReader implementation? >> Has this been done for backwards compatibility reasons with the old source >> interface? If not, then one could define a Collection<E> getNextRecords() >> method which returns the currently retrieved records and then the caller >> emits them outside of the SourceReader. That way the interface would not >> allow to implement an outputting loop where we never hand back control to >> the caller. At the moment, this contract can be easily broken and is only >> mentioned loosely in the JavaDocs. >> >> Cheers, >> Till >> >> On Tue, Dec 10, 2019 at 7:49 AM Jingsong Li <jingsongl...@gmail.com> >> wrote: >> >>> Hi all, >>> >>> I think current design is good. >>> >>> My understanding is: >>> >>> For execution mode: bounded mode and continuous mode, It's totally >>> different. I don't think we have the ability to integrate the two models >> at >>> present. It's about scheduling, memory, algorithms, States, etc. we >>> shouldn't confuse them. >>> >>> For source capabilities: only bounded, only continuous, both bounded and >>> continuous. >>> I think Kafka is a source that can be ran both bounded >>> and continuous execution mode. >>> And Kafka with end offset should be ran both bounded >>> and continuous execution mode. Using apache Beam with Flink runner, I >> used >>> to run a "bounded" Kafka in streaming mode. For our previous DataStream, >> it >>> is not necessarily required that the source cannot be bounded. >>> >>> So it is my thought for Dawid's question: >>> 1.pass a bounded source to continuousSource() +1 >>> 2.pass a continuous source to boundedSource() -1, should throw exception. >>> >>> In StreamExecutionEnvironment, continuousSource and boundedSource define >>> the execution mode. It defines a clear boundary of execution mode. >>> >>> Best, >>> Jingsong Lee >>> >>> On Tue, Dec 10, 2019 at 10:37 AM Jark Wu <imj...@gmail.com> wrote: >>> >>>> I agree with Dawid's point that the boundedness information should come >>>> from the source itself (e.g. the end timestamp), not through >>>> env.boundedSouce()/continuousSource(). >>>> I think if we want to support something like `env.source()` that derive >>> the >>>> execution mode from source, `supportsBoundedness(Boundedness)` >>>> method is not enough, because we don't know whether it is bounded or >> not. >>>> Best, >>>> Jark >>>> >>>> >>>> On Mon, 9 Dec 2019 at 22:21, Dawid Wysakowicz <dwysakow...@apache.org> >>>> wrote: >>>> >>>>> One more thing. In the current proposal, with the >>>>> supportsBoundedness(Boundedness) method and the boundedness coming >> from >>>>> either continuousSource or boundedSource I could not find how this >>>>> information is fed back to the SplitEnumerator. >>>>> >>>>> Best, >>>>> >>>>> Dawid >>>>> >>>>> On 09/12/2019 13:52, Becket Qin wrote: >>>>>> Hi Dawid, >>>>>> >>>>>> Thanks for the comments. This actually brings another relevant >>> question >>>>>> about what does a "bounded source" imply. I actually had the same >>>>>> impression when I look at the Source API. Here is what I understand >>>> after >>>>>> some discussion with Stephan. The bounded source has the following >>>>> impacts. >>>>>> 1. API validity. >>>>>> - A bounded source generates a bounded stream so some operations >> that >>>>> only >>>>>> works for bounded records would be performed, e.g. sort. >>>>>> - To expose these bounded stream only APIs, there are two options: >>>>>> a. Add them to the DataStream API and throw exception if a >>> method >>>> is >>>>>> called on an unbounded stream. >>>>>> b. Create a BoundedDataStream class which is returned from >>>>>> env.boundedSource(), while DataStream is returned from >>>>> env.continousSource(). >>>>>> Note that this cannot be done by having single >> env.source(theSource) >>>> even >>>>>> the Source has a getBoundedness() method. >>>>>> >>>>>> 2. Scheduling >>>>>> - A bounded source could be computed stage by stage without >> bringing >>> up >>>>> all >>>>>> the tasks at the same time. >>>>>> >>>>>> 3. Operator behaviors >>>>>> - A bounded source indicates the records are finite so some >> operators >>>> can >>>>>> wait until it receives all the records before it starts the >>> processing. >>>>>> In the above impact, only 1 is relevant to the API design. And the >>>>> current >>>>>> proposal in FLIP-27 is following 1.b. >>>>>> >>>>>> // boundedness depends of source property, imo this should always >> be >>>>>>> preferred >>>>>>> >>>>>> DataStream<MyType> stream = env.source(theSource); >>>>>> >>>>>> >>>>>> In your proposal, does DataStream have bounded stream only methods? >>> It >>>>>> looks it should have, otherwise passing a bounded Source to >>>> env.source() >>>>>> would be confusing. In that case, we will essentially do 1.a if an >>>>>> unbounded Source is created from env.source(unboundedSource). >>>>>> >>>>>> If we have the methods only supported for bounded streams in >>>> DataStream, >>>>> it >>>>>> seems a little weird to have a separate BoundedDataStream >> interface. >>>>>> Am I understand it correctly? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Jiangjie (Becket) Qin >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz < >>>> dwysakow...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> Really well written proposal and very important one. I must admit >> I >>>> have >>>>>>> not understood all the intricacies of it yet. >>>>>>> >>>>>>> One question I have though is about where does the information >> about >>>>>>> boundedness come from. I think in most cases it is a property of >> the >>>>>>> source. As you described it might be e.g. end offset, a flag >> should >>> it >>>>>>> monitor new splits etc. I think it would be a really nice use case >>> to >>>> be >>>>>>> able to say: >>>>>>> >>>>>>> new KafkaSource().readUntil(long timestamp), >>>>>>> >>>>>>> which could work as an "end offset". Moreover I think all Bounded >>>>> sources >>>>>>> support continuous mode, but no intrinsically continuous source >>>> support >>>>> the >>>>>>> Bounded mode. If I understood the proposal correctly it suggest >> the >>>>>>> boundedness sort of "comes" from the outside of the source, from >> the >>>>>>> invokation of either boundedStream or continousSource. >>>>>>> >>>>>>> I am wondering if it would make sense to actually change the >> method >>>>>>> boolean Source#supportsBoundedness(Boundedness) >>>>>>> >>>>>>> to >>>>>>> >>>>>>> Boundedness Source#getBoundedness(). >>>>>>> >>>>>>> As for the methods #boundedSource, #continousSource, assuming the >>>>>>> boundedness is property of the source they do not affect how the >>>>> enumerator >>>>>>> works, but mostly how the dag is scheduled, right? I am not >> against >>>>> those >>>>>>> methods, but I think it is a very specific use case to actually >>>> override >>>>>>> the property of the source. In general I would expect users to >> only >>>> call >>>>>>> env.source(theSource), where the source tells if it is bounded or >>>> not. I >>>>>>> would suggest considering following set of methods: >>>>>>> >>>>>>> // boundedness depends of source property, imo this should always >> be >>>>> preferred >>>>>>> DataStream<MyType> stream = env.source(theSource); >>>>>>> >>>>>>> >>>>>>> // always continous execution, whether bounded or unbounded source >>>>>>> >>>>>>> DataStream<MyType> boundedStream = env.continousSource(theSource); >>>>>>> >>>>>>> // imo this would make sense if the BoundedDataStream provides >>>>> additional features unavailable for continous mode >>>>>>> BoundedDataStream<MyType> batch = env.boundedSource(theSource); >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> Dawid >>>>>>> >>>>>>> >>>>>>> On 04/12/2019 11:25, Stephan Ewen wrote: >>>>>>> >>>>>>> Thanks, Becket, for updating this. >>>>>>> >>>>>>> I agree with moving the aspects you mentioned into separate FLIPs >> - >>>> this >>>>>>> one way becoming unwieldy in size. >>>>>>> >>>>>>> +1 to the FLIP in its current state. Its a very detailed write-up, >>>>> nicely >>>>>>> done! >>>>>>> >>>>>>> On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com> >> < >>>>> becket....@gmail.com> wrote: >>>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> Sorry for the long belated update. I have updated FLIP-27 wiki >> page >>>> with >>>>>>> the latest proposals. Some noticeable changes include: >>>>>>> 1. A new generic communication mechanism between SplitEnumerator >> and >>>>>>> SourceReader. >>>>>>> 2. Some detail API method signature changes. >>>>>>> >>>>>>> We left a few things out of this FLIP and will address them in >>>> separate >>>>>>> FLIPs. Including: >>>>>>> 1. Per split event time. >>>>>>> 2. Event time alignment. >>>>>>> 3. Fine grained failover for SplitEnumerator failure. >>>>>>> >>>>>>> Please let us know if you have any question. >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Jiangjie (Becket) Qin >>>>>>> >>>>>>> On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org> < >>>>> se...@apache.org> wrote: >>>>>>> >>>>>>> Hi Łukasz! >>>>>>> >>>>>>> Becket and me are working hard on figuring out the last details >> and >>>>>>> implementing the first PoC. We would update the FLIP hopefully >> next >>>>> week. >>>>>>> There is a fair chance that a first version of this will be in >> 1.10, >>>> but >>>>>>> I >>>>>>> >>>>>>> think it will take another release to battle test it and migrate >> the >>>>>>> connectors. >>>>>>> >>>>>>> Best, >>>>>>> Stephan >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl >>> < >>>>> l...@touk.pl> >>>>>>> wrote: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> This proposal looks very promising for us. Do you have any plans >> in >>>>>>> which >>>>>>> >>>>>>> Flink release it is going to be released? We are thinking on >> using a >>>>>>> Data >>>>>>> >>>>>>> Set API for our future use cases but on the other hand Data Set >> API >>> is >>>>>>> going to be deprecated so using proposed bounded data streams >>> solution >>>>>>> could be more viable in the long term. >>>>>>> >>>>>>> Thanks, >>>>>>> Łukasz >>>>>>> >>>>>>> On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com> < >>>>> thomas.we...@gmail.com> wrote: >>>>>>> Thanks for putting together this proposal! >>>>>>> >>>>>>> I see that the "Per Split Event Time" and "Event Time Alignment" >>>>>>> >>>>>>> sections >>>>>>> >>>>>>> are still TBD. >>>>>>> >>>>>>> It would probably be good to flesh those out a bit before >> proceeding >>>>>>> too >>>>>>> >>>>>>> far >>>>>>> >>>>>>> as the event time alignment will probably influence the >> interaction >>>>>>> with >>>>>>> >>>>>>> the split reader, specifically ReaderStatus >> emitNext(SourceOutput<E> >>>>>>> output). >>>>>>> >>>>>>> We currently have only one implementation for event time alignment >>> in >>>>>>> the >>>>>>> >>>>>>> Kinesis consumer. The synchronization in that case takes place as >>> the >>>>>>> last >>>>>>> >>>>>>> step before records are emitted downstream (RecordEmitter). With >> the >>>>>>> currently proposed interfaces, the equivalent can be implemented >> in >>>>>>> the >>>>>>> >>>>>>> reader loop, although note that in the Kinesis consumer the per >>> shard >>>>>>> threads push records. >>>>>>> >>>>>>> Synchronization has not been implemented for the Kafka consumer >> yet. >>>>>>> https://issues.apache.org/jira/browse/FLINK-12675 >>>>>>> >>>>>>> When I looked at it, I realized that the implementation will look >>>>>>> >>>>>>> quite >>>>>>> >>>>>>> different >>>>>>> from Kinesis because it needs to take place in the pull part, >> where >>>>>>> records >>>>>>> >>>>>>> are taken from the Kafka client. Due to the multiplexing it cannot >>> be >>>>>>> done >>>>>>> >>>>>>> by blocking the split thread like it currently works for Kinesis. >>>>>>> >>>>>>> Reading >>>>>>> >>>>>>> from individual Kafka partitions needs to be controlled via >>>>>>> >>>>>>> pause/resume >>>>>>> >>>>>>> on the Kafka client. >>>>>>> >>>>>>> To take on that responsibility the split thread would need to be >>>>>>> >>>>>>> aware >>>>>>> >>>>>>> of >>>>>>> >>>>>>> the >>>>>>> watermarks or at least whether it should or should not continue to >>>>>>> >>>>>>> consume >>>>>>> >>>>>>> a given split and this may require a different SourceReader or >>>>>>> >>>>>>> SourceOutput >>>>>>> >>>>>>> interface. >>>>>>> >>>>>>> Thanks, >>>>>>> Thomas >>>>>>> >>>>>>> >>>>>>> On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com> < >>>>> mmyy1...@gmail.com> wrote: >>>>>>> >>>>>>> Hi Stephan, >>>>>>> >>>>>>> Thank you for feedback! >>>>>>> Will take a look at your branch before public discussing. >>>>>>> >>>>>>> >>>>>>> On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org> >> < >>>>> se...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>> Hi Biao! >>>>>>> >>>>>>> Thanks for reviving this. I would like to join this discussion, >>>>>>> >>>>>>> but >>>>>>> >>>>>>> am >>>>>>> >>>>>>> quite occupied with the 1.9 release, so can we maybe pause this >>>>>>> >>>>>>> discussion >>>>>>> >>>>>>> for a week or so? >>>>>>> >>>>>>> In the meantime I can share some suggestion based on prior >>>>>>> >>>>>>> experiments: >>>>>>> >>>>>>> How to do watermarks / timestamp extractors in a simpler and more >>>>>>> >>>>>>> flexible >>>>>>> >>>>>>> way. I think that part is quite promising should be part of the >>>>>>> >>>>>>> new >>>>>>> >>>>>>> source >>>>>>> >>>>>>> interface. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime >>>>>>> >> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java >>>>>>> Some experiments on how to build the source reader and its >>>>>>> >>>>>>> library >>>>>>> >>>>>>> for >>>>>>> >>>>>>> common threading/split patterns: >>>>>>> >>>>>>> >>>>>>> >>>>>>> >> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src >>>>>>> Best, >>>>>>> Stephan >>>>>>> >>>>>>> >>>>>>> On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com> < >>>>> mmyy1...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> Hi devs, >>>>>>> >>>>>>> Since 1.9 is nearly released, I think we could get back to >>>>>>> >>>>>>> FLIP-27. >>>>>>> >>>>>>> I >>>>>>> >>>>>>> believe it should be included in 1.10. >>>>>>> >>>>>>> There are so many things mentioned in document of FLIP-27. [1] I >>>>>>> >>>>>>> think >>>>>>> >>>>>>> we'd better discuss them separately. However the wiki is not a >>>>>>> >>>>>>> good >>>>>>> >>>>>>> place >>>>>>> >>>>>>> to discuss. I wrote google doc about SplitReader API which >>>>>>> >>>>>>> misses >>>>>>> >>>>>>> some >>>>>>> >>>>>>> details in the document. [2] >>>>>>> >>>>>>> 1. >>>>>>> >>>>>>> >>>>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface >>>>>>> 2. >>>>>>> >>>>>>> >>>>>>> >> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing >>>>>>> CC Stephan, Aljoscha, Piotrek, Becket >>>>>>> >>>>>>> >>>>>>> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com> < >>>>> mmyy1...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> Hi Steven, >>>>>>> Thank you for the feedback. Please take a look at the document >>>>>>> >>>>>>> FLIP-27 >>>>>>> >>>>>>> < >>>>>>> >>>>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface >>>>>>> which >>>>>>> >>>>>>> is updated recently. A lot of details of enumerator were added >>>>>>> >>>>>>> in >>>>>>> >>>>>>> this >>>>>>> >>>>>>> document. I think it would help. >>>>>>> >>>>>>> Steven Wu <stevenz...@gmail.com> <stevenz...@gmail.com> >>> 于2019年3月28日周四 >>>>> 下午12:52写道: >>>>>>> >>>>>>> This proposal mentioned that SplitEnumerator might run on the >>>>>>> JobManager or >>>>>>> in a single task on a TaskManager. >>>>>>> >>>>>>> if enumerator is a single task on a taskmanager, then the job >>>>>>> >>>>>>> DAG >>>>>>> >>>>>>> can >>>>>>> >>>>>>> never >>>>>>> been embarrassingly parallel anymore. That will nullify the >>>>>>> >>>>>>> leverage >>>>>>> >>>>>>> of >>>>>>> >>>>>>> fine-grained recovery for embarrassingly parallel jobs. >>>>>>> >>>>>>> It's not clear to me what's the implication of running >>>>>>> >>>>>>> enumerator >>>>>>> >>>>>>> on >>>>>>> >>>>>>> the >>>>>>> >>>>>>> jobmanager. So I will leave that out for now. >>>>>>> >>>>>>> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> < >>>>> mmyy1...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> Hi Stephan & Piotrek, >>>>>>> >>>>>>> Thank you for feedback. >>>>>>> >>>>>>> It seems that there are a lot of things to do in community. >>>>>>> >>>>>>> I >>>>>>> >>>>>>> am >>>>>>> >>>>>>> just >>>>>>> >>>>>>> afraid that this discussion may be forgotten since there so >>>>>>> >>>>>>> many >>>>>>> >>>>>>> proposals >>>>>>> >>>>>>> recently. >>>>>>> Anyway, wish to see the split topics soon :) >>>>>>> >>>>>>> Piotr Nowojski <pi...@da-platform.com> <pi...@da-platform.com> >>>>> 于2019年1月24日周四 >>>>>>> 下午8:21写道: >>>>>>> >>>>>>> Hi Biao! >>>>>>> >>>>>>> This discussion was stalled because of preparations for >>>>>>> >>>>>>> the >>>>>>> >>>>>>> open >>>>>>> >>>>>>> sourcing >>>>>>> >>>>>>> & merging Blink. I think before creating the tickets we >>>>>>> >>>>>>> should >>>>>>> >>>>>>> split this >>>>>>> >>>>>>> discussion into topics/areas outlined by Stephan and >>>>>>> >>>>>>> create >>>>>>> >>>>>>> Flips >>>>>>> >>>>>>> for >>>>>>> >>>>>>> that. >>>>>>> >>>>>>> I think there is no chance for this to be completed in >>>>>>> >>>>>>> couple >>>>>>> >>>>>>> of >>>>>>> >>>>>>> remaining >>>>>>> >>>>>>> weeks/1 month before 1.8 feature freeze, however it would >>>>>>> >>>>>>> be >>>>>>> >>>>>>> good >>>>>>> >>>>>>> to aim >>>>>>> >>>>>>> with those changes for 1.9. >>>>>>> >>>>>>> Piotrek >>>>>>> >>>>>>> >>>>>>> On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> < >>>>> mmyy1...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> Hi community, >>>>>>> The summary of Stephan makes a lot sense to me. It is >>>>>>> >>>>>>> much >>>>>>> >>>>>>> clearer >>>>>>> >>>>>>> indeed >>>>>>> >>>>>>> after splitting the complex topic into small ones. >>>>>>> I was wondering is there any detail plan for next step? >>>>>>> >>>>>>> If >>>>>>> >>>>>>> not, >>>>>>> >>>>>>> I >>>>>>> >>>>>>> would >>>>>>> >>>>>>> like to push this thing forward by creating some JIRA >>>>>>> >>>>>>> issues. >>>>>>> >>>>>>> Another question is that should version 1.8 include >>>>>>> >>>>>>> these >>>>>>> >>>>>>> features? >>>>>>> >>>>>>> Stephan Ewen <se...@apache.org> <se...@apache.org> 于2018年12月1日周六 >>>>> 上午4:20写道: >>>>>>> >>>>>>> Thanks everyone for the lively discussion. Let me try >>>>>>> >>>>>>> to >>>>>>> >>>>>>> summarize >>>>>>> >>>>>>> where I >>>>>>> >>>>>>> see convergence in the discussion and open issues. >>>>>>> I'll try to group this by design aspect of the source. >>>>>>> >>>>>>> Please >>>>>>> >>>>>>> let me >>>>>>> >>>>>>> know >>>>>>> >>>>>>> if I got things wrong or missed something crucial here. >>>>>>> >>>>>>> For issues 1-3, if the below reflects the state of the >>>>>>> >>>>>>> discussion, I >>>>>>> >>>>>>> would >>>>>>> >>>>>>> try and update the FLIP in the next days. >>>>>>> For the remaining ones we need more discussion. >>>>>>> >>>>>>> I would suggest to fork each of these aspects into a >>>>>>> >>>>>>> separate >>>>>>> >>>>>>> mail >>>>>>> >>>>>>> thread, >>>>>>> >>>>>>> or will loose sight of the individual aspects. >>>>>>> >>>>>>> *(1) Separation of Split Enumerator and Split Reader* >>>>>>> >>>>>>> - All seem to agree this is a good thing >>>>>>> - Split Enumerator could in the end live on JobManager >>>>>>> >>>>>>> (and >>>>>>> >>>>>>> assign >>>>>>> >>>>>>> splits >>>>>>> >>>>>>> via RPC) or in a task (and assign splits via data >>>>>>> >>>>>>> streams) >>>>>>> >>>>>>> - this discussion is orthogonal and should come later, >>>>>>> >>>>>>> when >>>>>>> >>>>>>> the >>>>>>> >>>>>>> interface >>>>>>> >>>>>>> is agreed upon. >>>>>>> >>>>>>> *(2) Split Readers for one or more splits* >>>>>>> >>>>>>> - Discussion seems to agree that we need to support >>>>>>> >>>>>>> one >>>>>>> >>>>>>> reader >>>>>>> >>>>>>> that >>>>>>> >>>>>>> possibly handles multiple splits concurrently. >>>>>>> - The requirement comes from sources where one >>>>>>> >>>>>>> poll()-style >>>>>>> >>>>>>> call >>>>>>> >>>>>>> fetches >>>>>>> >>>>>>> data from different splits / partitions >>>>>>> --> example sources that require that would be for >>>>>>> >>>>>>> example >>>>>>> >>>>>>> Kafka, >>>>>>> >>>>>>> Pravega, Pulsar >>>>>>> >>>>>>> - Could have one split reader per source, or multiple >>>>>>> >>>>>>> split >>>>>>> >>>>>>> readers >>>>>>> >>>>>>> that >>>>>>> >>>>>>> share the "poll()" function >>>>>>> - To not make it too complicated, we can start with >>>>>>> >>>>>>> thinking >>>>>>> >>>>>>> about >>>>>>> >>>>>>> one >>>>>>> >>>>>>> split reader for all splits initially and see if that >>>>>>> >>>>>>> covers >>>>>>> >>>>>>> all >>>>>>> >>>>>>> requirements >>>>>>> >>>>>>> *(3) Threading model of the Split Reader* >>>>>>> >>>>>>> - Most active part of the discussion ;-) >>>>>>> >>>>>>> - A non-blocking way for Flink's task code to interact >>>>>>> >>>>>>> with >>>>>>> >>>>>>> the >>>>>>> >>>>>>> source >>>>>>> >>>>>>> is >>>>>>> >>>>>>> needed in order to a task runtime code based on a >>>>>>> single-threaded/actor-style task design >>>>>>> --> I personally am a big proponent of that, it will >>>>>>> >>>>>>> help >>>>>>> >>>>>>> with >>>>>>> >>>>>>> well-behaved checkpoints, efficiency, and simpler yet >>>>>>> >>>>>>> more >>>>>>> >>>>>>> robust >>>>>>> >>>>>>> runtime >>>>>>> >>>>>>> code >>>>>>> >>>>>>> - Users care about simple abstraction, so as a >>>>>>> >>>>>>> subclass >>>>>>> >>>>>>> of >>>>>>> >>>>>>> SplitReader >>>>>>> >>>>>>> (non-blocking / async) we need to have a >>>>>>> >>>>>>> BlockingSplitReader >>>>>>> >>>>>>> which >>>>>>> >>>>>>> will >>>>>>> >>>>>>> form the basis of most source implementations. >>>>>>> >>>>>>> BlockingSplitReader >>>>>>> >>>>>>> lets >>>>>>> >>>>>>> users do blocking simple poll() calls. >>>>>>> - The BlockingSplitReader would spawn a thread (or >>>>>>> >>>>>>> more) >>>>>>> >>>>>>> and >>>>>>> >>>>>>> the >>>>>>> >>>>>>> thread(s) can make blocking calls and hand over data >>>>>>> >>>>>>> buffers >>>>>>> >>>>>>> via >>>>>>> >>>>>>> a >>>>>>> >>>>>>> blocking >>>>>>> >>>>>>> queue >>>>>>> - This should allow us to cover both, a fully async >>>>>>> >>>>>>> runtime, >>>>>>> >>>>>>> and a >>>>>>> >>>>>>> simple >>>>>>> >>>>>>> blocking interface for users. >>>>>>> - This is actually very similar to how the Kafka >>>>>>> >>>>>>> connectors >>>>>>> >>>>>>> work. >>>>>>> >>>>>>> Kafka >>>>>>> >>>>>>> 9+ with one thread, Kafka 8 with multiple threads >>>>>>> >>>>>>> - On the base SplitReader (the async one), the >>>>>>> >>>>>>> non-blocking >>>>>>> >>>>>>> method >>>>>>> >>>>>>> that >>>>>>> >>>>>>> gets the next chunk of data would signal data >>>>>>> >>>>>>> availability >>>>>>> >>>>>>> via >>>>>>> >>>>>>> a >>>>>>> >>>>>>> CompletableFuture, because that gives the best >>>>>>> >>>>>>> flexibility >>>>>>> >>>>>>> (can >>>>>>> >>>>>>> await >>>>>>> >>>>>>> completion or register notification handlers). >>>>>>> - The source task would register a "thenHandle()" (or >>>>>>> >>>>>>> similar) >>>>>>> >>>>>>> on the >>>>>>> >>>>>>> future to put a "take next data" task into the >>>>>>> >>>>>>> actor-style >>>>>>> >>>>>>> mailbox >>>>>>> >>>>>>> *(4) Split Enumeration and Assignment* >>>>>>> >>>>>>> - Splits may be generated lazily, both in cases where >>>>>>> >>>>>>> there >>>>>>> >>>>>>> is a >>>>>>> >>>>>>> limited >>>>>>> >>>>>>> number of splits (but very many), or splits are >>>>>>> >>>>>>> discovered >>>>>>> >>>>>>> over >>>>>>> >>>>>>> time >>>>>>> >>>>>>> - Assignment should also be lazy, to get better load >>>>>>> >>>>>>> balancing >>>>>>> >>>>>>> - Assignment needs support locality preferences >>>>>>> >>>>>>> - Possible design based on discussion so far: >>>>>>> >>>>>>> --> SplitReader has a method "addSplits(SplitT...)" >>>>>>> >>>>>>> to >>>>>>> >>>>>>> add >>>>>>> >>>>>>> one or >>>>>>> >>>>>>> more >>>>>>> >>>>>>> splits. Some split readers might assume they have only >>>>>>> >>>>>>> one >>>>>>> >>>>>>> split >>>>>>> >>>>>>> ever, >>>>>>> >>>>>>> concurrently, others assume multiple splits. (Note: >>>>>>> >>>>>>> idea >>>>>>> >>>>>>> behind >>>>>>> >>>>>>> being >>>>>>> >>>>>>> able >>>>>>> >>>>>>> to add multiple splits at the same time is to ease >>>>>>> >>>>>>> startup >>>>>>> >>>>>>> where >>>>>>> >>>>>>> multiple >>>>>>> >>>>>>> splits may be assigned instantly.) >>>>>>> --> SplitReader has a context object on which it can >>>>>>> >>>>>>> call >>>>>>> >>>>>>> indicate >>>>>>> >>>>>>> when >>>>>>> >>>>>>> splits are completed. The enumerator gets that >>>>>>> >>>>>>> notification and >>>>>>> >>>>>>> can >>>>>>> >>>>>>> use >>>>>>> >>>>>>> to >>>>>>> >>>>>>> decide when to assign new splits. This should help both >>>>>>> >>>>>>> in >>>>>>> >>>>>>> cases >>>>>>> >>>>>>> of >>>>>>> >>>>>>> sources >>>>>>> >>>>>>> that take splits lazily (file readers) and in case the >>>>>>> >>>>>>> source >>>>>>> >>>>>>> needs to >>>>>>> >>>>>>> preserve a partial order between splits (Kinesis, >>>>>>> >>>>>>> Pravega, >>>>>>> >>>>>>> Pulsar may >>>>>>> >>>>>>> need >>>>>>> >>>>>>> that). >>>>>>> --> SplitEnumerator gets notification when >>>>>>> >>>>>>> SplitReaders >>>>>>> >>>>>>> start >>>>>>> >>>>>>> and >>>>>>> >>>>>>> when >>>>>>> >>>>>>> they finish splits. They can decide at that moment to >>>>>>> >>>>>>> push >>>>>>> >>>>>>> more >>>>>>> >>>>>>> splits >>>>>>> >>>>>>> to >>>>>>> >>>>>>> that reader >>>>>>> --> The SplitEnumerator should probably be aware of >>>>>>> >>>>>>> the >>>>>>> >>>>>>> source >>>>>>> >>>>>>> parallelism, to build its initial distribution. >>>>>>> >>>>>>> - Open question: Should the source expose something >>>>>>> >>>>>>> like >>>>>>> >>>>>>> "host >>>>>>> >>>>>>> preferences", so that yarn/mesos/k8s can take this into >>>>>>> >>>>>>> account >>>>>>> >>>>>>> when >>>>>>> >>>>>>> selecting a node to start a TM on? >>>>>>> >>>>>>> *(5) Watermarks and event time alignment* >>>>>>> >>>>>>> - Watermark generation, as well as idleness, needs to >>>>>>> >>>>>>> be >>>>>>> >>>>>>> per >>>>>>> >>>>>>> split >>>>>>> >>>>>>> (like >>>>>>> >>>>>>> currently in the Kafka Source, per partition) >>>>>>> - It is desirable to support optional >>>>>>> >>>>>>> event-time-alignment, >>>>>>> >>>>>>> meaning >>>>>>> >>>>>>> that >>>>>>> >>>>>>> splits that are ahead are back-pressured or temporarily >>>>>>> >>>>>>> unsubscribed >>>>>>> >>>>>>> - I think i would be desirable to encapsulate >>>>>>> >>>>>>> watermark >>>>>>> >>>>>>> generation >>>>>>> >>>>>>> logic >>>>>>> >>>>>>> in watermark generators, for a separation of concerns. >>>>>>> >>>>>>> The >>>>>>> >>>>>>> watermark >>>>>>> >>>>>>> generators should run per split. >>>>>>> - Using watermark generators would also help with >>>>>>> >>>>>>> another >>>>>>> >>>>>>> problem of >>>>>>> >>>>>>> the >>>>>>> >>>>>>> suggested interface, namely supporting non-periodic >>>>>>> >>>>>>> watermarks >>>>>>> >>>>>>> efficiently. >>>>>>> >>>>>>> - Need a way to "dispatch" next record to different >>>>>>> >>>>>>> watermark >>>>>>> >>>>>>> generators >>>>>>> >>>>>>> - Need a way to tell SplitReader to "suspend" a split >>>>>>> >>>>>>> until a >>>>>>> >>>>>>> certain >>>>>>> >>>>>>> watermark is reached (event time backpressure) >>>>>>> - This would in fact be not needed (and thus simpler) >>>>>>> >>>>>>> if >>>>>>> >>>>>>> we >>>>>>> >>>>>>> had >>>>>>> >>>>>>> a >>>>>>> >>>>>>> SplitReader per split and may be a reason to re-open >>>>>>> >>>>>>> that >>>>>>> >>>>>>> discussion >>>>>>> >>>>>>> *(6) Watermarks across splits and in the Split >>>>>>> >>>>>>> Enumerator* >>>>>>> >>>>>>> - The split enumerator may need some watermark >>>>>>> >>>>>>> awareness, >>>>>>> >>>>>>> which >>>>>>> >>>>>>> should >>>>>>> >>>>>>> be >>>>>>> >>>>>>> purely based on split metadata (like create timestamp >>>>>>> >>>>>>> of >>>>>>> >>>>>>> file >>>>>>> >>>>>>> splits) >>>>>>> >>>>>>> - If there are still more splits with overlapping >>>>>>> >>>>>>> event >>>>>>> >>>>>>> time >>>>>>> >>>>>>> range >>>>>>> >>>>>>> for >>>>>>> >>>>>>> a >>>>>>> >>>>>>> split reader, then that split reader should not advance >>>>>>> >>>>>>> the >>>>>>> >>>>>>> watermark >>>>>>> >>>>>>> within the split beyond the overlap boundary. Otherwise >>>>>>> >>>>>>> future >>>>>>> >>>>>>> splits >>>>>>> >>>>>>> will >>>>>>> >>>>>>> produce late data. >>>>>>> >>>>>>> - One way to approach this could be that the split >>>>>>> >>>>>>> enumerator >>>>>>> >>>>>>> may >>>>>>> >>>>>>> send >>>>>>> >>>>>>> watermarks to the readers, and the readers cannot emit >>>>>>> >>>>>>> watermarks >>>>>>> >>>>>>> beyond >>>>>>> >>>>>>> that received watermark. >>>>>>> - Many split enumerators would simply immediately send >>>>>>> >>>>>>> Long.MAX >>>>>>> >>>>>>> out >>>>>>> >>>>>>> and >>>>>>> >>>>>>> leave the progress purely to the split readers. >>>>>>> >>>>>>> - For event-time alignment / split back pressure, this >>>>>>> >>>>>>> begs >>>>>>> >>>>>>> the >>>>>>> >>>>>>> question >>>>>>> >>>>>>> how we can avoid deadlocks that may arise when splits >>>>>>> >>>>>>> are >>>>>>> >>>>>>> suspended >>>>>>> >>>>>>> for >>>>>>> >>>>>>> event time back pressure, >>>>>>> >>>>>>> *(7) Batch and streaming Unification* >>>>>>> >>>>>>> - Functionality wise, the above design should support >>>>>>> >>>>>>> both >>>>>>> >>>>>>> - Batch often (mostly) does not care about reading "in >>>>>>> >>>>>>> order" >>>>>>> >>>>>>> and >>>>>>> >>>>>>> generating watermarks >>>>>>> --> Might use different enumerator logic that is >>>>>>> >>>>>>> more >>>>>>> >>>>>>> locality >>>>>>> >>>>>>> aware >>>>>>> >>>>>>> and ignores event time order >>>>>>> --> Does not generate watermarks >>>>>>> - Would be great if bounded sources could be >>>>>>> >>>>>>> identified >>>>>>> >>>>>>> at >>>>>>> >>>>>>> compile >>>>>>> >>>>>>> time, >>>>>>> >>>>>>> so that "env.addBoundedSource(...)" is type safe and >>>>>>> >>>>>>> can >>>>>>> >>>>>>> return a >>>>>>> >>>>>>> "BoundedDataStream". >>>>>>> - Possible to defer this discussion until later >>>>>>> >>>>>>> *Miscellaneous Comments* >>>>>>> >>>>>>> - Should the source have a TypeInformation for the >>>>>>> >>>>>>> produced >>>>>>> >>>>>>> type, >>>>>>> >>>>>>> instead >>>>>>> >>>>>>> of a serializer? We need a type information in the >>>>>>> >>>>>>> stream >>>>>>> >>>>>>> anyways, and >>>>>>> >>>>>>> can >>>>>>> >>>>>>> derive the serializer from that. Plus, creating the >>>>>>> >>>>>>> serializer >>>>>>> >>>>>>> should >>>>>>> >>>>>>> respect the ExecutionConfig. >>>>>>> >>>>>>> - The TypeSerializer interface is very powerful but >>>>>>> >>>>>>> also >>>>>>> >>>>>>> not >>>>>>> >>>>>>> easy to >>>>>>> >>>>>>> implement. Its purpose is to handle data super >>>>>>> >>>>>>> efficiently, >>>>>>> >>>>>>> support >>>>>>> >>>>>>> flexible ways of evolution, etc. >>>>>>> For metadata I would suggest to look at the >>>>>>> >>>>>>> SimpleVersionedSerializer >>>>>>> >>>>>>> instead, which is used for example for checkpoint >>>>>>> >>>>>>> master >>>>>>> >>>>>>> hooks, >>>>>>> >>>>>>> or for >>>>>>> >>>>>>> the >>>>>>> >>>>>>> streaming file sink. I think that is is a good match >>>>>>> >>>>>>> for >>>>>>> >>>>>>> cases >>>>>>> >>>>>>> where >>>>>>> >>>>>>> we >>>>>>> >>>>>>> do >>>>>>> >>>>>>> not need more than ser/deser (no copy, etc.) and don't >>>>>>> >>>>>>> need to >>>>>>> >>>>>>> push >>>>>>> >>>>>>> versioning out of the serialization paths for best >>>>>>> >>>>>>> performance >>>>>>> >>>>>>> (as in >>>>>>> >>>>>>> the >>>>>>> >>>>>>> TypeSerializer) >>>>>>> >>>>>>> >>>>>>> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas < >>>>> k.klou...@data-artisans.com> >>>>>>> wrote: >>>>>>> >>>>>>> >>>>>>> Hi Biao, >>>>>>> >>>>>>> Thanks for the answer! >>>>>>> >>>>>>> So given the multi-threaded readers, now we have as >>>>>>> >>>>>>> open >>>>>>> >>>>>>> questions: >>>>>>> >>>>>>> 1) How do we let the checkpoints pass through our >>>>>>> >>>>>>> multi-threaded >>>>>>> >>>>>>> reader >>>>>>> >>>>>>> operator? >>>>>>> >>>>>>> 2) Do we have separate reader and source operators or >>>>>>> >>>>>>> not? In >>>>>>> >>>>>>> the >>>>>>> >>>>>>> strategy >>>>>>> >>>>>>> that has a separate source, the source operator has a >>>>>>> >>>>>>> parallelism of >>>>>>> >>>>>>> 1 >>>>>>> >>>>>>> and >>>>>>> >>>>>>> is responsible for split recovery only. >>>>>>> >>>>>>> For the first one, given also the constraints >>>>>>> >>>>>>> (blocking, >>>>>>> >>>>>>> finite >>>>>>> >>>>>>> queues, >>>>>>> >>>>>>> etc), I do not have an answer yet. >>>>>>> >>>>>>> For the 2nd, I think that we should go with separate >>>>>>> >>>>>>> operators >>>>>>> >>>>>>> for >>>>>>> >>>>>>> the >>>>>>> >>>>>>> source and the readers, for the following reasons: >>>>>>> >>>>>>> 1) This is more aligned with a potential future >>>>>>> >>>>>>> improvement >>>>>>> >>>>>>> where the >>>>>>> >>>>>>> split >>>>>>> >>>>>>> discovery becomes a responsibility of the JobManager >>>>>>> >>>>>>> and >>>>>>> >>>>>>> readers are >>>>>>> >>>>>>> pooling more work from the JM. >>>>>>> >>>>>>> 2) The source is going to be the "single point of >>>>>>> >>>>>>> truth". >>>>>>> >>>>>>> It >>>>>>> >>>>>>> will >>>>>>> >>>>>>> know >>>>>>> >>>>>>> what >>>>>>> >>>>>>> has been processed and what not. If the source and the >>>>>>> >>>>>>> readers >>>>>>> >>>>>>> are a >>>>>>> >>>>>>> single >>>>>>> >>>>>>> operator with parallelism > 1, or in general, if the >>>>>>> >>>>>>> split >>>>>>> >>>>>>> discovery >>>>>>> >>>>>>> is >>>>>>> >>>>>>> done by each task individually, then: >>>>>>> i) we have to have a deterministic scheme for each >>>>>>> >>>>>>> reader to >>>>>>> >>>>>>> assign >>>>>>> >>>>>>> splits to itself (e.g. mod subtaskId). This is not >>>>>>> >>>>>>> necessarily >>>>>>> >>>>>>> trivial >>>>>>> >>>>>>> for >>>>>>> >>>>>>> all sources. >>>>>>> ii) each reader would have to keep a copy of all its >>>>>>> >>>>>>> processed >>>>>>> >>>>>>> slpits >>>>>>> >>>>>>> iii) the state has to be a union state with a >>>>>>> >>>>>>> non-trivial >>>>>>> >>>>>>> merging >>>>>>> >>>>>>> logic >>>>>>> >>>>>>> in order to support rescaling. >>>>>>> >>>>>>> Two additional points that you raised above: >>>>>>> >>>>>>> i) The point that you raised that we need to keep all >>>>>>> >>>>>>> splits >>>>>>> >>>>>>> (processed >>>>>>> >>>>>>> and >>>>>>> >>>>>>> not-processed) I think is a bit of a strong >>>>>>> >>>>>>> requirement. >>>>>>> >>>>>>> This >>>>>>> >>>>>>> would >>>>>>> >>>>>>> imply >>>>>>> >>>>>>> that for infinite sources the state will grow >>>>>>> >>>>>>> indefinitely. >>>>>>> >>>>>>> This is >>>>>>> >>>>>>> problem >>>>>>> >>>>>>> is even more pronounced if we do not have a single >>>>>>> >>>>>>> source >>>>>>> >>>>>>> that >>>>>>> >>>>>>> assigns >>>>>>> >>>>>>> splits to readers, as each reader will have its own >>>>>>> >>>>>>> copy >>>>>>> >>>>>>> of >>>>>>> >>>>>>> the >>>>>>> >>>>>>> state. >>>>>>> >>>>>>> ii) it is true that for finite sources we need to >>>>>>> >>>>>>> somehow >>>>>>> >>>>>>> not >>>>>>> >>>>>>> close >>>>>>> >>>>>>> the >>>>>>> >>>>>>> readers when the source/split discoverer finishes. The >>>>>>> ContinuousFileReaderOperator has a work-around for >>>>>>> >>>>>>> that. >>>>>>> >>>>>>> It is >>>>>>> >>>>>>> not >>>>>>> >>>>>>> elegant, >>>>>>> >>>>>>> and checkpoints are not emitted after closing the >>>>>>> >>>>>>> source, >>>>>>> >>>>>>> but >>>>>>> >>>>>>> this, I >>>>>>> >>>>>>> believe, is a bigger problem which requires more >>>>>>> >>>>>>> changes >>>>>>> >>>>>>> than >>>>>>> >>>>>>> just >>>>>>> >>>>>>> refactoring the source interface. >>>>>>> >>>>>>> Cheers, >>>>>>> Kostas >>>>>>> >>>>>>> >>>>>>> >>>>> >>> >>> -- >>> Best, Jingsong Lee >>>
signature.asc
Description: OpenPGP digital signature