Hi community, The summary of Stephan makes a lot sense to me. It is much clearer indeed after splitting the complex topic into small ones. I was wondering is there any detail plan for next step? If not, I would like to push this thing forward by creating some JIRA issues. Another question is that should version 1.8 include these features?
Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道: > Thanks everyone for the lively discussion. Let me try to summarize where I > see convergence in the discussion and open issues. > I'll try to group this by design aspect of the source. Please let me know > if I got things wrong or missed something crucial here. > > For issues 1-3, if the below reflects the state of the discussion, I would > try and update the FLIP in the next days. > For the remaining ones we need more discussion. > > I would suggest to fork each of these aspects into a separate mail thread, > or will loose sight of the individual aspects. > > *(1) Separation of Split Enumerator and Split Reader* > > - All seem to agree this is a good thing > - Split Enumerator could in the end live on JobManager (and assign splits > via RPC) or in a task (and assign splits via data streams) > - this discussion is orthogonal and should come later, when the interface > is agreed upon. > > *(2) Split Readers for one or more splits* > > - Discussion seems to agree that we need to support one reader that > possibly handles multiple splits concurrently. > - The requirement comes from sources where one poll()-style call fetches > data from different splits / partitions > --> example sources that require that would be for example Kafka, > Pravega, Pulsar > > - Could have one split reader per source, or multiple split readers that > share the "poll()" function > - To not make it too complicated, we can start with thinking about one > split reader for all splits initially and see if that covers all > requirements > > *(3) Threading model of the Split Reader* > > - Most active part of the discussion ;-) > > - A non-blocking way for Flink's task code to interact with the source is > needed in order to a task runtime code based on a > single-threaded/actor-style task design > --> I personally am a big proponent of that, it will help with > well-behaved checkpoints, efficiency, and simpler yet more robust runtime > code > > - Users care about simple abstraction, so as a subclass of SplitReader > (non-blocking / async) we need to have a BlockingSplitReader which will > form the basis of most source implementations. BlockingSplitReader lets > users do blocking simple poll() calls. > - The BlockingSplitReader would spawn a thread (or more) and the > thread(s) can make blocking calls and hand over data buffers via a blocking > queue > - This should allow us to cover both, a fully async runtime, and a simple > blocking interface for users. > - This is actually very similar to how the Kafka connectors work. Kafka > 9+ with one thread, Kafka 8 with multiple threads > > - On the base SplitReader (the async one), the non-blocking method that > gets the next chunk of data would signal data availability via a > CompletableFuture, because that gives the best flexibility (can await > completion or register notification handlers). > - The source task would register a "thenHandle()" (or similar) on the > future to put a "take next data" task into the actor-style mailbox > > *(4) Split Enumeration and Assignment* > > - Splits may be generated lazily, both in cases where there is a limited > number of splits (but very many), or splits are discovered over time > - Assignment should also be lazy, to get better load balancing > - Assignment needs support locality preferences > > - Possible design based on discussion so far: > > --> SplitReader has a method "addSplits(SplitT...)" to add one or more > splits. Some split readers might assume they have only one split ever, > concurrently, others assume multiple splits. (Note: idea behind being able > to add multiple splits at the same time is to ease startup where multiple > splits may be assigned instantly.) > --> SplitReader has a context object on which it can call indicate when > splits are completed. The enumerator gets that notification and can use to > decide when to assign new splits. This should help both in cases of sources > that take splits lazily (file readers) and in case the source needs to > preserve a partial order between splits (Kinesis, Pravega, Pulsar may need > that). > --> SplitEnumerator gets notification when SplitReaders start and when > they finish splits. They can decide at that moment to push more splits to > that reader > --> The SplitEnumerator should probably be aware of the source > parallelism, to build its initial distribution. > > - Open question: Should the source expose something like "host > preferences", so that yarn/mesos/k8s can take this into account when > selecting a node to start a TM on? > > *(5) Watermarks and event time alignment* > > - Watermark generation, as well as idleness, needs to be per split (like > currently in the Kafka Source, per partition) > - It is desirable to support optional event-time-alignment, meaning that > splits that are ahead are back-pressured or temporarily unsubscribed > > - I think i would be desirable to encapsulate watermark generation logic > in watermark generators, for a separation of concerns. The watermark > generators should run per split. > - Using watermark generators would also help with another problem of the > suggested interface, namely supporting non-periodic watermarks efficiently. > > - Need a way to "dispatch" next record to different watermark generators > - Need a way to tell SplitReader to "suspend" a split until a certain > watermark is reached (event time backpressure) > - This would in fact be not needed (and thus simpler) if we had a > SplitReader per split and may be a reason to re-open that discussion > > *(6) Watermarks across splits and in the Split Enumerator* > > - The split enumerator may need some watermark awareness, which should be > purely based on split metadata (like create timestamp of file splits) > - If there are still more splits with overlapping event time range for a > split reader, then that split reader should not advance the watermark > within the split beyond the overlap boundary. Otherwise future splits will > produce late data. > > - One way to approach this could be that the split enumerator may send > watermarks to the readers, and the readers cannot emit watermarks beyond > that received watermark. > - Many split enumerators would simply immediately send Long.MAX out and > leave the progress purely to the split readers. > > - For event-time alignment / split back pressure, this begs the question > how we can avoid deadlocks that may arise when splits are suspended for > event time back pressure, > > *(7) Batch and streaming Unification* > > - Functionality wise, the above design should support both > - Batch often (mostly) does not care about reading "in order" and > generating watermarks > --> Might use different enumerator logic that is more locality aware > and ignores event time order > --> Does not generate watermarks > - Would be great if bounded sources could be identified at compile time, > so that "env.addBoundedSource(...)" is type safe and can return a > "BoundedDataStream". > - Possible to defer this discussion until later > > *Miscellaneous Comments* > > - Should the source have a TypeInformation for the produced type, instead > of a serializer? We need a type information in the stream anyways, and can > derive the serializer from that. Plus, creating the serializer should > respect the ExecutionConfig. > > - The TypeSerializer interface is very powerful but also not easy to > implement. Its purpose is to handle data super efficiently, support > flexible ways of evolution, etc. > For metadata I would suggest to look at the SimpleVersionedSerializer > instead, which is used for example for checkpoint master hooks, or for the > streaming file sink. I think that is is a good match for cases where we do > not need more than ser/deser (no copy, etc.) and don't need to push > versioning out of the serialization paths for best performance (as in the > TypeSerializer) > > > On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas < > k.klou...@data-artisans.com> > wrote: > > > Hi Biao, > > > > Thanks for the answer! > > > > So given the multi-threaded readers, now we have as open questions: > > > > 1) How do we let the checkpoints pass through our multi-threaded reader > > operator? > > > > 2) Do we have separate reader and source operators or not? In the > strategy > > that has a separate source, the source operator has a parallelism of 1 > and > > is responsible for split recovery only. > > > > For the first one, given also the constraints (blocking, finite queues, > > etc), I do not have an answer yet. > > > > For the 2nd, I think that we should go with separate operators for the > > source and the readers, for the following reasons: > > > > 1) This is more aligned with a potential future improvement where the > split > > discovery becomes a responsibility of the JobManager and readers are > > pooling more work from the JM. > > > > 2) The source is going to be the "single point of truth". It will know > what > > has been processed and what not. If the source and the readers are a > single > > operator with parallelism > 1, or in general, if the split discovery is > > done by each task individually, then: > > i) we have to have a deterministic scheme for each reader to assign > > splits to itself (e.g. mod subtaskId). This is not necessarily trivial > for > > all sources. > > ii) each reader would have to keep a copy of all its processed slpits > > iii) the state has to be a union state with a non-trivial merging > logic > > in order to support rescaling. > > > > Two additional points that you raised above: > > > > i) The point that you raised that we need to keep all splits (processed > and > > not-processed) I think is a bit of a strong requirement. This would imply > > that for infinite sources the state will grow indefinitely. This is > problem > > is even more pronounced if we do not have a single source that assigns > > splits to readers, as each reader will have its own copy of the state. > > > > ii) it is true that for finite sources we need to somehow not close the > > readers when the source/split discoverer finishes. The > > ContinuousFileReaderOperator has a work-around for that. It is not > elegant, > > and checkpoints are not emitted after closing the source, but this, I > > believe, is a bigger problem which requires more changes than just > > refactoring the source interface. > > > > Cheers, > > Kostas > > >