Hi Biao! This discussion was stalled because of preparations for the open sourcing & merging Blink. I think before creating the tickets we should split this discussion into topics/areas outlined by Stephan and create Flips for that.
I think there is no chance for this to be completed in couple of remaining weeks/1 month before 1.8 feature freeze, however it would be good to aim with those changes for 1.9. Piotrek > On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> wrote: > > Hi community, > The summary of Stephan makes a lot sense to me. It is much clearer indeed > after splitting the complex topic into small ones. > I was wondering is there any detail plan for next step? If not, I would > like to push this thing forward by creating some JIRA issues. > Another question is that should version 1.8 include these features? > > Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道: > >> Thanks everyone for the lively discussion. Let me try to summarize where I >> see convergence in the discussion and open issues. >> I'll try to group this by design aspect of the source. Please let me know >> if I got things wrong or missed something crucial here. >> >> For issues 1-3, if the below reflects the state of the discussion, I would >> try and update the FLIP in the next days. >> For the remaining ones we need more discussion. >> >> I would suggest to fork each of these aspects into a separate mail thread, >> or will loose sight of the individual aspects. >> >> *(1) Separation of Split Enumerator and Split Reader* >> >> - All seem to agree this is a good thing >> - Split Enumerator could in the end live on JobManager (and assign splits >> via RPC) or in a task (and assign splits via data streams) >> - this discussion is orthogonal and should come later, when the interface >> is agreed upon. >> >> *(2) Split Readers for one or more splits* >> >> - Discussion seems to agree that we need to support one reader that >> possibly handles multiple splits concurrently. >> - The requirement comes from sources where one poll()-style call fetches >> data from different splits / partitions >> --> example sources that require that would be for example Kafka, >> Pravega, Pulsar >> >> - Could have one split reader per source, or multiple split readers that >> share the "poll()" function >> - To not make it too complicated, we can start with thinking about one >> split reader for all splits initially and see if that covers all >> requirements >> >> *(3) Threading model of the Split Reader* >> >> - Most active part of the discussion ;-) >> >> - A non-blocking way for Flink's task code to interact with the source is >> needed in order to a task runtime code based on a >> single-threaded/actor-style task design >> --> I personally am a big proponent of that, it will help with >> well-behaved checkpoints, efficiency, and simpler yet more robust runtime >> code >> >> - Users care about simple abstraction, so as a subclass of SplitReader >> (non-blocking / async) we need to have a BlockingSplitReader which will >> form the basis of most source implementations. BlockingSplitReader lets >> users do blocking simple poll() calls. >> - The BlockingSplitReader would spawn a thread (or more) and the >> thread(s) can make blocking calls and hand over data buffers via a blocking >> queue >> - This should allow us to cover both, a fully async runtime, and a simple >> blocking interface for users. >> - This is actually very similar to how the Kafka connectors work. Kafka >> 9+ with one thread, Kafka 8 with multiple threads >> >> - On the base SplitReader (the async one), the non-blocking method that >> gets the next chunk of data would signal data availability via a >> CompletableFuture, because that gives the best flexibility (can await >> completion or register notification handlers). >> - The source task would register a "thenHandle()" (or similar) on the >> future to put a "take next data" task into the actor-style mailbox >> >> *(4) Split Enumeration and Assignment* >> >> - Splits may be generated lazily, both in cases where there is a limited >> number of splits (but very many), or splits are discovered over time >> - Assignment should also be lazy, to get better load balancing >> - Assignment needs support locality preferences >> >> - Possible design based on discussion so far: >> >> --> SplitReader has a method "addSplits(SplitT...)" to add one or more >> splits. Some split readers might assume they have only one split ever, >> concurrently, others assume multiple splits. (Note: idea behind being able >> to add multiple splits at the same time is to ease startup where multiple >> splits may be assigned instantly.) >> --> SplitReader has a context object on which it can call indicate when >> splits are completed. The enumerator gets that notification and can use to >> decide when to assign new splits. This should help both in cases of sources >> that take splits lazily (file readers) and in case the source needs to >> preserve a partial order between splits (Kinesis, Pravega, Pulsar may need >> that). >> --> SplitEnumerator gets notification when SplitReaders start and when >> they finish splits. They can decide at that moment to push more splits to >> that reader >> --> The SplitEnumerator should probably be aware of the source >> parallelism, to build its initial distribution. >> >> - Open question: Should the source expose something like "host >> preferences", so that yarn/mesos/k8s can take this into account when >> selecting a node to start a TM on? >> >> *(5) Watermarks and event time alignment* >> >> - Watermark generation, as well as idleness, needs to be per split (like >> currently in the Kafka Source, per partition) >> - It is desirable to support optional event-time-alignment, meaning that >> splits that are ahead are back-pressured or temporarily unsubscribed >> >> - I think i would be desirable to encapsulate watermark generation logic >> in watermark generators, for a separation of concerns. The watermark >> generators should run per split. >> - Using watermark generators would also help with another problem of the >> suggested interface, namely supporting non-periodic watermarks efficiently. >> >> - Need a way to "dispatch" next record to different watermark generators >> - Need a way to tell SplitReader to "suspend" a split until a certain >> watermark is reached (event time backpressure) >> - This would in fact be not needed (and thus simpler) if we had a >> SplitReader per split and may be a reason to re-open that discussion >> >> *(6) Watermarks across splits and in the Split Enumerator* >> >> - The split enumerator may need some watermark awareness, which should be >> purely based on split metadata (like create timestamp of file splits) >> - If there are still more splits with overlapping event time range for a >> split reader, then that split reader should not advance the watermark >> within the split beyond the overlap boundary. Otherwise future splits will >> produce late data. >> >> - One way to approach this could be that the split enumerator may send >> watermarks to the readers, and the readers cannot emit watermarks beyond >> that received watermark. >> - Many split enumerators would simply immediately send Long.MAX out and >> leave the progress purely to the split readers. >> >> - For event-time alignment / split back pressure, this begs the question >> how we can avoid deadlocks that may arise when splits are suspended for >> event time back pressure, >> >> *(7) Batch and streaming Unification* >> >> - Functionality wise, the above design should support both >> - Batch often (mostly) does not care about reading "in order" and >> generating watermarks >> --> Might use different enumerator logic that is more locality aware >> and ignores event time order >> --> Does not generate watermarks >> - Would be great if bounded sources could be identified at compile time, >> so that "env.addBoundedSource(...)" is type safe and can return a >> "BoundedDataStream". >> - Possible to defer this discussion until later >> >> *Miscellaneous Comments* >> >> - Should the source have a TypeInformation for the produced type, instead >> of a serializer? We need a type information in the stream anyways, and can >> derive the serializer from that. Plus, creating the serializer should >> respect the ExecutionConfig. >> >> - The TypeSerializer interface is very powerful but also not easy to >> implement. Its purpose is to handle data super efficiently, support >> flexible ways of evolution, etc. >> For metadata I would suggest to look at the SimpleVersionedSerializer >> instead, which is used for example for checkpoint master hooks, or for the >> streaming file sink. I think that is is a good match for cases where we do >> not need more than ser/deser (no copy, etc.) and don't need to push >> versioning out of the serialization paths for best performance (as in the >> TypeSerializer) >> >> >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas < >> k.klou...@data-artisans.com> >> wrote: >> >>> Hi Biao, >>> >>> Thanks for the answer! >>> >>> So given the multi-threaded readers, now we have as open questions: >>> >>> 1) How do we let the checkpoints pass through our multi-threaded reader >>> operator? >>> >>> 2) Do we have separate reader and source operators or not? In the >> strategy >>> that has a separate source, the source operator has a parallelism of 1 >> and >>> is responsible for split recovery only. >>> >>> For the first one, given also the constraints (blocking, finite queues, >>> etc), I do not have an answer yet. >>> >>> For the 2nd, I think that we should go with separate operators for the >>> source and the readers, for the following reasons: >>> >>> 1) This is more aligned with a potential future improvement where the >> split >>> discovery becomes a responsibility of the JobManager and readers are >>> pooling more work from the JM. >>> >>> 2) The source is going to be the "single point of truth". It will know >> what >>> has been processed and what not. If the source and the readers are a >> single >>> operator with parallelism > 1, or in general, if the split discovery is >>> done by each task individually, then: >>> i) we have to have a deterministic scheme for each reader to assign >>> splits to itself (e.g. mod subtaskId). This is not necessarily trivial >> for >>> all sources. >>> ii) each reader would have to keep a copy of all its processed slpits >>> iii) the state has to be a union state with a non-trivial merging >> logic >>> in order to support rescaling. >>> >>> Two additional points that you raised above: >>> >>> i) The point that you raised that we need to keep all splits (processed >> and >>> not-processed) I think is a bit of a strong requirement. This would imply >>> that for infinite sources the state will grow indefinitely. This is >> problem >>> is even more pronounced if we do not have a single source that assigns >>> splits to readers, as each reader will have its own copy of the state. >>> >>> ii) it is true that for finite sources we need to somehow not close the >>> readers when the source/split discoverer finishes. The >>> ContinuousFileReaderOperator has a work-around for that. It is not >> elegant, >>> and checkpoints are not emitted after closing the source, but this, I >>> believe, is a bigger problem which requires more changes than just >>> refactoring the source interface. >>> >>> Cheers, >>> Kostas >>> >>