This proposal mentioned that SplitEnumerator might run on the JobManager or
in a single task on a TaskManager.

if enumerator is a single task on a taskmanager, then the job DAG can never
been embarrassingly parallel anymore. That will nullify the leverage of
fine-grained recovery for embarrassingly parallel jobs.

It's not clear to me what's the implication of running enumerator on the
jobmanager. So I will leave that out for now.

On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> wrote:

> Hi Stephan & Piotrek,
>
> Thank you for feedback.
>
> It seems that there are a lot of things to do in community. I am just
> afraid that this discussion may be forgotten since there so many proposals
> recently.
> Anyway, wish to see the split topics soon :)
>
> Piotr Nowojski <pi...@da-platform.com> 于2019年1月24日周四 下午8:21写道:
>
> > Hi Biao!
> >
> > This discussion was stalled because of preparations for the open sourcing
> > & merging Blink. I think before creating the tickets we should split this
> > discussion into topics/areas outlined by Stephan and create Flips for
> that.
> >
> > I think there is no chance for this to be completed in couple of
> remaining
> > weeks/1 month before 1.8 feature freeze, however it would be good to aim
> > with those changes for 1.9.
> >
> > Piotrek
> >
> > > On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> wrote:
> > >
> > > Hi community,
> > > The summary of Stephan makes a lot sense to me. It is much clearer
> indeed
> > > after splitting the complex topic into small ones.
> > > I was wondering is there any detail plan for next step? If not, I would
> > > like to push this thing forward by creating some JIRA issues.
> > > Another question is that should version 1.8 include these features?
> > >
> > > Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道:
> > >
> > >> Thanks everyone for the lively discussion. Let me try to summarize
> > where I
> > >> see convergence in the discussion and open issues.
> > >> I'll try to group this by design aspect of the source. Please let me
> > know
> > >> if I got things wrong or missed something crucial here.
> > >>
> > >> For issues 1-3, if the below reflects the state of the discussion, I
> > would
> > >> try and update the FLIP in the next days.
> > >> For the remaining ones we need more discussion.
> > >>
> > >> I would suggest to fork each of these aspects into a separate mail
> > thread,
> > >> or will loose sight of the individual aspects.
> > >>
> > >> *(1) Separation of Split Enumerator and Split Reader*
> > >>
> > >>  - All seem to agree this is a good thing
> > >>  - Split Enumerator could in the end live on JobManager (and assign
> > splits
> > >> via RPC) or in a task (and assign splits via data streams)
> > >>  - this discussion is orthogonal and should come later, when the
> > interface
> > >> is agreed upon.
> > >>
> > >> *(2) Split Readers for one or more splits*
> > >>
> > >>  - Discussion seems to agree that we need to support one reader that
> > >> possibly handles multiple splits concurrently.
> > >>  - The requirement comes from sources where one poll()-style call
> > fetches
> > >> data from different splits / partitions
> > >>    --> example sources that require that would be for example Kafka,
> > >> Pravega, Pulsar
> > >>
> > >>  - Could have one split reader per source, or multiple split readers
> > that
> > >> share the "poll()" function
> > >>  - To not make it too complicated, we can start with thinking about
> one
> > >> split reader for all splits initially and see if that covers all
> > >> requirements
> > >>
> > >> *(3) Threading model of the Split Reader*
> > >>
> > >>  - Most active part of the discussion ;-)
> > >>
> > >>  - A non-blocking way for Flink's task code to interact with the
> source
> > is
> > >> needed in order to a task runtime code based on a
> > >> single-threaded/actor-style task design
> > >>    --> I personally am a big proponent of that, it will help with
> > >> well-behaved checkpoints, efficiency, and simpler yet more robust
> > runtime
> > >> code
> > >>
> > >>  - Users care about simple abstraction, so as a subclass of
> SplitReader
> > >> (non-blocking / async) we need to have a BlockingSplitReader which
> will
> > >> form the basis of most source implementations. BlockingSplitReader
> lets
> > >> users do blocking simple poll() calls.
> > >>  - The BlockingSplitReader would spawn a thread (or more) and the
> > >> thread(s) can make blocking calls and hand over data buffers via a
> > blocking
> > >> queue
> > >>  - This should allow us to cover both, a fully async runtime, and a
> > simple
> > >> blocking interface for users.
> > >>  - This is actually very similar to how the Kafka connectors work.
> Kafka
> > >> 9+ with one thread, Kafka 8 with multiple threads
> > >>
> > >>  - On the base SplitReader (the async one), the non-blocking method
> that
> > >> gets the next chunk of data would signal data availability via a
> > >> CompletableFuture, because that gives the best flexibility (can await
> > >> completion or register notification handlers).
> > >>  - The source task would register a "thenHandle()" (or similar) on the
> > >> future to put a "take next data" task into the actor-style mailbox
> > >>
> > >> *(4) Split Enumeration and Assignment*
> > >>
> > >>  - Splits may be generated lazily, both in cases where there is a
> > limited
> > >> number of splits (but very many), or splits are discovered over time
> > >>  - Assignment should also be lazy, to get better load balancing
> > >>  - Assignment needs support locality preferences
> > >>
> > >>  - Possible design based on discussion so far:
> > >>
> > >>    --> SplitReader has a method "addSplits(SplitT...)" to add one or
> > more
> > >> splits. Some split readers might assume they have only one split ever,
> > >> concurrently, others assume multiple splits. (Note: idea behind being
> > able
> > >> to add multiple splits at the same time is to ease startup where
> > multiple
> > >> splits may be assigned instantly.)
> > >>    --> SplitReader has a context object on which it can call indicate
> > when
> > >> splits are completed. The enumerator gets that notification and can
> use
> > to
> > >> decide when to assign new splits. This should help both in cases of
> > sources
> > >> that take splits lazily (file readers) and in case the source needs to
> > >> preserve a partial order between splits (Kinesis, Pravega, Pulsar may
> > need
> > >> that).
> > >>    --> SplitEnumerator gets notification when SplitReaders start and
> > when
> > >> they finish splits. They can decide at that moment to push more splits
> > to
> > >> that reader
> > >>    --> The SplitEnumerator should probably be aware of the source
> > >> parallelism, to build its initial distribution.
> > >>
> > >>  - Open question: Should the source expose something like "host
> > >> preferences", so that yarn/mesos/k8s can take this into account when
> > >> selecting a node to start a TM on?
> > >>
> > >> *(5) Watermarks and event time alignment*
> > >>
> > >>  - Watermark generation, as well as idleness, needs to be per split
> > (like
> > >> currently in the Kafka Source, per partition)
> > >>  - It is desirable to support optional event-time-alignment, meaning
> > that
> > >> splits that are ahead are back-pressured or temporarily unsubscribed
> > >>
> > >>  - I think i would be desirable to encapsulate watermark generation
> > logic
> > >> in watermark generators, for a separation of concerns. The watermark
> > >> generators should run per split.
> > >>  - Using watermark generators would also help with another problem of
> > the
> > >> suggested interface, namely supporting non-periodic watermarks
> > efficiently.
> > >>
> > >>  - Need a way to "dispatch" next record to different watermark
> > generators
> > >>  - Need a way to tell SplitReader to "suspend" a split until a certain
> > >> watermark is reached (event time backpressure)
> > >>  - This would in fact be not needed (and thus simpler) if we had a
> > >> SplitReader per split and may be a reason to re-open that discussion
> > >>
> > >> *(6) Watermarks across splits and in the Split Enumerator*
> > >>
> > >>  - The split enumerator may need some watermark awareness, which
> should
> > be
> > >> purely based on split metadata (like create timestamp of file splits)
> > >>  - If there are still more splits with overlapping event time range
> for
> > a
> > >> split reader, then that split reader should not advance the watermark
> > >> within the split beyond the overlap boundary. Otherwise future splits
> > will
> > >> produce late data.
> > >>
> > >>  - One way to approach this could be that the split enumerator may
> send
> > >> watermarks to the readers, and the readers cannot emit watermarks
> beyond
> > >> that received watermark.
> > >>  - Many split enumerators would simply immediately send Long.MAX out
> and
> > >> leave the progress purely to the split readers.
> > >>
> > >>  - For event-time alignment / split back pressure, this begs the
> > question
> > >> how we can avoid deadlocks that may arise when splits are suspended
> for
> > >> event time back pressure,
> > >>
> > >> *(7) Batch and streaming Unification*
> > >>
> > >>  - Functionality wise, the above design should support both
> > >>  - Batch often (mostly) does not care about reading "in order" and
> > >> generating watermarks
> > >>    --> Might use different enumerator logic that is more locality
> aware
> > >> and ignores event time order
> > >>    --> Does not generate watermarks
> > >>  - Would be great if bounded sources could be identified at compile
> > time,
> > >> so that "env.addBoundedSource(...)" is type safe and can return a
> > >> "BoundedDataStream".
> > >>  - Possible to defer this discussion until later
> > >>
> > >> *Miscellaneous Comments*
> > >>
> > >>  - Should the source have a TypeInformation for the produced type,
> > instead
> > >> of a serializer? We need a type information in the stream anyways, and
> > can
> > >> derive the serializer from that. Plus, creating the serializer should
> > >> respect the ExecutionConfig.
> > >>
> > >>  - The TypeSerializer interface is very powerful but also not easy to
> > >> implement. Its purpose is to handle data super efficiently, support
> > >> flexible ways of evolution, etc.
> > >>  For metadata I would suggest to look at the SimpleVersionedSerializer
> > >> instead, which is used for example for checkpoint master hooks, or for
> > the
> > >> streaming file sink. I think that is is a good match for cases where
> we
> > do
> > >> not need more than ser/deser (no copy, etc.) and don't need to push
> > >> versioning out of the serialization paths for best performance (as in
> > the
> > >> TypeSerializer)
> > >>
> > >>
> > >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
> > >> k.klou...@data-artisans.com>
> > >> wrote:
> > >>
> > >>> Hi Biao,
> > >>>
> > >>> Thanks for the answer!
> > >>>
> > >>> So given the multi-threaded readers, now we have as open questions:
> > >>>
> > >>> 1) How do we let the checkpoints pass through our multi-threaded
> reader
> > >>> operator?
> > >>>
> > >>> 2) Do we have separate reader and source operators or not? In the
> > >> strategy
> > >>> that has a separate source, the source operator has a parallelism of
> 1
> > >> and
> > >>> is responsible for split recovery only.
> > >>>
> > >>> For the first one, given also the constraints (blocking, finite
> queues,
> > >>> etc), I do not have an answer yet.
> > >>>
> > >>> For the 2nd, I think that we should go with separate operators for
> the
> > >>> source and the readers, for the following reasons:
> > >>>
> > >>> 1) This is more aligned with a potential future improvement where the
> > >> split
> > >>> discovery becomes a responsibility of the JobManager and readers are
> > >>> pooling more work from the JM.
> > >>>
> > >>> 2) The source is going to be the "single point of truth". It will
> know
> > >> what
> > >>> has been processed and what not. If the source and the readers are a
> > >> single
> > >>> operator with parallelism > 1, or in general, if the split discovery
> is
> > >>> done by each task individually, then:
> > >>>   i) we have to have a deterministic scheme for each reader to assign
> > >>> splits to itself (e.g. mod subtaskId). This is not necessarily
> trivial
> > >> for
> > >>> all sources.
> > >>>   ii) each reader would have to keep a copy of all its processed
> slpits
> > >>>   iii) the state has to be a union state with a non-trivial merging
> > >> logic
> > >>> in order to support rescaling.
> > >>>
> > >>> Two additional points that you raised above:
> > >>>
> > >>> i) The point that you raised that we need to keep all splits
> (processed
> > >> and
> > >>> not-processed) I think is a bit of a strong requirement. This would
> > imply
> > >>> that for infinite sources the state will grow indefinitely. This is
> > >> problem
> > >>> is even more pronounced if we do not have a single source that
> assigns
> > >>> splits to readers, as each reader will have its own copy of the
> state.
> > >>>
> > >>> ii) it is true that for finite sources we need to somehow not close
> the
> > >>> readers when the source/split discoverer finishes. The
> > >>> ContinuousFileReaderOperator has a work-around for that. It is not
> > >> elegant,
> > >>> and checkpoints are not emitted after closing the source, but this, I
> > >>> believe, is a bigger problem which requires more changes than just
> > >>> refactoring the source interface.
> > >>>
> > >>> Cheers,
> > >>> Kostas
> > >>>
> > >>
> >
> >
>

Reply via email to