Hi Stephan & Piotrek,

Thank you for feedback.

It seems that there are a lot of things to do in community. I am just
afraid that this discussion may be forgotten since there so many proposals
recently.
Anyway, wish to see the split topics soon :)

Piotr Nowojski <pi...@da-platform.com> 于2019年1月24日周四 下午8:21写道:

> Hi Biao!
>
> This discussion was stalled because of preparations for the open sourcing
> & merging Blink. I think before creating the tickets we should split this
> discussion into topics/areas outlined by Stephan and create Flips for that.
>
> I think there is no chance for this to be completed in couple of remaining
> weeks/1 month before 1.8 feature freeze, however it would be good to aim
> with those changes for 1.9.
>
> Piotrek
>
> > On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> wrote:
> >
> > Hi community,
> > The summary of Stephan makes a lot sense to me. It is much clearer indeed
> > after splitting the complex topic into small ones.
> > I was wondering is there any detail plan for next step? If not, I would
> > like to push this thing forward by creating some JIRA issues.
> > Another question is that should version 1.8 include these features?
> >
> > Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道:
> >
> >> Thanks everyone for the lively discussion. Let me try to summarize
> where I
> >> see convergence in the discussion and open issues.
> >> I'll try to group this by design aspect of the source. Please let me
> know
> >> if I got things wrong or missed something crucial here.
> >>
> >> For issues 1-3, if the below reflects the state of the discussion, I
> would
> >> try and update the FLIP in the next days.
> >> For the remaining ones we need more discussion.
> >>
> >> I would suggest to fork each of these aspects into a separate mail
> thread,
> >> or will loose sight of the individual aspects.
> >>
> >> *(1) Separation of Split Enumerator and Split Reader*
> >>
> >>  - All seem to agree this is a good thing
> >>  - Split Enumerator could in the end live on JobManager (and assign
> splits
> >> via RPC) or in a task (and assign splits via data streams)
> >>  - this discussion is orthogonal and should come later, when the
> interface
> >> is agreed upon.
> >>
> >> *(2) Split Readers for one or more splits*
> >>
> >>  - Discussion seems to agree that we need to support one reader that
> >> possibly handles multiple splits concurrently.
> >>  - The requirement comes from sources where one poll()-style call
> fetches
> >> data from different splits / partitions
> >>    --> example sources that require that would be for example Kafka,
> >> Pravega, Pulsar
> >>
> >>  - Could have one split reader per source, or multiple split readers
> that
> >> share the "poll()" function
> >>  - To not make it too complicated, we can start with thinking about one
> >> split reader for all splits initially and see if that covers all
> >> requirements
> >>
> >> *(3) Threading model of the Split Reader*
> >>
> >>  - Most active part of the discussion ;-)
> >>
> >>  - A non-blocking way for Flink's task code to interact with the source
> is
> >> needed in order to a task runtime code based on a
> >> single-threaded/actor-style task design
> >>    --> I personally am a big proponent of that, it will help with
> >> well-behaved checkpoints, efficiency, and simpler yet more robust
> runtime
> >> code
> >>
> >>  - Users care about simple abstraction, so as a subclass of SplitReader
> >> (non-blocking / async) we need to have a BlockingSplitReader which will
> >> form the basis of most source implementations. BlockingSplitReader lets
> >> users do blocking simple poll() calls.
> >>  - The BlockingSplitReader would spawn a thread (or more) and the
> >> thread(s) can make blocking calls and hand over data buffers via a
> blocking
> >> queue
> >>  - This should allow us to cover both, a fully async runtime, and a
> simple
> >> blocking interface for users.
> >>  - This is actually very similar to how the Kafka connectors work. Kafka
> >> 9+ with one thread, Kafka 8 with multiple threads
> >>
> >>  - On the base SplitReader (the async one), the non-blocking method that
> >> gets the next chunk of data would signal data availability via a
> >> CompletableFuture, because that gives the best flexibility (can await
> >> completion or register notification handlers).
> >>  - The source task would register a "thenHandle()" (or similar) on the
> >> future to put a "take next data" task into the actor-style mailbox
> >>
> >> *(4) Split Enumeration and Assignment*
> >>
> >>  - Splits may be generated lazily, both in cases where there is a
> limited
> >> number of splits (but very many), or splits are discovered over time
> >>  - Assignment should also be lazy, to get better load balancing
> >>  - Assignment needs support locality preferences
> >>
> >>  - Possible design based on discussion so far:
> >>
> >>    --> SplitReader has a method "addSplits(SplitT...)" to add one or
> more
> >> splits. Some split readers might assume they have only one split ever,
> >> concurrently, others assume multiple splits. (Note: idea behind being
> able
> >> to add multiple splits at the same time is to ease startup where
> multiple
> >> splits may be assigned instantly.)
> >>    --> SplitReader has a context object on which it can call indicate
> when
> >> splits are completed. The enumerator gets that notification and can use
> to
> >> decide when to assign new splits. This should help both in cases of
> sources
> >> that take splits lazily (file readers) and in case the source needs to
> >> preserve a partial order between splits (Kinesis, Pravega, Pulsar may
> need
> >> that).
> >>    --> SplitEnumerator gets notification when SplitReaders start and
> when
> >> they finish splits. They can decide at that moment to push more splits
> to
> >> that reader
> >>    --> The SplitEnumerator should probably be aware of the source
> >> parallelism, to build its initial distribution.
> >>
> >>  - Open question: Should the source expose something like "host
> >> preferences", so that yarn/mesos/k8s can take this into account when
> >> selecting a node to start a TM on?
> >>
> >> *(5) Watermarks and event time alignment*
> >>
> >>  - Watermark generation, as well as idleness, needs to be per split
> (like
> >> currently in the Kafka Source, per partition)
> >>  - It is desirable to support optional event-time-alignment, meaning
> that
> >> splits that are ahead are back-pressured or temporarily unsubscribed
> >>
> >>  - I think i would be desirable to encapsulate watermark generation
> logic
> >> in watermark generators, for a separation of concerns. The watermark
> >> generators should run per split.
> >>  - Using watermark generators would also help with another problem of
> the
> >> suggested interface, namely supporting non-periodic watermarks
> efficiently.
> >>
> >>  - Need a way to "dispatch" next record to different watermark
> generators
> >>  - Need a way to tell SplitReader to "suspend" a split until a certain
> >> watermark is reached (event time backpressure)
> >>  - This would in fact be not needed (and thus simpler) if we had a
> >> SplitReader per split and may be a reason to re-open that discussion
> >>
> >> *(6) Watermarks across splits and in the Split Enumerator*
> >>
> >>  - The split enumerator may need some watermark awareness, which should
> be
> >> purely based on split metadata (like create timestamp of file splits)
> >>  - If there are still more splits with overlapping event time range for
> a
> >> split reader, then that split reader should not advance the watermark
> >> within the split beyond the overlap boundary. Otherwise future splits
> will
> >> produce late data.
> >>
> >>  - One way to approach this could be that the split enumerator may send
> >> watermarks to the readers, and the readers cannot emit watermarks beyond
> >> that received watermark.
> >>  - Many split enumerators would simply immediately send Long.MAX out and
> >> leave the progress purely to the split readers.
> >>
> >>  - For event-time alignment / split back pressure, this begs the
> question
> >> how we can avoid deadlocks that may arise when splits are suspended for
> >> event time back pressure,
> >>
> >> *(7) Batch and streaming Unification*
> >>
> >>  - Functionality wise, the above design should support both
> >>  - Batch often (mostly) does not care about reading "in order" and
> >> generating watermarks
> >>    --> Might use different enumerator logic that is more locality aware
> >> and ignores event time order
> >>    --> Does not generate watermarks
> >>  - Would be great if bounded sources could be identified at compile
> time,
> >> so that "env.addBoundedSource(...)" is type safe and can return a
> >> "BoundedDataStream".
> >>  - Possible to defer this discussion until later
> >>
> >> *Miscellaneous Comments*
> >>
> >>  - Should the source have a TypeInformation for the produced type,
> instead
> >> of a serializer? We need a type information in the stream anyways, and
> can
> >> derive the serializer from that. Plus, creating the serializer should
> >> respect the ExecutionConfig.
> >>
> >>  - The TypeSerializer interface is very powerful but also not easy to
> >> implement. Its purpose is to handle data super efficiently, support
> >> flexible ways of evolution, etc.
> >>  For metadata I would suggest to look at the SimpleVersionedSerializer
> >> instead, which is used for example for checkpoint master hooks, or for
> the
> >> streaming file sink. I think that is is a good match for cases where we
> do
> >> not need more than ser/deser (no copy, etc.) and don't need to push
> >> versioning out of the serialization paths for best performance (as in
> the
> >> TypeSerializer)
> >>
> >>
> >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
> >> k.klou...@data-artisans.com>
> >> wrote:
> >>
> >>> Hi Biao,
> >>>
> >>> Thanks for the answer!
> >>>
> >>> So given the multi-threaded readers, now we have as open questions:
> >>>
> >>> 1) How do we let the checkpoints pass through our multi-threaded reader
> >>> operator?
> >>>
> >>> 2) Do we have separate reader and source operators or not? In the
> >> strategy
> >>> that has a separate source, the source operator has a parallelism of 1
> >> and
> >>> is responsible for split recovery only.
> >>>
> >>> For the first one, given also the constraints (blocking, finite queues,
> >>> etc), I do not have an answer yet.
> >>>
> >>> For the 2nd, I think that we should go with separate operators for the
> >>> source and the readers, for the following reasons:
> >>>
> >>> 1) This is more aligned with a potential future improvement where the
> >> split
> >>> discovery becomes a responsibility of the JobManager and readers are
> >>> pooling more work from the JM.
> >>>
> >>> 2) The source is going to be the "single point of truth". It will know
> >> what
> >>> has been processed and what not. If the source and the readers are a
> >> single
> >>> operator with parallelism > 1, or in general, if the split discovery is
> >>> done by each task individually, then:
> >>>   i) we have to have a deterministic scheme for each reader to assign
> >>> splits to itself (e.g. mod subtaskId). This is not necessarily trivial
> >> for
> >>> all sources.
> >>>   ii) each reader would have to keep a copy of all its processed slpits
> >>>   iii) the state has to be a union state with a non-trivial merging
> >> logic
> >>> in order to support rescaling.
> >>>
> >>> Two additional points that you raised above:
> >>>
> >>> i) The point that you raised that we need to keep all splits (processed
> >> and
> >>> not-processed) I think is a bit of a strong requirement. This would
> imply
> >>> that for infinite sources the state will grow indefinitely. This is
> >> problem
> >>> is even more pronounced if we do not have a single source that assigns
> >>> splits to readers, as each reader will have its own copy of the state.
> >>>
> >>> ii) it is true that for finite sources we need to somehow not close the
> >>> readers when the source/split discoverer finishes. The
> >>> ContinuousFileReaderOperator has a work-around for that. It is not
> >> elegant,
> >>> and checkpoints are not emitted after closing the source, but this, I
> >>> believe, is a bigger problem which requires more changes than just
> >>> refactoring the source interface.
> >>>
> >>> Cheers,
> >>> Kostas
> >>>
> >>
>
>

Reply via email to