Hi Stephan & Piotrek, Thank you for feedback.
It seems that there are a lot of things to do in community. I am just afraid that this discussion may be forgotten since there so many proposals recently. Anyway, wish to see the split topics soon :) Piotr Nowojski <pi...@da-platform.com> 于2019年1月24日周四 下午8:21写道: > Hi Biao! > > This discussion was stalled because of preparations for the open sourcing > & merging Blink. I think before creating the tickets we should split this > discussion into topics/areas outlined by Stephan and create Flips for that. > > I think there is no chance for this to be completed in couple of remaining > weeks/1 month before 1.8 feature freeze, however it would be good to aim > with those changes for 1.9. > > Piotrek > > > On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> wrote: > > > > Hi community, > > The summary of Stephan makes a lot sense to me. It is much clearer indeed > > after splitting the complex topic into small ones. > > I was wondering is there any detail plan for next step? If not, I would > > like to push this thing forward by creating some JIRA issues. > > Another question is that should version 1.8 include these features? > > > > Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道: > > > >> Thanks everyone for the lively discussion. Let me try to summarize > where I > >> see convergence in the discussion and open issues. > >> I'll try to group this by design aspect of the source. Please let me > know > >> if I got things wrong or missed something crucial here. > >> > >> For issues 1-3, if the below reflects the state of the discussion, I > would > >> try and update the FLIP in the next days. > >> For the remaining ones we need more discussion. > >> > >> I would suggest to fork each of these aspects into a separate mail > thread, > >> or will loose sight of the individual aspects. > >> > >> *(1) Separation of Split Enumerator and Split Reader* > >> > >> - All seem to agree this is a good thing > >> - Split Enumerator could in the end live on JobManager (and assign > splits > >> via RPC) or in a task (and assign splits via data streams) > >> - this discussion is orthogonal and should come later, when the > interface > >> is agreed upon. > >> > >> *(2) Split Readers for one or more splits* > >> > >> - Discussion seems to agree that we need to support one reader that > >> possibly handles multiple splits concurrently. > >> - The requirement comes from sources where one poll()-style call > fetches > >> data from different splits / partitions > >> --> example sources that require that would be for example Kafka, > >> Pravega, Pulsar > >> > >> - Could have one split reader per source, or multiple split readers > that > >> share the "poll()" function > >> - To not make it too complicated, we can start with thinking about one > >> split reader for all splits initially and see if that covers all > >> requirements > >> > >> *(3) Threading model of the Split Reader* > >> > >> - Most active part of the discussion ;-) > >> > >> - A non-blocking way for Flink's task code to interact with the source > is > >> needed in order to a task runtime code based on a > >> single-threaded/actor-style task design > >> --> I personally am a big proponent of that, it will help with > >> well-behaved checkpoints, efficiency, and simpler yet more robust > runtime > >> code > >> > >> - Users care about simple abstraction, so as a subclass of SplitReader > >> (non-blocking / async) we need to have a BlockingSplitReader which will > >> form the basis of most source implementations. BlockingSplitReader lets > >> users do blocking simple poll() calls. > >> - The BlockingSplitReader would spawn a thread (or more) and the > >> thread(s) can make blocking calls and hand over data buffers via a > blocking > >> queue > >> - This should allow us to cover both, a fully async runtime, and a > simple > >> blocking interface for users. > >> - This is actually very similar to how the Kafka connectors work. Kafka > >> 9+ with one thread, Kafka 8 with multiple threads > >> > >> - On the base SplitReader (the async one), the non-blocking method that > >> gets the next chunk of data would signal data availability via a > >> CompletableFuture, because that gives the best flexibility (can await > >> completion or register notification handlers). > >> - The source task would register a "thenHandle()" (or similar) on the > >> future to put a "take next data" task into the actor-style mailbox > >> > >> *(4) Split Enumeration and Assignment* > >> > >> - Splits may be generated lazily, both in cases where there is a > limited > >> number of splits (but very many), or splits are discovered over time > >> - Assignment should also be lazy, to get better load balancing > >> - Assignment needs support locality preferences > >> > >> - Possible design based on discussion so far: > >> > >> --> SplitReader has a method "addSplits(SplitT...)" to add one or > more > >> splits. Some split readers might assume they have only one split ever, > >> concurrently, others assume multiple splits. (Note: idea behind being > able > >> to add multiple splits at the same time is to ease startup where > multiple > >> splits may be assigned instantly.) > >> --> SplitReader has a context object on which it can call indicate > when > >> splits are completed. The enumerator gets that notification and can use > to > >> decide when to assign new splits. This should help both in cases of > sources > >> that take splits lazily (file readers) and in case the source needs to > >> preserve a partial order between splits (Kinesis, Pravega, Pulsar may > need > >> that). > >> --> SplitEnumerator gets notification when SplitReaders start and > when > >> they finish splits. They can decide at that moment to push more splits > to > >> that reader > >> --> The SplitEnumerator should probably be aware of the source > >> parallelism, to build its initial distribution. > >> > >> - Open question: Should the source expose something like "host > >> preferences", so that yarn/mesos/k8s can take this into account when > >> selecting a node to start a TM on? > >> > >> *(5) Watermarks and event time alignment* > >> > >> - Watermark generation, as well as idleness, needs to be per split > (like > >> currently in the Kafka Source, per partition) > >> - It is desirable to support optional event-time-alignment, meaning > that > >> splits that are ahead are back-pressured or temporarily unsubscribed > >> > >> - I think i would be desirable to encapsulate watermark generation > logic > >> in watermark generators, for a separation of concerns. The watermark > >> generators should run per split. > >> - Using watermark generators would also help with another problem of > the > >> suggested interface, namely supporting non-periodic watermarks > efficiently. > >> > >> - Need a way to "dispatch" next record to different watermark > generators > >> - Need a way to tell SplitReader to "suspend" a split until a certain > >> watermark is reached (event time backpressure) > >> - This would in fact be not needed (and thus simpler) if we had a > >> SplitReader per split and may be a reason to re-open that discussion > >> > >> *(6) Watermarks across splits and in the Split Enumerator* > >> > >> - The split enumerator may need some watermark awareness, which should > be > >> purely based on split metadata (like create timestamp of file splits) > >> - If there are still more splits with overlapping event time range for > a > >> split reader, then that split reader should not advance the watermark > >> within the split beyond the overlap boundary. Otherwise future splits > will > >> produce late data. > >> > >> - One way to approach this could be that the split enumerator may send > >> watermarks to the readers, and the readers cannot emit watermarks beyond > >> that received watermark. > >> - Many split enumerators would simply immediately send Long.MAX out and > >> leave the progress purely to the split readers. > >> > >> - For event-time alignment / split back pressure, this begs the > question > >> how we can avoid deadlocks that may arise when splits are suspended for > >> event time back pressure, > >> > >> *(7) Batch and streaming Unification* > >> > >> - Functionality wise, the above design should support both > >> - Batch often (mostly) does not care about reading "in order" and > >> generating watermarks > >> --> Might use different enumerator logic that is more locality aware > >> and ignores event time order > >> --> Does not generate watermarks > >> - Would be great if bounded sources could be identified at compile > time, > >> so that "env.addBoundedSource(...)" is type safe and can return a > >> "BoundedDataStream". > >> - Possible to defer this discussion until later > >> > >> *Miscellaneous Comments* > >> > >> - Should the source have a TypeInformation for the produced type, > instead > >> of a serializer? We need a type information in the stream anyways, and > can > >> derive the serializer from that. Plus, creating the serializer should > >> respect the ExecutionConfig. > >> > >> - The TypeSerializer interface is very powerful but also not easy to > >> implement. Its purpose is to handle data super efficiently, support > >> flexible ways of evolution, etc. > >> For metadata I would suggest to look at the SimpleVersionedSerializer > >> instead, which is used for example for checkpoint master hooks, or for > the > >> streaming file sink. I think that is is a good match for cases where we > do > >> not need more than ser/deser (no copy, etc.) and don't need to push > >> versioning out of the serialization paths for best performance (as in > the > >> TypeSerializer) > >> > >> > >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas < > >> k.klou...@data-artisans.com> > >> wrote: > >> > >>> Hi Biao, > >>> > >>> Thanks for the answer! > >>> > >>> So given the multi-threaded readers, now we have as open questions: > >>> > >>> 1) How do we let the checkpoints pass through our multi-threaded reader > >>> operator? > >>> > >>> 2) Do we have separate reader and source operators or not? In the > >> strategy > >>> that has a separate source, the source operator has a parallelism of 1 > >> and > >>> is responsible for split recovery only. > >>> > >>> For the first one, given also the constraints (blocking, finite queues, > >>> etc), I do not have an answer yet. > >>> > >>> For the 2nd, I think that we should go with separate operators for the > >>> source and the readers, for the following reasons: > >>> > >>> 1) This is more aligned with a potential future improvement where the > >> split > >>> discovery becomes a responsibility of the JobManager and readers are > >>> pooling more work from the JM. > >>> > >>> 2) The source is going to be the "single point of truth". It will know > >> what > >>> has been processed and what not. If the source and the readers are a > >> single > >>> operator with parallelism > 1, or in general, if the split discovery is > >>> done by each task individually, then: > >>> i) we have to have a deterministic scheme for each reader to assign > >>> splits to itself (e.g. mod subtaskId). This is not necessarily trivial > >> for > >>> all sources. > >>> ii) each reader would have to keep a copy of all its processed slpits > >>> iii) the state has to be a union state with a non-trivial merging > >> logic > >>> in order to support rescaling. > >>> > >>> Two additional points that you raised above: > >>> > >>> i) The point that you raised that we need to keep all splits (processed > >> and > >>> not-processed) I think is a bit of a strong requirement. This would > imply > >>> that for infinite sources the state will grow indefinitely. This is > >> problem > >>> is even more pronounced if we do not have a single source that assigns > >>> splits to readers, as each reader will have its own copy of the state. > >>> > >>> ii) it is true that for finite sources we need to somehow not close the > >>> readers when the source/split discoverer finishes. The > >>> ContinuousFileReaderOperator has a work-around for that. It is not > >> elegant, > >>> and checkpoints are not emitted after closing the source, but this, I > >>> believe, is a bigger problem which requires more changes than just > >>> refactoring the source interface. > >>> > >>> Cheers, > >>> Kostas > >>> > >> > >