Hi Becket, I am not sure if I understood the last paragraph correctly, but let me clarify my thoughts.
I would not add any bounded/batch specific methods to the DataStream. Imo all the user facing bounded/batch specific methods should be exposed through the new BoundedDataStream interface. 1. Using the source() method would basically mean use the information from Source#getBoundedness in 2(Scheduling) and 3(Operator behaviors). I believe all the unbounded/stream operations can be executed also for a batch stream. In other words the stream/unbounded operations are a subset of the bounded/batch. So I see no problem why it would not be possible to pass a bounded source here if we do not care about bounded specific operations such as e.g. the sort you mentioned. 2. Using the continuousSource() method would mean use Boundedness#CONTINUOUS_UNBOUNDED in 2 and 3 independent of what Source#getBoundedness says. <- not sure how useful this method would actually be, imo usually we do want to leverage the boundedness of a source 3. Using boundedSource() would: 1. throw and exception if Source#getBoundedness returns Boundedness#CONTINUOUS_UNBOUNDED 2. if Source#getBoundedness returns Boundedness#BOUNDED it is used in 2 and 3 + we expose additional methods in the API as this would return BoundedStream My main concern was that I think the boundedness should come out of the source rather than from which method on ExecutionEnvironment is used. In general I am also fine with the methods in your original proposal I think though we should have a clear logic what happens if you: 1. pass a bounded source to continuousSource() 2. pass a continuous source to boundedSource() Plus why should I think about the boundedness twice if I do not care about the additional, bounded-specific methods. Once when instantiating the source (e.g. the example with end timestamp) and second time when creating the DataStream. Best, Dawid On 09/12/2019 13:52, Becket Qin wrote: > Hi Dawid, > > Thanks for the comments. This actually brings another relevant question > about what does a "bounded source" imply. I actually had the same > impression when I look at the Source API. Here is what I understand after > some discussion with Stephan. The bounded source has the following impacts. > > 1. API validity. > - A bounded source generates a bounded stream so some operations that only > works for bounded records would be performed, e.g. sort. > - To expose these bounded stream only APIs, there are two options: > a. Add them to the DataStream API and throw exception if a method is > called on an unbounded stream. > b. Create a BoundedDataStream class which is returned from > env.boundedSource(), while DataStream is returned from env.continousSource(). > Note that this cannot be done by having single env.source(theSource) even > the Source has a getBoundedness() method. > > 2. Scheduling > - A bounded source could be computed stage by stage without bringing up all > the tasks at the same time. > > 3. Operator behaviors > - A bounded source indicates the records are finite so some operators can > wait until it receives all the records before it starts the processing. > > In the above impact, only 1 is relevant to the API design. And the current > proposal in FLIP-27 is following 1.b. > > // boundedness depends of source property, imo this should always be >> preferred >> > > DataStream<MyType> stream = env.source(theSource); > > > In your proposal, does DataStream have bounded stream only methods? It > looks it should have, otherwise passing a bounded Source to env.source() > would be confusing. In that case, we will essentially do 1.a if an > unbounded Source is created from env.source(unboundedSource). > > If we have the methods only supported for bounded streams in DataStream, it > seems a little weird to have a separate BoundedDataStream interface. > > Am I understand it correctly? > > Thanks, > > Jiangjie (Becket) Qin > > > > On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Hi all, >> >> Really well written proposal and very important one. I must admit I have >> not understood all the intricacies of it yet. >> >> One question I have though is about where does the information about >> boundedness come from. I think in most cases it is a property of the >> source. As you described it might be e.g. end offset, a flag should it >> monitor new splits etc. I think it would be a really nice use case to be >> able to say: >> >> new KafkaSource().readUntil(long timestamp), >> >> which could work as an "end offset". Moreover I think all Bounded sources >> support continuous mode, but no intrinsically continuous source support the >> Bounded mode. If I understood the proposal correctly it suggest the >> boundedness sort of "comes" from the outside of the source, from the >> invokation of either boundedStream or continousSource. >> >> I am wondering if it would make sense to actually change the method >> >> boolean Source#supportsBoundedness(Boundedness) >> >> to >> >> Boundedness Source#getBoundedness(). >> >> As for the methods #boundedSource, #continousSource, assuming the >> boundedness is property of the source they do not affect how the enumerator >> works, but mostly how the dag is scheduled, right? I am not against those >> methods, but I think it is a very specific use case to actually override >> the property of the source. In general I would expect users to only call >> env.source(theSource), where the source tells if it is bounded or not. I >> would suggest considering following set of methods: >> >> // boundedness depends of source property, imo this should always be >> preferred >> >> DataStream<MyType> stream = env.source(theSource); >> >> >> // always continous execution, whether bounded or unbounded source >> >> DataStream<MyType> boundedStream = env.continousSource(theSource); >> >> // imo this would make sense if the BoundedDataStream provides additional >> features unavailable for continous mode >> BoundedDataStream<MyType> batch = env.boundedSource(theSource); >> >> >> Best, >> >> Dawid >> >> >> On 04/12/2019 11:25, Stephan Ewen wrote: >> >> Thanks, Becket, for updating this. >> >> I agree with moving the aspects you mentioned into separate FLIPs - this >> one way becoming unwieldy in size. >> >> +1 to the FLIP in its current state. Its a very detailed write-up, nicely >> done! >> >> On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com> >> <becket....@gmail.com> wrote: >> >> >> Hi all, >> >> Sorry for the long belated update. I have updated FLIP-27 wiki page with >> the latest proposals. Some noticeable changes include: >> 1. A new generic communication mechanism between SplitEnumerator and >> SourceReader. >> 2. Some detail API method signature changes. >> >> We left a few things out of this FLIP and will address them in separate >> FLIPs. Including: >> 1. Per split event time. >> 2. Event time alignment. >> 3. Fine grained failover for SplitEnumerator failure. >> >> Please let us know if you have any question. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org> >> <se...@apache.org> wrote: >> >> >> Hi Łukasz! >> >> Becket and me are working hard on figuring out the last details and >> implementing the first PoC. We would update the FLIP hopefully next week. >> >> There is a fair chance that a first version of this will be in 1.10, but >> >> I >> >> think it will take another release to battle test it and migrate the >> connectors. >> >> Best, >> Stephan >> >> >> >> >> On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl> >> <l...@touk.pl> >> >> wrote: >> >> Hi, >> >> This proposal looks very promising for us. Do you have any plans in >> >> which >> >> Flink release it is going to be released? We are thinking on using a >> >> Data >> >> Set API for our future use cases but on the other hand Data Set API is >> going to be deprecated so using proposed bounded data streams solution >> could be more viable in the long term. >> >> Thanks, >> Łukasz >> >> On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com> >> <thomas.we...@gmail.com> wrote: >> >> Thanks for putting together this proposal! >> >> I see that the "Per Split Event Time" and "Event Time Alignment" >> >> sections >> >> are still TBD. >> >> It would probably be good to flesh those out a bit before proceeding >> >> too >> >> far >> >> as the event time alignment will probably influence the interaction >> >> with >> >> the split reader, specifically ReaderStatus emitNext(SourceOutput<E> >> output). >> >> We currently have only one implementation for event time alignment in >> >> the >> >> Kinesis consumer. The synchronization in that case takes place as the >> >> last >> >> step before records are emitted downstream (RecordEmitter). With the >> currently proposed interfaces, the equivalent can be implemented in >> >> the >> >> reader loop, although note that in the Kinesis consumer the per shard >> threads push records. >> >> Synchronization has not been implemented for the Kafka consumer yet. >> https://issues.apache.org/jira/browse/FLINK-12675 >> >> When I looked at it, I realized that the implementation will look >> >> quite >> >> different >> from Kinesis because it needs to take place in the pull part, where >> >> records >> >> are taken from the Kafka client. Due to the multiplexing it cannot be >> >> done >> >> by blocking the split thread like it currently works for Kinesis. >> >> Reading >> >> from individual Kafka partitions needs to be controlled via >> >> pause/resume >> >> on the Kafka client. >> >> To take on that responsibility the split thread would need to be >> >> aware >> >> of >> >> the >> watermarks or at least whether it should or should not continue to >> >> consume >> >> a given split and this may require a different SourceReader or >> >> SourceOutput >> >> interface. >> >> Thanks, >> Thomas >> >> >> On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com> >> <mmyy1...@gmail.com> wrote: >> >> >> Hi Stephan, >> >> Thank you for feedback! >> Will take a look at your branch before public discussing. >> >> >> On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org> >> <se...@apache.org> >> >> wrote: >> >> Hi Biao! >> >> Thanks for reviving this. I would like to join this discussion, >> >> but >> >> am >> >> quite occupied with the 1.9 release, so can we maybe pause this >> >> discussion >> >> for a week or so? >> >> In the meantime I can share some suggestion based on prior >> >> experiments: >> >> How to do watermarks / timestamp extractors in a simpler and more >> >> flexible >> >> way. I think that part is quite promising should be part of the >> >> new >> >> source >> >> interface. >> >> >> >> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime >> >> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java >> >> Some experiments on how to build the source reader and its >> >> library >> >> for >> >> common threading/split patterns: >> >> >> >> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src >> >> Best, >> Stephan >> >> >> On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com> >> <mmyy1...@gmail.com> >> >> wrote: >> >> Hi devs, >> >> Since 1.9 is nearly released, I think we could get back to >> >> FLIP-27. >> >> I >> >> believe it should be included in 1.10. >> >> There are so many things mentioned in document of FLIP-27. [1] I >> >> think >> >> we'd better discuss them separately. However the wiki is not a >> >> good >> >> place >> >> to discuss. I wrote google doc about SplitReader API which >> >> misses >> >> some >> >> details in the document. [2] >> >> 1. >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface >> >> 2. >> >> >> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing >> >> CC Stephan, Aljoscha, Piotrek, Becket >> >> >> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com> >> <mmyy1...@gmail.com> >> >> wrote: >> >> Hi Steven, >> Thank you for the feedback. Please take a look at the document >> >> FLIP-27 >> >> < >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface >> >> which >> >> is updated recently. A lot of details of enumerator were added >> >> in >> >> this >> >> document. I think it would help. >> >> Steven Wu <stevenz...@gmail.com> <stevenz...@gmail.com> 于2019年3月28日周四 >> 下午12:52写道: >> >> >> This proposal mentioned that SplitEnumerator might run on the >> JobManager or >> in a single task on a TaskManager. >> >> if enumerator is a single task on a taskmanager, then the job >> >> DAG >> >> can >> >> never >> been embarrassingly parallel anymore. That will nullify the >> >> leverage >> >> of >> >> fine-grained recovery for embarrassingly parallel jobs. >> >> It's not clear to me what's the implication of running >> >> enumerator >> >> on >> >> the >> >> jobmanager. So I will leave that out for now. >> >> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> >> <mmyy1...@gmail.com> >> >> wrote: >> >> Hi Stephan & Piotrek, >> >> Thank you for feedback. >> >> It seems that there are a lot of things to do in community. >> >> I >> >> am >> >> just >> >> afraid that this discussion may be forgotten since there so >> >> many >> >> proposals >> >> recently. >> Anyway, wish to see the split topics soon :) >> >> Piotr Nowojski <pi...@da-platform.com> <pi...@da-platform.com> 于2019年1月24日周四 >> >> 下午8:21写道: >> >> Hi Biao! >> >> This discussion was stalled because of preparations for >> >> the >> >> open >> >> sourcing >> >> & merging Blink. I think before creating the tickets we >> >> should >> >> split this >> >> discussion into topics/areas outlined by Stephan and >> >> create >> >> Flips >> >> for >> >> that. >> >> I think there is no chance for this to be completed in >> >> couple >> >> of >> >> remaining >> >> weeks/1 month before 1.8 feature freeze, however it would >> >> be >> >> good >> >> to aim >> >> with those changes for 1.9. >> >> Piotrek >> >> >> On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> <mmyy1...@gmail.com> >> >> wrote: >> >> Hi community, >> The summary of Stephan makes a lot sense to me. It is >> >> much >> >> clearer >> >> indeed >> >> after splitting the complex topic into small ones. >> I was wondering is there any detail plan for next step? >> >> If >> >> not, >> >> I >> >> would >> >> like to push this thing forward by creating some JIRA >> >> issues. >> >> Another question is that should version 1.8 include >> >> these >> >> features? >> >> Stephan Ewen <se...@apache.org> <se...@apache.org> 于2018年12月1日周六 上午4:20写道: >> >> >> Thanks everyone for the lively discussion. Let me try >> >> to >> >> summarize >> >> where I >> >> see convergence in the discussion and open issues. >> I'll try to group this by design aspect of the source. >> >> Please >> >> let me >> >> know >> >> if I got things wrong or missed something crucial here. >> >> For issues 1-3, if the below reflects the state of the >> >> discussion, I >> >> would >> >> try and update the FLIP in the next days. >> For the remaining ones we need more discussion. >> >> I would suggest to fork each of these aspects into a >> >> separate >> >> mail >> >> thread, >> >> or will loose sight of the individual aspects. >> >> *(1) Separation of Split Enumerator and Split Reader* >> >> - All seem to agree this is a good thing >> - Split Enumerator could in the end live on JobManager >> >> (and >> >> assign >> >> splits >> >> via RPC) or in a task (and assign splits via data >> >> streams) >> >> - this discussion is orthogonal and should come later, >> >> when >> >> the >> >> interface >> >> is agreed upon. >> >> *(2) Split Readers for one or more splits* >> >> - Discussion seems to agree that we need to support >> >> one >> >> reader >> >> that >> >> possibly handles multiple splits concurrently. >> - The requirement comes from sources where one >> >> poll()-style >> >> call >> >> fetches >> >> data from different splits / partitions >> --> example sources that require that would be for >> >> example >> >> Kafka, >> >> Pravega, Pulsar >> >> - Could have one split reader per source, or multiple >> >> split >> >> readers >> >> that >> >> share the "poll()" function >> - To not make it too complicated, we can start with >> >> thinking >> >> about >> >> one >> >> split reader for all splits initially and see if that >> >> covers >> >> all >> >> requirements >> >> *(3) Threading model of the Split Reader* >> >> - Most active part of the discussion ;-) >> >> - A non-blocking way for Flink's task code to interact >> >> with >> >> the >> >> source >> >> is >> >> needed in order to a task runtime code based on a >> single-threaded/actor-style task design >> --> I personally am a big proponent of that, it will >> >> help >> >> with >> >> well-behaved checkpoints, efficiency, and simpler yet >> >> more >> >> robust >> >> runtime >> >> code >> >> - Users care about simple abstraction, so as a >> >> subclass >> >> of >> >> SplitReader >> >> (non-blocking / async) we need to have a >> >> BlockingSplitReader >> >> which >> >> will >> >> form the basis of most source implementations. >> >> BlockingSplitReader >> >> lets >> >> users do blocking simple poll() calls. >> - The BlockingSplitReader would spawn a thread (or >> >> more) >> >> and >> >> the >> >> thread(s) can make blocking calls and hand over data >> >> buffers >> >> via >> >> a >> >> blocking >> >> queue >> - This should allow us to cover both, a fully async >> >> runtime, >> >> and a >> >> simple >> >> blocking interface for users. >> - This is actually very similar to how the Kafka >> >> connectors >> >> work. >> >> Kafka >> >> 9+ with one thread, Kafka 8 with multiple threads >> >> - On the base SplitReader (the async one), the >> >> non-blocking >> >> method >> >> that >> >> gets the next chunk of data would signal data >> >> availability >> >> via >> >> a >> >> CompletableFuture, because that gives the best >> >> flexibility >> >> (can >> >> await >> >> completion or register notification handlers). >> - The source task would register a "thenHandle()" (or >> >> similar) >> >> on the >> >> future to put a "take next data" task into the >> >> actor-style >> >> mailbox >> >> *(4) Split Enumeration and Assignment* >> >> - Splits may be generated lazily, both in cases where >> >> there >> >> is a >> >> limited >> >> number of splits (but very many), or splits are >> >> discovered >> >> over >> >> time >> >> - Assignment should also be lazy, to get better load >> >> balancing >> >> - Assignment needs support locality preferences >> >> - Possible design based on discussion so far: >> >> --> SplitReader has a method "addSplits(SplitT...)" >> >> to >> >> add >> >> one or >> >> more >> >> splits. Some split readers might assume they have only >> >> one >> >> split >> >> ever, >> >> concurrently, others assume multiple splits. (Note: >> >> idea >> >> behind >> >> being >> >> able >> >> to add multiple splits at the same time is to ease >> >> startup >> >> where >> >> multiple >> >> splits may be assigned instantly.) >> --> SplitReader has a context object on which it can >> >> call >> >> indicate >> >> when >> >> splits are completed. The enumerator gets that >> >> notification and >> >> can >> >> use >> >> to >> >> decide when to assign new splits. This should help both >> >> in >> >> cases >> >> of >> >> sources >> >> that take splits lazily (file readers) and in case the >> >> source >> >> needs to >> >> preserve a partial order between splits (Kinesis, >> >> Pravega, >> >> Pulsar may >> >> need >> >> that). >> --> SplitEnumerator gets notification when >> >> SplitReaders >> >> start >> >> and >> >> when >> >> they finish splits. They can decide at that moment to >> >> push >> >> more >> >> splits >> >> to >> >> that reader >> --> The SplitEnumerator should probably be aware of >> >> the >> >> source >> >> parallelism, to build its initial distribution. >> >> - Open question: Should the source expose something >> >> like >> >> "host >> >> preferences", so that yarn/mesos/k8s can take this into >> >> account >> >> when >> >> selecting a node to start a TM on? >> >> *(5) Watermarks and event time alignment* >> >> - Watermark generation, as well as idleness, needs to >> >> be >> >> per >> >> split >> >> (like >> >> currently in the Kafka Source, per partition) >> - It is desirable to support optional >> >> event-time-alignment, >> >> meaning >> >> that >> >> splits that are ahead are back-pressured or temporarily >> >> unsubscribed >> >> - I think i would be desirable to encapsulate >> >> watermark >> >> generation >> >> logic >> >> in watermark generators, for a separation of concerns. >> >> The >> >> watermark >> >> generators should run per split. >> - Using watermark generators would also help with >> >> another >> >> problem of >> >> the >> >> suggested interface, namely supporting non-periodic >> >> watermarks >> >> efficiently. >> >> - Need a way to "dispatch" next record to different >> >> watermark >> >> generators >> >> - Need a way to tell SplitReader to "suspend" a split >> >> until a >> >> certain >> >> watermark is reached (event time backpressure) >> - This would in fact be not needed (and thus simpler) >> >> if >> >> we >> >> had >> >> a >> >> SplitReader per split and may be a reason to re-open >> >> that >> >> discussion >> >> *(6) Watermarks across splits and in the Split >> >> Enumerator* >> >> - The split enumerator may need some watermark >> >> awareness, >> >> which >> >> should >> >> be >> >> purely based on split metadata (like create timestamp >> >> of >> >> file >> >> splits) >> >> - If there are still more splits with overlapping >> >> event >> >> time >> >> range >> >> for >> >> a >> >> split reader, then that split reader should not advance >> >> the >> >> watermark >> >> within the split beyond the overlap boundary. Otherwise >> >> future >> >> splits >> >> will >> >> produce late data. >> >> - One way to approach this could be that the split >> >> enumerator >> >> may >> >> send >> >> watermarks to the readers, and the readers cannot emit >> >> watermarks >> >> beyond >> >> that received watermark. >> - Many split enumerators would simply immediately send >> >> Long.MAX >> >> out >> >> and >> >> leave the progress purely to the split readers. >> >> - For event-time alignment / split back pressure, this >> >> begs >> >> the >> >> question >> >> how we can avoid deadlocks that may arise when splits >> >> are >> >> suspended >> >> for >> >> event time back pressure, >> >> *(7) Batch and streaming Unification* >> >> - Functionality wise, the above design should support >> >> both >> >> - Batch often (mostly) does not care about reading "in >> >> order" >> >> and >> >> generating watermarks >> --> Might use different enumerator logic that is >> >> more >> >> locality >> >> aware >> >> and ignores event time order >> --> Does not generate watermarks >> - Would be great if bounded sources could be >> >> identified >> >> at >> >> compile >> >> time, >> >> so that "env.addBoundedSource(...)" is type safe and >> >> can >> >> return a >> >> "BoundedDataStream". >> - Possible to defer this discussion until later >> >> *Miscellaneous Comments* >> >> - Should the source have a TypeInformation for the >> >> produced >> >> type, >> >> instead >> >> of a serializer? We need a type information in the >> >> stream >> >> anyways, and >> >> can >> >> derive the serializer from that. Plus, creating the >> >> serializer >> >> should >> >> respect the ExecutionConfig. >> >> - The TypeSerializer interface is very powerful but >> >> also >> >> not >> >> easy to >> >> implement. Its purpose is to handle data super >> >> efficiently, >> >> support >> >> flexible ways of evolution, etc. >> For metadata I would suggest to look at the >> >> SimpleVersionedSerializer >> >> instead, which is used for example for checkpoint >> >> master >> >> hooks, >> >> or for >> >> the >> >> streaming file sink. I think that is is a good match >> >> for >> >> cases >> >> where >> >> we >> >> do >> >> not need more than ser/deser (no copy, etc.) and don't >> >> need to >> >> push >> >> versioning out of the serialization paths for best >> >> performance >> >> (as in >> >> the >> >> TypeSerializer) >> >> >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <k.klou...@data-artisans.com> >> wrote: >> >> >> Hi Biao, >> >> Thanks for the answer! >> >> So given the multi-threaded readers, now we have as >> >> open >> >> questions: >> >> 1) How do we let the checkpoints pass through our >> >> multi-threaded >> >> reader >> >> operator? >> >> 2) Do we have separate reader and source operators or >> >> not? In >> >> the >> >> strategy >> >> that has a separate source, the source operator has a >> >> parallelism of >> >> 1 >> >> and >> >> is responsible for split recovery only. >> >> For the first one, given also the constraints >> >> (blocking, >> >> finite >> >> queues, >> >> etc), I do not have an answer yet. >> >> For the 2nd, I think that we should go with separate >> >> operators >> >> for >> >> the >> >> source and the readers, for the following reasons: >> >> 1) This is more aligned with a potential future >> >> improvement >> >> where the >> >> split >> >> discovery becomes a responsibility of the JobManager >> >> and >> >> readers are >> >> pooling more work from the JM. >> >> 2) The source is going to be the "single point of >> >> truth". >> >> It >> >> will >> >> know >> >> what >> >> has been processed and what not. If the source and the >> >> readers >> >> are a >> >> single >> >> operator with parallelism > 1, or in general, if the >> >> split >> >> discovery >> >> is >> >> done by each task individually, then: >> i) we have to have a deterministic scheme for each >> >> reader to >> >> assign >> >> splits to itself (e.g. mod subtaskId). This is not >> >> necessarily >> >> trivial >> >> for >> >> all sources. >> ii) each reader would have to keep a copy of all its >> >> processed >> >> slpits >> >> iii) the state has to be a union state with a >> >> non-trivial >> >> merging >> >> logic >> >> in order to support rescaling. >> >> Two additional points that you raised above: >> >> i) The point that you raised that we need to keep all >> >> splits >> >> (processed >> >> and >> >> not-processed) I think is a bit of a strong >> >> requirement. >> >> This >> >> would >> >> imply >> >> that for infinite sources the state will grow >> >> indefinitely. >> >> This is >> >> problem >> >> is even more pronounced if we do not have a single >> >> source >> >> that >> >> assigns >> >> splits to readers, as each reader will have its own >> >> copy >> >> of >> >> the >> >> state. >> >> ii) it is true that for finite sources we need to >> >> somehow >> >> not >> >> close >> >> the >> >> readers when the source/split discoverer finishes. The >> ContinuousFileReaderOperator has a work-around for >> >> that. >> >> It is >> >> not >> >> elegant, >> >> and checkpoints are not emitted after closing the >> >> source, >> >> but >> >> this, I >> >> believe, is a bigger problem which requires more >> >> changes >> >> than >> >> just >> >> refactoring the source interface. >> >> Cheers, >> Kostas >> >> >>
signature.asc
Description: OpenPGP digital signature