Thanks Stephan,

I have to mention that most of the design work and FLIP wiki had actually
been done by Aljoscha, Biao and you, before I picked up this FLIP.

Given that this FLIP has gone through an extended discussion and release
1.10 code freeze is approaching, I'd like to start a vote thread in about
12 hours if there is no further objections.

Thanks,

Jiangjie (Becket) Qin

On Wed, Dec 4, 2019 at 7:04 PM Becket Qin <becket....@gmail.com> wrote:

> Hi Jiayi,
>
> For now there is no communication between the coordinators. And I do see
> some use cases if we can open up that channel. But it won't be in this FLIP.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Dec 4, 2019 at 6:53 PM bupt_ljy <bupt_...@163.com> wrote:
>
>> Hi Becket,
>>
>>
>> Thanks for updating the progress!
>>
>>
>> I have a question about the #OperatorCoordinator. Will there be any
>> communication between different #OperatorCoordinators (or in the future
>> plan)? Because in that way it may be able to cover some cases in FLIP-27[1]
>> like initializing static data before main input processing. Of course it
>> requires more thinking, just want to speak up some ideas in my mind.
>>
>>
>> +1 to the FLIP and detailed design!
>>
>>
>>
>> [1].
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>>
>>
>> Best,
>>
>> Jiayi Liao
>>
>>  Original Message
>> *Sender:* Stephan Ewen<se...@apache.org>
>> *Recipient:* dev<dev@flink.apache.org>
>> *Date:* Wednesday, Dec 4, 2019 18:25
>> *Subject:* Re: [DISCUSS] FLIP-27: Refactor Source Interface
>>
>> Thanks, Becket, for updating this.
>>
>> I agree with moving the aspects you mentioned into separate FLIPs - this
>> one way becoming unwieldy in size.
>>
>> +1 to the FLIP in its current state. Its a very detailed write-up, nicely
>> done!
>>
>> On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com> wrote:
>>
>> > Hi all,
>> >
>> > Sorry for the long belated update. I have updated FLIP-27 wiki page with
>> > the latest proposals. Some noticeable changes include:
>> > 1. A new generic communication mechanism between SplitEnumerator and
>> > SourceReader.
>> > 2. Some detail API method signature changes.
>> >
>> > We left a few things out of this FLIP and will address them in separate
>> > FLIPs. Including:
>> > 1. Per split event time.
>> > 2. Event time alignment.
>> > 3. Fine grained failover for SplitEnumerator failure.
>> >
>> > Please let us know if you have any question.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> > On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org> wrote:
>> >
>> > > Hi  Łukasz!
>> > >
>> > > Becket and me are working hard on figuring out the last details and
>> > > implementing the first PoC. We would update the FLIP hopefully next week.
>> > >
>> > > There is a fair chance that a first version of this will be in 1.10, but
>> > I
>> > > think it will take another release to battle test it and migrate the
>> > > connectors.
>> > >
>> > > Best,
>> > > Stephan
>> > >
>> > >
>> > >
>> > >
>> > > On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl>
>> > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > This proposal looks very promising for us. Do you have any plans in
>> > which
>> > > > Flink release it is going to be released? We are thinking on using a
>> > Data
>> > > > Set API for our future use cases but on the other hand Data Set API is
>> > > > going to be deprecated so using proposed bounded data streams solution
>> > > > could be more viable in the long term.
>> > > >
>> > > > Thanks,
>> > > > Łukasz
>> > > >
>> > > > On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com> wrote:
>> > > > > Thanks for putting together this proposal!
>> > > > >
>> > > > > I see that the "Per Split Event Time" and "Event Time Alignment"
>> > > sections
>> > > > > are still TBD.
>> > > > >
>> > > > > It would probably be good to flesh those out a bit before proceeding
>> > > too
>> > > > far
>> > > > > as the event time alignment will probably influence the interaction
>> > > with
>> > > > > the split reader, specifically ReaderStatus emitNext(SourceOutput<E>
>> > > > > output).
>> > > > >
>> > > > > We currently have only one implementation for event time alignment in
>> > > the
>> > > > > Kinesis consumer. The synchronization in that case takes place as the
>> > > > last
>> > > > > step before records are emitted downstream (RecordEmitter). With the
>> > > > > currently proposed interfaces, the equivalent can be implemented in
>> > the
>> > > > > reader loop, although note that in the Kinesis consumer the per shard
>> > > > > threads push records.
>> > > > >
>> > > > > Synchronization has not been implemented for the Kafka consumer yet.
>> > > > >
>> > > > > https://issues.apache.org/jira/browse/FLINK-12675
>> > > > >
>> > > > > When I looked at it, I realized that the implementation will look
>> > quite
>> > > > > different
>> > > > > from Kinesis because it needs to take place in the pull part, where
>> > > > records
>> > > > > are taken from the Kafka client. Due to the multiplexing it cannot be
>> > > > done
>> > > > > by blocking the split thread like it currently works for Kinesis.
>> > > Reading
>> > > > > from individual Kafka partitions needs to be controlled via
>> > > pause/resume
>> > > > > on the Kafka client.
>> > > > >
>> > > > > To take on that responsibility the split thread would need to be
>> > aware
>> > > of
>> > > > > the
>> > > > > watermarks or at least whether it should or should not continue to
>> > > > consume
>> > > > > a given split and this may require a different SourceReader or
>> > > > SourceOutput
>> > > > > interface.
>> > > > >
>> > > > > Thanks,
>> > > > > Thomas
>> > > > >
>> > > > >
>> > > > > On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com> wrote:
>> > > > >
>> > > > > > Hi Stephan,
>> > > > > >
>> > > > > > Thank you for feedback!
>> > > > > > Will take a look at your branch before public discussing.
>> > > > > >
>> > > > > >
>> > > > > > On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org>
>> > > > wrote:
>> > > > > >
>> > > > > > > Hi Biao!
>> > > > > > >
>> > > > > > > Thanks for reviving this. I would like to join this discussion,
>> > but
>> > > > am
>> > > > > > > quite occupied with the 1.9 release, so can we maybe pause this
>> > > > > > discussion
>> > > > > > > for a week or so?
>> > > > > > >
>> > > > > > > In the meantime I can share some suggestion based on prior
>> > > > experiments:
>> > > > > > >
>> > > > > > > How to do watermarks / timestamp extractors in a simpler and more
>> > > > > > flexible
>> > > > > > > way. I think that part is quite promising should be part of the
>> > new
>> > > > > > source
>> > > > > > > interface.
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > >
>> > >
>> > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > >
>> > >
>> > https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > Some experiments on how to build the source reader and its
>> > library
>> > > > for
>> > > > > > > common threading/split patterns:
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > >
>> > >
>> > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
>> > > > > > >
>> > > > > > >
>> > > > > > > Best,
>> > > > > > > Stephan
>> > > > > > >
>> > > > > > >
>> > > > > > > On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com>
>> > > > wrote:
>> > > > > > >
>> > > > > > >> Hi devs,
>> > > > > > >>
>> > > > > > >> Since 1.9 is nearly released, I think we could get back to
>> > > FLIP-27.
>> > > > I
>> > > > > > >> believe it should be included in 1.10.
>> > > > > > >>
>> > > > > > >> There are so many things mentioned in document of FLIP-27. [1] I
>> > > > think
>> > > > > > >> we'd better discuss them separately. However the wiki is not a
>> > > good
>> > > > > > place
>> > > > > > >> to discuss. I wrote google doc about SplitReader API which
>> > misses
>> > > > some
>> > > > > > >> details in the document. [2]
>> > > > > > >>
>> > > > > > >> 1.
>> > > > > > >>
>> > > > > >
>> > > >
>> > >
>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
>> > > > > > >> 2.
>> > > > > > >>
>> > > > > >
>> > > >
>> > >
>> > https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
>> > > > > > >>
>> > > > > > >> CC Stephan, Aljoscha, Piotrek, Becket
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com>
>> > > > wrote:
>> > > > > > >>
>> > > > > > >>> Hi Steven,
>> > > > > > >>> Thank you for the feedback. Please take a look at the document
>> > > > FLIP-27
>> > > > > > >>> <
>> > > > > >
>> > > >
>> > >
>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>> > > > >
>> > > > > > which
>> > > > > > >>> is updated recently. A lot of details of enumerator were added
>> > in
>> > > > this
>> > > > > > >>> document. I think it would help.
>> > > > > > >>>
>> > > > > > >>> Steven Wu <stevenz...@gmail.com> 于2019年3月28日周四 下午12:52写道:
>> > > > > > >>>
>> > > > > > >>>> This proposal mentioned that SplitEnumerator might run on the
>> > > > > > >>>> JobManager or
>> > > > > > >>>> in a single task on a TaskManager.
>> > > > > > >>>>
>> > > > > > >>>> if enumerator is a single task on a taskmanager, then the job
>> > > DAG
>> > > > can
>> > > > > > >>>> never
>> > > > > > >>>> been embarrassingly parallel anymore. That will nullify the
>> > > > leverage
>> > > > > > of
>> > > > > > >>>> fine-grained recovery for embarrassingly parallel jobs.
>> > > > > > >>>>
>> > > > > > >>>> It's not clear to me what's the implication of running
>> > > enumerator
>> > > > on
>> > > > > > the
>> > > > > > >>>> jobmanager. So I will leave that out for now.
>> > > > > > >>>>
>> > > > > > >>>> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com>
>> > > > wrote:
>> > > > > > >>>>
>> > > > > > >>>> > Hi Stephan & Piotrek,
>> > > > > > >>>> >
>> > > > > > >>>> > Thank you for feedback.
>> > > > > > >>>> >
>> > > > > > >>>> > It seems that there are a lot of things to do in community.
>> > I
>> > > am
>> > > > > > just
>> > > > > > >>>> > afraid that this discussion may be forgotten since there so
>> > > many
>> > > > > > >>>> proposals
>> > > > > > >>>> > recently.
>> > > > > > >>>> > Anyway, wish to see the split topics soon :)
>> > > > > > >>>> >
>> > > > > > >>>> > Piotr Nowojski <pi...@da-platform.com> 于2019年1月24日周四
>> > > 下午8:21写道:
>> > > > > > >>>> >
>> > > > > > >>>> > > Hi Biao!
>> > > > > > >>>> > >
>> > > > > > >>>> > > This discussion was stalled because of preparations for
>> > the
>> > > > open
>> > > > > > >>>> sourcing
>> > > > > > >>>> > > & merging Blink. I think before creating the tickets we
>> > > should
>> > > > > > >>>> split this
>> > > > > > >>>> > > discussion into topics/areas outlined by Stephan and
>> > create
>> > > > Flips
>> > > > > > >>>> for
>> > > > > > >>>> > that.
>> > > > > > >>>> > >
>> > > > > > >>>> > > I think there is no chance for this to be completed in
>> > > couple
>> > > > of
>> > > > > > >>>> > remaining
>> > > > > > >>>> > > weeks/1 month before 1.8 feature freeze, however it would
>> > be
>> > > > good
>> > > > > > >>>> to aim
>> > > > > > >>>> > > with those changes for 1.9.
>> > > > > > >>>> > >
>> > > > > > >>>> > > Piotrek
>> > > > > > >>>> > >
>> > > > > > >>>> > > > On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com>
>> > > > wrote:
>> > > > > > >>>> > > >
>> > > > > > >>>> > > > Hi community,
>> > > > > > >>>> > > > The summary of Stephan makes a lot sense to me. It is
>> > much
>> > > > > > clearer
>> > > > > > >>>> > indeed
>> > > > > > >>>> > > > after splitting the complex topic into small ones.
>> > > > > > >>>> > > > I was wondering is there any detail plan for next step?
>> > If
>> > > > not,
>> > > > > > I
>> > > > > > >>>> would
>> > > > > > >>>> > > > like to push this thing forward by creating some JIRA
>> > > > issues.
>> > > > > > >>>> > > > Another question is that should version 1.8 include
>> > these
>> > > > > > >>>> features?
>> > > > > > >>>> > > >
>> > > > > > >>>> > > > Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道:
>> > > > > > >>>> > > >
>> > > > > > >>>> > > >> Thanks everyone for the lively discussion. Let me try
>> > to
>> > > > > > >>>> summarize
>> > > > > > >>>> > > where I
>> > > > > > >>>> > > >> see convergence in the discussion and open issues.
>> > > > > > >>>> > > >> I'll try to group this by design aspect of the source.
>> > > > Please
>> > > > > > >>>> let me
>> > > > > > >>>> > > know
>> > > > > > >>>> > > >> if I got things wrong or missed something crucial here.
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >> For issues 1-3, if the below reflects the state of the
>> > > > > > >>>> discussion, I
>> > > > > > >>>> > > would
>> > > > > > >>>> > > >> try and update the FLIP in the next days.
>> > > > > > >>>> > > >> For the remaining ones we need more discussion.
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >> I would suggest to fork each of these aspects into a
>> > > > separate
>> > > > > > >>>> mail
>> > > > > > >>>> > > thread,
>> > > > > > >>>> > > >> or will loose sight of the individual aspects.
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >> *(1) Separation of Split Enumerator and Split Reader*
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - All seem to agree this is a good thing
>> > > > > > >>>> > > >>  - Split Enumerator could in the end live on JobManager
>> > > > (and
>> > > > > > >>>> assign
>> > > > > > >>>> > > splits
>> > > > > > >>>> > > >> via RPC) or in a task (and assign splits via data
>> > > streams)
>> > > > > > >>>> > > >>  - this discussion is orthogonal and should come later,
>> > > > when
>> > > > > > the
>> > > > > > >>>> > > interface
>> > > > > > >>>> > > >> is agreed upon.
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >> *(2) Split Readers for one or more splits*
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - Discussion seems to agree that we need to support
>> > one
>> > > > reader
>> > > > > > >>>> that
>> > > > > > >>>> > > >> possibly handles multiple splits concurrently.
>> > > > > > >>>> > > >>  - The requirement comes from sources where one
>> > > > poll()-style
>> > > > > > call
>> > > > > > >>>> > > fetches
>> > > > > > >>>> > > >> data from different splits / partitions
>> > > > > > >>>> > > >>    --> example sources that require that would be for
>> > > > example
>> > > > > > >>>> Kafka,
>> > > > > > >>>> > > >> Pravega, Pulsar
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - Could have one split reader per source, or multiple
>> > > > split
>> > > > > > >>>> readers
>> > > > > > >>>> > > that
>> > > > > > >>>> > > >> share the "poll()" function
>> > > > > > >>>> > > >>  - To not make it too complicated, we can start with
>> > > > thinking
>> > > > > > >>>> about
>> > > > > > >>>> > one
>> > > > > > >>>> > > >> split reader for all splits initially and see if that
>> > > > covers
>> > > > > > all
>> > > > > > >>>> > > >> requirements
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >> *(3) Threading model of the Split Reader*
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - Most active part of the discussion ;-)
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - A non-blocking way for Flink's task code to interact
>> > > > with
>> > > > > > the
>> > > > > > >>>> > source
>> > > > > > >>>> > > is
>> > > > > > >>>> > > >> needed in order to a task runtime code based on a
>> > > > > > >>>> > > >> single-threaded/actor-style task design
>> > > > > > >>>> > > >>    --> I personally am a big proponent of that, it will
>> > > > help
>> > > > > > with
>> > > > > > >>>> > > >> well-behaved checkpoints, efficiency, and simpler yet
>> > > more
>> > > > > > robust
>> > > > > > >>>> > > runtime
>> > > > > > >>>> > > >> code
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - Users care about simple abstraction, so as a
>> > subclass
>> > > of
>> > > > > > >>>> > SplitReader
>> > > > > > >>>> > > >> (non-blocking / async) we need to have a
>> > > > BlockingSplitReader
>> > > > > > >>>> which
>> > > > > > >>>> > will
>> > > > > > >>>> > > >> form the basis of most source implementations.
>> > > > > > >>>> BlockingSplitReader
>> > > > > > >>>> > lets
>> > > > > > >>>> > > >> users do blocking simple poll() calls.
>> > > > > > >>>> > > >>  - The BlockingSplitReader would spawn a thread (or
>> > more)
>> > > > and
>> > > > > > the
>> > > > > > >>>> > > >> thread(s) can make blocking calls and hand over data
>> > > > buffers
>> > > > > > via
>> > > > > > >>>> a
>> > > > > > >>>> > > blocking
>> > > > > > >>>> > > >> queue
>> > > > > > >>>> > > >>  - This should allow us to cover both, a fully async
>> > > > runtime,
>> > > > > > >>>> and a
>> > > > > > >>>> > > simple
>> > > > > > >>>> > > >> blocking interface for users.
>> > > > > > >>>> > > >>  - This is actually very similar to how the Kafka
>> > > > connectors
>> > > > > > >>>> work.
>> > > > > > >>>> > Kafka
>> > > > > > >>>> > > >> 9+ with one thread, Kafka 8 with multiple threads
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - On the base SplitReader (the async one), the
>> > > > non-blocking
>> > > > > > >>>> method
>> > > > > > >>>> > that
>> > > > > > >>>> > > >> gets the next chunk of data would signal data
>> > > availability
>> > > > via
>> > > > > > a
>> > > > > > >>>> > > >> CompletableFuture, because that gives the best
>> > > flexibility
>> > > > (can
>> > > > > > >>>> await
>> > > > > > >>>> > > >> completion or register notification handlers).
>> > > > > > >>>> > > >>  - The source task would register a "thenHandle()" (or
>> > > > similar)
>> > > > > > >>>> on the
>> > > > > > >>>> > > >> future to put a "take next data" task into the
>> > > actor-style
>> > > > > > >>>> mailbox
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >> *(4) Split Enumeration and Assignment*
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - Splits may be generated lazily, both in cases where
>> > > > there
>> > > > > > is a
>> > > > > > >>>> > > limited
>> > > > > > >>>> > > >> number of splits (but very many), or splits are
>> > > discovered
>> > > > over
>> > > > > > >>>> time
>> > > > > > >>>> > > >>  - Assignment should also be lazy, to get better load
>> > > > balancing
>> > > > > > >>>> > > >>  - Assignment needs support locality preferences
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - Possible design based on discussion so far:
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>    --> SplitReader has a method "addSplits(SplitT...)"
>> > to
>> > > > add
>> > > > > > >>>> one or
>> > > > > > >>>> > > more
>> > > > > > >>>> > > >> splits. Some split readers might assume they have only
>> > > one
>> > > > > > split
>> > > > > > >>>> ever,
>> > > > > > >>>> > > >> concurrently, others assume multiple splits. (Note:
>> > idea
>> > > > behind
>> > > > > > >>>> being
>> > > > > > >>>> > > able
>> > > > > > >>>> > > >> to add multiple splits at the same time is to ease
>> > > startup
>> > > > > > where
>> > > > > > >>>> > > multiple
>> > > > > > >>>> > > >> splits may be assigned instantly.)
>> > > > > > >>>> > > >>    --> SplitReader has a context object on which it can
>> > > > call
>> > > > > > >>>> indicate
>> > > > > > >>>> > > when
>> > > > > > >>>> > > >> splits are completed. The enumerator gets that
>> > > > notification and
>> > > > > > >>>> can
>> > > > > > >>>> > use
>> > > > > > >>>> > > to
>> > > > > > >>>> > > >> decide when to assign new splits. This should help both
>> > > in
>> > > > > > cases
>> > > > > > >>>> of
>> > > > > > >>>> > > sources
>> > > > > > >>>> > > >> that take splits lazily (file readers) and in case the
>> > > > source
>> > > > > > >>>> needs to
>> > > > > > >>>> > > >> preserve a partial order between splits (Kinesis,
>> > > Pravega,
>> > > > > > >>>> Pulsar may
>> > > > > > >>>> > > need
>> > > > > > >>>> > > >> that).
>> > > > > > >>>> > > >>    --> SplitEnumerator gets notification when
>> > > SplitReaders
>> > > > > > start
>> > > > > > >>>> and
>> > > > > > >>>> > > when
>> > > > > > >>>> > > >> they finish splits. They can decide at that moment to
>> > > push
>> > > > more
>> > > > > > >>>> splits
>> > > > > > >>>> > > to
>> > > > > > >>>> > > >> that reader
>> > > > > > >>>> > > >>    --> The SplitEnumerator should probably be aware of
>> > > the
>> > > > > > source
>> > > > > > >>>> > > >> parallelism, to build its initial distribution.
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - Open question: Should the source expose something
>> > like
>> > > > "host
>> > > > > > >>>> > > >> preferences", so that yarn/mesos/k8s can take this into
>> > > > account
>> > > > > > >>>> when
>> > > > > > >>>> > > >> selecting a node to start a TM on?
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >> *(5) Watermarks and event time alignment*
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - Watermark generation, as well as idleness, needs to
>> > be
>> > > > per
>> > > > > > >>>> split
>> > > > > > >>>> > > (like
>> > > > > > >>>> > > >> currently in the Kafka Source, per partition)
>> > > > > > >>>> > > >>  - It is desirable to support optional
>> > > > event-time-alignment,
>> > > > > > >>>> meaning
>> > > > > > >>>> > > that
>> > > > > > >>>> > > >> splits that are ahead are back-pressured or temporarily
>> > > > > > >>>> unsubscribed
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - I think i would be desirable to encapsulate
>> > watermark
>> > > > > > >>>> generation
>> > > > > > >>>> > > logic
>> > > > > > >>>> > > >> in watermark generators, for a separation of concerns.
>> > > The
>> > > > > > >>>> watermark
>> > > > > > >>>> > > >> generators should run per split.
>> > > > > > >>>> > > >>  - Using watermark generators would also help with
>> > > another
>> > > > > > >>>> problem of
>> > > > > > >>>> > > the
>> > > > > > >>>> > > >> suggested interface, namely supporting non-periodic
>> > > > watermarks
>> > > > > > >>>> > > efficiently.
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - Need a way to "dispatch" next record to different
>> > > > watermark
>> > > > > > >>>> > > generators
>> > > > > > >>>> > > >>  - Need a way to tell SplitReader to "suspend" a split
>> > > > until a
>> > > > > > >>>> certain
>> > > > > > >>>> > > >> watermark is reached (event time backpressure)
>> > > > > > >>>> > > >>  - This would in fact be not needed (and thus simpler)
>> > if
>> > > > we
>> > > > > > had
>> > > > > > >>>> a
>> > > > > > >>>> > > >> SplitReader per split and may be a reason to re-open
>> > that
>> > > > > > >>>> discussion
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >> *(6) Watermarks across splits and in the Split
>> > > Enumerator*
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - The split enumerator may need some watermark
>> > > awareness,
>> > > > > > which
>> > > > > > >>>> > should
>> > > > > > >>>> > > be
>> > > > > > >>>> > > >> purely based on split metadata (like create timestamp
>> > of
>> > > > file
>> > > > > > >>>> splits)
>> > > > > > >>>> > > >>  - If there are still more splits with overlapping
>> > event
>> > > > time
>> > > > > > >>>> range
>> > > > > > >>>> > for
>> > > > > > >>>> > > a
>> > > > > > >>>> > > >> split reader, then that split reader should not advance
>> > > the
>> > > > > > >>>> watermark
>> > > > > > >>>> > > >> within the split beyond the overlap boundary. Otherwise
>> > > > future
>> > > > > > >>>> splits
>> > > > > > >>>> > > will
>> > > > > > >>>> > > >> produce late data.
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - One way to approach this could be that the split
>> > > > enumerator
>> > > > > > >>>> may
>> > > > > > >>>> > send
>> > > > > > >>>> > > >> watermarks to the readers, and the readers cannot emit
>> > > > > > watermarks
>> > > > > > >>>> > beyond
>> > > > > > >>>> > > >> that received watermark.
>> > > > > > >>>> > > >>  - Many split enumerators would simply immediately send
>> > > > > > Long.MAX
>> > > > > > >>>> out
>> > > > > > >>>> > and
>> > > > > > >>>> > > >> leave the progress purely to the split readers.
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - For event-time alignment / split back pressure, this
>> > > > begs
>> > > > > > the
>> > > > > > >>>> > > question
>> > > > > > >>>> > > >> how we can avoid deadlocks that may arise when splits
>> > are
>> > > > > > >>>> suspended
>> > > > > > >>>> > for
>> > > > > > >>>> > > >> event time back pressure,
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >> *(7) Batch and streaming Unification*
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - Functionality wise, the above design should support
>> > > both
>> > > > > > >>>> > > >>  - Batch often (mostly) does not care about reading "in
>> > > > order"
>> > > > > > >>>> and
>> > > > > > >>>> > > >> generating watermarks
>> > > > > > >>>> > > >>    --> Might use different enumerator logic that is
>> > more
>> > > > > > locality
>> > > > > > >>>> > aware
>> > > > > > >>>> > > >> and ignores event time order
>> > > > > > >>>> > > >>    --> Does not generate watermarks
>> > > > > > >>>> > > >>  - Would be great if bounded sources could be
>> > identified
>> > > at
>> > > > > > >>>> compile
>> > > > > > >>>> > > time,
>> > > > > > >>>> > > >> so that "env.addBoundedSource(...)" is type safe and
>> > can
>> > > > > > return a
>> > > > > > >>>> > > >> "BoundedDataStream".
>> > > > > > >>>> > > >>  - Possible to defer this discussion until later
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >> *Miscellaneous Comments*
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - Should the source have a TypeInformation for the
>> > > > produced
>> > > > > > >>>> type,
>> > > > > > >>>> > > instead
>> > > > > > >>>> > > >> of a serializer? We need a type information in the
>> > stream
>> > > > > > >>>> anyways, and
>> > > > > > >>>> > > can
>> > > > > > >>>> > > >> derive the serializer from that. Plus, creating the
>> > > > serializer
>> > > > > > >>>> should
>> > > > > > >>>> > > >> respect the ExecutionConfig.
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>  - The TypeSerializer interface is very powerful but
>> > also
>> > > > not
>> > > > > > >>>> easy to
>> > > > > > >>>> > > >> implement. Its purpose is to handle data super
>> > > efficiently,
>> > > > > > >>>> support
>> > > > > > >>>> > > >> flexible ways of evolution, etc.
>> > > > > > >>>> > > >>  For metadata I would suggest to look at the
>> > > > > > >>>> SimpleVersionedSerializer
>> > > > > > >>>> > > >> instead, which is used for example for checkpoint
>> > master
>> > > > hooks,
>> > > > > > >>>> or for
>> > > > > > >>>> > > the
>> > > > > > >>>> > > >> streaming file sink. I think that is is a good match
>> > for
>> > > > cases
>> > > > > > >>>> where
>> > > > > > >>>> > we
>> > > > > > >>>> > > do
>> > > > > > >>>> > > >> not need more than ser/deser (no copy, etc.) and don't
>> > > > need to
>> > > > > > >>>> push
>> > > > > > >>>> > > >> versioning out of the serialization paths for best
>> > > > performance
>> > > > > > >>>> (as in
>> > > > > > >>>> > > the
>> > > > > > >>>> > > >> TypeSerializer)
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
>> > > > > > >>>> > > >> k.klou...@data-artisans.com>
>> > > > > > >>>> > > >> wrote:
>> > > > > > >>>> > > >>
>> > > > > > >>>> > > >>> Hi Biao,
>> > > > > > >>>> > > >>>
>> > > > > > >>>> > > >>> Thanks for the answer!
>> > > > > > >>>> > > >>>
>> > > > > > >>>> > > >>> So given the multi-threaded readers, now we have as
>> > open
>> > > > > > >>>> questions:
>> > > > > > >>>> > > >>>
>> > > > > > >>>> > > >>> 1) How do we let the checkpoints pass through our
>> > > > > > multi-threaded
>> > > > > > >>>> > reader
>> > > > > > >>>> > > >>> operator?
>> > > > > > >>>> > > >>>
>> > > > > > >>>> > > >>> 2) Do we have separate reader and source operators or
>> > > > not? In
>> > > > > > >>>> the
>> > > > > > >>>> > > >> strategy
>> > > > > > >>>> > > >>> that has a separate source, the source operator has a
>> > > > > > >>>> parallelism of
>> > > > > > >>>> > 1
>> > > > > > >>>> > > >> and
>> > > > > > >>>> > > >>> is responsible for split recovery only.
>> > > > > > >>>> > > >>>
>> > > > > > >>>> > > >>> For the first one, given also the constraints
>> > (blocking,
>> > > > > > finite
>> > > > > > >>>> > queues,
>> > > > > > >>>> > > >>> etc), I do not have an answer yet.
>> > > > > > >>>> > > >>>
>> > > > > > >>>> > > >>> For the 2nd, I think that we should go with separate
>> > > > operators
>> > > > > > >>>> for
>> > > > > > >>>> > the
>> > > > > > >>>> > > >>> source and the readers, for the following reasons:
>> > > > > > >>>> > > >>>
>> > > > > > >>>> > > >>> 1) This is more aligned with a potential future
>> > > > improvement
>> > > > > > >>>> where the
>> > > > > > >>>> > > >> split
>> > > > > > >>>> > > >>> discovery becomes a responsibility of the JobManager
>> > and
>> > > > > > >>>> readers are
>> > > > > > >>>> > > >>> pooling more work from the JM.
>> > > > > > >>>> > > >>>
>> > > > > > >>>> > > >>> 2) The source is going to be the "single point of
>> > > truth".
>> > > > It
>> > > > > > >>>> will
>> > > > > > >>>> > know
>> > > > > > >>>> > > >> what
>> > > > > > >>>> > > >>> has been processed and what not. If the source and the
>> > > > readers
>> > > > > > >>>> are a
>> > > > > > >>>> > > >> single
>> > > > > > >>>> > > >>> operator with parallelism > 1, or in general, if the
>> > > split
>> > > > > > >>>> discovery
>> > > > > > >>>> > is
>> > > > > > >>>> > > >>> done by each task individually, then:
>> > > > > > >>>> > > >>>   i) we have to have a deterministic scheme for each
>> > > > reader to
>> > > > > > >>>> assign
>> > > > > > >>>> > > >>> splits to itself (e.g. mod subtaskId). This is not
>> > > > necessarily
>> > > > > > >>>> > trivial
>> > > > > > >>>> > > >> for
>> > > > > > >>>> > > >>> all sources.
>> > > > > > >>>> > > >>>   ii) each reader would have to keep a copy of all its
>> > > > > > processed
>> > > > > > >>>> > slpits
>> > > > > > >>>> > > >>>   iii) the state has to be a union state with a
>> > > > non-trivial
>> > > > > > >>>> merging
>> > > > > > >>>> > > >> logic
>> > > > > > >>>> > > >>> in order to support rescaling.
>> > > > > > >>>> > > >>>
>> > > > > > >>>> > > >>> Two additional points that you raised above:
>> > > > > > >>>> > > >>>
>> > > > > > >>>> > > >>> i) The point that you raised that we need to keep all
>> > > > splits
>> > > > > > >>>> > (processed
>> > > > > > >>>> > > >> and
>> > > > > > >>>> > > >>> not-processed) I think is a bit of a strong
>> > requirement.
>> > > > This
>> > > > > > >>>> would
>> > > > > > >>>> > > imply
>> > > > > > >>>> > > >>> that for infinite sources the state will grow
>> > > > indefinitely.
>> > > > > > >>>> This is
>> > > > > > >>>> > > >> problem
>> > > > > > >>>> > > >>> is even more pronounced if we do not have a single
>> > > source
>> > > > that
>> > > > > > >>>> > assigns
>> > > > > > >>>> > > >>> splits to readers, as each reader will have its own
>> > copy
>> > > > of
>> > > > > > the
>> > > > > > >>>> > state.
>> > > > > > >>>> > > >>>
>> > > > > > >>>> > > >>> ii) it is true that for finite sources we need to
>> > > somehow
>> > > > not
>> > > > > > >>>> close
>> > > > > > >>>> > the
>> > > > > > >>>> > > >>> readers when the source/split discoverer finishes. The
>> > > > > > >>>> > > >>> ContinuousFileReaderOperator has a work-around for
>> > that.
>> > > > It is
>> > > > > > >>>> not
>> > > > > > >>>> > > >> elegant,
>> > > > > > >>>> > > >>> and checkpoints are not emitted after closing the
>> > > source,
>> > > > but
>> > > > > > >>>> this, I
>> > > > > > >>>> > > >>> believe, is a bigger problem which requires more
>> > changes
>> > > > than
>> > > > > > >>>> just
>> > > > > > >>>> > > >>> refactoring the source interface.
>> > > > > > >>>> > > >>>
>> > > > > > >>>> > > >>> Cheers,
>> > > > > > >>>> > > >>> Kostas
>> > > > > > >>>> > > >>>
>> > > > > > >>>> > > >>
>> > > > > > >>>> > >
>> > > > > > >>>> > >
>> > > > > > >>>> >
>> > > > > > >>>>
>> > > > > > >>>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>>
>>

Reply via email to