I agree with Dawid's point that the boundedness information should come
from the source itself (e.g. the end timestamp), not through
env.boundedSouce()/continuousSource().
I think if we want to support something like `env.source()` that derive the
execution mode from source, `supportsBoundedness(Boundedness)`
method is not enough, because we don't know whether it is bounded or not.

Best,
Jark


On Mon, 9 Dec 2019 at 22:21, Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> One more thing. In the current proposal, with the
> supportsBoundedness(Boundedness) method and the boundedness coming from
> either continuousSource or boundedSource I could not find how this
> information is fed back to the SplitEnumerator.
>
> Best,
>
> Dawid
>
> On 09/12/2019 13:52, Becket Qin wrote:
> > Hi Dawid,
> >
> > Thanks for the comments. This actually brings another relevant question
> > about what does a "bounded source" imply. I actually had the same
> > impression when I look at the Source API. Here is what I understand after
> > some discussion with Stephan. The bounded source has the following
> impacts.
> >
> > 1. API validity.
> > - A bounded source generates a bounded stream so some operations that
> only
> > works for bounded records would be performed, e.g. sort.
> > - To expose these bounded stream only APIs, there are two options:
> >      a. Add them to the DataStream API and throw exception if a method is
> > called on an unbounded stream.
> >      b. Create a BoundedDataStream class which is returned from
> > env.boundedSource(), while DataStream is returned from
> env.continousSource().
> > Note that this cannot be done by having single env.source(theSource) even
> > the Source has a getBoundedness() method.
> >
> > 2. Scheduling
> > - A bounded source could be computed stage by stage without bringing up
> all
> > the tasks at the same time.
> >
> > 3. Operator behaviors
> > - A bounded source indicates the records are finite so some operators can
> > wait until it receives all the records before it starts the processing.
> >
> > In the above impact, only 1 is relevant to the API design. And the
> current
> > proposal in FLIP-27 is following 1.b.
> >
> > // boundedness depends of source property, imo this should always be
> >> preferred
> >>
> >
> > DataStream<MyType> stream = env.source(theSource);
> >
> >
> > In your proposal, does DataStream have bounded stream only methods? It
> > looks it should have, otherwise passing a bounded Source to env.source()
> > would be confusing. In that case, we will essentially do 1.a if an
> > unbounded Source is created from env.source(unboundedSource).
> >
> > If we have the methods only supported for bounded streams in DataStream,
> it
> > seems a little weird to have a separate BoundedDataStream interface.
> >
> > Am I understand it correctly?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz <dwysakow...@apache.org>
> > wrote:
> >
> >> Hi all,
> >>
> >> Really well written proposal and very important one. I must admit I have
> >> not understood all the intricacies of it yet.
> >>
> >> One question I have though is about where does the information about
> >> boundedness come from. I think in most cases it is a property of the
> >> source. As you described it might be e.g. end offset, a flag should it
> >> monitor new splits etc. I think it would be a really nice use case to be
> >> able to say:
> >>
> >> new KafkaSource().readUntil(long timestamp),
> >>
> >> which could work as an "end offset". Moreover I think all Bounded
> sources
> >> support continuous mode, but no intrinsically continuous source support
> the
> >> Bounded mode. If I understood the proposal correctly it suggest the
> >> boundedness sort of "comes" from the outside of the source, from the
> >> invokation of either boundedStream or continousSource.
> >>
> >> I am wondering if it would make sense to actually change the method
> >>
> >> boolean Source#supportsBoundedness(Boundedness)
> >>
> >> to
> >>
> >> Boundedness Source#getBoundedness().
> >>
> >> As for the methods #boundedSource, #continousSource, assuming the
> >> boundedness is property of the source they do not affect how the
> enumerator
> >> works, but mostly how the dag is scheduled, right? I am not against
> those
> >> methods, but I think it is a very specific use case to actually override
> >> the property of the source. In general I would expect users to only call
> >> env.source(theSource), where the source tells if it is bounded or not. I
> >> would suggest considering following set of methods:
> >>
> >> // boundedness depends of source property, imo this should always be
> preferred
> >>
> >> DataStream<MyType> stream = env.source(theSource);
> >>
> >>
> >> // always continous execution, whether bounded or unbounded source
> >>
> >> DataStream<MyType> boundedStream = env.continousSource(theSource);
> >>
> >> // imo this would make sense if the BoundedDataStream provides
> additional features unavailable for continous mode
> >> BoundedDataStream<MyType> batch = env.boundedSource(theSource);
> >>
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >>
> >> On 04/12/2019 11:25, Stephan Ewen wrote:
> >>
> >> Thanks, Becket, for updating this.
> >>
> >> I agree with moving the aspects you mentioned into separate FLIPs - this
> >> one way becoming unwieldy in size.
> >>
> >> +1 to the FLIP in its current state. Its a very detailed write-up,
> nicely
> >> done!
> >>
> >> On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com> <
> becket....@gmail.com> wrote:
> >>
> >>
> >> Hi all,
> >>
> >> Sorry for the long belated update. I have updated FLIP-27 wiki page with
> >> the latest proposals. Some noticeable changes include:
> >> 1. A new generic communication mechanism between SplitEnumerator and
> >> SourceReader.
> >> 2. Some detail API method signature changes.
> >>
> >> We left a few things out of this FLIP and will address them in separate
> >> FLIPs. Including:
> >> 1. Per split event time.
> >> 2. Event time alignment.
> >> 3. Fine grained failover for SplitEnumerator failure.
> >>
> >> Please let us know if you have any question.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org> <
> se...@apache.org> wrote:
> >>
> >>
> >> Hi  Łukasz!
> >>
> >> Becket and me are working hard on figuring out the last details and
> >> implementing the first PoC. We would update the FLIP hopefully next
> week.
> >>
> >> There is a fair chance that a first version of this will be in 1.10, but
> >>
> >> I
> >>
> >> think it will take another release to battle test it and migrate the
> >> connectors.
> >>
> >> Best,
> >> Stephan
> >>
> >>
> >>
> >>
> >> On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl> <
> l...@touk.pl>
> >>
> >> wrote:
> >>
> >> Hi,
> >>
> >> This proposal looks very promising for us. Do you have any plans in
> >>
> >> which
> >>
> >> Flink release it is going to be released? We are thinking on using a
> >>
> >> Data
> >>
> >> Set API for our future use cases but on the other hand Data Set API is
> >> going to be deprecated so using proposed bounded data streams solution
> >> could be more viable in the long term.
> >>
> >> Thanks,
> >> Łukasz
> >>
> >> On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com> <
> thomas.we...@gmail.com> wrote:
> >>
> >> Thanks for putting together this proposal!
> >>
> >> I see that the "Per Split Event Time" and "Event Time Alignment"
> >>
> >> sections
> >>
> >> are still TBD.
> >>
> >> It would probably be good to flesh those out a bit before proceeding
> >>
> >> too
> >>
> >> far
> >>
> >> as the event time alignment will probably influence the interaction
> >>
> >> with
> >>
> >> the split reader, specifically ReaderStatus emitNext(SourceOutput<E>
> >> output).
> >>
> >> We currently have only one implementation for event time alignment in
> >>
> >> the
> >>
> >> Kinesis consumer. The synchronization in that case takes place as the
> >>
> >> last
> >>
> >> step before records are emitted downstream (RecordEmitter). With the
> >> currently proposed interfaces, the equivalent can be implemented in
> >>
> >> the
> >>
> >> reader loop, although note that in the Kinesis consumer the per shard
> >> threads push records.
> >>
> >> Synchronization has not been implemented for the Kafka consumer yet.
> >> https://issues.apache.org/jira/browse/FLINK-12675
> >>
> >> When I looked at it, I realized that the implementation will look
> >>
> >> quite
> >>
> >> different
> >> from Kinesis because it needs to take place in the pull part, where
> >>
> >> records
> >>
> >> are taken from the Kafka client. Due to the multiplexing it cannot be
> >>
> >> done
> >>
> >> by blocking the split thread like it currently works for Kinesis.
> >>
> >> Reading
> >>
> >> from individual Kafka partitions needs to be controlled via
> >>
> >> pause/resume
> >>
> >> on the Kafka client.
> >>
> >> To take on that responsibility the split thread would need to be
> >>
> >> aware
> >>
> >> of
> >>
> >> the
> >> watermarks or at least whether it should or should not continue to
> >>
> >> consume
> >>
> >> a given split and this may require a different SourceReader or
> >>
> >> SourceOutput
> >>
> >> interface.
> >>
> >> Thanks,
> >> Thomas
> >>
> >>
> >> On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com> <
> mmyy1...@gmail.com> wrote:
> >>
> >>
> >> Hi Stephan,
> >>
> >> Thank you for feedback!
> >> Will take a look at your branch before public discussing.
> >>
> >>
> >> On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org> <
> se...@apache.org>
> >>
> >> wrote:
> >>
> >> Hi Biao!
> >>
> >> Thanks for reviving this. I would like to join this discussion,
> >>
> >> but
> >>
> >> am
> >>
> >> quite occupied with the 1.9 release, so can we maybe pause this
> >>
> >> discussion
> >>
> >> for a week or so?
> >>
> >> In the meantime I can share some suggestion based on prior
> >>
> >> experiments:
> >>
> >> How to do watermarks / timestamp extractors in a simpler and more
> >>
> >> flexible
> >>
> >> way. I think that part is quite promising should be part of the
> >>
> >> new
> >>
> >> source
> >>
> >> interface.
> >>
> >>
> >>
> >>
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
> >>
> >>
> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
> >>
> >> Some experiments on how to build the source reader and its
> >>
> >> library
> >>
> >> for
> >>
> >> common threading/split patterns:
> >>
> >>
> >>
> >>
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
> >>
> >> Best,
> >> Stephan
> >>
> >>
> >> On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com> <
> mmyy1...@gmail.com>
> >>
> >> wrote:
> >>
> >> Hi devs,
> >>
> >> Since 1.9 is nearly released, I think we could get back to
> >>
> >> FLIP-27.
> >>
> >> I
> >>
> >> believe it should be included in 1.10.
> >>
> >> There are so many things mentioned in document of FLIP-27. [1] I
> >>
> >> think
> >>
> >> we'd better discuss them separately. However the wiki is not a
> >>
> >> good
> >>
> >> place
> >>
> >> to discuss. I wrote google doc about SplitReader API which
> >>
> >> misses
> >>
> >> some
> >>
> >> details in the document. [2]
> >>
> >> 1.
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> >>
> >> 2.
> >>
> >>
> >>
> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
> >>
> >> CC Stephan, Aljoscha, Piotrek, Becket
> >>
> >>
> >> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com> <
> mmyy1...@gmail.com>
> >>
> >> wrote:
> >>
> >> Hi Steven,
> >> Thank you for the feedback. Please take a look at the document
> >>
> >> FLIP-27
> >>
> >> <
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >>
> >> which
> >>
> >> is updated recently. A lot of details of enumerator were added
> >>
> >> in
> >>
> >> this
> >>
> >> document. I think it would help.
> >>
> >> Steven Wu <stevenz...@gmail.com> <stevenz...@gmail.com> 于2019年3月28日周四
> 下午12:52写道:
> >>
> >>
> >> This proposal mentioned that SplitEnumerator might run on the
> >> JobManager or
> >> in a single task on a TaskManager.
> >>
> >> if enumerator is a single task on a taskmanager, then the job
> >>
> >> DAG
> >>
> >> can
> >>
> >> never
> >> been embarrassingly parallel anymore. That will nullify the
> >>
> >> leverage
> >>
> >> of
> >>
> >> fine-grained recovery for embarrassingly parallel jobs.
> >>
> >> It's not clear to me what's the implication of running
> >>
> >> enumerator
> >>
> >> on
> >>
> >> the
> >>
> >> jobmanager. So I will leave that out for now.
> >>
> >> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> <
> mmyy1...@gmail.com>
> >>
> >> wrote:
> >>
> >> Hi Stephan & Piotrek,
> >>
> >> Thank you for feedback.
> >>
> >> It seems that there are a lot of things to do in community.
> >>
> >> I
> >>
> >> am
> >>
> >> just
> >>
> >> afraid that this discussion may be forgotten since there so
> >>
> >> many
> >>
> >> proposals
> >>
> >> recently.
> >> Anyway, wish to see the split topics soon :)
> >>
> >> Piotr Nowojski <pi...@da-platform.com> <pi...@da-platform.com>
> 于2019年1月24日周四
> >>
> >> 下午8:21写道:
> >>
> >> Hi Biao!
> >>
> >> This discussion was stalled because of preparations for
> >>
> >> the
> >>
> >> open
> >>
> >> sourcing
> >>
> >> & merging Blink. I think before creating the tickets we
> >>
> >> should
> >>
> >> split this
> >>
> >> discussion into topics/areas outlined by Stephan and
> >>
> >> create
> >>
> >> Flips
> >>
> >> for
> >>
> >> that.
> >>
> >> I think there is no chance for this to be completed in
> >>
> >> couple
> >>
> >> of
> >>
> >> remaining
> >>
> >> weeks/1 month before 1.8 feature freeze, however it would
> >>
> >> be
> >>
> >> good
> >>
> >> to aim
> >>
> >> with those changes for 1.9.
> >>
> >> Piotrek
> >>
> >>
> >> On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> <
> mmyy1...@gmail.com>
> >>
> >> wrote:
> >>
> >> Hi community,
> >> The summary of Stephan makes a lot sense to me. It is
> >>
> >> much
> >>
> >> clearer
> >>
> >> indeed
> >>
> >> after splitting the complex topic into small ones.
> >> I was wondering is there any detail plan for next step?
> >>
> >> If
> >>
> >> not,
> >>
> >> I
> >>
> >> would
> >>
> >> like to push this thing forward by creating some JIRA
> >>
> >> issues.
> >>
> >> Another question is that should version 1.8 include
> >>
> >> these
> >>
> >> features?
> >>
> >> Stephan Ewen <se...@apache.org> <se...@apache.org> 于2018年12月1日周六
> 上午4:20写道:
> >>
> >>
> >> Thanks everyone for the lively discussion. Let me try
> >>
> >> to
> >>
> >> summarize
> >>
> >> where I
> >>
> >> see convergence in the discussion and open issues.
> >> I'll try to group this by design aspect of the source.
> >>
> >> Please
> >>
> >> let me
> >>
> >> know
> >>
> >> if I got things wrong or missed something crucial here.
> >>
> >> For issues 1-3, if the below reflects the state of the
> >>
> >> discussion, I
> >>
> >> would
> >>
> >> try and update the FLIP in the next days.
> >> For the remaining ones we need more discussion.
> >>
> >> I would suggest to fork each of these aspects into a
> >>
> >> separate
> >>
> >> mail
> >>
> >> thread,
> >>
> >> or will loose sight of the individual aspects.
> >>
> >> *(1) Separation of Split Enumerator and Split Reader*
> >>
> >>  - All seem to agree this is a good thing
> >>  - Split Enumerator could in the end live on JobManager
> >>
> >> (and
> >>
> >> assign
> >>
> >> splits
> >>
> >> via RPC) or in a task (and assign splits via data
> >>
> >> streams)
> >>
> >>  - this discussion is orthogonal and should come later,
> >>
> >> when
> >>
> >> the
> >>
> >> interface
> >>
> >> is agreed upon.
> >>
> >> *(2) Split Readers for one or more splits*
> >>
> >>  - Discussion seems to agree that we need to support
> >>
> >> one
> >>
> >> reader
> >>
> >> that
> >>
> >> possibly handles multiple splits concurrently.
> >>  - The requirement comes from sources where one
> >>
> >> poll()-style
> >>
> >> call
> >>
> >> fetches
> >>
> >> data from different splits / partitions
> >>    --> example sources that require that would be for
> >>
> >> example
> >>
> >> Kafka,
> >>
> >> Pravega, Pulsar
> >>
> >>  - Could have one split reader per source, or multiple
> >>
> >> split
> >>
> >> readers
> >>
> >> that
> >>
> >> share the "poll()" function
> >>  - To not make it too complicated, we can start with
> >>
> >> thinking
> >>
> >> about
> >>
> >> one
> >>
> >> split reader for all splits initially and see if that
> >>
> >> covers
> >>
> >> all
> >>
> >> requirements
> >>
> >> *(3) Threading model of the Split Reader*
> >>
> >>  - Most active part of the discussion ;-)
> >>
> >>  - A non-blocking way for Flink's task code to interact
> >>
> >> with
> >>
> >> the
> >>
> >> source
> >>
> >> is
> >>
> >> needed in order to a task runtime code based on a
> >> single-threaded/actor-style task design
> >>    --> I personally am a big proponent of that, it will
> >>
> >> help
> >>
> >> with
> >>
> >> well-behaved checkpoints, efficiency, and simpler yet
> >>
> >> more
> >>
> >> robust
> >>
> >> runtime
> >>
> >> code
> >>
> >>  - Users care about simple abstraction, so as a
> >>
> >> subclass
> >>
> >> of
> >>
> >> SplitReader
> >>
> >> (non-blocking / async) we need to have a
> >>
> >> BlockingSplitReader
> >>
> >> which
> >>
> >> will
> >>
> >> form the basis of most source implementations.
> >>
> >> BlockingSplitReader
> >>
> >> lets
> >>
> >> users do blocking simple poll() calls.
> >>  - The BlockingSplitReader would spawn a thread (or
> >>
> >> more)
> >>
> >> and
> >>
> >> the
> >>
> >> thread(s) can make blocking calls and hand over data
> >>
> >> buffers
> >>
> >> via
> >>
> >> a
> >>
> >> blocking
> >>
> >> queue
> >>  - This should allow us to cover both, a fully async
> >>
> >> runtime,
> >>
> >> and a
> >>
> >> simple
> >>
> >> blocking interface for users.
> >>  - This is actually very similar to how the Kafka
> >>
> >> connectors
> >>
> >> work.
> >>
> >> Kafka
> >>
> >> 9+ with one thread, Kafka 8 with multiple threads
> >>
> >>  - On the base SplitReader (the async one), the
> >>
> >> non-blocking
> >>
> >> method
> >>
> >> that
> >>
> >> gets the next chunk of data would signal data
> >>
> >> availability
> >>
> >> via
> >>
> >> a
> >>
> >> CompletableFuture, because that gives the best
> >>
> >> flexibility
> >>
> >> (can
> >>
> >> await
> >>
> >> completion or register notification handlers).
> >>  - The source task would register a "thenHandle()" (or
> >>
> >> similar)
> >>
> >> on the
> >>
> >> future to put a "take next data" task into the
> >>
> >> actor-style
> >>
> >> mailbox
> >>
> >> *(4) Split Enumeration and Assignment*
> >>
> >>  - Splits may be generated lazily, both in cases where
> >>
> >> there
> >>
> >> is a
> >>
> >> limited
> >>
> >> number of splits (but very many), or splits are
> >>
> >> discovered
> >>
> >> over
> >>
> >> time
> >>
> >>  - Assignment should also be lazy, to get better load
> >>
> >> balancing
> >>
> >>  - Assignment needs support locality preferences
> >>
> >>  - Possible design based on discussion so far:
> >>
> >>    --> SplitReader has a method "addSplits(SplitT...)"
> >>
> >> to
> >>
> >> add
> >>
> >> one or
> >>
> >> more
> >>
> >> splits. Some split readers might assume they have only
> >>
> >> one
> >>
> >> split
> >>
> >> ever,
> >>
> >> concurrently, others assume multiple splits. (Note:
> >>
> >> idea
> >>
> >> behind
> >>
> >> being
> >>
> >> able
> >>
> >> to add multiple splits at the same time is to ease
> >>
> >> startup
> >>
> >> where
> >>
> >> multiple
> >>
> >> splits may be assigned instantly.)
> >>    --> SplitReader has a context object on which it can
> >>
> >> call
> >>
> >> indicate
> >>
> >> when
> >>
> >> splits are completed. The enumerator gets that
> >>
> >> notification and
> >>
> >> can
> >>
> >> use
> >>
> >> to
> >>
> >> decide when to assign new splits. This should help both
> >>
> >> in
> >>
> >> cases
> >>
> >> of
> >>
> >> sources
> >>
> >> that take splits lazily (file readers) and in case the
> >>
> >> source
> >>
> >> needs to
> >>
> >> preserve a partial order between splits (Kinesis,
> >>
> >> Pravega,
> >>
> >> Pulsar may
> >>
> >> need
> >>
> >> that).
> >>    --> SplitEnumerator gets notification when
> >>
> >> SplitReaders
> >>
> >> start
> >>
> >> and
> >>
> >> when
> >>
> >> they finish splits. They can decide at that moment to
> >>
> >> push
> >>
> >> more
> >>
> >> splits
> >>
> >> to
> >>
> >> that reader
> >>    --> The SplitEnumerator should probably be aware of
> >>
> >> the
> >>
> >> source
> >>
> >> parallelism, to build its initial distribution.
> >>
> >>  - Open question: Should the source expose something
> >>
> >> like
> >>
> >> "host
> >>
> >> preferences", so that yarn/mesos/k8s can take this into
> >>
> >> account
> >>
> >> when
> >>
> >> selecting a node to start a TM on?
> >>
> >> *(5) Watermarks and event time alignment*
> >>
> >>  - Watermark generation, as well as idleness, needs to
> >>
> >> be
> >>
> >> per
> >>
> >> split
> >>
> >> (like
> >>
> >> currently in the Kafka Source, per partition)
> >>  - It is desirable to support optional
> >>
> >> event-time-alignment,
> >>
> >> meaning
> >>
> >> that
> >>
> >> splits that are ahead are back-pressured or temporarily
> >>
> >> unsubscribed
> >>
> >>  - I think i would be desirable to encapsulate
> >>
> >> watermark
> >>
> >> generation
> >>
> >> logic
> >>
> >> in watermark generators, for a separation of concerns.
> >>
> >> The
> >>
> >> watermark
> >>
> >> generators should run per split.
> >>  - Using watermark generators would also help with
> >>
> >> another
> >>
> >> problem of
> >>
> >> the
> >>
> >> suggested interface, namely supporting non-periodic
> >>
> >> watermarks
> >>
> >> efficiently.
> >>
> >>  - Need a way to "dispatch" next record to different
> >>
> >> watermark
> >>
> >> generators
> >>
> >>  - Need a way to tell SplitReader to "suspend" a split
> >>
> >> until a
> >>
> >> certain
> >>
> >> watermark is reached (event time backpressure)
> >>  - This would in fact be not needed (and thus simpler)
> >>
> >> if
> >>
> >> we
> >>
> >> had
> >>
> >> a
> >>
> >> SplitReader per split and may be a reason to re-open
> >>
> >> that
> >>
> >> discussion
> >>
> >> *(6) Watermarks across splits and in the Split
> >>
> >> Enumerator*
> >>
> >>  - The split enumerator may need some watermark
> >>
> >> awareness,
> >>
> >> which
> >>
> >> should
> >>
> >> be
> >>
> >> purely based on split metadata (like create timestamp
> >>
> >> of
> >>
> >> file
> >>
> >> splits)
> >>
> >>  - If there are still more splits with overlapping
> >>
> >> event
> >>
> >> time
> >>
> >> range
> >>
> >> for
> >>
> >> a
> >>
> >> split reader, then that split reader should not advance
> >>
> >> the
> >>
> >> watermark
> >>
> >> within the split beyond the overlap boundary. Otherwise
> >>
> >> future
> >>
> >> splits
> >>
> >> will
> >>
> >> produce late data.
> >>
> >>  - One way to approach this could be that the split
> >>
> >> enumerator
> >>
> >> may
> >>
> >> send
> >>
> >> watermarks to the readers, and the readers cannot emit
> >>
> >> watermarks
> >>
> >> beyond
> >>
> >> that received watermark.
> >>  - Many split enumerators would simply immediately send
> >>
> >> Long.MAX
> >>
> >> out
> >>
> >> and
> >>
> >> leave the progress purely to the split readers.
> >>
> >>  - For event-time alignment / split back pressure, this
> >>
> >> begs
> >>
> >> the
> >>
> >> question
> >>
> >> how we can avoid deadlocks that may arise when splits
> >>
> >> are
> >>
> >> suspended
> >>
> >> for
> >>
> >> event time back pressure,
> >>
> >> *(7) Batch and streaming Unification*
> >>
> >>  - Functionality wise, the above design should support
> >>
> >> both
> >>
> >>  - Batch often (mostly) does not care about reading "in
> >>
> >> order"
> >>
> >> and
> >>
> >> generating watermarks
> >>    --> Might use different enumerator logic that is
> >>
> >> more
> >>
> >> locality
> >>
> >> aware
> >>
> >> and ignores event time order
> >>    --> Does not generate watermarks
> >>  - Would be great if bounded sources could be
> >>
> >> identified
> >>
> >> at
> >>
> >> compile
> >>
> >> time,
> >>
> >> so that "env.addBoundedSource(...)" is type safe and
> >>
> >> can
> >>
> >> return a
> >>
> >> "BoundedDataStream".
> >>  - Possible to defer this discussion until later
> >>
> >> *Miscellaneous Comments*
> >>
> >>  - Should the source have a TypeInformation for the
> >>
> >> produced
> >>
> >> type,
> >>
> >> instead
> >>
> >> of a serializer? We need a type information in the
> >>
> >> stream
> >>
> >> anyways, and
> >>
> >> can
> >>
> >> derive the serializer from that. Plus, creating the
> >>
> >> serializer
> >>
> >> should
> >>
> >> respect the ExecutionConfig.
> >>
> >>  - The TypeSerializer interface is very powerful but
> >>
> >> also
> >>
> >> not
> >>
> >> easy to
> >>
> >> implement. Its purpose is to handle data super
> >>
> >> efficiently,
> >>
> >> support
> >>
> >> flexible ways of evolution, etc.
> >>  For metadata I would suggest to look at the
> >>
> >> SimpleVersionedSerializer
> >>
> >> instead, which is used for example for checkpoint
> >>
> >> master
> >>
> >> hooks,
> >>
> >> or for
> >>
> >> the
> >>
> >> streaming file sink. I think that is is a good match
> >>
> >> for
> >>
> >> cases
> >>
> >> where
> >>
> >> we
> >>
> >> do
> >>
> >> not need more than ser/deser (no copy, etc.) and don't
> >>
> >> need to
> >>
> >> push
> >>
> >> versioning out of the serialization paths for best
> >>
> >> performance
> >>
> >> (as in
> >>
> >> the
> >>
> >> TypeSerializer)
> >>
> >>
> >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
> k.klou...@data-artisans.com>
> >> wrote:
> >>
> >>
> >> Hi Biao,
> >>
> >> Thanks for the answer!
> >>
> >> So given the multi-threaded readers, now we have as
> >>
> >> open
> >>
> >> questions:
> >>
> >> 1) How do we let the checkpoints pass through our
> >>
> >> multi-threaded
> >>
> >> reader
> >>
> >> operator?
> >>
> >> 2) Do we have separate reader and source operators or
> >>
> >> not? In
> >>
> >> the
> >>
> >> strategy
> >>
> >> that has a separate source, the source operator has a
> >>
> >> parallelism of
> >>
> >> 1
> >>
> >> and
> >>
> >> is responsible for split recovery only.
> >>
> >> For the first one, given also the constraints
> >>
> >> (blocking,
> >>
> >> finite
> >>
> >> queues,
> >>
> >> etc), I do not have an answer yet.
> >>
> >> For the 2nd, I think that we should go with separate
> >>
> >> operators
> >>
> >> for
> >>
> >> the
> >>
> >> source and the readers, for the following reasons:
> >>
> >> 1) This is more aligned with a potential future
> >>
> >> improvement
> >>
> >> where the
> >>
> >> split
> >>
> >> discovery becomes a responsibility of the JobManager
> >>
> >> and
> >>
> >> readers are
> >>
> >> pooling more work from the JM.
> >>
> >> 2) The source is going to be the "single point of
> >>
> >> truth".
> >>
> >> It
> >>
> >> will
> >>
> >> know
> >>
> >> what
> >>
> >> has been processed and what not. If the source and the
> >>
> >> readers
> >>
> >> are a
> >>
> >> single
> >>
> >> operator with parallelism > 1, or in general, if the
> >>
> >> split
> >>
> >> discovery
> >>
> >> is
> >>
> >> done by each task individually, then:
> >>   i) we have to have a deterministic scheme for each
> >>
> >> reader to
> >>
> >> assign
> >>
> >> splits to itself (e.g. mod subtaskId). This is not
> >>
> >> necessarily
> >>
> >> trivial
> >>
> >> for
> >>
> >> all sources.
> >>   ii) each reader would have to keep a copy of all its
> >>
> >> processed
> >>
> >> slpits
> >>
> >>   iii) the state has to be a union state with a
> >>
> >> non-trivial
> >>
> >> merging
> >>
> >> logic
> >>
> >> in order to support rescaling.
> >>
> >> Two additional points that you raised above:
> >>
> >> i) The point that you raised that we need to keep all
> >>
> >> splits
> >>
> >> (processed
> >>
> >> and
> >>
> >> not-processed) I think is a bit of a strong
> >>
> >> requirement.
> >>
> >> This
> >>
> >> would
> >>
> >> imply
> >>
> >> that for infinite sources the state will grow
> >>
> >> indefinitely.
> >>
> >> This is
> >>
> >> problem
> >>
> >> is even more pronounced if we do not have a single
> >>
> >> source
> >>
> >> that
> >>
> >> assigns
> >>
> >> splits to readers, as each reader will have its own
> >>
> >> copy
> >>
> >> of
> >>
> >> the
> >>
> >> state.
> >>
> >> ii) it is true that for finite sources we need to
> >>
> >> somehow
> >>
> >> not
> >>
> >> close
> >>
> >> the
> >>
> >> readers when the source/split discoverer finishes. The
> >> ContinuousFileReaderOperator has a work-around for
> >>
> >> that.
> >>
> >> It is
> >>
> >> not
> >>
> >> elegant,
> >>
> >> and checkpoints are not emitted after closing the
> >>
> >> source,
> >>
> >> but
> >>
> >> this, I
> >>
> >> believe, is a bigger problem which requires more
> >>
> >> changes
> >>
> >> than
> >>
> >> just
> >>
> >> refactoring the source interface.
> >>
> >> Cheers,
> >> Kostas
> >>
> >>
> >>
>
>

Reply via email to