Hi community,
The summary of Stephan makes a lot sense to me. It is much clearer indeed
after splitting the complex topic into small ones.
I was wondering is there any detail plan for next step? If not, I would
like to push this thing forward by creating some JIRA issues.
Another question is that should version 1.8 include these features?

Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道:

> Thanks everyone for the lively discussion. Let me try to summarize where I
> see convergence in the discussion and open issues.
> I'll try to group this by design aspect of the source. Please let me know
> if I got things wrong or missed something crucial here.
>
> For issues 1-3, if the below reflects the state of the discussion, I would
> try and update the FLIP in the next days.
> For the remaining ones we need more discussion.
>
> I would suggest to fork each of these aspects into a separate mail thread,
> or will loose sight of the individual aspects.
>
> *(1) Separation of Split Enumerator and Split Reader*
>
>   - All seem to agree this is a good thing
>   - Split Enumerator could in the end live on JobManager (and assign splits
> via RPC) or in a task (and assign splits via data streams)
>   - this discussion is orthogonal and should come later, when the interface
> is agreed upon.
>
> *(2) Split Readers for one or more splits*
>
>   - Discussion seems to agree that we need to support one reader that
> possibly handles multiple splits concurrently.
>   - The requirement comes from sources where one poll()-style call fetches
> data from different splits / partitions
>     --> example sources that require that would be for example Kafka,
> Pravega, Pulsar
>
>   - Could have one split reader per source, or multiple split readers that
> share the "poll()" function
>   - To not make it too complicated, we can start with thinking about one
> split reader for all splits initially and see if that covers all
> requirements
>
> *(3) Threading model of the Split Reader*
>
>   - Most active part of the discussion ;-)
>
>   - A non-blocking way for Flink's task code to interact with the source is
> needed in order to a task runtime code based on a
> single-threaded/actor-style task design
>     --> I personally am a big proponent of that, it will help with
> well-behaved checkpoints, efficiency, and simpler yet more robust runtime
> code
>
>   - Users care about simple abstraction, so as a subclass of SplitReader
> (non-blocking / async) we need to have a BlockingSplitReader which will
> form the basis of most source implementations. BlockingSplitReader lets
> users do blocking simple poll() calls.
>   - The BlockingSplitReader would spawn a thread (or more) and the
> thread(s) can make blocking calls and hand over data buffers via a blocking
> queue
>   - This should allow us to cover both, a fully async runtime, and a simple
> blocking interface for users.
>   - This is actually very similar to how the Kafka connectors work. Kafka
> 9+ with one thread, Kafka 8 with multiple threads
>
>   - On the base SplitReader (the async one), the non-blocking method that
> gets the next chunk of data would signal data availability via a
> CompletableFuture, because that gives the best flexibility (can await
> completion or register notification handlers).
>   - The source task would register a "thenHandle()" (or similar) on the
> future to put a "take next data" task into the actor-style mailbox
>
> *(4) Split Enumeration and Assignment*
>
>   - Splits may be generated lazily, both in cases where there is a limited
> number of splits (but very many), or splits are discovered over time
>   - Assignment should also be lazy, to get better load balancing
>   - Assignment needs support locality preferences
>
>   - Possible design based on discussion so far:
>
>     --> SplitReader has a method "addSplits(SplitT...)" to add one or more
> splits. Some split readers might assume they have only one split ever,
> concurrently, others assume multiple splits. (Note: idea behind being able
> to add multiple splits at the same time is to ease startup where multiple
> splits may be assigned instantly.)
>     --> SplitReader has a context object on which it can call indicate when
> splits are completed. The enumerator gets that notification and can use to
> decide when to assign new splits. This should help both in cases of sources
> that take splits lazily (file readers) and in case the source needs to
> preserve a partial order between splits (Kinesis, Pravega, Pulsar may need
> that).
>     --> SplitEnumerator gets notification when SplitReaders start and when
> they finish splits. They can decide at that moment to push more splits to
> that reader
>     --> The SplitEnumerator should probably be aware of the source
> parallelism, to build its initial distribution.
>
>   - Open question: Should the source expose something like "host
> preferences", so that yarn/mesos/k8s can take this into account when
> selecting a node to start a TM on?
>
> *(5) Watermarks and event time alignment*
>
>   - Watermark generation, as well as idleness, needs to be per split (like
> currently in the Kafka Source, per partition)
>   - It is desirable to support optional event-time-alignment, meaning that
> splits that are ahead are back-pressured or temporarily unsubscribed
>
>   - I think i would be desirable to encapsulate watermark generation logic
> in watermark generators, for a separation of concerns. The watermark
> generators should run per split.
>   - Using watermark generators would also help with another problem of the
> suggested interface, namely supporting non-periodic watermarks efficiently.
>
>   - Need a way to "dispatch" next record to different watermark generators
>   - Need a way to tell SplitReader to "suspend" a split until a certain
> watermark is reached (event time backpressure)
>   - This would in fact be not needed (and thus simpler) if we had a
> SplitReader per split and may be a reason to re-open that discussion
>
> *(6) Watermarks across splits and in the Split Enumerator*
>
>   - The split enumerator may need some watermark awareness, which should be
> purely based on split metadata (like create timestamp of file splits)
>   - If there are still more splits with overlapping event time range for a
> split reader, then that split reader should not advance the watermark
> within the split beyond the overlap boundary. Otherwise future splits will
> produce late data.
>
>   - One way to approach this could be that the split enumerator may send
> watermarks to the readers, and the readers cannot emit watermarks beyond
> that received watermark.
>   - Many split enumerators would simply immediately send Long.MAX out and
> leave the progress purely to the split readers.
>
>   - For event-time alignment / split back pressure, this begs the question
> how we can avoid deadlocks that may arise when splits are suspended for
> event time back pressure,
>
> *(7) Batch and streaming Unification*
>
>   - Functionality wise, the above design should support both
>   - Batch often (mostly) does not care about reading "in order" and
> generating watermarks
>     --> Might use different enumerator logic that is more locality aware
> and ignores event time order
>     --> Does not generate watermarks
>   - Would be great if bounded sources could be identified at compile time,
> so that "env.addBoundedSource(...)" is type safe and can return a
> "BoundedDataStream".
>   - Possible to defer this discussion until later
>
> *Miscellaneous Comments*
>
>   - Should the source have a TypeInformation for the produced type, instead
> of a serializer? We need a type information in the stream anyways, and can
> derive the serializer from that. Plus, creating the serializer should
> respect the ExecutionConfig.
>
>   - The TypeSerializer interface is very powerful but also not easy to
> implement. Its purpose is to handle data super efficiently, support
> flexible ways of evolution, etc.
>   For metadata I would suggest to look at the SimpleVersionedSerializer
> instead, which is used for example for checkpoint master hooks, or for the
> streaming file sink. I think that is is a good match for cases where we do
> not need more than ser/deser (no copy, etc.) and don't need to push
> versioning out of the serialization paths for best performance (as in the
> TypeSerializer)
>
>
> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
> k.klou...@data-artisans.com>
> wrote:
>
> > Hi Biao,
> >
> > Thanks for the answer!
> >
> > So given the multi-threaded readers, now we have as open questions:
> >
> > 1) How do we let the checkpoints pass through our multi-threaded reader
> > operator?
> >
> > 2) Do we have separate reader and source operators or not? In the
> strategy
> > that has a separate source, the source operator has a parallelism of 1
> and
> > is responsible for split recovery only.
> >
> > For the first one, given also the constraints (blocking, finite queues,
> > etc), I do not have an answer yet.
> >
> > For the 2nd, I think that we should go with separate operators for the
> > source and the readers, for the following reasons:
> >
> > 1) This is more aligned with a potential future improvement where the
> split
> > discovery becomes a responsibility of the JobManager and readers are
> > pooling more work from the JM.
> >
> > 2) The source is going to be the "single point of truth". It will know
> what
> > has been processed and what not. If the source and the readers are a
> single
> > operator with parallelism > 1, or in general, if the split discovery is
> > done by each task individually, then:
> >    i) we have to have a deterministic scheme for each reader to assign
> > splits to itself (e.g. mod subtaskId). This is not necessarily trivial
> for
> > all sources.
> >    ii) each reader would have to keep a copy of all its processed slpits
> >    iii) the state has to be a union state with a non-trivial merging
> logic
> > in order to support rescaling.
> >
> > Two additional points that you raised above:
> >
> > i) The point that you raised that we need to keep all splits (processed
> and
> > not-processed) I think is a bit of a strong requirement. This would imply
> > that for infinite sources the state will grow indefinitely. This is
> problem
> > is even more pronounced if we do not have a single source that assigns
> > splits to readers, as each reader will have its own copy of the state.
> >
> > ii) it is true that for finite sources we need to somehow not close the
> > readers when the source/split discoverer finishes. The
> > ContinuousFileReaderOperator has a work-around for that. It is not
> elegant,
> > and checkpoints are not emitted after closing the source, but this, I
> > believe, is a bigger problem which requires more changes than just
> > refactoring the source interface.
> >
> > Cheers,
> > Kostas
> >
>

Reply via email to