I agree with Dawid's point that the boundedness information should come from the source itself (e.g. the end timestamp), not through env.boundedSouce()/continuousSource(). I think if we want to support something like `env.source()` that derive the execution mode from source, `supportsBoundedness(Boundedness)` method is not enough, because we don't know whether it is bounded or not.
Best, Jark On Mon, 9 Dec 2019 at 22:21, Dawid Wysakowicz <dwysakow...@apache.org> wrote: > One more thing. In the current proposal, with the > supportsBoundedness(Boundedness) method and the boundedness coming from > either continuousSource or boundedSource I could not find how this > information is fed back to the SplitEnumerator. > > Best, > > Dawid > > On 09/12/2019 13:52, Becket Qin wrote: > > Hi Dawid, > > > > Thanks for the comments. This actually brings another relevant question > > about what does a "bounded source" imply. I actually had the same > > impression when I look at the Source API. Here is what I understand after > > some discussion with Stephan. The bounded source has the following > impacts. > > > > 1. API validity. > > - A bounded source generates a bounded stream so some operations that > only > > works for bounded records would be performed, e.g. sort. > > - To expose these bounded stream only APIs, there are two options: > > a. Add them to the DataStream API and throw exception if a method is > > called on an unbounded stream. > > b. Create a BoundedDataStream class which is returned from > > env.boundedSource(), while DataStream is returned from > env.continousSource(). > > Note that this cannot be done by having single env.source(theSource) even > > the Source has a getBoundedness() method. > > > > 2. Scheduling > > - A bounded source could be computed stage by stage without bringing up > all > > the tasks at the same time. > > > > 3. Operator behaviors > > - A bounded source indicates the records are finite so some operators can > > wait until it receives all the records before it starts the processing. > > > > In the above impact, only 1 is relevant to the API design. And the > current > > proposal in FLIP-27 is following 1.b. > > > > // boundedness depends of source property, imo this should always be > >> preferred > >> > > > > DataStream<MyType> stream = env.source(theSource); > > > > > > In your proposal, does DataStream have bounded stream only methods? It > > looks it should have, otherwise passing a bounded Source to env.source() > > would be confusing. In that case, we will essentially do 1.a if an > > unbounded Source is created from env.source(unboundedSource). > > > > If we have the methods only supported for bounded streams in DataStream, > it > > seems a little weird to have a separate BoundedDataStream interface. > > > > Am I understand it correctly? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz <dwysakow...@apache.org> > > wrote: > > > >> Hi all, > >> > >> Really well written proposal and very important one. I must admit I have > >> not understood all the intricacies of it yet. > >> > >> One question I have though is about where does the information about > >> boundedness come from. I think in most cases it is a property of the > >> source. As you described it might be e.g. end offset, a flag should it > >> monitor new splits etc. I think it would be a really nice use case to be > >> able to say: > >> > >> new KafkaSource().readUntil(long timestamp), > >> > >> which could work as an "end offset". Moreover I think all Bounded > sources > >> support continuous mode, but no intrinsically continuous source support > the > >> Bounded mode. If I understood the proposal correctly it suggest the > >> boundedness sort of "comes" from the outside of the source, from the > >> invokation of either boundedStream or continousSource. > >> > >> I am wondering if it would make sense to actually change the method > >> > >> boolean Source#supportsBoundedness(Boundedness) > >> > >> to > >> > >> Boundedness Source#getBoundedness(). > >> > >> As for the methods #boundedSource, #continousSource, assuming the > >> boundedness is property of the source they do not affect how the > enumerator > >> works, but mostly how the dag is scheduled, right? I am not against > those > >> methods, but I think it is a very specific use case to actually override > >> the property of the source. In general I would expect users to only call > >> env.source(theSource), where the source tells if it is bounded or not. I > >> would suggest considering following set of methods: > >> > >> // boundedness depends of source property, imo this should always be > preferred > >> > >> DataStream<MyType> stream = env.source(theSource); > >> > >> > >> // always continous execution, whether bounded or unbounded source > >> > >> DataStream<MyType> boundedStream = env.continousSource(theSource); > >> > >> // imo this would make sense if the BoundedDataStream provides > additional features unavailable for continous mode > >> BoundedDataStream<MyType> batch = env.boundedSource(theSource); > >> > >> > >> Best, > >> > >> Dawid > >> > >> > >> On 04/12/2019 11:25, Stephan Ewen wrote: > >> > >> Thanks, Becket, for updating this. > >> > >> I agree with moving the aspects you mentioned into separate FLIPs - this > >> one way becoming unwieldy in size. > >> > >> +1 to the FLIP in its current state. Its a very detailed write-up, > nicely > >> done! > >> > >> On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com> < > becket....@gmail.com> wrote: > >> > >> > >> Hi all, > >> > >> Sorry for the long belated update. I have updated FLIP-27 wiki page with > >> the latest proposals. Some noticeable changes include: > >> 1. A new generic communication mechanism between SplitEnumerator and > >> SourceReader. > >> 2. Some detail API method signature changes. > >> > >> We left a few things out of this FLIP and will address them in separate > >> FLIPs. Including: > >> 1. Per split event time. > >> 2. Event time alignment. > >> 3. Fine grained failover for SplitEnumerator failure. > >> > >> Please let us know if you have any question. > >> > >> Thanks, > >> > >> Jiangjie (Becket) Qin > >> > >> On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org> < > se...@apache.org> wrote: > >> > >> > >> Hi Łukasz! > >> > >> Becket and me are working hard on figuring out the last details and > >> implementing the first PoC. We would update the FLIP hopefully next > week. > >> > >> There is a fair chance that a first version of this will be in 1.10, but > >> > >> I > >> > >> think it will take another release to battle test it and migrate the > >> connectors. > >> > >> Best, > >> Stephan > >> > >> > >> > >> > >> On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl> < > l...@touk.pl> > >> > >> wrote: > >> > >> Hi, > >> > >> This proposal looks very promising for us. Do you have any plans in > >> > >> which > >> > >> Flink release it is going to be released? We are thinking on using a > >> > >> Data > >> > >> Set API for our future use cases but on the other hand Data Set API is > >> going to be deprecated so using proposed bounded data streams solution > >> could be more viable in the long term. > >> > >> Thanks, > >> Łukasz > >> > >> On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com> < > thomas.we...@gmail.com> wrote: > >> > >> Thanks for putting together this proposal! > >> > >> I see that the "Per Split Event Time" and "Event Time Alignment" > >> > >> sections > >> > >> are still TBD. > >> > >> It would probably be good to flesh those out a bit before proceeding > >> > >> too > >> > >> far > >> > >> as the event time alignment will probably influence the interaction > >> > >> with > >> > >> the split reader, specifically ReaderStatus emitNext(SourceOutput<E> > >> output). > >> > >> We currently have only one implementation for event time alignment in > >> > >> the > >> > >> Kinesis consumer. The synchronization in that case takes place as the > >> > >> last > >> > >> step before records are emitted downstream (RecordEmitter). With the > >> currently proposed interfaces, the equivalent can be implemented in > >> > >> the > >> > >> reader loop, although note that in the Kinesis consumer the per shard > >> threads push records. > >> > >> Synchronization has not been implemented for the Kafka consumer yet. > >> https://issues.apache.org/jira/browse/FLINK-12675 > >> > >> When I looked at it, I realized that the implementation will look > >> > >> quite > >> > >> different > >> from Kinesis because it needs to take place in the pull part, where > >> > >> records > >> > >> are taken from the Kafka client. Due to the multiplexing it cannot be > >> > >> done > >> > >> by blocking the split thread like it currently works for Kinesis. > >> > >> Reading > >> > >> from individual Kafka partitions needs to be controlled via > >> > >> pause/resume > >> > >> on the Kafka client. > >> > >> To take on that responsibility the split thread would need to be > >> > >> aware > >> > >> of > >> > >> the > >> watermarks or at least whether it should or should not continue to > >> > >> consume > >> > >> a given split and this may require a different SourceReader or > >> > >> SourceOutput > >> > >> interface. > >> > >> Thanks, > >> Thomas > >> > >> > >> On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com> < > mmyy1...@gmail.com> wrote: > >> > >> > >> Hi Stephan, > >> > >> Thank you for feedback! > >> Will take a look at your branch before public discussing. > >> > >> > >> On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org> < > se...@apache.org> > >> > >> wrote: > >> > >> Hi Biao! > >> > >> Thanks for reviving this. I would like to join this discussion, > >> > >> but > >> > >> am > >> > >> quite occupied with the 1.9 release, so can we maybe pause this > >> > >> discussion > >> > >> for a week or so? > >> > >> In the meantime I can share some suggestion based on prior > >> > >> experiments: > >> > >> How to do watermarks / timestamp extractors in a simpler and more > >> > >> flexible > >> > >> way. I think that part is quite promising should be part of the > >> > >> new > >> > >> source > >> > >> interface. > >> > >> > >> > >> > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime > >> > >> > https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java > >> > >> Some experiments on how to build the source reader and its > >> > >> library > >> > >> for > >> > >> common threading/split patterns: > >> > >> > >> > >> > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src > >> > >> Best, > >> Stephan > >> > >> > >> On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com> < > mmyy1...@gmail.com> > >> > >> wrote: > >> > >> Hi devs, > >> > >> Since 1.9 is nearly released, I think we could get back to > >> > >> FLIP-27. > >> > >> I > >> > >> believe it should be included in 1.10. > >> > >> There are so many things mentioned in document of FLIP-27. [1] I > >> > >> think > >> > >> we'd better discuss them separately. However the wiki is not a > >> > >> good > >> > >> place > >> > >> to discuss. I wrote google doc about SplitReader API which > >> > >> misses > >> > >> some > >> > >> details in the document. [2] > >> > >> 1. > >> > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface > >> > >> 2. > >> > >> > >> > https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing > >> > >> CC Stephan, Aljoscha, Piotrek, Becket > >> > >> > >> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com> < > mmyy1...@gmail.com> > >> > >> wrote: > >> > >> Hi Steven, > >> Thank you for the feedback. Please take a look at the document > >> > >> FLIP-27 > >> > >> < > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > >> > >> which > >> > >> is updated recently. A lot of details of enumerator were added > >> > >> in > >> > >> this > >> > >> document. I think it would help. > >> > >> Steven Wu <stevenz...@gmail.com> <stevenz...@gmail.com> 于2019年3月28日周四 > 下午12:52写道: > >> > >> > >> This proposal mentioned that SplitEnumerator might run on the > >> JobManager or > >> in a single task on a TaskManager. > >> > >> if enumerator is a single task on a taskmanager, then the job > >> > >> DAG > >> > >> can > >> > >> never > >> been embarrassingly parallel anymore. That will nullify the > >> > >> leverage > >> > >> of > >> > >> fine-grained recovery for embarrassingly parallel jobs. > >> > >> It's not clear to me what's the implication of running > >> > >> enumerator > >> > >> on > >> > >> the > >> > >> jobmanager. So I will leave that out for now. > >> > >> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> < > mmyy1...@gmail.com> > >> > >> wrote: > >> > >> Hi Stephan & Piotrek, > >> > >> Thank you for feedback. > >> > >> It seems that there are a lot of things to do in community. > >> > >> I > >> > >> am > >> > >> just > >> > >> afraid that this discussion may be forgotten since there so > >> > >> many > >> > >> proposals > >> > >> recently. > >> Anyway, wish to see the split topics soon :) > >> > >> Piotr Nowojski <pi...@da-platform.com> <pi...@da-platform.com> > 于2019年1月24日周四 > >> > >> 下午8:21写道: > >> > >> Hi Biao! > >> > >> This discussion was stalled because of preparations for > >> > >> the > >> > >> open > >> > >> sourcing > >> > >> & merging Blink. I think before creating the tickets we > >> > >> should > >> > >> split this > >> > >> discussion into topics/areas outlined by Stephan and > >> > >> create > >> > >> Flips > >> > >> for > >> > >> that. > >> > >> I think there is no chance for this to be completed in > >> > >> couple > >> > >> of > >> > >> remaining > >> > >> weeks/1 month before 1.8 feature freeze, however it would > >> > >> be > >> > >> good > >> > >> to aim > >> > >> with those changes for 1.9. > >> > >> Piotrek > >> > >> > >> On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> < > mmyy1...@gmail.com> > >> > >> wrote: > >> > >> Hi community, > >> The summary of Stephan makes a lot sense to me. It is > >> > >> much > >> > >> clearer > >> > >> indeed > >> > >> after splitting the complex topic into small ones. > >> I was wondering is there any detail plan for next step? > >> > >> If > >> > >> not, > >> > >> I > >> > >> would > >> > >> like to push this thing forward by creating some JIRA > >> > >> issues. > >> > >> Another question is that should version 1.8 include > >> > >> these > >> > >> features? > >> > >> Stephan Ewen <se...@apache.org> <se...@apache.org> 于2018年12月1日周六 > 上午4:20写道: > >> > >> > >> Thanks everyone for the lively discussion. Let me try > >> > >> to > >> > >> summarize > >> > >> where I > >> > >> see convergence in the discussion and open issues. > >> I'll try to group this by design aspect of the source. > >> > >> Please > >> > >> let me > >> > >> know > >> > >> if I got things wrong or missed something crucial here. > >> > >> For issues 1-3, if the below reflects the state of the > >> > >> discussion, I > >> > >> would > >> > >> try and update the FLIP in the next days. > >> For the remaining ones we need more discussion. > >> > >> I would suggest to fork each of these aspects into a > >> > >> separate > >> > >> mail > >> > >> thread, > >> > >> or will loose sight of the individual aspects. > >> > >> *(1) Separation of Split Enumerator and Split Reader* > >> > >> - All seem to agree this is a good thing > >> - Split Enumerator could in the end live on JobManager > >> > >> (and > >> > >> assign > >> > >> splits > >> > >> via RPC) or in a task (and assign splits via data > >> > >> streams) > >> > >> - this discussion is orthogonal and should come later, > >> > >> when > >> > >> the > >> > >> interface > >> > >> is agreed upon. > >> > >> *(2) Split Readers for one or more splits* > >> > >> - Discussion seems to agree that we need to support > >> > >> one > >> > >> reader > >> > >> that > >> > >> possibly handles multiple splits concurrently. > >> - The requirement comes from sources where one > >> > >> poll()-style > >> > >> call > >> > >> fetches > >> > >> data from different splits / partitions > >> --> example sources that require that would be for > >> > >> example > >> > >> Kafka, > >> > >> Pravega, Pulsar > >> > >> - Could have one split reader per source, or multiple > >> > >> split > >> > >> readers > >> > >> that > >> > >> share the "poll()" function > >> - To not make it too complicated, we can start with > >> > >> thinking > >> > >> about > >> > >> one > >> > >> split reader for all splits initially and see if that > >> > >> covers > >> > >> all > >> > >> requirements > >> > >> *(3) Threading model of the Split Reader* > >> > >> - Most active part of the discussion ;-) > >> > >> - A non-blocking way for Flink's task code to interact > >> > >> with > >> > >> the > >> > >> source > >> > >> is > >> > >> needed in order to a task runtime code based on a > >> single-threaded/actor-style task design > >> --> I personally am a big proponent of that, it will > >> > >> help > >> > >> with > >> > >> well-behaved checkpoints, efficiency, and simpler yet > >> > >> more > >> > >> robust > >> > >> runtime > >> > >> code > >> > >> - Users care about simple abstraction, so as a > >> > >> subclass > >> > >> of > >> > >> SplitReader > >> > >> (non-blocking / async) we need to have a > >> > >> BlockingSplitReader > >> > >> which > >> > >> will > >> > >> form the basis of most source implementations. > >> > >> BlockingSplitReader > >> > >> lets > >> > >> users do blocking simple poll() calls. > >> - The BlockingSplitReader would spawn a thread (or > >> > >> more) > >> > >> and > >> > >> the > >> > >> thread(s) can make blocking calls and hand over data > >> > >> buffers > >> > >> via > >> > >> a > >> > >> blocking > >> > >> queue > >> - This should allow us to cover both, a fully async > >> > >> runtime, > >> > >> and a > >> > >> simple > >> > >> blocking interface for users. > >> - This is actually very similar to how the Kafka > >> > >> connectors > >> > >> work. > >> > >> Kafka > >> > >> 9+ with one thread, Kafka 8 with multiple threads > >> > >> - On the base SplitReader (the async one), the > >> > >> non-blocking > >> > >> method > >> > >> that > >> > >> gets the next chunk of data would signal data > >> > >> availability > >> > >> via > >> > >> a > >> > >> CompletableFuture, because that gives the best > >> > >> flexibility > >> > >> (can > >> > >> await > >> > >> completion or register notification handlers). > >> - The source task would register a "thenHandle()" (or > >> > >> similar) > >> > >> on the > >> > >> future to put a "take next data" task into the > >> > >> actor-style > >> > >> mailbox > >> > >> *(4) Split Enumeration and Assignment* > >> > >> - Splits may be generated lazily, both in cases where > >> > >> there > >> > >> is a > >> > >> limited > >> > >> number of splits (but very many), or splits are > >> > >> discovered > >> > >> over > >> > >> time > >> > >> - Assignment should also be lazy, to get better load > >> > >> balancing > >> > >> - Assignment needs support locality preferences > >> > >> - Possible design based on discussion so far: > >> > >> --> SplitReader has a method "addSplits(SplitT...)" > >> > >> to > >> > >> add > >> > >> one or > >> > >> more > >> > >> splits. Some split readers might assume they have only > >> > >> one > >> > >> split > >> > >> ever, > >> > >> concurrently, others assume multiple splits. (Note: > >> > >> idea > >> > >> behind > >> > >> being > >> > >> able > >> > >> to add multiple splits at the same time is to ease > >> > >> startup > >> > >> where > >> > >> multiple > >> > >> splits may be assigned instantly.) > >> --> SplitReader has a context object on which it can > >> > >> call > >> > >> indicate > >> > >> when > >> > >> splits are completed. The enumerator gets that > >> > >> notification and > >> > >> can > >> > >> use > >> > >> to > >> > >> decide when to assign new splits. This should help both > >> > >> in > >> > >> cases > >> > >> of > >> > >> sources > >> > >> that take splits lazily (file readers) and in case the > >> > >> source > >> > >> needs to > >> > >> preserve a partial order between splits (Kinesis, > >> > >> Pravega, > >> > >> Pulsar may > >> > >> need > >> > >> that). > >> --> SplitEnumerator gets notification when > >> > >> SplitReaders > >> > >> start > >> > >> and > >> > >> when > >> > >> they finish splits. They can decide at that moment to > >> > >> push > >> > >> more > >> > >> splits > >> > >> to > >> > >> that reader > >> --> The SplitEnumerator should probably be aware of > >> > >> the > >> > >> source > >> > >> parallelism, to build its initial distribution. > >> > >> - Open question: Should the source expose something > >> > >> like > >> > >> "host > >> > >> preferences", so that yarn/mesos/k8s can take this into > >> > >> account > >> > >> when > >> > >> selecting a node to start a TM on? > >> > >> *(5) Watermarks and event time alignment* > >> > >> - Watermark generation, as well as idleness, needs to > >> > >> be > >> > >> per > >> > >> split > >> > >> (like > >> > >> currently in the Kafka Source, per partition) > >> - It is desirable to support optional > >> > >> event-time-alignment, > >> > >> meaning > >> > >> that > >> > >> splits that are ahead are back-pressured or temporarily > >> > >> unsubscribed > >> > >> - I think i would be desirable to encapsulate > >> > >> watermark > >> > >> generation > >> > >> logic > >> > >> in watermark generators, for a separation of concerns. > >> > >> The > >> > >> watermark > >> > >> generators should run per split. > >> - Using watermark generators would also help with > >> > >> another > >> > >> problem of > >> > >> the > >> > >> suggested interface, namely supporting non-periodic > >> > >> watermarks > >> > >> efficiently. > >> > >> - Need a way to "dispatch" next record to different > >> > >> watermark > >> > >> generators > >> > >> - Need a way to tell SplitReader to "suspend" a split > >> > >> until a > >> > >> certain > >> > >> watermark is reached (event time backpressure) > >> - This would in fact be not needed (and thus simpler) > >> > >> if > >> > >> we > >> > >> had > >> > >> a > >> > >> SplitReader per split and may be a reason to re-open > >> > >> that > >> > >> discussion > >> > >> *(6) Watermarks across splits and in the Split > >> > >> Enumerator* > >> > >> - The split enumerator may need some watermark > >> > >> awareness, > >> > >> which > >> > >> should > >> > >> be > >> > >> purely based on split metadata (like create timestamp > >> > >> of > >> > >> file > >> > >> splits) > >> > >> - If there are still more splits with overlapping > >> > >> event > >> > >> time > >> > >> range > >> > >> for > >> > >> a > >> > >> split reader, then that split reader should not advance > >> > >> the > >> > >> watermark > >> > >> within the split beyond the overlap boundary. Otherwise > >> > >> future > >> > >> splits > >> > >> will > >> > >> produce late data. > >> > >> - One way to approach this could be that the split > >> > >> enumerator > >> > >> may > >> > >> send > >> > >> watermarks to the readers, and the readers cannot emit > >> > >> watermarks > >> > >> beyond > >> > >> that received watermark. > >> - Many split enumerators would simply immediately send > >> > >> Long.MAX > >> > >> out > >> > >> and > >> > >> leave the progress purely to the split readers. > >> > >> - For event-time alignment / split back pressure, this > >> > >> begs > >> > >> the > >> > >> question > >> > >> how we can avoid deadlocks that may arise when splits > >> > >> are > >> > >> suspended > >> > >> for > >> > >> event time back pressure, > >> > >> *(7) Batch and streaming Unification* > >> > >> - Functionality wise, the above design should support > >> > >> both > >> > >> - Batch often (mostly) does not care about reading "in > >> > >> order" > >> > >> and > >> > >> generating watermarks > >> --> Might use different enumerator logic that is > >> > >> more > >> > >> locality > >> > >> aware > >> > >> and ignores event time order > >> --> Does not generate watermarks > >> - Would be great if bounded sources could be > >> > >> identified > >> > >> at > >> > >> compile > >> > >> time, > >> > >> so that "env.addBoundedSource(...)" is type safe and > >> > >> can > >> > >> return a > >> > >> "BoundedDataStream". > >> - Possible to defer this discussion until later > >> > >> *Miscellaneous Comments* > >> > >> - Should the source have a TypeInformation for the > >> > >> produced > >> > >> type, > >> > >> instead > >> > >> of a serializer? We need a type information in the > >> > >> stream > >> > >> anyways, and > >> > >> can > >> > >> derive the serializer from that. Plus, creating the > >> > >> serializer > >> > >> should > >> > >> respect the ExecutionConfig. > >> > >> - The TypeSerializer interface is very powerful but > >> > >> also > >> > >> not > >> > >> easy to > >> > >> implement. Its purpose is to handle data super > >> > >> efficiently, > >> > >> support > >> > >> flexible ways of evolution, etc. > >> For metadata I would suggest to look at the > >> > >> SimpleVersionedSerializer > >> > >> instead, which is used for example for checkpoint > >> > >> master > >> > >> hooks, > >> > >> or for > >> > >> the > >> > >> streaming file sink. I think that is is a good match > >> > >> for > >> > >> cases > >> > >> where > >> > >> we > >> > >> do > >> > >> not need more than ser/deser (no copy, etc.) and don't > >> > >> need to > >> > >> push > >> > >> versioning out of the serialization paths for best > >> > >> performance > >> > >> (as in > >> > >> the > >> > >> TypeSerializer) > >> > >> > >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas < > k.klou...@data-artisans.com> > >> wrote: > >> > >> > >> Hi Biao, > >> > >> Thanks for the answer! > >> > >> So given the multi-threaded readers, now we have as > >> > >> open > >> > >> questions: > >> > >> 1) How do we let the checkpoints pass through our > >> > >> multi-threaded > >> > >> reader > >> > >> operator? > >> > >> 2) Do we have separate reader and source operators or > >> > >> not? In > >> > >> the > >> > >> strategy > >> > >> that has a separate source, the source operator has a > >> > >> parallelism of > >> > >> 1 > >> > >> and > >> > >> is responsible for split recovery only. > >> > >> For the first one, given also the constraints > >> > >> (blocking, > >> > >> finite > >> > >> queues, > >> > >> etc), I do not have an answer yet. > >> > >> For the 2nd, I think that we should go with separate > >> > >> operators > >> > >> for > >> > >> the > >> > >> source and the readers, for the following reasons: > >> > >> 1) This is more aligned with a potential future > >> > >> improvement > >> > >> where the > >> > >> split > >> > >> discovery becomes a responsibility of the JobManager > >> > >> and > >> > >> readers are > >> > >> pooling more work from the JM. > >> > >> 2) The source is going to be the "single point of > >> > >> truth". > >> > >> It > >> > >> will > >> > >> know > >> > >> what > >> > >> has been processed and what not. If the source and the > >> > >> readers > >> > >> are a > >> > >> single > >> > >> operator with parallelism > 1, or in general, if the > >> > >> split > >> > >> discovery > >> > >> is > >> > >> done by each task individually, then: > >> i) we have to have a deterministic scheme for each > >> > >> reader to > >> > >> assign > >> > >> splits to itself (e.g. mod subtaskId). This is not > >> > >> necessarily > >> > >> trivial > >> > >> for > >> > >> all sources. > >> ii) each reader would have to keep a copy of all its > >> > >> processed > >> > >> slpits > >> > >> iii) the state has to be a union state with a > >> > >> non-trivial > >> > >> merging > >> > >> logic > >> > >> in order to support rescaling. > >> > >> Two additional points that you raised above: > >> > >> i) The point that you raised that we need to keep all > >> > >> splits > >> > >> (processed > >> > >> and > >> > >> not-processed) I think is a bit of a strong > >> > >> requirement. > >> > >> This > >> > >> would > >> > >> imply > >> > >> that for infinite sources the state will grow > >> > >> indefinitely. > >> > >> This is > >> > >> problem > >> > >> is even more pronounced if we do not have a single > >> > >> source > >> > >> that > >> > >> assigns > >> > >> splits to readers, as each reader will have its own > >> > >> copy > >> > >> of > >> > >> the > >> > >> state. > >> > >> ii) it is true that for finite sources we need to > >> > >> somehow > >> > >> not > >> > >> close > >> > >> the > >> > >> readers when the source/split discoverer finishes. The > >> ContinuousFileReaderOperator has a work-around for > >> > >> that. > >> > >> It is > >> > >> not > >> > >> elegant, > >> > >> and checkpoints are not emitted after closing the > >> > >> source, > >> > >> but > >> > >> this, I > >> > >> believe, is a bigger problem which requires more > >> > >> changes > >> > >> than > >> > >> just > >> > >> refactoring the source interface. > >> > >> Cheers, > >> Kostas > >> > >> > >> > >