Thanks, Becket, for updating this.

I agree with moving the aspects you mentioned into separate FLIPs - this
one way becoming unwieldy in size.

+1 to the FLIP in its current state. Its a very detailed write-up, nicely
done!

On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com> wrote:

> Hi all,
>
> Sorry for the long belated update. I have updated FLIP-27 wiki page with
> the latest proposals. Some noticeable changes include:
> 1. A new generic communication mechanism between SplitEnumerator and
> SourceReader.
> 2. Some detail API method signature changes.
>
> We left a few things out of this FLIP and will address them in separate
> FLIPs. Including:
> 1. Per split event time.
> 2. Event time alignment.
> 3. Fine grained failover for SplitEnumerator failure.
>
> Please let us know if you have any question.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org> wrote:
>
> > Hi  Łukasz!
> >
> > Becket and me are working hard on figuring out the last details and
> > implementing the first PoC. We would update the FLIP hopefully next week.
> >
> > There is a fair chance that a first version of this will be in 1.10, but
> I
> > think it will take another release to battle test it and migrate the
> > connectors.
> >
> > Best,
> > Stephan
> >
> >
> >
> >
> > On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl>
> wrote:
> >
> > > Hi,
> > >
> > > This proposal looks very promising for us. Do you have any plans in
> which
> > > Flink release it is going to be released? We are thinking on using a
> Data
> > > Set API for our future use cases but on the other hand Data Set API is
> > > going to be deprecated so using proposed bounded data streams solution
> > > could be more viable in the long term.
> > >
> > > Thanks,
> > > Łukasz
> > >
> > > On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com> wrote:
> > > > Thanks for putting together this proposal!
> > > >
> > > > I see that the "Per Split Event Time" and "Event Time Alignment"
> > sections
> > > > are still TBD.
> > > >
> > > > It would probably be good to flesh those out a bit before proceeding
> > too
> > > far
> > > > as the event time alignment will probably influence the interaction
> > with
> > > > the split reader, specifically ReaderStatus emitNext(SourceOutput<E>
> > > > output).
> > > >
> > > > We currently have only one implementation for event time alignment in
> > the
> > > > Kinesis consumer. The synchronization in that case takes place as the
> > > last
> > > > step before records are emitted downstream (RecordEmitter). With the
> > > > currently proposed interfaces, the equivalent can be implemented in
> the
> > > > reader loop, although note that in the Kinesis consumer the per shard
> > > > threads push records.
> > > >
> > > > Synchronization has not been implemented for the Kafka consumer yet.
> > > >
> > > > https://issues.apache.org/jira/browse/FLINK-12675
> > > >
> > > > When I looked at it, I realized that the implementation will look
> quite
> > > > different
> > > > from Kinesis because it needs to take place in the pull part, where
> > > records
> > > > are taken from the Kafka client. Due to the multiplexing it cannot be
> > > done
> > > > by blocking the split thread like it currently works for Kinesis.
> > Reading
> > > > from individual Kafka partitions needs to be controlled via
> > pause/resume
> > > > on the Kafka client.
> > > >
> > > > To take on that responsibility the split thread would need to be
> aware
> > of
> > > > the
> > > > watermarks or at least whether it should or should not continue to
> > > consume
> > > > a given split and this may require a different SourceReader or
> > > SourceOutput
> > > > interface.
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > > On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com> wrote:
> > > >
> > > > > Hi Stephan,
> > > > >
> > > > > Thank you for feedback!
> > > > > Will take a look at your branch before public discussing.
> > > > >
> > > > >
> > > > > On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org>
> > > wrote:
> > > > >
> > > > > > Hi Biao!
> > > > > >
> > > > > > Thanks for reviving this. I would like to join this discussion,
> but
> > > am
> > > > > > quite occupied with the 1.9 release, so can we maybe pause this
> > > > > discussion
> > > > > > for a week or so?
> > > > > >
> > > > > > In the meantime I can share some suggestion based on prior
> > > experiments:
> > > > > >
> > > > > > How to do watermarks / timestamp extractors in a simpler and more
> > > > > flexible
> > > > > > way. I think that part is quite promising should be part of the
> new
> > > > > source
> > > > > > interface.
> > > > > >
> > > > > >
> > > > >
> > >
> >
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > >
> >
> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
> > > > > >
> > > > > >
> > > > > >
> > > > > > Some experiments on how to build the source reader and its
> library
> > > for
> > > > > > common threading/split patterns:
> > > > > >
> > > > > >
> > > > >
> > >
> >
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Stephan
> > > > > >
> > > > > >
> > > > > > On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com>
> > > wrote:
> > > > > >
> > > > > >> Hi devs,
> > > > > >>
> > > > > >> Since 1.9 is nearly released, I think we could get back to
> > FLIP-27.
> > > I
> > > > > >> believe it should be included in 1.10.
> > > > > >>
> > > > > >> There are so many things mentioned in document of FLIP-27. [1] I
> > > think
> > > > > >> we'd better discuss them separately. However the wiki is not a
> > good
> > > > > place
> > > > > >> to discuss. I wrote google doc about SplitReader API which
> misses
> > > some
> > > > > >> details in the document. [2]
> > > > > >>
> > > > > >> 1.
> > > > > >>
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> > > > > >> 2.
> > > > > >>
> > > > >
> > >
> >
> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
> > > > > >>
> > > > > >> CC Stephan, Aljoscha, Piotrek, Becket
> > > > > >>
> > > > > >>
> > > > > >> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com>
> > > wrote:
> > > > > >>
> > > > > >>> Hi Steven,
> > > > > >>> Thank you for the feedback. Please take a look at the document
> > > FLIP-27
> > > > > >>> <
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > > >
> > > > > which
> > > > > >>> is updated recently. A lot of details of enumerator were added
> in
> > > this
> > > > > >>> document. I think it would help.
> > > > > >>>
> > > > > >>> Steven Wu <stevenz...@gmail.com> 于2019年3月28日周四 下午12:52写道:
> > > > > >>>
> > > > > >>>> This proposal mentioned that SplitEnumerator might run on the
> > > > > >>>> JobManager or
> > > > > >>>> in a single task on a TaskManager.
> > > > > >>>>
> > > > > >>>> if enumerator is a single task on a taskmanager, then the job
> > DAG
> > > can
> > > > > >>>> never
> > > > > >>>> been embarrassingly parallel anymore. That will nullify the
> > > leverage
> > > > > of
> > > > > >>>> fine-grained recovery for embarrassingly parallel jobs.
> > > > > >>>>
> > > > > >>>> It's not clear to me what's the implication of running
> > enumerator
> > > on
> > > > > the
> > > > > >>>> jobmanager. So I will leave that out for now.
> > > > > >>>>
> > > > > >>>> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com>
> > > wrote:
> > > > > >>>>
> > > > > >>>> > Hi Stephan & Piotrek,
> > > > > >>>> >
> > > > > >>>> > Thank you for feedback.
> > > > > >>>> >
> > > > > >>>> > It seems that there are a lot of things to do in community.
> I
> > am
> > > > > just
> > > > > >>>> > afraid that this discussion may be forgotten since there so
> > many
> > > > > >>>> proposals
> > > > > >>>> > recently.
> > > > > >>>> > Anyway, wish to see the split topics soon :)
> > > > > >>>> >
> > > > > >>>> > Piotr Nowojski <pi...@da-platform.com> 于2019年1月24日周四
> > 下午8:21写道:
> > > > > >>>> >
> > > > > >>>> > > Hi Biao!
> > > > > >>>> > >
> > > > > >>>> > > This discussion was stalled because of preparations for
> the
> > > open
> > > > > >>>> sourcing
> > > > > >>>> > > & merging Blink. I think before creating the tickets we
> > should
> > > > > >>>> split this
> > > > > >>>> > > discussion into topics/areas outlined by Stephan and
> create
> > > Flips
> > > > > >>>> for
> > > > > >>>> > that.
> > > > > >>>> > >
> > > > > >>>> > > I think there is no chance for this to be completed in
> > couple
> > > of
> > > > > >>>> > remaining
> > > > > >>>> > > weeks/1 month before 1.8 feature freeze, however it would
> be
> > > good
> > > > > >>>> to aim
> > > > > >>>> > > with those changes for 1.9.
> > > > > >>>> > >
> > > > > >>>> > > Piotrek
> > > > > >>>> > >
> > > > > >>>> > > > On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com>
> > > wrote:
> > > > > >>>> > > >
> > > > > >>>> > > > Hi community,
> > > > > >>>> > > > The summary of Stephan makes a lot sense to me. It is
> much
> > > > > clearer
> > > > > >>>> > indeed
> > > > > >>>> > > > after splitting the complex topic into small ones.
> > > > > >>>> > > > I was wondering is there any detail plan for next step?
> If
> > > not,
> > > > > I
> > > > > >>>> would
> > > > > >>>> > > > like to push this thing forward by creating some JIRA
> > > issues.
> > > > > >>>> > > > Another question is that should version 1.8 include
> these
> > > > > >>>> features?
> > > > > >>>> > > >
> > > > > >>>> > > > Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道:
> > > > > >>>> > > >
> > > > > >>>> > > >> Thanks everyone for the lively discussion. Let me try
> to
> > > > > >>>> summarize
> > > > > >>>> > > where I
> > > > > >>>> > > >> see convergence in the discussion and open issues.
> > > > > >>>> > > >> I'll try to group this by design aspect of the source.
> > > Please
> > > > > >>>> let me
> > > > > >>>> > > know
> > > > > >>>> > > >> if I got things wrong or missed something crucial here.
> > > > > >>>> > > >>
> > > > > >>>> > > >> For issues 1-3, if the below reflects the state of the
> > > > > >>>> discussion, I
> > > > > >>>> > > would
> > > > > >>>> > > >> try and update the FLIP in the next days.
> > > > > >>>> > > >> For the remaining ones we need more discussion.
> > > > > >>>> > > >>
> > > > > >>>> > > >> I would suggest to fork each of these aspects into a
> > > separate
> > > > > >>>> mail
> > > > > >>>> > > thread,
> > > > > >>>> > > >> or will loose sight of the individual aspects.
> > > > > >>>> > > >>
> > > > > >>>> > > >> *(1) Separation of Split Enumerator and Split Reader*
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - All seem to agree this is a good thing
> > > > > >>>> > > >>  - Split Enumerator could in the end live on JobManager
> > > (and
> > > > > >>>> assign
> > > > > >>>> > > splits
> > > > > >>>> > > >> via RPC) or in a task (and assign splits via data
> > streams)
> > > > > >>>> > > >>  - this discussion is orthogonal and should come later,
> > > when
> > > > > the
> > > > > >>>> > > interface
> > > > > >>>> > > >> is agreed upon.
> > > > > >>>> > > >>
> > > > > >>>> > > >> *(2) Split Readers for one or more splits*
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - Discussion seems to agree that we need to support
> one
> > > reader
> > > > > >>>> that
> > > > > >>>> > > >> possibly handles multiple splits concurrently.
> > > > > >>>> > > >>  - The requirement comes from sources where one
> > > poll()-style
> > > > > call
> > > > > >>>> > > fetches
> > > > > >>>> > > >> data from different splits / partitions
> > > > > >>>> > > >>    --> example sources that require that would be for
> > > example
> > > > > >>>> Kafka,
> > > > > >>>> > > >> Pravega, Pulsar
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - Could have one split reader per source, or multiple
> > > split
> > > > > >>>> readers
> > > > > >>>> > > that
> > > > > >>>> > > >> share the "poll()" function
> > > > > >>>> > > >>  - To not make it too complicated, we can start with
> > > thinking
> > > > > >>>> about
> > > > > >>>> > one
> > > > > >>>> > > >> split reader for all splits initially and see if that
> > > covers
> > > > > all
> > > > > >>>> > > >> requirements
> > > > > >>>> > > >>
> > > > > >>>> > > >> *(3) Threading model of the Split Reader*
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - Most active part of the discussion ;-)
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - A non-blocking way for Flink's task code to interact
> > > with
> > > > > the
> > > > > >>>> > source
> > > > > >>>> > > is
> > > > > >>>> > > >> needed in order to a task runtime code based on a
> > > > > >>>> > > >> single-threaded/actor-style task design
> > > > > >>>> > > >>    --> I personally am a big proponent of that, it will
> > > help
> > > > > with
> > > > > >>>> > > >> well-behaved checkpoints, efficiency, and simpler yet
> > more
> > > > > robust
> > > > > >>>> > > runtime
> > > > > >>>> > > >> code
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - Users care about simple abstraction, so as a
> subclass
> > of
> > > > > >>>> > SplitReader
> > > > > >>>> > > >> (non-blocking / async) we need to have a
> > > BlockingSplitReader
> > > > > >>>> which
> > > > > >>>> > will
> > > > > >>>> > > >> form the basis of most source implementations.
> > > > > >>>> BlockingSplitReader
> > > > > >>>> > lets
> > > > > >>>> > > >> users do blocking simple poll() calls.
> > > > > >>>> > > >>  - The BlockingSplitReader would spawn a thread (or
> more)
> > > and
> > > > > the
> > > > > >>>> > > >> thread(s) can make blocking calls and hand over data
> > > buffers
> > > > > via
> > > > > >>>> a
> > > > > >>>> > > blocking
> > > > > >>>> > > >> queue
> > > > > >>>> > > >>  - This should allow us to cover both, a fully async
> > > runtime,
> > > > > >>>> and a
> > > > > >>>> > > simple
> > > > > >>>> > > >> blocking interface for users.
> > > > > >>>> > > >>  - This is actually very similar to how the Kafka
> > > connectors
> > > > > >>>> work.
> > > > > >>>> > Kafka
> > > > > >>>> > > >> 9+ with one thread, Kafka 8 with multiple threads
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - On the base SplitReader (the async one), the
> > > non-blocking
> > > > > >>>> method
> > > > > >>>> > that
> > > > > >>>> > > >> gets the next chunk of data would signal data
> > availability
> > > via
> > > > > a
> > > > > >>>> > > >> CompletableFuture, because that gives the best
> > flexibility
> > > (can
> > > > > >>>> await
> > > > > >>>> > > >> completion or register notification handlers).
> > > > > >>>> > > >>  - The source task would register a "thenHandle()" (or
> > > similar)
> > > > > >>>> on the
> > > > > >>>> > > >> future to put a "take next data" task into the
> > actor-style
> > > > > >>>> mailbox
> > > > > >>>> > > >>
> > > > > >>>> > > >> *(4) Split Enumeration and Assignment*
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - Splits may be generated lazily, both in cases where
> > > there
> > > > > is a
> > > > > >>>> > > limited
> > > > > >>>> > > >> number of splits (but very many), or splits are
> > discovered
> > > over
> > > > > >>>> time
> > > > > >>>> > > >>  - Assignment should also be lazy, to get better load
> > > balancing
> > > > > >>>> > > >>  - Assignment needs support locality preferences
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - Possible design based on discussion so far:
> > > > > >>>> > > >>
> > > > > >>>> > > >>    --> SplitReader has a method "addSplits(SplitT...)"
> to
> > > add
> > > > > >>>> one or
> > > > > >>>> > > more
> > > > > >>>> > > >> splits. Some split readers might assume they have only
> > one
> > > > > split
> > > > > >>>> ever,
> > > > > >>>> > > >> concurrently, others assume multiple splits. (Note:
> idea
> > > behind
> > > > > >>>> being
> > > > > >>>> > > able
> > > > > >>>> > > >> to add multiple splits at the same time is to ease
> > startup
> > > > > where
> > > > > >>>> > > multiple
> > > > > >>>> > > >> splits may be assigned instantly.)
> > > > > >>>> > > >>    --> SplitReader has a context object on which it can
> > > call
> > > > > >>>> indicate
> > > > > >>>> > > when
> > > > > >>>> > > >> splits are completed. The enumerator gets that
> > > notification and
> > > > > >>>> can
> > > > > >>>> > use
> > > > > >>>> > > to
> > > > > >>>> > > >> decide when to assign new splits. This should help both
> > in
> > > > > cases
> > > > > >>>> of
> > > > > >>>> > > sources
> > > > > >>>> > > >> that take splits lazily (file readers) and in case the
> > > source
> > > > > >>>> needs to
> > > > > >>>> > > >> preserve a partial order between splits (Kinesis,
> > Pravega,
> > > > > >>>> Pulsar may
> > > > > >>>> > > need
> > > > > >>>> > > >> that).
> > > > > >>>> > > >>    --> SplitEnumerator gets notification when
> > SplitReaders
> > > > > start
> > > > > >>>> and
> > > > > >>>> > > when
> > > > > >>>> > > >> they finish splits. They can decide at that moment to
> > push
> > > more
> > > > > >>>> splits
> > > > > >>>> > > to
> > > > > >>>> > > >> that reader
> > > > > >>>> > > >>    --> The SplitEnumerator should probably be aware of
> > the
> > > > > source
> > > > > >>>> > > >> parallelism, to build its initial distribution.
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - Open question: Should the source expose something
> like
> > > "host
> > > > > >>>> > > >> preferences", so that yarn/mesos/k8s can take this into
> > > account
> > > > > >>>> when
> > > > > >>>> > > >> selecting a node to start a TM on?
> > > > > >>>> > > >>
> > > > > >>>> > > >> *(5) Watermarks and event time alignment*
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - Watermark generation, as well as idleness, needs to
> be
> > > per
> > > > > >>>> split
> > > > > >>>> > > (like
> > > > > >>>> > > >> currently in the Kafka Source, per partition)
> > > > > >>>> > > >>  - It is desirable to support optional
> > > event-time-alignment,
> > > > > >>>> meaning
> > > > > >>>> > > that
> > > > > >>>> > > >> splits that are ahead are back-pressured or temporarily
> > > > > >>>> unsubscribed
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - I think i would be desirable to encapsulate
> watermark
> > > > > >>>> generation
> > > > > >>>> > > logic
> > > > > >>>> > > >> in watermark generators, for a separation of concerns.
> > The
> > > > > >>>> watermark
> > > > > >>>> > > >> generators should run per split.
> > > > > >>>> > > >>  - Using watermark generators would also help with
> > another
> > > > > >>>> problem of
> > > > > >>>> > > the
> > > > > >>>> > > >> suggested interface, namely supporting non-periodic
> > > watermarks
> > > > > >>>> > > efficiently.
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - Need a way to "dispatch" next record to different
> > > watermark
> > > > > >>>> > > generators
> > > > > >>>> > > >>  - Need a way to tell SplitReader to "suspend" a split
> > > until a
> > > > > >>>> certain
> > > > > >>>> > > >> watermark is reached (event time backpressure)
> > > > > >>>> > > >>  - This would in fact be not needed (and thus simpler)
> if
> > > we
> > > > > had
> > > > > >>>> a
> > > > > >>>> > > >> SplitReader per split and may be a reason to re-open
> that
> > > > > >>>> discussion
> > > > > >>>> > > >>
> > > > > >>>> > > >> *(6) Watermarks across splits and in the Split
> > Enumerator*
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - The split enumerator may need some watermark
> > awareness,
> > > > > which
> > > > > >>>> > should
> > > > > >>>> > > be
> > > > > >>>> > > >> purely based on split metadata (like create timestamp
> of
> > > file
> > > > > >>>> splits)
> > > > > >>>> > > >>  - If there are still more splits with overlapping
> event
> > > time
> > > > > >>>> range
> > > > > >>>> > for
> > > > > >>>> > > a
> > > > > >>>> > > >> split reader, then that split reader should not advance
> > the
> > > > > >>>> watermark
> > > > > >>>> > > >> within the split beyond the overlap boundary. Otherwise
> > > future
> > > > > >>>> splits
> > > > > >>>> > > will
> > > > > >>>> > > >> produce late data.
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - One way to approach this could be that the split
> > > enumerator
> > > > > >>>> may
> > > > > >>>> > send
> > > > > >>>> > > >> watermarks to the readers, and the readers cannot emit
> > > > > watermarks
> > > > > >>>> > beyond
> > > > > >>>> > > >> that received watermark.
> > > > > >>>> > > >>  - Many split enumerators would simply immediately send
> > > > > Long.MAX
> > > > > >>>> out
> > > > > >>>> > and
> > > > > >>>> > > >> leave the progress purely to the split readers.
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - For event-time alignment / split back pressure, this
> > > begs
> > > > > the
> > > > > >>>> > > question
> > > > > >>>> > > >> how we can avoid deadlocks that may arise when splits
> are
> > > > > >>>> suspended
> > > > > >>>> > for
> > > > > >>>> > > >> event time back pressure,
> > > > > >>>> > > >>
> > > > > >>>> > > >> *(7) Batch and streaming Unification*
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - Functionality wise, the above design should support
> > both
> > > > > >>>> > > >>  - Batch often (mostly) does not care about reading "in
> > > order"
> > > > > >>>> and
> > > > > >>>> > > >> generating watermarks
> > > > > >>>> > > >>    --> Might use different enumerator logic that is
> more
> > > > > locality
> > > > > >>>> > aware
> > > > > >>>> > > >> and ignores event time order
> > > > > >>>> > > >>    --> Does not generate watermarks
> > > > > >>>> > > >>  - Would be great if bounded sources could be
> identified
> > at
> > > > > >>>> compile
> > > > > >>>> > > time,
> > > > > >>>> > > >> so that "env.addBoundedSource(...)" is type safe and
> can
> > > > > return a
> > > > > >>>> > > >> "BoundedDataStream".
> > > > > >>>> > > >>  - Possible to defer this discussion until later
> > > > > >>>> > > >>
> > > > > >>>> > > >> *Miscellaneous Comments*
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - Should the source have a TypeInformation for the
> > > produced
> > > > > >>>> type,
> > > > > >>>> > > instead
> > > > > >>>> > > >> of a serializer? We need a type information in the
> stream
> > > > > >>>> anyways, and
> > > > > >>>> > > can
> > > > > >>>> > > >> derive the serializer from that. Plus, creating the
> > > serializer
> > > > > >>>> should
> > > > > >>>> > > >> respect the ExecutionConfig.
> > > > > >>>> > > >>
> > > > > >>>> > > >>  - The TypeSerializer interface is very powerful but
> also
> > > not
> > > > > >>>> easy to
> > > > > >>>> > > >> implement. Its purpose is to handle data super
> > efficiently,
> > > > > >>>> support
> > > > > >>>> > > >> flexible ways of evolution, etc.
> > > > > >>>> > > >>  For metadata I would suggest to look at the
> > > > > >>>> SimpleVersionedSerializer
> > > > > >>>> > > >> instead, which is used for example for checkpoint
> master
> > > hooks,
> > > > > >>>> or for
> > > > > >>>> > > the
> > > > > >>>> > > >> streaming file sink. I think that is is a good match
> for
> > > cases
> > > > > >>>> where
> > > > > >>>> > we
> > > > > >>>> > > do
> > > > > >>>> > > >> not need more than ser/deser (no copy, etc.) and don't
> > > need to
> > > > > >>>> push
> > > > > >>>> > > >> versioning out of the serialization paths for best
> > > performance
> > > > > >>>> (as in
> > > > > >>>> > > the
> > > > > >>>> > > >> TypeSerializer)
> > > > > >>>> > > >>
> > > > > >>>> > > >>
> > > > > >>>> > > >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
> > > > > >>>> > > >> k.klou...@data-artisans.com>
> > > > > >>>> > > >> wrote:
> > > > > >>>> > > >>
> > > > > >>>> > > >>> Hi Biao,
> > > > > >>>> > > >>>
> > > > > >>>> > > >>> Thanks for the answer!
> > > > > >>>> > > >>>
> > > > > >>>> > > >>> So given the multi-threaded readers, now we have as
> open
> > > > > >>>> questions:
> > > > > >>>> > > >>>
> > > > > >>>> > > >>> 1) How do we let the checkpoints pass through our
> > > > > multi-threaded
> > > > > >>>> > reader
> > > > > >>>> > > >>> operator?
> > > > > >>>> > > >>>
> > > > > >>>> > > >>> 2) Do we have separate reader and source operators or
> > > not? In
> > > > > >>>> the
> > > > > >>>> > > >> strategy
> > > > > >>>> > > >>> that has a separate source, the source operator has a
> > > > > >>>> parallelism of
> > > > > >>>> > 1
> > > > > >>>> > > >> and
> > > > > >>>> > > >>> is responsible for split recovery only.
> > > > > >>>> > > >>>
> > > > > >>>> > > >>> For the first one, given also the constraints
> (blocking,
> > > > > finite
> > > > > >>>> > queues,
> > > > > >>>> > > >>> etc), I do not have an answer yet.
> > > > > >>>> > > >>>
> > > > > >>>> > > >>> For the 2nd, I think that we should go with separate
> > > operators
> > > > > >>>> for
> > > > > >>>> > the
> > > > > >>>> > > >>> source and the readers, for the following reasons:
> > > > > >>>> > > >>>
> > > > > >>>> > > >>> 1) This is more aligned with a potential future
> > > improvement
> > > > > >>>> where the
> > > > > >>>> > > >> split
> > > > > >>>> > > >>> discovery becomes a responsibility of the JobManager
> and
> > > > > >>>> readers are
> > > > > >>>> > > >>> pooling more work from the JM.
> > > > > >>>> > > >>>
> > > > > >>>> > > >>> 2) The source is going to be the "single point of
> > truth".
> > > It
> > > > > >>>> will
> > > > > >>>> > know
> > > > > >>>> > > >> what
> > > > > >>>> > > >>> has been processed and what not. If the source and the
> > > readers
> > > > > >>>> are a
> > > > > >>>> > > >> single
> > > > > >>>> > > >>> operator with parallelism > 1, or in general, if the
> > split
> > > > > >>>> discovery
> > > > > >>>> > is
> > > > > >>>> > > >>> done by each task individually, then:
> > > > > >>>> > > >>>   i) we have to have a deterministic scheme for each
> > > reader to
> > > > > >>>> assign
> > > > > >>>> > > >>> splits to itself (e.g. mod subtaskId). This is not
> > > necessarily
> > > > > >>>> > trivial
> > > > > >>>> > > >> for
> > > > > >>>> > > >>> all sources.
> > > > > >>>> > > >>>   ii) each reader would have to keep a copy of all its
> > > > > processed
> > > > > >>>> > slpits
> > > > > >>>> > > >>>   iii) the state has to be a union state with a
> > > non-trivial
> > > > > >>>> merging
> > > > > >>>> > > >> logic
> > > > > >>>> > > >>> in order to support rescaling.
> > > > > >>>> > > >>>
> > > > > >>>> > > >>> Two additional points that you raised above:
> > > > > >>>> > > >>>
> > > > > >>>> > > >>> i) The point that you raised that we need to keep all
> > > splits
> > > > > >>>> > (processed
> > > > > >>>> > > >> and
> > > > > >>>> > > >>> not-processed) I think is a bit of a strong
> requirement.
> > > This
> > > > > >>>> would
> > > > > >>>> > > imply
> > > > > >>>> > > >>> that for infinite sources the state will grow
> > > indefinitely.
> > > > > >>>> This is
> > > > > >>>> > > >> problem
> > > > > >>>> > > >>> is even more pronounced if we do not have a single
> > source
> > > that
> > > > > >>>> > assigns
> > > > > >>>> > > >>> splits to readers, as each reader will have its own
> copy
> > > of
> > > > > the
> > > > > >>>> > state.
> > > > > >>>> > > >>>
> > > > > >>>> > > >>> ii) it is true that for finite sources we need to
> > somehow
> > > not
> > > > > >>>> close
> > > > > >>>> > the
> > > > > >>>> > > >>> readers when the source/split discoverer finishes. The
> > > > > >>>> > > >>> ContinuousFileReaderOperator has a work-around for
> that.
> > > It is
> > > > > >>>> not
> > > > > >>>> > > >> elegant,
> > > > > >>>> > > >>> and checkpoints are not emitted after closing the
> > source,
> > > but
> > > > > >>>> this, I
> > > > > >>>> > > >>> believe, is a bigger problem which requires more
> changes
> > > than
> > > > > >>>> just
> > > > > >>>> > > >>> refactoring the source interface.
> > > > > >>>> > > >>>
> > > > > >>>> > > >>> Cheers,
> > > > > >>>> > > >>> Kostas
> > > > > >>>> > > >>>
> > > > > >>>> > > >>
> > > > > >>>> > >
> > > > > >>>> > >
> > > > > >>>> >
> > > > > >>>>
> > > > > >>>
> > > > >
> > > >
> > >
> >
>

Reply via email to