Hi Becket,

quick clarification from my side because I think you misunderstood my
question. I did not suggest to let the SourceReader return only a single
record at a time when calling getNextRecords. As the return type indicates,
the method can return an arbitrary number of records.

Cheers,
Till

On Wed, Dec 11, 2019 at 10:13 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Becket,
>
> Issue #1 - Design of Source interface
>
> I mentioned the lack of a method like Source#createEnumerator(Boundedness
> boundedness, SplitEnumeratorContext context), because without the current
> proposal is not complete/does not work.
>
> If we say that boundedness is an intrinsic property of a source imo we
> don't need the Source#createEnumerator(Boundedness boundedness,
> SplitEnumeratorContext context) method.
>
> Assuming a source from my previous example:
>
> Source source = KafkaSource.builder()
>   ...
>   .untilTimestamp(...)
>   .build()
>
> Would the enumerator differ if created like
> source.createEnumerator(CONTINUOUS_UNBOUNDED, ...) vs source
> .createEnumerator(BOUNDED, ...)? I know I am repeating myself, but this is
> the part that my opinion differ the most from the current proposal. I
> really think it should always be the source that tells if it is bounded or
> not. In the current proposal methods continousSource/boundedSource somewhat
> reconfigure the source, which I think is misleading.
>
> I think a call like:
>
> Source source = KafkaSource.builder()
>   ...
>   .readContinously() / readUntilLatestOffset() / readUntilTimestamp / 
> readUntilOffsets / ...
>   .build()
>
> is way cleaner (and expressive) than
>
> Source source = KafkaSource.builder()
>   ...
>   .build()
>
>
> env.continousSource(source) // which actually underneath would call 
> createEnumerator(CONTINUOUS, ctx) which would be equivalent to 
> source.readContinously().createEnumerator(ctx)
> // or
> env.boundedSource(source) // which actually underneath would call 
> createEnumerator(BOUNDED, ctx) which would be equivalent to 
> source.readUntilLatestOffset().createEnumerator(ctx)
>
>
> Sorry for the comparison, but to me it seems there is too much magic
> happening underneath those two calls.
>
> I really believe the Source interface should have getBoundedness method
> instead of (supportBoundedness) + createEnumerator(Boundedness, ...)
>
>
> Issue #2 - Design of
> ExecutionEnvironment#source()/continuousSource()/boundedSource()
>
> As you might have guessed I am slightly in favor of option #2 modified.
> Yes I am aware every step of the dag would have to be able to say if it is
> bounded or not. I have a feeling it would be easier to express cross
> bounded/unbounded operations, but I must admit I have not thought it
> through thoroughly, In the spirit of batch is just a special case of
> streaming I thought BoundedStream would extend from DataStream. Correct me
> if I am wrong. In such a setup the cross bounded/unbounded operation could
> be expressed quite easily I think:
>
> DataStream {
>   DataStream join(DataStream, ...); // we could not really tell if the result 
> is bounded or not, but because bounded stream is a special case of unbounded 
> the API object is correct, irrespective if the left or right side of the join 
> is bounded
> }
>
> BoundedStream extends DataStream {
>   BoundedStream join(BoundedStream, ...); // only if both sides are bounded 
> the result can be bounded as well. However we do have access to the 
> DataStream#join here, so you can still join with a DataStream
> }
>
>
> On the other hand I also see benefits of two completely disjointed APIs,
> as we could prohibit some streaming calls in the bounded API. I can't think
> of any unbounded operators that could not be implemented for bounded stream.
>
> Besides I think we both agree we don't like the method:
>
> DataStream boundedStream(Source)
>
> suggested in the current state of the FLIP. Do we ? :)
>
> Best,
>
> Dawid
>
> On 10/12/2019 18:57, Becket Qin wrote:
>
> Hi folks,
>
> Thanks for the discussion, great feedback. Also thanks Dawid for the
> explanation, it is much clearer now.
>
> One thing that is indeed missing from the FLIP is how the boundedness is
> passed to the Source implementation. So the API should be
> Source#createEnumerator(Boundedness boundedness, SplitEnumeratorContext
> context)
> And we can probably remove the Source#supportBoundedness(Boundedness
> boundedness) method.
>
> Assuming we have that, we are essentially choosing from one of the
> following two options:
>
> Option 1:
> // The source is continuous source, and only unbounded operations can be
> performed.
> DataStream<Type> datastream = env.continuousSource(someSource);
>
> // The source is bounded source, both bounded and unbounded operations can
> be performed.
> BoundedDataStream<Type> boundedDataStream = env.boundedSource(someSource);
>
>   - Pros:
>        a) explicit boundary between bounded / unbounded streams, it is
> quite simple and clear to the users.
>   - Cons:
>        a) For applications that do not involve bounded operations, they
> still have to call different API to distinguish bounded / unbounded streams.
>        b) No support for bounded stream to run in a streaming runtime
> setting, i.e. scheduling and operators behaviors.
>
>
> Option 2:
> // The source is either bounded or unbounded, but only unbounded operations
> could be performed on the returned DataStream.
> DataStream<Type> dataStream = env.source(someSource);
>
> // The source must be a bounded source, otherwise exception is thrown.
> BoundedDataStream<Type> boundedDataStream =
> env.boundedSource(boundedSource);
>
> The pros and cons are exactly the opposite of option 1.
>   - Pros:
>        a) For applications that do not involve bounded operations, they
> still have to call different API to distinguish bounded / unbounded streams.
>        b) Support for bounded stream to run in a streaming runtime setting,
> i.e. scheduling and operators behaviors.
>   - Cons:
>        a) Bounded / unbounded streams are kind of mixed, i.e. given a
> DataStream, it is not clear whether it is bounded or not, unless you have
> the access to its source.
>
>
> If we only think from the Source API perspective, option 2 seems a better
> choice because functionality wise it is a superset of option 1, at the cost
> of some seemingly acceptable ambiguity in the DataStream API.
> But if we look at the DataStream API as a whole, option 1 seems a clearer
> choice. For example, some times a library may have to know whether a
> certain task will finish or not. And it would be difficult to tell if the
> input is a DataStream, unless additional information is provided all the
> way from the Source. One possible solution is to have a *modified option 2*
> which adds a method to the DataStream API to indicate boundedness, such as
> getBoundedness(). It would solve the problem with a potential confusion of
> what is difference between a DataStream with getBoundedness()=true and a
> BoundedDataStream. But that seems not super difficult to explain.
>
> So from API's perspective, I don't have a strong opinion between *option 1*
> and *modified option 2. *I like the cleanness of option 1, but modified
> option 2 would be more attractive if we have concrete use case for the
> "Bounded stream with unbounded streaming runtime settings".
>
> Re: Till
>
>
> Maybe this has already been asked before but I was wondering why the
> SourceReader interface has the method pollNext which hands the
> responsibility of outputting elements to the SourceReader implementation?
> Has this been done for backwards compatibility reasons with the old source
> interface? If not, then one could define a Collection<E> getNextRecords()
> method which returns the currently retrieved records and then the caller
> emits them outside of the SourceReader. That way the interface would not
> allow to implement an outputting loop where we never hand back control to
> the caller. At the moment, this contract can be easily broken and is only
> mentioned loosely in the JavaDocs.
>
>
> The primary reason we handover the SourceOutput to the SourceReader is
> because sometimes it is difficult for a SourceReader to emit one record at
> a time. One example is some batched messaging systems which only have an
> offset for the entire batch instead of individual messages in the batch. In
> that case, returning one record at a time would leave the SourceReader in
> an uncheckpointable state because they can only checkpoint at the batch
> boundaries.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Dec 10, 2019 at 5:33 PM Till Rohrmann <trohrm...@apache.org> 
> <trohrm...@apache.org> wrote:
>
>
> Hi everyone,
>
> thanks for drafting this FLIP. It reads very well.
>
> Concerning Dawid's proposal, I tend to agree. The boundedness could come
> from the source and tell the system how to treat the operator (scheduling
> wise). From a user's perspective it should be fine to get back a DataStream
> when calling env.source(boundedSource) if he does not need special
> operations defined on a BoundedDataStream. If he needs this, then one could
> use the method BoundedDataStream env.boundedSource(boundedSource).
>
> If possible, we could enforce the proper usage of env.boundedSource() by
> introducing a BoundedSource type so that one cannot pass an
> unbounded source to it. That way users would not be able to shoot
> themselves in the foot.
>
> Maybe this has already been asked before but I was wondering why the
> SourceReader interface has the method pollNext which hands the
> responsibility of outputting elements to the SourceReader implementation?
> Has this been done for backwards compatibility reasons with the old source
> interface? If not, then one could define a Collection<E> getNextRecords()
> method which returns the currently retrieved records and then the caller
> emits them outside of the SourceReader. That way the interface would not
> allow to implement an outputting loop where we never hand back control to
> the caller. At the moment, this contract can be easily broken and is only
> mentioned loosely in the JavaDocs.
>
> Cheers,
> Till
>
> On Tue, Dec 10, 2019 at 7:49 AM Jingsong Li <jingsongl...@gmail.com> 
> <jingsongl...@gmail.com>
> wrote:
>
>
> Hi all,
>
> I think current design is good.
>
> My understanding is:
>
> For execution mode: bounded mode and continuous mode, It's totally
> different. I don't think we have the ability to integrate the two models
>
> at
>
> present. It's about scheduling, memory, algorithms, States, etc. we
> shouldn't confuse them.
>
> For source capabilities: only bounded, only continuous, both bounded and
> continuous.
> I think Kafka is a source that can be ran both bounded
> and continuous execution mode.
> And Kafka with end offset should be ran both bounded
> and continuous execution mode.  Using apache Beam with Flink runner, I
>
> used
>
> to run a "bounded" Kafka in streaming mode. For our previous DataStream,
>
> it
>
> is not necessarily required that the source cannot be bounded.
>
> So it is my thought for Dawid's question:
> 1.pass a bounded source to continuousSource() +1
> 2.pass a continuous source to boundedSource() -1, should throw exception.
>
> In StreamExecutionEnvironment, continuousSource and boundedSource define
> the execution mode. It defines a clear boundary of execution mode.
>
> Best,
> Jingsong Lee
>
> On Tue, Dec 10, 2019 at 10:37 AM Jark Wu <imj...@gmail.com> 
> <imj...@gmail.com> wrote:
>
>
> I agree with Dawid's point that the boundedness information should come
> from the source itself (e.g. the end timestamp), not through
> env.boundedSouce()/continuousSource().
> I think if we want to support something like `env.source()` that derive
>
> the
>
> execution mode from source, `supportsBoundedness(Boundedness)`
> method is not enough, because we don't know whether it is bounded or
>
> not.
>
> Best,
> Jark
>
>
> On Mon, 9 Dec 2019 at 22:21, Dawid Wysakowicz <dwysakow...@apache.org> 
> <dwysakow...@apache.org>
> wrote:
>
>
> One more thing. In the current proposal, with the
> supportsBoundedness(Boundedness) method and the boundedness coming
>
> from
>
> either continuousSource or boundedSource I could not find how this
> information is fed back to the SplitEnumerator.
>
> Best,
>
> Dawid
>
> On 09/12/2019 13:52, Becket Qin wrote:
>
> Hi Dawid,
>
> Thanks for the comments. This actually brings another relevant
>
> question
>
> about what does a "bounded source" imply. I actually had the same
> impression when I look at the Source API. Here is what I understand
>
> after
>
> some discussion with Stephan. The bounded source has the following
>
> impacts.
>
> 1. API validity.
> - A bounded source generates a bounded stream so some operations
>
> that
>
> only
>
> works for bounded records would be performed, e.g. sort.
> - To expose these bounded stream only APIs, there are two options:
>      a. Add them to the DataStream API and throw exception if a
>
> method
>
> is
>
> called on an unbounded stream.
>      b. Create a BoundedDataStream class which is returned from
> env.boundedSource(), while DataStream is returned from
>
> env.continousSource().
>
> Note that this cannot be done by having single
>
> env.source(theSource)
>
> even
>
> the Source has a getBoundedness() method.
>
> 2. Scheduling
> - A bounded source could be computed stage by stage without
>
> bringing
>
> up
>
> all
>
> the tasks at the same time.
>
> 3. Operator behaviors
> - A bounded source indicates the records are finite so some
>
> operators
>
> can
>
> wait until it receives all the records before it starts the
>
> processing.
>
> In the above impact, only 1 is relevant to the API design. And the
>
> current
>
> proposal in FLIP-27 is following 1.b.
>
> // boundedness depends of source property, imo this should always
>
> be
>
> preferred
>
>
> DataStream<MyType> stream = env.source(theSource);
>
>
> In your proposal, does DataStream have bounded stream only methods?
>
> It
>
> looks it should have, otherwise passing a bounded Source to
>
> env.source()
>
> would be confusing. In that case, we will essentially do 1.a if an
> unbounded Source is created from env.source(unboundedSource).
>
> If we have the methods only supported for bounded streams in
>
> DataStream,
>
> it
>
> seems a little weird to have a separate BoundedDataStream
>
> interface.
>
> Am I understand it correctly?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz <
>
> dwysakow...@apache.org>
>
> wrote:
>
>
> Hi all,
>
> Really well written proposal and very important one. I must admit
>
> I
>
> have
>
> not understood all the intricacies of it yet.
>
> One question I have though is about where does the information
>
> about
>
> boundedness come from. I think in most cases it is a property of
>
> the
>
> source. As you described it might be e.g. end offset, a flag
>
> should
>
> it
>
> monitor new splits etc. I think it would be a really nice use case
>
> to
>
> be
>
> able to say:
>
> new KafkaSource().readUntil(long timestamp),
>
> which could work as an "end offset". Moreover I think all Bounded
>
> sources
>
> support continuous mode, but no intrinsically continuous source
>
> support
>
> the
>
> Bounded mode. If I understood the proposal correctly it suggest
>
> the
>
> boundedness sort of "comes" from the outside of the source, from
>
> the
>
> invokation of either boundedStream or continousSource.
>
> I am wondering if it would make sense to actually change the
>
> method
>
> boolean Source#supportsBoundedness(Boundedness)
>
> to
>
> Boundedness Source#getBoundedness().
>
> As for the methods #boundedSource, #continousSource, assuming the
> boundedness is property of the source they do not affect how the
>
> enumerator
>
> works, but mostly how the dag is scheduled, right? I am not
>
> against
>
> those
>
> methods, but I think it is a very specific use case to actually
>
> override
>
> the property of the source. In general I would expect users to
>
> only
>
> call
>
> env.source(theSource), where the source tells if it is bounded or
>
> not. I
>
> would suggest considering following set of methods:
>
> // boundedness depends of source property, imo this should always
>
> be
>
> preferred
>
> DataStream<MyType> stream = env.source(theSource);
>
>
> // always continous execution, whether bounded or unbounded source
>
> DataStream<MyType> boundedStream = env.continousSource(theSource);
>
> // imo this would make sense if the BoundedDataStream provides
>
> additional features unavailable for continous mode
>
> BoundedDataStream<MyType> batch = env.boundedSource(theSource);
>
>
> Best,
>
> Dawid
>
>
> On 04/12/2019 11:25, Stephan Ewen wrote:
>
> Thanks, Becket, for updating this.
>
> I agree with moving the aspects you mentioned into separate FLIPs
>
> -
>
> this
>
> one way becoming unwieldy in size.
>
> +1 to the FLIP in its current state. Its a very detailed write-up,
>
> nicely
>
> done!
>
> On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com> 
> <becket....@gmail.com>
>
> <
>
> becket....@gmail.com> wrote:
>
> Hi all,
>
> Sorry for the long belated update. I have updated FLIP-27 wiki
>
> page
>
> with
>
> the latest proposals. Some noticeable changes include:
> 1. A new generic communication mechanism between SplitEnumerator
>
> and
>
> SourceReader.
> 2. Some detail API method signature changes.
>
> We left a few things out of this FLIP and will address them in
>
> separate
>
> FLIPs. Including:
> 1. Per split event time.
> 2. Event time alignment.
> 3. Fine grained failover for SplitEnumerator failure.
>
> Please let us know if you have any question.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org> 
> <se...@apache.org> <
>
> se...@apache.org> wrote:
>
> Hi  Łukasz!
>
> Becket and me are working hard on figuring out the last details
>
> and
>
> implementing the first PoC. We would update the FLIP hopefully
>
> next
>
> week.
>
> There is a fair chance that a first version of this will be in
>
> 1.10,
>
> but
>
> I
>
> think it will take another release to battle test it and migrate
>
> the
>
> connectors.
>
> Best,
> Stephan
>
>
>
>
> On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl
>
> <
>
> l...@touk.pl>
>
> wrote:
>
> Hi,
>
> This proposal looks very promising for us. Do you have any plans
>
> in
>
> which
>
> Flink release it is going to be released? We are thinking on
>
> using a
>
> Data
>
> Set API for our future use cases but on the other hand Data Set
>
> API
>
> is
>
> going to be deprecated so using proposed bounded data streams
>
> solution
>
> could be more viable in the long term.
>
> Thanks,
> Łukasz
>
> On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com> 
> <thomas.we...@gmail.com> <
>
> thomas.we...@gmail.com> wrote:
>
> Thanks for putting together this proposal!
>
> I see that the "Per Split Event Time" and "Event Time Alignment"
>
> sections
>
> are still TBD.
>
> It would probably be good to flesh those out a bit before
>
> proceeding
>
> too
>
> far
>
> as the event time alignment will probably influence the
>
> interaction
>
> with
>
> the split reader, specifically ReaderStatus
>
> emitNext(SourceOutput<E>
>
> output).
>
> We currently have only one implementation for event time alignment
>
> in
>
> the
>
> Kinesis consumer. The synchronization in that case takes place as
>
> the
>
> last
>
> step before records are emitted downstream (RecordEmitter). With
>
> the
>
> currently proposed interfaces, the equivalent can be implemented
>
> in
>
> the
>
> reader loop, although note that in the Kinesis consumer the per
>
> shard
>
> threads push records.
>
> Synchronization has not been implemented for the Kafka consumer
>
> yet.
>
> https://issues.apache.org/jira/browse/FLINK-12675
>
> When I looked at it, I realized that the implementation will look
>
> quite
>
> different
> from Kinesis because it needs to take place in the pull part,
>
> where
>
> records
>
> are taken from the Kafka client. Due to the multiplexing it cannot
>
> be
>
> done
>
> by blocking the split thread like it currently works for Kinesis.
>
> Reading
>
> from individual Kafka partitions needs to be controlled via
>
> pause/resume
>
> on the Kafka client.
>
> To take on that responsibility the split thread would need to be
>
> aware
>
> of
>
> the
> watermarks or at least whether it should or should not continue to
>
> consume
>
> a given split and this may require a different SourceReader or
>
> SourceOutput
>
> interface.
>
> Thanks,
> Thomas
>
>
> On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com> 
> <mmyy1...@gmail.com> <
>
> mmyy1...@gmail.com> wrote:
>
> Hi Stephan,
>
> Thank you for feedback!
> Will take a look at your branch before public discussing.
>
>
> On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org> 
> <se...@apache.org>
>
> <
>
> se...@apache.org>
>
> wrote:
>
> Hi Biao!
>
> Thanks for reviving this. I would like to join this discussion,
>
> but
>
> am
>
> quite occupied with the 1.9 release, so can we maybe pause this
>
> discussion
>
> for a week or so?
>
> In the meantime I can share some suggestion based on prior
>
> experiments:
>
> How to do watermarks / timestamp extractors in a simpler and more
>
> flexible
>
> way. I think that part is quite promising should be part of the
>
> new
>
> source
>
> interface.
>
>
>
>
>
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
>
> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
>
> Some experiments on how to build the source reader and its
>
> library
>
> for
>
> common threading/split patterns:
>
>
>
>
>
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
>
> Best,
> Stephan
>
>
> On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com> 
> <mmyy1...@gmail.com> <
>
> mmyy1...@gmail.com>
>
> wrote:
>
> Hi devs,
>
> Since 1.9 is nearly released, I think we could get back to
>
> FLIP-27.
>
> I
>
> believe it should be included in 1.10.
>
> There are so many things mentioned in document of FLIP-27. [1] I
>
> think
>
> we'd better discuss them separately. However the wiki is not a
>
> good
>
> place
>
> to discuss. I wrote google doc about SplitReader API which
>
> misses
>
> some
>
> details in the document. [2]
>
> 1.
>
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
>
> 2.
>
>
>
>
> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
>
> CC Stephan, Aljoscha, Piotrek, Becket
>
>
> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com> 
> <mmyy1...@gmail.com> <
>
> mmyy1...@gmail.com>
>
> wrote:
>
> Hi Steven,
> Thank you for the feedback. Please take a look at the document
>
> FLIP-27
>
> <
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
> which
>
> is updated recently. A lot of details of enumerator were added
>
> in
>
> this
>
> document. I think it would help.
>
> Steven Wu <stevenz...@gmail.com> <stevenz...@gmail.com> 
> <stevenz...@gmail.com> <stevenz...@gmail.com>
>
> 于2019年3月28日周四
>
> 下午12:52写道:
>
> This proposal mentioned that SplitEnumerator might run on the
> JobManager or
> in a single task on a TaskManager.
>
> if enumerator is a single task on a taskmanager, then the job
>
> DAG
>
> can
>
> never
> been embarrassingly parallel anymore. That will nullify the
>
> leverage
>
> of
>
> fine-grained recovery for embarrassingly parallel jobs.
>
> It's not clear to me what's the implication of running
>
> enumerator
>
> on
>
> the
>
> jobmanager. So I will leave that out for now.
>
> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> 
> <mmyy1...@gmail.com> <
>
> mmyy1...@gmail.com>
>
> wrote:
>
> Hi Stephan & Piotrek,
>
> Thank you for feedback.
>
> It seems that there are a lot of things to do in community.
>
> I
>
> am
>
> just
>
> afraid that this discussion may be forgotten since there so
>
> many
>
> proposals
>
> recently.
> Anyway, wish to see the split topics soon :)
>
> Piotr Nowojski <pi...@da-platform.com> <pi...@da-platform.com> 
> <pi...@da-platform.com> <pi...@da-platform.com>
>
> 于2019年1月24日周四
>
> 下午8:21写道:
>
> Hi Biao!
>
> This discussion was stalled because of preparations for
>
> the
>
> open
>
> sourcing
>
> & merging Blink. I think before creating the tickets we
>
> should
>
> split this
>
> discussion into topics/areas outlined by Stephan and
>
> create
>
> Flips
>
> for
>
> that.
>
> I think there is no chance for this to be completed in
>
> couple
>
> of
>
> remaining
>
> weeks/1 month before 1.8 feature freeze, however it would
>
> be
>
> good
>
> to aim
>
> with those changes for 1.9.
>
> Piotrek
>
>
> On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> <mmyy1...@gmail.com> <
>
> mmyy1...@gmail.com>
>
> wrote:
>
> Hi community,
> The summary of Stephan makes a lot sense to me. It is
>
> much
>
> clearer
>
> indeed
>
> after splitting the complex topic into small ones.
> I was wondering is there any detail plan for next step?
>
> If
>
> not,
>
> I
>
> would
>
> like to push this thing forward by creating some JIRA
>
> issues.
>
> Another question is that should version 1.8 include
>
> these
>
> features?
>
> Stephan Ewen <se...@apache.org> <se...@apache.org> <se...@apache.org> 
> <se...@apache.org> 于2018年12月1日周六
>
> 上午4:20写道:
>
> Thanks everyone for the lively discussion. Let me try
>
> to
>
> summarize
>
> where I
>
> see convergence in the discussion and open issues.
> I'll try to group this by design aspect of the source.
>
> Please
>
> let me
>
> know
>
> if I got things wrong or missed something crucial here.
>
> For issues 1-3, if the below reflects the state of the
>
> discussion, I
>
> would
>
> try and update the FLIP in the next days.
> For the remaining ones we need more discussion.
>
> I would suggest to fork each of these aspects into a
>
> separate
>
> mail
>
> thread,
>
> or will loose sight of the individual aspects.
>
> *(1) Separation of Split Enumerator and Split Reader*
>
>  - All seem to agree this is a good thing
>  - Split Enumerator could in the end live on JobManager
>
> (and
>
> assign
>
> splits
>
> via RPC) or in a task (and assign splits via data
>
> streams)
>
>  - this discussion is orthogonal and should come later,
>
> when
>
> the
>
> interface
>
> is agreed upon.
>
> *(2) Split Readers for one or more splits*
>
>  - Discussion seems to agree that we need to support
>
> one
>
> reader
>
> that
>
> possibly handles multiple splits concurrently.
>  - The requirement comes from sources where one
>
> poll()-style
>
> call
>
> fetches
>
> data from different splits / partitions
>    --> example sources that require that would be for
>
> example
>
> Kafka,
>
> Pravega, Pulsar
>
>  - Could have one split reader per source, or multiple
>
> split
>
> readers
>
> that
>
> share the "poll()" function
>  - To not make it too complicated, we can start with
>
> thinking
>
> about
>
> one
>
> split reader for all splits initially and see if that
>
> covers
>
> all
>
> requirements
>
> *(3) Threading model of the Split Reader*
>
>  - Most active part of the discussion ;-)
>
>  - A non-blocking way for Flink's task code to interact
>
> with
>
> the
>
> source
>
> is
>
> needed in order to a task runtime code based on a
> single-threaded/actor-style task design
>    --> I personally am a big proponent of that, it will
>
> help
>
> with
>
> well-behaved checkpoints, efficiency, and simpler yet
>
> more
>
> robust
>
> runtime
>
> code
>
>  - Users care about simple abstraction, so as a
>
> subclass
>
> of
>
> SplitReader
>
> (non-blocking / async) we need to have a
>
> BlockingSplitReader
>
> which
>
> will
>
> form the basis of most source implementations.
>
> BlockingSplitReader
>
> lets
>
> users do blocking simple poll() calls.
>  - The BlockingSplitReader would spawn a thread (or
>
> more)
>
> and
>
> the
>
> thread(s) can make blocking calls and hand over data
>
> buffers
>
> via
>
> a
>
> blocking
>
> queue
>  - This should allow us to cover both, a fully async
>
> runtime,
>
> and a
>
> simple
>
> blocking interface for users.
>  - This is actually very similar to how the Kafka
>
> connectors
>
> work.
>
> Kafka
>
> 9+ with one thread, Kafka 8 with multiple threads
>
>  - On the base SplitReader (the async one), the
>
> non-blocking
>
> method
>
> that
>
> gets the next chunk of data would signal data
>
> availability
>
> via
>
> a
>
> CompletableFuture, because that gives the best
>
> flexibility
>
> (can
>
> await
>
> completion or register notification handlers).
>  - The source task would register a "thenHandle()" (or
>
> similar)
>
> on the
>
> future to put a "take next data" task into the
>
> actor-style
>
> mailbox
>
> *(4) Split Enumeration and Assignment*
>
>  - Splits may be generated lazily, both in cases where
>
> there
>
> is a
>
> limited
>
> number of splits (but very many), or splits are
>
> discovered
>
> over
>
> time
>
>  - Assignment should also be lazy, to get better load
>
> balancing
>
>  - Assignment needs support locality preferences
>
>  - Possible design based on discussion so far:
>
>    --> SplitReader has a method "addSplits(SplitT...)"
>
> to
>
> add
>
> one or
>
> more
>
> splits. Some split readers might assume they have only
>
> one
>
> split
>
> ever,
>
> concurrently, others assume multiple splits. (Note:
>
> idea
>
> behind
>
> being
>
> able
>
> to add multiple splits at the same time is to ease
>
> startup
>
> where
>
> multiple
>
> splits may be assigned instantly.)
>    --> SplitReader has a context object on which it can
>
> call
>
> indicate
>
> when
>
> splits are completed. The enumerator gets that
>
> notification and
>
> can
>
> use
>
> to
>
> decide when to assign new splits. This should help both
>
> in
>
> cases
>
> of
>
> sources
>
> that take splits lazily (file readers) and in case the
>
> source
>
> needs to
>
> preserve a partial order between splits (Kinesis,
>
> Pravega,
>
> Pulsar may
>
> need
>
> that).
>    --> SplitEnumerator gets notification when
>
> SplitReaders
>
> start
>
> and
>
> when
>
> they finish splits. They can decide at that moment to
>
> push
>
> more
>
> splits
>
> to
>
> that reader
>    --> The SplitEnumerator should probably be aware of
>
> the
>
> source
>
> parallelism, to build its initial distribution.
>
>  - Open question: Should the source expose something
>
> like
>
> "host
>
> preferences", so that yarn/mesos/k8s can take this into
>
> account
>
> when
>
> selecting a node to start a TM on?
>
> *(5) Watermarks and event time alignment*
>
>  - Watermark generation, as well as idleness, needs to
>
> be
>
> per
>
> split
>
> (like
>
> currently in the Kafka Source, per partition)
>  - It is desirable to support optional
>
> event-time-alignment,
>
> meaning
>
> that
>
> splits that are ahead are back-pressured or temporarily
>
> unsubscribed
>
>  - I think i would be desirable to encapsulate
>
> watermark
>
> generation
>
> logic
>
> in watermark generators, for a separation of concerns.
>
> The
>
> watermark
>
> generators should run per split.
>  - Using watermark generators would also help with
>
> another
>
> problem of
>
> the
>
> suggested interface, namely supporting non-periodic
>
> watermarks
>
> efficiently.
>
>  - Need a way to "dispatch" next record to different
>
> watermark
>
> generators
>
>  - Need a way to tell SplitReader to "suspend" a split
>
> until a
>
> certain
>
> watermark is reached (event time backpressure)
>  - This would in fact be not needed (and thus simpler)
>
> if
>
> we
>
> had
>
> a
>
> SplitReader per split and may be a reason to re-open
>
> that
>
> discussion
>
> *(6) Watermarks across splits and in the Split
>
> Enumerator*
>
>  - The split enumerator may need some watermark
>
> awareness,
>
> which
>
> should
>
> be
>
> purely based on split metadata (like create timestamp
>
> of
>
> file
>
> splits)
>
>  - If there are still more splits with overlapping
>
> event
>
> time
>
> range
>
> for
>
> a
>
> split reader, then that split reader should not advance
>
> the
>
> watermark
>
> within the split beyond the overlap boundary. Otherwise
>
> future
>
> splits
>
> will
>
> produce late data.
>
>  - One way to approach this could be that the split
>
> enumerator
>
> may
>
> send
>
> watermarks to the readers, and the readers cannot emit
>
> watermarks
>
> beyond
>
> that received watermark.
>  - Many split enumerators would simply immediately send
>
> Long.MAX
>
> out
>
> and
>
> leave the progress purely to the split readers.
>
>  - For event-time alignment / split back pressure, this
>
> begs
>
> the
>
> question
>
> how we can avoid deadlocks that may arise when splits
>
> are
>
> suspended
>
> for
>
> event time back pressure,
>
> *(7) Batch and streaming Unification*
>
>  - Functionality wise, the above design should support
>
> both
>
>  - Batch often (mostly) does not care about reading "in
>
> order"
>
> and
>
> generating watermarks
>    --> Might use different enumerator logic that is
>
> more
>
> locality
>
> aware
>
> and ignores event time order
>    --> Does not generate watermarks
>  - Would be great if bounded sources could be
>
> identified
>
> at
>
> compile
>
> time,
>
> so that "env.addBoundedSource(...)" is type safe and
>
> can
>
> return a
>
> "BoundedDataStream".
>  - Possible to defer this discussion until later
>
> *Miscellaneous Comments*
>
>  - Should the source have a TypeInformation for the
>
> produced
>
> type,
>
> instead
>
> of a serializer? We need a type information in the
>
> stream
>
> anyways, and
>
> can
>
> derive the serializer from that. Plus, creating the
>
> serializer
>
> should
>
> respect the ExecutionConfig.
>
>  - The TypeSerializer interface is very powerful but
>
> also
>
> not
>
> easy to
>
> implement. Its purpose is to handle data super
>
> efficiently,
>
> support
>
> flexible ways of evolution, etc.
>  For metadata I would suggest to look at the
>
> SimpleVersionedSerializer
>
> instead, which is used for example for checkpoint
>
> master
>
> hooks,
>
> or for
>
> the
>
> streaming file sink. I think that is is a good match
>
> for
>
> cases
>
> where
>
> we
>
> do
>
> not need more than ser/deser (no copy, etc.) and don't
>
> need to
>
> push
>
> versioning out of the serialization paths for best
>
> performance
>
> (as in
>
> the
>
> TypeSerializer)
>
>
> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
>
> k.klou...@data-artisans.com>
>
> wrote:
>
>
> Hi Biao,
>
> Thanks for the answer!
>
> So given the multi-threaded readers, now we have as
>
> open
>
> questions:
>
> 1) How do we let the checkpoints pass through our
>
> multi-threaded
>
> reader
>
> operator?
>
> 2) Do we have separate reader and source operators or
>
> not? In
>
> the
>
> strategy
>
> that has a separate source, the source operator has a
>
> parallelism of
>
> 1
>
> and
>
> is responsible for split recovery only.
>
> For the first one, given also the constraints
>
> (blocking,
>
> finite
>
> queues,
>
> etc), I do not have an answer yet.
>
> For the 2nd, I think that we should go with separate
>
> operators
>
> for
>
> the
>
> source and the readers, for the following reasons:
>
> 1) This is more aligned with a potential future
>
> improvement
>
> where the
>
> split
>
> discovery becomes a responsibility of the JobManager
>
> and
>
> readers are
>
> pooling more work from the JM.
>
> 2) The source is going to be the "single point of
>
> truth".
>
> It
>
> will
>
> know
>
> what
>
> has been processed and what not. If the source and the
>
> readers
>
> are a
>
> single
>
> operator with parallelism > 1, or in general, if the
>
> split
>
> discovery
>
> is
>
> done by each task individually, then:
>   i) we have to have a deterministic scheme for each
>
> reader to
>
> assign
>
> splits to itself (e.g. mod subtaskId). This is not
>
> necessarily
>
> trivial
>
> for
>
> all sources.
>   ii) each reader would have to keep a copy of all its
>
> processed
>
> slpits
>
>   iii) the state has to be a union state with a
>
> non-trivial
>
> merging
>
> logic
>
> in order to support rescaling.
>
> Two additional points that you raised above:
>
> i) The point that you raised that we need to keep all
>
> splits
>
> (processed
>
> and
>
> not-processed) I think is a bit of a strong
>
> requirement.
>
> This
>
> would
>
> imply
>
> that for infinite sources the state will grow
>
> indefinitely.
>
> This is
>
> problem
>
> is even more pronounced if we do not have a single
>
> source
>
> that
>
> assigns
>
> splits to readers, as each reader will have its own
>
> copy
>
> of
>
> the
>
> state.
>
> ii) it is true that for finite sources we need to
>
> somehow
>
> not
>
> close
>
> the
>
> readers when the source/split discoverer finishes. The
> ContinuousFileReaderOperator has a work-around for
>
> that.
>
> It is
>
> not
>
> elegant,
>
> and checkpoints are not emitted after closing the
>
> source,
>
> but
>
> this, I
>
> believe, is a bigger problem which requires more
>
> changes
>
> than
>
> just
>
> refactoring the source interface.
>
> Cheers,
> Kostas
>
>
>
>
> --
> Best, Jingsong Lee
>
>
>

Reply via email to