Thanks Stephan, I have to mention that most of the design work and FLIP wiki had actually been done by Aljoscha, Biao and you, before I picked up this FLIP.
Given that this FLIP has gone through an extended discussion and release 1.10 code freeze is approaching, I'd like to start a vote thread in about 12 hours if there is no further objections. Thanks, Jiangjie (Becket) Qin On Wed, Dec 4, 2019 at 7:04 PM Becket Qin <becket....@gmail.com> wrote: > Hi Jiayi, > > For now there is no communication between the coordinators. And I do see > some use cases if we can open up that channel. But it won't be in this FLIP. > > Thanks, > > Jiangjie (Becket) Qin > > On Wed, Dec 4, 2019 at 6:53 PM bupt_ljy <bupt_...@163.com> wrote: > >> Hi Becket, >> >> >> Thanks for updating the progress! >> >> >> I have a question about the #OperatorCoordinator. Will there be any >> communication between different #OperatorCoordinators (or in the future >> plan)? Because in that way it may be able to cover some cases in FLIP-27[1] >> like initializing static data before main input processing. Of course it >> requires more thinking, just want to speak up some ideas in my mind. >> >> >> +1 to the FLIP and detailed design! >> >> >> >> [1]. >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API >> >> >> Best, >> >> Jiayi Liao >> >> Original Message >> *Sender:* Stephan Ewen<se...@apache.org> >> *Recipient:* dev<dev@flink.apache.org> >> *Date:* Wednesday, Dec 4, 2019 18:25 >> *Subject:* Re: [DISCUSS] FLIP-27: Refactor Source Interface >> >> Thanks, Becket, for updating this. >> >> I agree with moving the aspects you mentioned into separate FLIPs - this >> one way becoming unwieldy in size. >> >> +1 to the FLIP in its current state. Its a very detailed write-up, nicely >> done! >> >> On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com> wrote: >> >> > Hi all, >> > >> > Sorry for the long belated update. I have updated FLIP-27 wiki page with >> > the latest proposals. Some noticeable changes include: >> > 1. A new generic communication mechanism between SplitEnumerator and >> > SourceReader. >> > 2. Some detail API method signature changes. >> > >> > We left a few things out of this FLIP and will address them in separate >> > FLIPs. Including: >> > 1. Per split event time. >> > 2. Event time alignment. >> > 3. Fine grained failover for SplitEnumerator failure. >> > >> > Please let us know if you have any question. >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org> wrote: >> > >> > > Hi Łukasz! >> > > >> > > Becket and me are working hard on figuring out the last details and >> > > implementing the first PoC. We would update the FLIP hopefully next week. >> > > >> > > There is a fair chance that a first version of this will be in 1.10, but >> > I >> > > think it will take another release to battle test it and migrate the >> > > connectors. >> > > >> > > Best, >> > > Stephan >> > > >> > > >> > > >> > > >> > > On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl> >> > wrote: >> > > >> > > > Hi, >> > > > >> > > > This proposal looks very promising for us. Do you have any plans in >> > which >> > > > Flink release it is going to be released? We are thinking on using a >> > Data >> > > > Set API for our future use cases but on the other hand Data Set API is >> > > > going to be deprecated so using proposed bounded data streams solution >> > > > could be more viable in the long term. >> > > > >> > > > Thanks, >> > > > Łukasz >> > > > >> > > > On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com> wrote: >> > > > > Thanks for putting together this proposal! >> > > > > >> > > > > I see that the "Per Split Event Time" and "Event Time Alignment" >> > > sections >> > > > > are still TBD. >> > > > > >> > > > > It would probably be good to flesh those out a bit before proceeding >> > > too >> > > > far >> > > > > as the event time alignment will probably influence the interaction >> > > with >> > > > > the split reader, specifically ReaderStatus emitNext(SourceOutput<E> >> > > > > output). >> > > > > >> > > > > We currently have only one implementation for event time alignment in >> > > the >> > > > > Kinesis consumer. The synchronization in that case takes place as the >> > > > last >> > > > > step before records are emitted downstream (RecordEmitter). With the >> > > > > currently proposed interfaces, the equivalent can be implemented in >> > the >> > > > > reader loop, although note that in the Kinesis consumer the per shard >> > > > > threads push records. >> > > > > >> > > > > Synchronization has not been implemented for the Kafka consumer yet. >> > > > > >> > > > > https://issues.apache.org/jira/browse/FLINK-12675 >> > > > > >> > > > > When I looked at it, I realized that the implementation will look >> > quite >> > > > > different >> > > > > from Kinesis because it needs to take place in the pull part, where >> > > > records >> > > > > are taken from the Kafka client. Due to the multiplexing it cannot be >> > > > done >> > > > > by blocking the split thread like it currently works for Kinesis. >> > > Reading >> > > > > from individual Kafka partitions needs to be controlled via >> > > pause/resume >> > > > > on the Kafka client. >> > > > > >> > > > > To take on that responsibility the split thread would need to be >> > aware >> > > of >> > > > > the >> > > > > watermarks or at least whether it should or should not continue to >> > > > consume >> > > > > a given split and this may require a different SourceReader or >> > > > SourceOutput >> > > > > interface. >> > > > > >> > > > > Thanks, >> > > > > Thomas >> > > > > >> > > > > >> > > > > On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com> wrote: >> > > > > >> > > > > > Hi Stephan, >> > > > > > >> > > > > > Thank you for feedback! >> > > > > > Will take a look at your branch before public discussing. >> > > > > > >> > > > > > >> > > > > > On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org> >> > > > wrote: >> > > > > > >> > > > > > > Hi Biao! >> > > > > > > >> > > > > > > Thanks for reviving this. I would like to join this discussion, >> > but >> > > > am >> > > > > > > quite occupied with the 1.9 release, so can we maybe pause this >> > > > > > discussion >> > > > > > > for a week or so? >> > > > > > > >> > > > > > > In the meantime I can share some suggestion based on prior >> > > > experiments: >> > > > > > > >> > > > > > > How to do watermarks / timestamp extractors in a simpler and more >> > > > > > flexible >> > > > > > > way. I think that part is quite promising should be part of the >> > new >> > > > > > source >> > > > > > > interface. >> > > > > > > >> > > > > > > >> > > > > > >> > > > >> > > >> > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > >> > > >> > https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > Some experiments on how to build the source reader and its >> > library >> > > > for >> > > > > > > common threading/split patterns: >> > > > > > > >> > > > > > > >> > > > > > >> > > > >> > > >> > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src >> > > > > > > >> > > > > > > >> > > > > > > Best, >> > > > > > > Stephan >> > > > > > > >> > > > > > > >> > > > > > > On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com> >> > > > wrote: >> > > > > > > >> > > > > > >> Hi devs, >> > > > > > >> >> > > > > > >> Since 1.9 is nearly released, I think we could get back to >> > > FLIP-27. >> > > > I >> > > > > > >> believe it should be included in 1.10. >> > > > > > >> >> > > > > > >> There are so many things mentioned in document of FLIP-27. [1] I >> > > > think >> > > > > > >> we'd better discuss them separately. However the wiki is not a >> > > good >> > > > > > place >> > > > > > >> to discuss. I wrote google doc about SplitReader API which >> > misses >> > > > some >> > > > > > >> details in the document. [2] >> > > > > > >> >> > > > > > >> 1. >> > > > > > >> >> > > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface >> > > > > > >> 2. >> > > > > > >> >> > > > > > >> > > > >> > > >> > https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing >> > > > > > >> >> > > > > > >> CC Stephan, Aljoscha, Piotrek, Becket >> > > > > > >> >> > > > > > >> >> > > > > > >> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com> >> > > > wrote: >> > > > > > >> >> > > > > > >>> Hi Steven, >> > > > > > >>> Thank you for the feedback. Please take a look at the document >> > > > FLIP-27 >> > > > > > >>> < >> > > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface >> > > > > >> > > > > > which >> > > > > > >>> is updated recently. A lot of details of enumerator were added >> > in >> > > > this >> > > > > > >>> document. I think it would help. >> > > > > > >>> >> > > > > > >>> Steven Wu <stevenz...@gmail.com> 于2019年3月28日周四 下午12:52写道: >> > > > > > >>> >> > > > > > >>>> This proposal mentioned that SplitEnumerator might run on the >> > > > > > >>>> JobManager or >> > > > > > >>>> in a single task on a TaskManager. >> > > > > > >>>> >> > > > > > >>>> if enumerator is a single task on a taskmanager, then the job >> > > DAG >> > > > can >> > > > > > >>>> never >> > > > > > >>>> been embarrassingly parallel anymore. That will nullify the >> > > > leverage >> > > > > > of >> > > > > > >>>> fine-grained recovery for embarrassingly parallel jobs. >> > > > > > >>>> >> > > > > > >>>> It's not clear to me what's the implication of running >> > > enumerator >> > > > on >> > > > > > the >> > > > > > >>>> jobmanager. So I will leave that out for now. >> > > > > > >>>> >> > > > > > >>>> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> >> > > > wrote: >> > > > > > >>>> >> > > > > > >>>> > Hi Stephan & Piotrek, >> > > > > > >>>> > >> > > > > > >>>> > Thank you for feedback. >> > > > > > >>>> > >> > > > > > >>>> > It seems that there are a lot of things to do in community. >> > I >> > > am >> > > > > > just >> > > > > > >>>> > afraid that this discussion may be forgotten since there so >> > > many >> > > > > > >>>> proposals >> > > > > > >>>> > recently. >> > > > > > >>>> > Anyway, wish to see the split topics soon :) >> > > > > > >>>> > >> > > > > > >>>> > Piotr Nowojski <pi...@da-platform.com> 于2019年1月24日周四 >> > > 下午8:21写道: >> > > > > > >>>> > >> > > > > > >>>> > > Hi Biao! >> > > > > > >>>> > > >> > > > > > >>>> > > This discussion was stalled because of preparations for >> > the >> > > > open >> > > > > > >>>> sourcing >> > > > > > >>>> > > & merging Blink. I think before creating the tickets we >> > > should >> > > > > > >>>> split this >> > > > > > >>>> > > discussion into topics/areas outlined by Stephan and >> > create >> > > > Flips >> > > > > > >>>> for >> > > > > > >>>> > that. >> > > > > > >>>> > > >> > > > > > >>>> > > I think there is no chance for this to be completed in >> > > couple >> > > > of >> > > > > > >>>> > remaining >> > > > > > >>>> > > weeks/1 month before 1.8 feature freeze, however it would >> > be >> > > > good >> > > > > > >>>> to aim >> > > > > > >>>> > > with those changes for 1.9. >> > > > > > >>>> > > >> > > > > > >>>> > > Piotrek >> > > > > > >>>> > > >> > > > > > >>>> > > > On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> >> > > > wrote: >> > > > > > >>>> > > > >> > > > > > >>>> > > > Hi community, >> > > > > > >>>> > > > The summary of Stephan makes a lot sense to me. It is >> > much >> > > > > > clearer >> > > > > > >>>> > indeed >> > > > > > >>>> > > > after splitting the complex topic into small ones. >> > > > > > >>>> > > > I was wondering is there any detail plan for next step? >> > If >> > > > not, >> > > > > > I >> > > > > > >>>> would >> > > > > > >>>> > > > like to push this thing forward by creating some JIRA >> > > > issues. >> > > > > > >>>> > > > Another question is that should version 1.8 include >> > these >> > > > > > >>>> features? >> > > > > > >>>> > > > >> > > > > > >>>> > > > Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道: >> > > > > > >>>> > > > >> > > > > > >>>> > > >> Thanks everyone for the lively discussion. Let me try >> > to >> > > > > > >>>> summarize >> > > > > > >>>> > > where I >> > > > > > >>>> > > >> see convergence in the discussion and open issues. >> > > > > > >>>> > > >> I'll try to group this by design aspect of the source. >> > > > Please >> > > > > > >>>> let me >> > > > > > >>>> > > know >> > > > > > >>>> > > >> if I got things wrong or missed something crucial here. >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> For issues 1-3, if the below reflects the state of the >> > > > > > >>>> discussion, I >> > > > > > >>>> > > would >> > > > > > >>>> > > >> try and update the FLIP in the next days. >> > > > > > >>>> > > >> For the remaining ones we need more discussion. >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> I would suggest to fork each of these aspects into a >> > > > separate >> > > > > > >>>> mail >> > > > > > >>>> > > thread, >> > > > > > >>>> > > >> or will loose sight of the individual aspects. >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> *(1) Separation of Split Enumerator and Split Reader* >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - All seem to agree this is a good thing >> > > > > > >>>> > > >> - Split Enumerator could in the end live on JobManager >> > > > (and >> > > > > > >>>> assign >> > > > > > >>>> > > splits >> > > > > > >>>> > > >> via RPC) or in a task (and assign splits via data >> > > streams) >> > > > > > >>>> > > >> - this discussion is orthogonal and should come later, >> > > > when >> > > > > > the >> > > > > > >>>> > > interface >> > > > > > >>>> > > >> is agreed upon. >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> *(2) Split Readers for one or more splits* >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - Discussion seems to agree that we need to support >> > one >> > > > reader >> > > > > > >>>> that >> > > > > > >>>> > > >> possibly handles multiple splits concurrently. >> > > > > > >>>> > > >> - The requirement comes from sources where one >> > > > poll()-style >> > > > > > call >> > > > > > >>>> > > fetches >> > > > > > >>>> > > >> data from different splits / partitions >> > > > > > >>>> > > >> --> example sources that require that would be for >> > > > example >> > > > > > >>>> Kafka, >> > > > > > >>>> > > >> Pravega, Pulsar >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - Could have one split reader per source, or multiple >> > > > split >> > > > > > >>>> readers >> > > > > > >>>> > > that >> > > > > > >>>> > > >> share the "poll()" function >> > > > > > >>>> > > >> - To not make it too complicated, we can start with >> > > > thinking >> > > > > > >>>> about >> > > > > > >>>> > one >> > > > > > >>>> > > >> split reader for all splits initially and see if that >> > > > covers >> > > > > > all >> > > > > > >>>> > > >> requirements >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> *(3) Threading model of the Split Reader* >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - Most active part of the discussion ;-) >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - A non-blocking way for Flink's task code to interact >> > > > with >> > > > > > the >> > > > > > >>>> > source >> > > > > > >>>> > > is >> > > > > > >>>> > > >> needed in order to a task runtime code based on a >> > > > > > >>>> > > >> single-threaded/actor-style task design >> > > > > > >>>> > > >> --> I personally am a big proponent of that, it will >> > > > help >> > > > > > with >> > > > > > >>>> > > >> well-behaved checkpoints, efficiency, and simpler yet >> > > more >> > > > > > robust >> > > > > > >>>> > > runtime >> > > > > > >>>> > > >> code >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - Users care about simple abstraction, so as a >> > subclass >> > > of >> > > > > > >>>> > SplitReader >> > > > > > >>>> > > >> (non-blocking / async) we need to have a >> > > > BlockingSplitReader >> > > > > > >>>> which >> > > > > > >>>> > will >> > > > > > >>>> > > >> form the basis of most source implementations. >> > > > > > >>>> BlockingSplitReader >> > > > > > >>>> > lets >> > > > > > >>>> > > >> users do blocking simple poll() calls. >> > > > > > >>>> > > >> - The BlockingSplitReader would spawn a thread (or >> > more) >> > > > and >> > > > > > the >> > > > > > >>>> > > >> thread(s) can make blocking calls and hand over data >> > > > buffers >> > > > > > via >> > > > > > >>>> a >> > > > > > >>>> > > blocking >> > > > > > >>>> > > >> queue >> > > > > > >>>> > > >> - This should allow us to cover both, a fully async >> > > > runtime, >> > > > > > >>>> and a >> > > > > > >>>> > > simple >> > > > > > >>>> > > >> blocking interface for users. >> > > > > > >>>> > > >> - This is actually very similar to how the Kafka >> > > > connectors >> > > > > > >>>> work. >> > > > > > >>>> > Kafka >> > > > > > >>>> > > >> 9+ with one thread, Kafka 8 with multiple threads >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - On the base SplitReader (the async one), the >> > > > non-blocking >> > > > > > >>>> method >> > > > > > >>>> > that >> > > > > > >>>> > > >> gets the next chunk of data would signal data >> > > availability >> > > > via >> > > > > > a >> > > > > > >>>> > > >> CompletableFuture, because that gives the best >> > > flexibility >> > > > (can >> > > > > > >>>> await >> > > > > > >>>> > > >> completion or register notification handlers). >> > > > > > >>>> > > >> - The source task would register a "thenHandle()" (or >> > > > similar) >> > > > > > >>>> on the >> > > > > > >>>> > > >> future to put a "take next data" task into the >> > > actor-style >> > > > > > >>>> mailbox >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> *(4) Split Enumeration and Assignment* >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - Splits may be generated lazily, both in cases where >> > > > there >> > > > > > is a >> > > > > > >>>> > > limited >> > > > > > >>>> > > >> number of splits (but very many), or splits are >> > > discovered >> > > > over >> > > > > > >>>> time >> > > > > > >>>> > > >> - Assignment should also be lazy, to get better load >> > > > balancing >> > > > > > >>>> > > >> - Assignment needs support locality preferences >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - Possible design based on discussion so far: >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> --> SplitReader has a method "addSplits(SplitT...)" >> > to >> > > > add >> > > > > > >>>> one or >> > > > > > >>>> > > more >> > > > > > >>>> > > >> splits. Some split readers might assume they have only >> > > one >> > > > > > split >> > > > > > >>>> ever, >> > > > > > >>>> > > >> concurrently, others assume multiple splits. (Note: >> > idea >> > > > behind >> > > > > > >>>> being >> > > > > > >>>> > > able >> > > > > > >>>> > > >> to add multiple splits at the same time is to ease >> > > startup >> > > > > > where >> > > > > > >>>> > > multiple >> > > > > > >>>> > > >> splits may be assigned instantly.) >> > > > > > >>>> > > >> --> SplitReader has a context object on which it can >> > > > call >> > > > > > >>>> indicate >> > > > > > >>>> > > when >> > > > > > >>>> > > >> splits are completed. The enumerator gets that >> > > > notification and >> > > > > > >>>> can >> > > > > > >>>> > use >> > > > > > >>>> > > to >> > > > > > >>>> > > >> decide when to assign new splits. This should help both >> > > in >> > > > > > cases >> > > > > > >>>> of >> > > > > > >>>> > > sources >> > > > > > >>>> > > >> that take splits lazily (file readers) and in case the >> > > > source >> > > > > > >>>> needs to >> > > > > > >>>> > > >> preserve a partial order between splits (Kinesis, >> > > Pravega, >> > > > > > >>>> Pulsar may >> > > > > > >>>> > > need >> > > > > > >>>> > > >> that). >> > > > > > >>>> > > >> --> SplitEnumerator gets notification when >> > > SplitReaders >> > > > > > start >> > > > > > >>>> and >> > > > > > >>>> > > when >> > > > > > >>>> > > >> they finish splits. They can decide at that moment to >> > > push >> > > > more >> > > > > > >>>> splits >> > > > > > >>>> > > to >> > > > > > >>>> > > >> that reader >> > > > > > >>>> > > >> --> The SplitEnumerator should probably be aware of >> > > the >> > > > > > source >> > > > > > >>>> > > >> parallelism, to build its initial distribution. >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - Open question: Should the source expose something >> > like >> > > > "host >> > > > > > >>>> > > >> preferences", so that yarn/mesos/k8s can take this into >> > > > account >> > > > > > >>>> when >> > > > > > >>>> > > >> selecting a node to start a TM on? >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> *(5) Watermarks and event time alignment* >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - Watermark generation, as well as idleness, needs to >> > be >> > > > per >> > > > > > >>>> split >> > > > > > >>>> > > (like >> > > > > > >>>> > > >> currently in the Kafka Source, per partition) >> > > > > > >>>> > > >> - It is desirable to support optional >> > > > event-time-alignment, >> > > > > > >>>> meaning >> > > > > > >>>> > > that >> > > > > > >>>> > > >> splits that are ahead are back-pressured or temporarily >> > > > > > >>>> unsubscribed >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - I think i would be desirable to encapsulate >> > watermark >> > > > > > >>>> generation >> > > > > > >>>> > > logic >> > > > > > >>>> > > >> in watermark generators, for a separation of concerns. >> > > The >> > > > > > >>>> watermark >> > > > > > >>>> > > >> generators should run per split. >> > > > > > >>>> > > >> - Using watermark generators would also help with >> > > another >> > > > > > >>>> problem of >> > > > > > >>>> > > the >> > > > > > >>>> > > >> suggested interface, namely supporting non-periodic >> > > > watermarks >> > > > > > >>>> > > efficiently. >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - Need a way to "dispatch" next record to different >> > > > watermark >> > > > > > >>>> > > generators >> > > > > > >>>> > > >> - Need a way to tell SplitReader to "suspend" a split >> > > > until a >> > > > > > >>>> certain >> > > > > > >>>> > > >> watermark is reached (event time backpressure) >> > > > > > >>>> > > >> - This would in fact be not needed (and thus simpler) >> > if >> > > > we >> > > > > > had >> > > > > > >>>> a >> > > > > > >>>> > > >> SplitReader per split and may be a reason to re-open >> > that >> > > > > > >>>> discussion >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> *(6) Watermarks across splits and in the Split >> > > Enumerator* >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - The split enumerator may need some watermark >> > > awareness, >> > > > > > which >> > > > > > >>>> > should >> > > > > > >>>> > > be >> > > > > > >>>> > > >> purely based on split metadata (like create timestamp >> > of >> > > > file >> > > > > > >>>> splits) >> > > > > > >>>> > > >> - If there are still more splits with overlapping >> > event >> > > > time >> > > > > > >>>> range >> > > > > > >>>> > for >> > > > > > >>>> > > a >> > > > > > >>>> > > >> split reader, then that split reader should not advance >> > > the >> > > > > > >>>> watermark >> > > > > > >>>> > > >> within the split beyond the overlap boundary. Otherwise >> > > > future >> > > > > > >>>> splits >> > > > > > >>>> > > will >> > > > > > >>>> > > >> produce late data. >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - One way to approach this could be that the split >> > > > enumerator >> > > > > > >>>> may >> > > > > > >>>> > send >> > > > > > >>>> > > >> watermarks to the readers, and the readers cannot emit >> > > > > > watermarks >> > > > > > >>>> > beyond >> > > > > > >>>> > > >> that received watermark. >> > > > > > >>>> > > >> - Many split enumerators would simply immediately send >> > > > > > Long.MAX >> > > > > > >>>> out >> > > > > > >>>> > and >> > > > > > >>>> > > >> leave the progress purely to the split readers. >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - For event-time alignment / split back pressure, this >> > > > begs >> > > > > > the >> > > > > > >>>> > > question >> > > > > > >>>> > > >> how we can avoid deadlocks that may arise when splits >> > are >> > > > > > >>>> suspended >> > > > > > >>>> > for >> > > > > > >>>> > > >> event time back pressure, >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> *(7) Batch and streaming Unification* >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - Functionality wise, the above design should support >> > > both >> > > > > > >>>> > > >> - Batch often (mostly) does not care about reading "in >> > > > order" >> > > > > > >>>> and >> > > > > > >>>> > > >> generating watermarks >> > > > > > >>>> > > >> --> Might use different enumerator logic that is >> > more >> > > > > > locality >> > > > > > >>>> > aware >> > > > > > >>>> > > >> and ignores event time order >> > > > > > >>>> > > >> --> Does not generate watermarks >> > > > > > >>>> > > >> - Would be great if bounded sources could be >> > identified >> > > at >> > > > > > >>>> compile >> > > > > > >>>> > > time, >> > > > > > >>>> > > >> so that "env.addBoundedSource(...)" is type safe and >> > can >> > > > > > return a >> > > > > > >>>> > > >> "BoundedDataStream". >> > > > > > >>>> > > >> - Possible to defer this discussion until later >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> *Miscellaneous Comments* >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - Should the source have a TypeInformation for the >> > > > produced >> > > > > > >>>> type, >> > > > > > >>>> > > instead >> > > > > > >>>> > > >> of a serializer? We need a type information in the >> > stream >> > > > > > >>>> anyways, and >> > > > > > >>>> > > can >> > > > > > >>>> > > >> derive the serializer from that. Plus, creating the >> > > > serializer >> > > > > > >>>> should >> > > > > > >>>> > > >> respect the ExecutionConfig. >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> - The TypeSerializer interface is very powerful but >> > also >> > > > not >> > > > > > >>>> easy to >> > > > > > >>>> > > >> implement. Its purpose is to handle data super >> > > efficiently, >> > > > > > >>>> support >> > > > > > >>>> > > >> flexible ways of evolution, etc. >> > > > > > >>>> > > >> For metadata I would suggest to look at the >> > > > > > >>>> SimpleVersionedSerializer >> > > > > > >>>> > > >> instead, which is used for example for checkpoint >> > master >> > > > hooks, >> > > > > > >>>> or for >> > > > > > >>>> > > the >> > > > > > >>>> > > >> streaming file sink. I think that is is a good match >> > for >> > > > cases >> > > > > > >>>> where >> > > > > > >>>> > we >> > > > > > >>>> > > do >> > > > > > >>>> > > >> not need more than ser/deser (no copy, etc.) and don't >> > > > need to >> > > > > > >>>> push >> > > > > > >>>> > > >> versioning out of the serialization paths for best >> > > > performance >> > > > > > >>>> (as in >> > > > > > >>>> > > the >> > > > > > >>>> > > >> TypeSerializer) >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas < >> > > > > > >>>> > > >> k.klou...@data-artisans.com> >> > > > > > >>>> > > >> wrote: >> > > > > > >>>> > > >> >> > > > > > >>>> > > >>> Hi Biao, >> > > > > > >>>> > > >>> >> > > > > > >>>> > > >>> Thanks for the answer! >> > > > > > >>>> > > >>> >> > > > > > >>>> > > >>> So given the multi-threaded readers, now we have as >> > open >> > > > > > >>>> questions: >> > > > > > >>>> > > >>> >> > > > > > >>>> > > >>> 1) How do we let the checkpoints pass through our >> > > > > > multi-threaded >> > > > > > >>>> > reader >> > > > > > >>>> > > >>> operator? >> > > > > > >>>> > > >>> >> > > > > > >>>> > > >>> 2) Do we have separate reader and source operators or >> > > > not? In >> > > > > > >>>> the >> > > > > > >>>> > > >> strategy >> > > > > > >>>> > > >>> that has a separate source, the source operator has a >> > > > > > >>>> parallelism of >> > > > > > >>>> > 1 >> > > > > > >>>> > > >> and >> > > > > > >>>> > > >>> is responsible for split recovery only. >> > > > > > >>>> > > >>> >> > > > > > >>>> > > >>> For the first one, given also the constraints >> > (blocking, >> > > > > > finite >> > > > > > >>>> > queues, >> > > > > > >>>> > > >>> etc), I do not have an answer yet. >> > > > > > >>>> > > >>> >> > > > > > >>>> > > >>> For the 2nd, I think that we should go with separate >> > > > operators >> > > > > > >>>> for >> > > > > > >>>> > the >> > > > > > >>>> > > >>> source and the readers, for the following reasons: >> > > > > > >>>> > > >>> >> > > > > > >>>> > > >>> 1) This is more aligned with a potential future >> > > > improvement >> > > > > > >>>> where the >> > > > > > >>>> > > >> split >> > > > > > >>>> > > >>> discovery becomes a responsibility of the JobManager >> > and >> > > > > > >>>> readers are >> > > > > > >>>> > > >>> pooling more work from the JM. >> > > > > > >>>> > > >>> >> > > > > > >>>> > > >>> 2) The source is going to be the "single point of >> > > truth". >> > > > It >> > > > > > >>>> will >> > > > > > >>>> > know >> > > > > > >>>> > > >> what >> > > > > > >>>> > > >>> has been processed and what not. If the source and the >> > > > readers >> > > > > > >>>> are a >> > > > > > >>>> > > >> single >> > > > > > >>>> > > >>> operator with parallelism > 1, or in general, if the >> > > split >> > > > > > >>>> discovery >> > > > > > >>>> > is >> > > > > > >>>> > > >>> done by each task individually, then: >> > > > > > >>>> > > >>> i) we have to have a deterministic scheme for each >> > > > reader to >> > > > > > >>>> assign >> > > > > > >>>> > > >>> splits to itself (e.g. mod subtaskId). This is not >> > > > necessarily >> > > > > > >>>> > trivial >> > > > > > >>>> > > >> for >> > > > > > >>>> > > >>> all sources. >> > > > > > >>>> > > >>> ii) each reader would have to keep a copy of all its >> > > > > > processed >> > > > > > >>>> > slpits >> > > > > > >>>> > > >>> iii) the state has to be a union state with a >> > > > non-trivial >> > > > > > >>>> merging >> > > > > > >>>> > > >> logic >> > > > > > >>>> > > >>> in order to support rescaling. >> > > > > > >>>> > > >>> >> > > > > > >>>> > > >>> Two additional points that you raised above: >> > > > > > >>>> > > >>> >> > > > > > >>>> > > >>> i) The point that you raised that we need to keep all >> > > > splits >> > > > > > >>>> > (processed >> > > > > > >>>> > > >> and >> > > > > > >>>> > > >>> not-processed) I think is a bit of a strong >> > requirement. >> > > > This >> > > > > > >>>> would >> > > > > > >>>> > > imply >> > > > > > >>>> > > >>> that for infinite sources the state will grow >> > > > indefinitely. >> > > > > > >>>> This is >> > > > > > >>>> > > >> problem >> > > > > > >>>> > > >>> is even more pronounced if we do not have a single >> > > source >> > > > that >> > > > > > >>>> > assigns >> > > > > > >>>> > > >>> splits to readers, as each reader will have its own >> > copy >> > > > of >> > > > > > the >> > > > > > >>>> > state. >> > > > > > >>>> > > >>> >> > > > > > >>>> > > >>> ii) it is true that for finite sources we need to >> > > somehow >> > > > not >> > > > > > >>>> close >> > > > > > >>>> > the >> > > > > > >>>> > > >>> readers when the source/split discoverer finishes. The >> > > > > > >>>> > > >>> ContinuousFileReaderOperator has a work-around for >> > that. >> > > > It is >> > > > > > >>>> not >> > > > > > >>>> > > >> elegant, >> > > > > > >>>> > > >>> and checkpoints are not emitted after closing the >> > > source, >> > > > but >> > > > > > >>>> this, I >> > > > > > >>>> > > >>> believe, is a bigger problem which requires more >> > changes >> > > > than >> > > > > > >>>> just >> > > > > > >>>> > > >>> refactoring the source interface. >> > > > > > >>>> > > >>> >> > > > > > >>>> > > >>> Cheers, >> > > > > > >>>> > > >>> Kostas >> > > > > > >>>> > > >>> >> > > > > > >>>> > > >> >> > > > > > >>>> > > >> > > > > > >>>> > > >> > > > > > >>>> > >> > > > > > >>>> >> > > > > > >>> >> > > > > > >> > > > > >> > > > >> > > >> > >> >> >>