Hi Steven, Thank you for the feedback. Please take a look at the document FLIP-27 <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface> which is updated recently. A lot of details of enumerator were added in this document. I think it would help.
Steven Wu <stevenz...@gmail.com> 于2019年3月28日周四 下午12:52写道: > This proposal mentioned that SplitEnumerator might run on the JobManager or > in a single task on a TaskManager. > > if enumerator is a single task on a taskmanager, then the job DAG can never > been embarrassingly parallel anymore. That will nullify the leverage of > fine-grained recovery for embarrassingly parallel jobs. > > It's not clear to me what's the implication of running enumerator on the > jobmanager. So I will leave that out for now. > > On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> wrote: > > > Hi Stephan & Piotrek, > > > > Thank you for feedback. > > > > It seems that there are a lot of things to do in community. I am just > > afraid that this discussion may be forgotten since there so many > proposals > > recently. > > Anyway, wish to see the split topics soon :) > > > > Piotr Nowojski <pi...@da-platform.com> 于2019年1月24日周四 下午8:21写道: > > > > > Hi Biao! > > > > > > This discussion was stalled because of preparations for the open > sourcing > > > & merging Blink. I think before creating the tickets we should split > this > > > discussion into topics/areas outlined by Stephan and create Flips for > > that. > > > > > > I think there is no chance for this to be completed in couple of > > remaining > > > weeks/1 month before 1.8 feature freeze, however it would be good to > aim > > > with those changes for 1.9. > > > > > > Piotrek > > > > > > > On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> wrote: > > > > > > > > Hi community, > > > > The summary of Stephan makes a lot sense to me. It is much clearer > > indeed > > > > after splitting the complex topic into small ones. > > > > I was wondering is there any detail plan for next step? If not, I > would > > > > like to push this thing forward by creating some JIRA issues. > > > > Another question is that should version 1.8 include these features? > > > > > > > > Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道: > > > > > > > >> Thanks everyone for the lively discussion. Let me try to summarize > > > where I > > > >> see convergence in the discussion and open issues. > > > >> I'll try to group this by design aspect of the source. Please let me > > > know > > > >> if I got things wrong or missed something crucial here. > > > >> > > > >> For issues 1-3, if the below reflects the state of the discussion, I > > > would > > > >> try and update the FLIP in the next days. > > > >> For the remaining ones we need more discussion. > > > >> > > > >> I would suggest to fork each of these aspects into a separate mail > > > thread, > > > >> or will loose sight of the individual aspects. > > > >> > > > >> *(1) Separation of Split Enumerator and Split Reader* > > > >> > > > >> - All seem to agree this is a good thing > > > >> - Split Enumerator could in the end live on JobManager (and assign > > > splits > > > >> via RPC) or in a task (and assign splits via data streams) > > > >> - this discussion is orthogonal and should come later, when the > > > interface > > > >> is agreed upon. > > > >> > > > >> *(2) Split Readers for one or more splits* > > > >> > > > >> - Discussion seems to agree that we need to support one reader that > > > >> possibly handles multiple splits concurrently. > > > >> - The requirement comes from sources where one poll()-style call > > > fetches > > > >> data from different splits / partitions > > > >> --> example sources that require that would be for example Kafka, > > > >> Pravega, Pulsar > > > >> > > > >> - Could have one split reader per source, or multiple split readers > > > that > > > >> share the "poll()" function > > > >> - To not make it too complicated, we can start with thinking about > > one > > > >> split reader for all splits initially and see if that covers all > > > >> requirements > > > >> > > > >> *(3) Threading model of the Split Reader* > > > >> > > > >> - Most active part of the discussion ;-) > > > >> > > > >> - A non-blocking way for Flink's task code to interact with the > > source > > > is > > > >> needed in order to a task runtime code based on a > > > >> single-threaded/actor-style task design > > > >> --> I personally am a big proponent of that, it will help with > > > >> well-behaved checkpoints, efficiency, and simpler yet more robust > > > runtime > > > >> code > > > >> > > > >> - Users care about simple abstraction, so as a subclass of > > SplitReader > > > >> (non-blocking / async) we need to have a BlockingSplitReader which > > will > > > >> form the basis of most source implementations. BlockingSplitReader > > lets > > > >> users do blocking simple poll() calls. > > > >> - The BlockingSplitReader would spawn a thread (or more) and the > > > >> thread(s) can make blocking calls and hand over data buffers via a > > > blocking > > > >> queue > > > >> - This should allow us to cover both, a fully async runtime, and a > > > simple > > > >> blocking interface for users. > > > >> - This is actually very similar to how the Kafka connectors work. > > Kafka > > > >> 9+ with one thread, Kafka 8 with multiple threads > > > >> > > > >> - On the base SplitReader (the async one), the non-blocking method > > that > > > >> gets the next chunk of data would signal data availability via a > > > >> CompletableFuture, because that gives the best flexibility (can > await > > > >> completion or register notification handlers). > > > >> - The source task would register a "thenHandle()" (or similar) on > the > > > >> future to put a "take next data" task into the actor-style mailbox > > > >> > > > >> *(4) Split Enumeration and Assignment* > > > >> > > > >> - Splits may be generated lazily, both in cases where there is a > > > limited > > > >> number of splits (but very many), or splits are discovered over time > > > >> - Assignment should also be lazy, to get better load balancing > > > >> - Assignment needs support locality preferences > > > >> > > > >> - Possible design based on discussion so far: > > > >> > > > >> --> SplitReader has a method "addSplits(SplitT...)" to add one or > > > more > > > >> splits. Some split readers might assume they have only one split > ever, > > > >> concurrently, others assume multiple splits. (Note: idea behind > being > > > able > > > >> to add multiple splits at the same time is to ease startup where > > > multiple > > > >> splits may be assigned instantly.) > > > >> --> SplitReader has a context object on which it can call > indicate > > > when > > > >> splits are completed. The enumerator gets that notification and can > > use > > > to > > > >> decide when to assign new splits. This should help both in cases of > > > sources > > > >> that take splits lazily (file readers) and in case the source needs > to > > > >> preserve a partial order between splits (Kinesis, Pravega, Pulsar > may > > > need > > > >> that). > > > >> --> SplitEnumerator gets notification when SplitReaders start and > > > when > > > >> they finish splits. They can decide at that moment to push more > splits > > > to > > > >> that reader > > > >> --> The SplitEnumerator should probably be aware of the source > > > >> parallelism, to build its initial distribution. > > > >> > > > >> - Open question: Should the source expose something like "host > > > >> preferences", so that yarn/mesos/k8s can take this into account when > > > >> selecting a node to start a TM on? > > > >> > > > >> *(5) Watermarks and event time alignment* > > > >> > > > >> - Watermark generation, as well as idleness, needs to be per split > > > (like > > > >> currently in the Kafka Source, per partition) > > > >> - It is desirable to support optional event-time-alignment, meaning > > > that > > > >> splits that are ahead are back-pressured or temporarily unsubscribed > > > >> > > > >> - I think i would be desirable to encapsulate watermark generation > > > logic > > > >> in watermark generators, for a separation of concerns. The watermark > > > >> generators should run per split. > > > >> - Using watermark generators would also help with another problem > of > > > the > > > >> suggested interface, namely supporting non-periodic watermarks > > > efficiently. > > > >> > > > >> - Need a way to "dispatch" next record to different watermark > > > generators > > > >> - Need a way to tell SplitReader to "suspend" a split until a > certain > > > >> watermark is reached (event time backpressure) > > > >> - This would in fact be not needed (and thus simpler) if we had a > > > >> SplitReader per split and may be a reason to re-open that discussion > > > >> > > > >> *(6) Watermarks across splits and in the Split Enumerator* > > > >> > > > >> - The split enumerator may need some watermark awareness, which > > should > > > be > > > >> purely based on split metadata (like create timestamp of file > splits) > > > >> - If there are still more splits with overlapping event time range > > for > > > a > > > >> split reader, then that split reader should not advance the > watermark > > > >> within the split beyond the overlap boundary. Otherwise future > splits > > > will > > > >> produce late data. > > > >> > > > >> - One way to approach this could be that the split enumerator may > > send > > > >> watermarks to the readers, and the readers cannot emit watermarks > > beyond > > > >> that received watermark. > > > >> - Many split enumerators would simply immediately send Long.MAX out > > and > > > >> leave the progress purely to the split readers. > > > >> > > > >> - For event-time alignment / split back pressure, this begs the > > > question > > > >> how we can avoid deadlocks that may arise when splits are suspended > > for > > > >> event time back pressure, > > > >> > > > >> *(7) Batch and streaming Unification* > > > >> > > > >> - Functionality wise, the above design should support both > > > >> - Batch often (mostly) does not care about reading "in order" and > > > >> generating watermarks > > > >> --> Might use different enumerator logic that is more locality > > aware > > > >> and ignores event time order > > > >> --> Does not generate watermarks > > > >> - Would be great if bounded sources could be identified at compile > > > time, > > > >> so that "env.addBoundedSource(...)" is type safe and can return a > > > >> "BoundedDataStream". > > > >> - Possible to defer this discussion until later > > > >> > > > >> *Miscellaneous Comments* > > > >> > > > >> - Should the source have a TypeInformation for the produced type, > > > instead > > > >> of a serializer? We need a type information in the stream anyways, > and > > > can > > > >> derive the serializer from that. Plus, creating the serializer > should > > > >> respect the ExecutionConfig. > > > >> > > > >> - The TypeSerializer interface is very powerful but also not easy > to > > > >> implement. Its purpose is to handle data super efficiently, support > > > >> flexible ways of evolution, etc. > > > >> For metadata I would suggest to look at the > SimpleVersionedSerializer > > > >> instead, which is used for example for checkpoint master hooks, or > for > > > the > > > >> streaming file sink. I think that is is a good match for cases where > > we > > > do > > > >> not need more than ser/deser (no copy, etc.) and don't need to push > > > >> versioning out of the serialization paths for best performance (as > in > > > the > > > >> TypeSerializer) > > > >> > > > >> > > > >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas < > > > >> k.klou...@data-artisans.com> > > > >> wrote: > > > >> > > > >>> Hi Biao, > > > >>> > > > >>> Thanks for the answer! > > > >>> > > > >>> So given the multi-threaded readers, now we have as open questions: > > > >>> > > > >>> 1) How do we let the checkpoints pass through our multi-threaded > > reader > > > >>> operator? > > > >>> > > > >>> 2) Do we have separate reader and source operators or not? In the > > > >> strategy > > > >>> that has a separate source, the source operator has a parallelism > of > > 1 > > > >> and > > > >>> is responsible for split recovery only. > > > >>> > > > >>> For the first one, given also the constraints (blocking, finite > > queues, > > > >>> etc), I do not have an answer yet. > > > >>> > > > >>> For the 2nd, I think that we should go with separate operators for > > the > > > >>> source and the readers, for the following reasons: > > > >>> > > > >>> 1) This is more aligned with a potential future improvement where > the > > > >> split > > > >>> discovery becomes a responsibility of the JobManager and readers > are > > > >>> pooling more work from the JM. > > > >>> > > > >>> 2) The source is going to be the "single point of truth". It will > > know > > > >> what > > > >>> has been processed and what not. If the source and the readers are > a > > > >> single > > > >>> operator with parallelism > 1, or in general, if the split > discovery > > is > > > >>> done by each task individually, then: > > > >>> i) we have to have a deterministic scheme for each reader to > assign > > > >>> splits to itself (e.g. mod subtaskId). This is not necessarily > > trivial > > > >> for > > > >>> all sources. > > > >>> ii) each reader would have to keep a copy of all its processed > > slpits > > > >>> iii) the state has to be a union state with a non-trivial merging > > > >> logic > > > >>> in order to support rescaling. > > > >>> > > > >>> Two additional points that you raised above: > > > >>> > > > >>> i) The point that you raised that we need to keep all splits > > (processed > > > >> and > > > >>> not-processed) I think is a bit of a strong requirement. This would > > > imply > > > >>> that for infinite sources the state will grow indefinitely. This is > > > >> problem > > > >>> is even more pronounced if we do not have a single source that > > assigns > > > >>> splits to readers, as each reader will have its own copy of the > > state. > > > >>> > > > >>> ii) it is true that for finite sources we need to somehow not close > > the > > > >>> readers when the source/split discoverer finishes. The > > > >>> ContinuousFileReaderOperator has a work-around for that. It is not > > > >> elegant, > > > >>> and checkpoints are not emitted after closing the source, but > this, I > > > >>> believe, is a bigger problem which requires more changes than just > > > >>> refactoring the source interface. > > > >>> > > > >>> Cheers, > > > >>> Kostas > > > >>> > > > >> > > > > > > > > >