Hi Steven,

Unfortunately we were behind schedule and did not get this into 1.10... So
it will be in 1.11 instead.

Thanks,

Jiangjie (Becket) Qin

On Thu, Jan 16, 2020 at 10:39 AM Steven Wu <stevenz...@gmail.com> wrote:

> Becket, is FLIP-27 still on track to be released in 1.10?
>
> On Tue, Jan 7, 2020 at 7:04 PM Becket Qin <becket....@gmail.com> wrote:
>
> > Hi folks,
> >
> > Happy new year!
> >
> > Stephan and I chatted offline yesterday. After reading the email thread
> > again, I found that I have misunderstood Dawid's original proposal
> > regarding the behavior of env.source(BoundedSource) and had an incorrect
> > impression about the behavior of java covariant return type.
> > Anyways, I agree what Dawid originally proposed makes sense, which is the
> > following API:
> >
> > // Return a BoundedDataStream instance if the source is bounded.
> > // Return a DataStream instance if the source is unbounded.
> > DataStream env.source(Source);
> >
> > // Throws exception if the source is unbounded.
> > // Used when users knows the source is bounded at programming time.
> > BoundedDataStream env.boundedSource(Source);
> >
> > A BoundedDataStream only runs in batch execution mode.
> > A DataStream only runs in streaming execution mode.
> >
> > To run a bounded source in streaming execution mode, one would do the
> > following:
> >
> > // Return a DataStream instance with a source that will stop at some
> point;
> > DataStream env.source(SourceUtils.asUnbounded(myBoundedSource));
> >
> > I'll update the FLIP wiki and resume the vote if there is no further
> > concerns.
> >
> > Apologies for the misunderstanding and thanks for all the patient
> > discussions.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Mon, Dec 23, 2019 at 8:00 AM Becket Qin <becket....@gmail.com> wrote:
> >
> > > Hi Steven,
> > >
> > > I think the current proposal is what you mentioned - a Kafka source
> that
> > > can be constructed in either BOUNDED or UNBOUNDED mode. And Flink can
> get
> > > the boundedness by invoking getBoundedness().
> > >
> > > So one can create a Kafka source by doing something like the following:
> > >
> > > new KafkaSource().startOffset(),endOffset(); // A bounded instance.
> > > new KafkaSource().startOffset(); // An unbounded instance.
> > >
> > > If users want to have an UNBOUNDED Kafka source that stops at some
> point.
> > > They can wrap the BOUNDED Kafka source like below:
> > >
> > > SourceUtils.asUnbounded(new KafkaSource.startOffset().endOffset());
> > >
> > > The wrapped source would be an unbounded Kafka source that stops at the
> > > end offset.
> > >
> > > Does that make sense?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Fri, Dec 20, 2019 at 1:31 PM Jark Wu <imj...@gmail.com> wrote:
> > >
> > >> Hi,
> > >>
> > >> First of all, I think it is not called "UNBOUNDED", according to the
> > >> FLIP-27, it is called "CONTINUOUS_UNBOUNDED".
> > >> And from the description of the Boundedness in the FLIP-27[1] declares
> > >> clearly what Becket and I think.
> > >>
> > >> public enum Boundedness {
> > >>
> > >>     /**
> > >>      * A bounded source processes the data that is currently available
> > and
> > >> will end after that.
> > >>      *
> > >>      * <p>When a source produces a bounded stream, the runtime may
> > >> activate
> > >> additional optimizations
> > >>      * that are suitable only for bounded input. Incorrectly producing
> > >> unbounded data when the source
> > >>      * is set to produce a bounded stream will often result in
> programs
> > >> that do not output any results
> > >>      * and may eventually fail due to runtime errors (out of memory or
> > >> storage).
> > >>      */
> > >>     BOUNDED,
> > >>
> > >>     /**
> > >>      * A continuous unbounded source continuously processes all data
> as
> > it
> > >> comes.
> > >>      *
> > >>      * <p>The source may run forever (until the program is terminated)
> > or
> > >> might actually end at some point,
> > >>      * based on some source-specific conditions. Because that is not
> > >> transparent to the runtime,
> > >>      * the runtime will use an execution mode for continuous unbounded
> > >> streams whenever this mode
> > >>      * is chosen.
> > >>      */
> > >>     CONTINUOUS_UNBOUNDED
> > >> }
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >> [1]:
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP-27:RefactorSourceInterface-Source
> > >>
> > >>
> > >>
> > >> On Fri, 20 Dec 2019 at 12:55, Steven Wu <stevenz...@gmail.com> wrote:
> > >>
> > >> > Becket,
> > >> >
> > >> > Regarding "UNBOUNDED source that stops at some point", I found it
> > >> difficult
> > >> > to grasp what UNBOUNDED really mean.
> > >> >
> > >> > If we want to use Kafka source with an end/stop time, I guess you
> call
> > >> it
> > >> > UNBOUNDED kafka source that stops (aka BOUNDED-streaming). The
> > >> > terminology is a little confusing to me. Maybe BOUNDED/UNBOUNDED
> > >> shouldn't
> > >> > be used to categorize source. Just call it Kafka source and it can
> run
> > >> in
> > >> > either BOUNDED or UNBOUNDED mode.
> > >> >
> > >> > Thanks,
> > >> > Steven
> > >> >
> > >> > On Thu, Dec 19, 2019 at 7:02 PM Becket Qin <becket....@gmail.com>
> > >> wrote:
> > >> >
> > >> > > I had an offline chat with Jark, and here are some more thoughts:
> > >> > >
> > >> > > 1. From SQL perspective, BOUNDED source leads to the batch
> execution
> > >> > mode,
> > >> > > UNBOUNDED source leads to the streaming execution mode.
> > >> > > 2. The semantic of UNBOUNDED source is may or may not stop. The
> > >> semantic
> > >> > of
> > >> > > BOUNDED source is will stop.
> > >> > > 3. The semantic of DataStream is may or may not terminate. The
> > >> semantic
> > >> > of
> > >> > > BoundedDataStream is will terminate.
> > >> > >
> > >> > > Given that, option 3 seems a better option because:
> > >> > > 1. SQL already has strict binding between Boundedness and
> execution
> > >> mode.
> > >> > > Letting DataStream be consistent would be good.
> > >> > > 2. The semantic of UNBOUNDED source is exactly the same as
> > >> DataStream. So
> > >> > > we should avoid breaking such semantic, i.e. turning some
> DataStream
> > >> from
> > >> > > "may or may not terminate" to "will terminate".
> > >> > >
> > >> > > For case where users want BOUNDED-streaming combination, they can
> > >> simply
> > >> > > use an UNBOUNDED source that stops at some point. We can even
> > provide
> > >> a
> > >> > > simple wrapper to wrap a BOUNDED source as an UNBOUNDED source if
> > that
> > >> > > helps. But API wise, option 3 seems telling a pretty good whole
> > story.
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jiangjie (Becket) Qin
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > On Thu, Dec 19, 2019 at 10:30 PM Becket Qin <becket....@gmail.com
> >
> > >> > wrote:
> > >> > >
> > >> > > > Hi Timo,
> > >> > > >
> > >> > > > Bounded is just a special case of unbounded and every bounded
> > source
> > >> > can
> > >> > > >> also be treated as an unbounded source. This would unify the
> API
> > if
> > >> > > >> people don't need a bounded operation.
> > >> > > >
> > >> > > >
> > >> > > > With option 3 users can still get a unified API with something
> > like
> > >> > > below:
> > >> > > >
> > >> > > > DataStream boundedStream = env.boundedSource(boundedSource);
> > >> > > > DataStream unboundedStream = env.source(unboundedSource);
> > >> > > >
> > >> > > > So in both cases, users can still use a unified DataStream
> without
> > >> > > > touching the bounded stream only methods.
> > >> > > > By "unify the API if people don't need the bounded operation".
> Do
> > >> you
> > >> > > > expect a DataStream with a Bounded source to have the batch
> > >> operators
> > >> > and
> > >> > > > scheduler settings as well?
> > >> > > >
> > >> > > >
> > >> > > > If we allow DataStream from BOUNDED source, we will essentially
> > pick
> > >> > > "*modified
> > >> > > > option 2*".
> > >> > > >
> > >> > > > // The source is either bounded or unbounded, but only unbounded
> > >> > > >> operations could be performed on the returned DataStream.
> > >> > > >> DataStream<Type> dataStream = env.source(someSource);
> > >> > > >
> > >> > > >
> > >> > > >> // The source must be a bounded source, otherwise exception is
> > >> thrown.
> > >> > > >> BoundedDataStream<Type> boundedDataStream =
> > >> > > >> env.boundedSource(boundedSource);
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > > // Add the following method to DataStream
> > >> > > >
> > >> > > > Boundedness DataStream#getBoundedness();
> > >> > > >
> > >> > > >
> > >> > > > From pure logical perspective, Boundedness and runtime settings
> > >> > > > (Stream/Batch) are two orthogonal dimensions. And are specified
> in
> > >> the
> > >> > > > following way.
> > >> > > >
> > >> > > > *Boundedness* - defined by the source: BOUNDED / UNBOUNDED.
> > >> > > > *Running mode* - defined by the API class: DataStream (Streaming
> > >> mode)
> > >> > /
> > >> > > > BoundedDataStream (batch mode).
> > >> > > >
> > >> > > > Excluding the UNBOUNDED-batch combination, the "*modified option
> > 2"*
> > >> > > > covers the rest three combination. Compared with "*modified
> option
> > >> 2*",
> > >> > > > the main benefit of option 3 is its simplicity and clearness, by
> > >> tying
> > >> > > > boundedness to running mode and giving up BOUNDED-streaming
> > >> > combination.
> > >> > > >
> > >> > > > Just to be clear, I am fine with either option. But I would like
> > to
> > >> > > > understand a bit more about the bounded-streaming use case and
> > when
> > >> > users
> > >> > > > would prefer this over bounded-batch case, and whether the added
> > >> value
> > >> > > > justifies the additional complexity in the API. Two cases I can
> > >> think
> > >> > of
> > >> > > > are:
> > >> > > > 1. The records in DataStream will be processed in order, while
> > >> > > > BoundedDataStream processes records without order guarantee.
> > >> > > > 2. DataStream emits intermediate results when processing a
> finite
> > >> > > dataset,
> > >> > > > while BoundedDataStream only emit the final result. In any case,
> > it
> > >> > could
> > >> > > > be supported by an UNBOUNDED source stopping at some point.
> > >> > > >
> > >> > > > Case 1 is actually misleading because DataStream in general
> > doesn't
> > >> > > really
> > >> > > > support in-order process.
> > >> > > > Case 2 seems a rare use case because the instantaneous
> > intermediate
> > >> > > result
> > >> > > > seems difficult to reason about. In any case, this can be
> > supported
> > >> by
> > >> > an
> > >> > > > UNBOUNDED source that stops at some point.
> > >> > > >
> > >> > > > Is there other use cases for bounded-streaming combination I
> > >> missed? I
> > >> > am
> > >> > > > a little hesitating to put the testing requirement here because
> > >> ideally
> > >> > > I'd
> > >> > > > avoid having public APIs for testing purpose only. And this
> could
> > be
> > >> > > > resolved by having a UNBOUNDED source stopping at some point as
> > >> well.
> > >> > > >
> > >> > > > Sorry for the long discussion, but I would really like to make
> an
> > >> API
> > >> > > > decision after knowing all the pros and cons.
> > >> > > >
> > >> > > > Thanks,
> > >> > > >
> > >> > > > Jiangjie (Becket) Qin
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > > On Thu, Dec 19, 2019 at 6:19 PM Timo Walther <
> twal...@apache.org>
> > >> > wrote:
> > >> > > >
> > >> > > >> Hi Becket,
> > >> > > >>
> > >> > > >> regarding *Option 3* I think we can relax the constraints for
> > >> > > >> env.source():
> > >> > > >>
> > >> > > >> // MySource can be bounded or unbounded
> > >> > > >> DataStream<Type> dataStream = env.source(mySource);
> > >> > > >>
> > >> > > >> // MySource must be bounded, otherwise throws exception.
> > >> > > >> BoundedDataStream<Type> boundedDataStream =
> > >> > env.boundedSource(mySource);
> > >> > > >>
> > >> > > >> Bounded is just a special case of unbounded and every bounded
> > >> source
> > >> > can
> > >> > > >> also be treated as an unbounded source. This would unify the
> API
> > if
> > >> > > >> people don't need a bounded operation. It also addresses Jark's
> > >> > > concerns.
> > >> > > >>
> > >> > > >> Regards,
> > >> > > >> Timo
> > >> > > >>
> > >> > > >>
> > >> > > >> On 18.12.19 14:16, Becket Qin wrote:
> > >> > > >> > Hi Jark,
> > >> > > >> >
> > >> > > >> > Please see the reply below:
> > >> > > >> >
> > >> > > >> > Regarding to option#3, my concern is that if we don't support
> > >> > > streaming
> > >> > > >> >> mode for bounded source,
> > >> > > >> >> how could we create a testing source for streaming mode?
> > >> Currently,
> > >> > > >> all the
> > >> > > >> >> testing source for streaming
> > >> > > >> >> are bounded, so that the integration test will finish
> finally.
> > >> > > >> >
> > >> > > >> >
> > >> > > >> > An UNBOUNDED source does not mean it will never stops. It
> > simply
> > >> > > >> indicates
> > >> > > >> > that the source *may* run forever, so the runtime needs to be
> > >> > prepared
> > >> > > >> for
> > >> > > >> > that, but the task may still stop at some point when it hits
> > some
> > >> > > >> > source-specific condition. So an UNBOUNDED testing source can
> > >> still
> > >> > > >> stop at
> > >> > > >> > some point if needed.
> > >> > > >> >
> > >> > > >> > Regarding to Source#getRecordOrder(), could we have a
> implicit
> > >> > > contract
> > >> > > >> >> that unbounded source should
> > >> > > >> >> already read in order (i.e. reading partitions in parallel),
> > for
> > >> > > >> bounded
> > >> > > >> >> source the order is not mandatory.
> > >> > > >> >
> > >> > > >> >
> > >> > > >> >
> > >> > > >> >> This is also the behaviors of the current sources.
> > >> > > >> >
> > >> > > >> > 1) a source can't guarantee it reads in strict order, because
> > the
> > >> > > >> producer
> > >> > > >> >> may produce data not in order.
> > >> > > >> >> 2) *Bounded-StrictOrder* is not necessary, because batch can
> > >> > reorder
> > >> > > >> data.
> > >> > > >> >
> > >> > > >> >
> > >> > > >> > It is true that sometimes the source cannot guarantee the
> > record
> > >> > > order,
> > >> > > >> but
> > >> > > >> > sometimes it can. Right now, even for stream processing,
> there
> > >> is no
> > >> > > >> > processing order guarantee. For example, a join operator may
> > >> emit a
> > >> > > >> later
> > >> > > >> > record which successfully found a join match earlier.
> > >> > > >> > Event order is one of the most important requirements for
> event
> > >> > > >> processing,
> > >> > > >> > a clear order guarantee would be necessary. That said, I
> agree
> > >> that
> > >> > > >> right
> > >> > > >> > now even if the sources provide the record order requirement,
> > the
> > >> > > >> runtime
> > >> > > >> > is not able to guarantee that out of the box. So I am OK if
> we
> > >> add
> > >> > the
> > >> > > >> > record order to the Source later. But we should avoid
> > misleading
> > >> > users
> > >> > > >> to
> > >> > > >> > make them think the processing order is guaranteed when using
> > the
> > >> > > >> unbounded
> > >> > > >> > runtime.
> > >> > > >> >
> > >> > > >> > Thanks,
> > >> > > >> >
> > >> > > >> > Jiangjie (Becket) Qin
> > >> > > >> >
> > >> > > >> > On Wed, Dec 18, 2019 at 10:29 AM Jark Wu <imj...@gmail.com>
> > >> wrote:
> > >> > > >> >
> > >> > > >> >> Hi Becket,
> > >> > > >> >>
> > >> > > >> >> That's great we have reached a consensus on
> > >> > Source#getBoundedness().
> > >> > > >> >>
> > >> > > >> >> Regarding to option#3, my concern is that if we don't
> support
> > >> > > streaming
> > >> > > >> >> mode for bounded source,
> > >> > > >> >> how could we create a testing source for streaming mode?
> > >> Currently,
> > >> > > >> all the
> > >> > > >> >> testing source for streaming
> > >> > > >> >> are bounded, so that the integration test will finish
> finally.
> > >> > > >> >>
> > >> > > >> >> Regarding to Source#getRecordOrder(), could we have a
> implicit
> > >> > > contract
> > >> > > >> >> that unbounded source should
> > >> > > >> >> already read in order (i.e. reading partitions in parallel),
> > for
> > >> > > >> bounded
> > >> > > >> >> source the order is not mandatory.
> > >> > > >> >> This is also the behaviors of the current sources.
> > >> > > >> >> 1) a source can't guarantee it reads in strict order,
> because
> > >> the
> > >> > > >> producer
> > >> > > >> >> may produce data not in order.
> > >> > > >> >> 2) *Bounded-StrictOrder* is not necessary, because batch can
> > >> > reorder
> > >> > > >> data.
> > >> > > >> >>
> > >> > > >> >> Best,
> > >> > > >> >> Jark
> > >> > > >> >>
> > >> > > >> >>
> > >> > > >> >>
> > >> > > >> >> On Tue, 17 Dec 2019 at 22:03, Becket Qin <
> > becket....@gmail.com>
> > >> > > wrote:
> > >> > > >> >>
> > >> > > >> >>> Hi folks,
> > >> > > >> >>>
> > >> > > >> >>> Thanks for the comments. I am convinced that the Source API
> > >> should
> > >> > > not
> > >> > > >> >> take
> > >> > > >> >>> boundedness as a parameter after it is constructed. What
> Timo
> > >> and
> > >> > > >> Dawid
> > >> > > >> >>> suggested sounds a reasonable solution to me. So the Source
> > API
> > >> > > would
> > >> > > >> >>> become:
> > >> > > >> >>>
> > >> > > >> >>> Source {
> > >> > > >> >>>      Boundedness getBoundedness();
> > >> > > >> >>> }
> > >> > > >> >>>
> > >> > > >> >>> Assuming the above Source API, in addition to the two
> options
> > >> > > >> mentioned
> > >> > > >> >> in
> > >> > > >> >>> earlier emails, I am thinking of another option:
> > >> > > >> >>>
> > >> > > >> >>> *Option 3:*
> > >> > > >> >>> // MySource must be unbounded, otherwise throws exception.
> > >> > > >> >>> DataStream<Type> dataStream = env.source(mySource);
> > >> > > >> >>>
> > >> > > >> >>> // MySource must be bounded, otherwise throws exception.
> > >> > > >> >>> BoundedDataStream<Type> boundedDataStream =
> > >> > > >> env.boundedSource(mySource);
> > >> > > >> >>>
> > >> > > >> >>> The pros of this API are:
> > >> > > >> >>>     a) It fits the requirements from Table / SQL well.
> > >> > > >> >>>     b) DataStream users still have type safety (option 2
> only
> > >> has
> > >> > > >> partial
> > >> > > >> >>> type safety).
> > >> > > >> >>>     c) Cristal clear boundedness from the API which makes
> > >> > DataStream
> > >> > > >> join
> > >> > > >> >> /
> > >> > > >> >>> connect easy to reason about.
> > >> > > >> >>> The caveats I see,
> > >> > > >> >>>     a) It is inconsistent with Table since Table has one
> > >> unified
> > >> > > >> >> interface.
> > >> > > >> >>>     b) No streaming mode for bounded source.
> > >> > > >> >>>
> > >> > > >> >>> @Stephan Ewen <ewenstep...@gmail.com> @Aljoscha Krettek
> > >> > > >> >>> <aljos...@ververica.com> what do you think of the
> approach?
> > >> > > >> >>>
> > >> > > >> >>>
> > >> > > >> >>> Orthogonal to the above API, I am wondering whether
> > >> boundedness is
> > >> > > the
> > >> > > >> >> only
> > >> > > >> >>> dimension needed to describe the characteristic of the
> Source
> > >> > > >> behavior.
> > >> > > >> >> We
> > >> > > >> >>> may also need to have another dimension of *record order*.
> > >> > > >> >>>
> > >> > > >> >>> For example, when a file source is reading from a directory
> > >> with
> > >> > > >> bounded
> > >> > > >> >>> records, it may have two ways to read.
> > >> > > >> >>> 1. Read files in parallel.
> > >> > > >> >>> 2. Read files in the chronological order.
> > >> > > >> >>> In both cases, the file source is a Bounded Source.
> However,
> > >> the
> > >> > > >> >> processing
> > >> > > >> >>> requirement for downstream may be different. In the first
> > case,
> > >> > the
> > >> > > >> >>> record processing and result emitting order does not
> matter,
> > >> e.g.
> > >> > > word
> > >> > > >> >>> count. In the second case, the records may have to be
> > >> processed in
> > >> > > the
> > >> > > >> >>> order they were read, e.g. change log processing.
> > >> > > >> >>>
> > >> > > >> >>> If the Source only has a getBoundedness() method, the
> > >> downstream
> > >> > > >> >> processors
> > >> > > >> >>> would not know whether the records emitted from the Source
> > >> should
> > >> > be
> > >> > > >> >>> processed in order or not. So combining the boundedness and
> > >> record
> > >> > > >> order,
> > >> > > >> >>> we will have four scenarios:
> > >> > > >> >>>
> > >> > > >> >>> *Bounded-StrictOrder*:     A segment of change log.
> > >> > > >> >>> *Bounded-Random*:          Batch Word Count.
> > >> > > >> >>> *Unbounded-StrictOrder*: An infinite change log.
> > >> > > >> >>> *Unbounded-Random*:     Streaming Word Count.
> > >> > > >> >>>
> > >> > > >> >>> Option 2 mentioned in the previous email was kind of trying
> > to
> > >> > > handle
> > >> > > >> the
> > >> > > >> >>> Bounded-StrictOrder case by creating a DataStream from a
> > >> bounded
> > >> > > >> source,
> > >> > > >> >>> which actually does not work.
> > >> > > >> >>> It looks that we do not have strict order support in some
> > >> > operators
> > >> > > at
> > >> > > >> >> this
> > >> > > >> >>> point, e.g. join. But we may still want to add the semantic
> > to
> > >> the
> > >> > > >> Source
> > >> > > >> >>> first so later on we don't need to change all the source
> > >> > > >> implementations,
> > >> > > >> >>> especially given that many of them will be implemented by
> 3rd
> > >> > party.
> > >> > > >> >>>
> > >> > > >> >>> Given that, we need another dimension of *Record Order* in
> > the
> > >> > > Source.
> > >> > > >> >> More
> > >> > > >> >>> specifically, the API would become:
> > >> > > >> >>>
> > >> > > >> >>> Source {
> > >> > > >> >>>      Boundedness getBoundedness();
> > >> > > >> >>>      RecordOrder getRecordOrder();
> > >> > > >> >>> }
> > >> > > >> >>>
> > >> > > >> >>> public enum RecordOrder {
> > >> > > >> >>>      /** The record in the DataStream must be processed in
> > its
> > >> > > strict
> > >> > > >> >> order
> > >> > > >> >>> for correctness. */
> > >> > > >> >>>      STRICT,
> > >> > > >> >>>      /** The record in the DataStream can be processed in
> > >> > arbitrary
> > >> > > >> order.
> > >> > > >> >>> */
> > >> > > >> >>>      RANDOM;
> > >> > > >> >>> }
> > >> > > >> >>>
> > >> > > >> >>> Any thoughts?
> > >> > > >> >>>
> > >> > > >> >>> Thanks,
> > >> > > >> >>>
> > >> > > >> >>> Jiangjie (Becket) Qin
> > >> > > >> >>>
> > >> > > >> >>> On Tue, Dec 17, 2019 at 3:44 PM Timo Walther <
> > >> twal...@apache.org>
> > >> > > >> wrote:
> > >> > > >> >>>
> > >> > > >> >>>> Hi Becket,
> > >> > > >> >>>>
> > >> > > >> >>>> I completely agree with Dawid's suggestion. The
> information
> > >> about
> > >> > > the
> > >> > > >> >>>> boundedness should come out of the source. Because most of
> > the
> > >> > > >> >> streaming
> > >> > > >> >>>> sources can be made bounded based on some connector
> specific
> > >> > > >> criterion.
> > >> > > >> >>>> In Kafka, it would be an end offset or end timestamp but
> in
> > >> any
> > >> > > case
> > >> > > >> >>>> having just a env.boundedSource() is not enough because
> > >> > parameters
> > >> > > >> for
> > >> > > >> >>>> making the source bounded are missing.
> > >> > > >> >>>>
> > >> > > >> >>>> I suggest to have a simple `isBounded(): Boolean` flag in
> > >> every
> > >> > > >> source
> > >> > > >> >>>> that might be influenced by a connector builder as Dawid
> > >> > mentioned.
> > >> > > >> >>>>
> > >> > > >> >>>> For type safety during programming, we can still go with
> > >> *Final
> > >> > > state
> > >> > > >> >>>> 1*. By having a env.source() vs env.boundedSource(). The
> > >> latter
> > >> > > would
> > >> > > >> >>>> just enforce that the boolean flag is set to `true` and
> > could
> > >> > make
> > >> > > >> >>>> bounded operations available (if we need that actually).
> > >> > > >> >>>>
> > >> > > >> >>>> However, I don't think that we should start making a
> unified
> > >> > Table
> > >> > > >> API
> > >> > > >> >>>> ununified again. Boundedness is an optimization property.
> > >> Every
> > >> > > >> bounded
> > >> > > >> >>>> operation can also executed in an unbounded way using
> > >> > > >> >> updates/retraction
> > >> > > >> >>>> or watermarks.
> > >> > > >> >>>>
> > >> > > >> >>>> Regards,
> > >> > > >> >>>> Timo
> > >> > > >> >>>>
> > >> > > >> >>>>
> > >> > > >> >>>> On 15.12.19 14:22, Becket Qin wrote:
> > >> > > >> >>>>> Hi Dawid and Jark,
> > >> > > >> >>>>>
> > >> > > >> >>>>> I think the discussion ultimately boils down to the
> > question
> > >> > that
> > >> > > >> >> which
> > >> > > >> >>>> one
> > >> > > >> >>>>> of the following two final states do we want? Once we
> make
> > >> this
> > >> > > >> >>> decision,
> > >> > > >> >>>>> everything else can be naturally derived.
> > >> > > >> >>>>>
> > >> > > >> >>>>> *Final state 1*: Separate API for bounded / unbounded
> > >> > DataStream &
> > >> > > >> >>> Table.
> > >> > > >> >>>>> That means any code users write will be valid at the
> point
> > >> when
> > >> > > they
> > >> > > >> >>>> write
> > >> > > >> >>>>> the code. This is similar to having type safety check at
> > >> > > programming
> > >> > > >> >>>> time.
> > >> > > >> >>>>> For example,
> > >> > > >> >>>>>
> > >> > > >> >>>>> BoundedDataStream extends DataStream {
> > >> > > >> >>>>> // Operations only available for bounded data.
> > >> > > >> >>>>> BoundedDataStream sort(...);
> > >> > > >> >>>>>
> > >> > > >> >>>>> // Interaction with another BoundedStream returns a
> Bounded
> > >> > > stream.
> > >> > > >> >>>>> BoundedJoinedDataStream join(BoundedDataStream other)
> > >> > > >> >>>>>
> > >> > > >> >>>>> // Interaction with another unbounded stream returns an
> > >> > unbounded
> > >> > > >> >>> stream.
> > >> > > >> >>>>> JoinedDataStream join(DataStream other)
> > >> > > >> >>>>> }
> > >> > > >> >>>>>
> > >> > > >> >>>>> BoundedTable extends Table {
> > >> > > >> >>>>>     // Bounded only operation.
> > >> > > >> >>>>> BoundedTable sort(...);
> > >> > > >> >>>>>
> > >> > > >> >>>>> // Interaction with another BoundedTable returns a
> > >> BoundedTable.
> > >> > > >> >>>>> BoundedTable join(BoundedTable other)
> > >> > > >> >>>>>
> > >> > > >> >>>>> // Interaction with another unbounded table returns an
> > >> unbounded
> > >> > > >> >> table.
> > >> > > >> >>>>> Table join(Table other)
> > >> > > >> >>>>> }
> > >> > > >> >>>>>
> > >> > > >> >>>>> *Final state 2*: One unified API for bounded / unbounded
> > >> > > DataStream
> > >> > > >> /
> > >> > > >> >>>>> Table.
> > >> > > >> >>>>> That unified API may throw exception at DAG compilation
> > time
> > >> if
> > >> > an
> > >> > > >> >>>> invalid
> > >> > > >> >>>>> operation is tried. This is what Table API currently
> > follows.
> > >> > > >> >>>>>
> > >> > > >> >>>>> DataStream {
> > >> > > >> >>>>> // Throws exception if the DataStream is unbounded.
> > >> > > >> >>>>> DataStream sort();
> > >> > > >> >>>>> // Get boundedness.
> > >> > > >> >>>>> Boundedness getBoundedness();
> > >> > > >> >>>>> }
> > >> > > >> >>>>>
> > >> > > >> >>>>> Table {
> > >> > > >> >>>>> // Throws exception if the table has infinite rows.
> > >> > > >> >>>>> Table orderBy();
> > >> > > >> >>>>>
> > >> > > >> >>>>> // Get boundedness.
> > >> > > >> >>>>> Boundedness getBoundedness();
> > >> > > >> >>>>> }
> > >> > > >> >>>>>
> > >> > > >> >>>>> >From what I understand, there is no consensus so far on
> > this
> > >> > > >> decision
> > >> > > >> >>>> yet.
> > >> > > >> >>>>> Whichever final state we choose, we need to make it
> > >> consistent
> > >> > > >> across
> > >> > > >> >>> the
> > >> > > >> >>>>> entire project. We should avoid the case that Table
> follows
> > >> one
> > >> > > >> final
> > >> > > >> >>>> state
> > >> > > >> >>>>> while DataStream follows another. Some arguments I am
> aware
> > >> of
> > >> > > from
> > >> > > >> >>> both
> > >> > > >> >>>>> sides so far are following:
> > >> > > >> >>>>>
> > >> > > >> >>>>> Arguments for final state 1:
> > >> > > >> >>>>> 1a) Clean API with method safety check at programming
> time.
> > >> > > >> >>>>> 1b) (Counter 2b) Although SQL does not have programming
> > time
> > >> > error
> > >> > > >> >>>> check, SQL
> > >> > > >> >>>>> is not really a "programming language" per se. So SQL can
> > be
> > >> > > >> >> different
> > >> > > >> >>>> from
> > >> > > >> >>>>> Table and DataStream.
> > >> > > >> >>>>> 1c)  Although final state 2 seems making it easier for
> SQL
> > to
> > >> > use
> > >> > > >> >> given
> > >> > > >> >>>> it
> > >> > > >> >>>>> is more "config based" than "parameter based", final
> state
> > 1
> > >> can
> > >> > > >> >>> probably
> > >> > > >> >>>>> also meet what SQL wants by wrapping the Source in
> > >> TableSource /
> > >> > > >> >>>>> TableSourceFactory API if needed.
> > >> > > >> >>>>>
> > >> > > >> >>>>> Arguments for final state 2:
> > >> > > >> >>>>> 2a) The Source API itself seems already sort of following
> > the
> > >> > > >> unified
> > >> > > >> >>> API
> > >> > > >> >>>>> pattern.
> > >> > > >> >>>>> 2b) There is no "programming time" method error check in
> > SQL
> > >> > case,
> > >> > > >> so
> > >> > > >> >>> we
> > >> > > >> >>>>> cannot really achieve final state 1 across the board.
> > >> > > >> >>>>> 2c) It is an easier path given our current status, i.e.
> > >> Table is
> > >> > > >> >>> already
> > >> > > >> >>>>> following final state 2.
> > >> > > >> >>>>> 2d) Users can always explicitly check the boundedness if
> > they
> > >> > want
> > >> > > >> >> to.
> > >> > > >> >>>>>
> > >> > > >> >>>>> As I mentioned earlier, my initial thought was also to
> > have a
> > >> > > >> >>>>> "configuration based" Source rather than a "parameter
> > based"
> > >> > > Source.
> > >> > > >> >> So
> > >> > > >> >>>> it
> > >> > > >> >>>>> is completely possible that I missed some important
> > >> > consideration
> > >> > > or
> > >> > > >> >>>> design
> > >> > > >> >>>>> principles that we want to enforce for the project. It
> > would
> > >> be
> > >> > > good
> > >> > > >> >>>>> if @Stephan
> > >> > > >> >>>>> Ewen <step...@ververica.com> and @Aljoscha Krettek <
> > >> > > >> >>>> aljos...@ververica.com> can
> > >> > > >> >>>>> also provide more thoughts on this.
> > >> > > >> >>>>>
> > >> > > >> >>>>>
> > >> > > >> >>>>> Re: Jingsong
> > >> > > >> >>>>>
> > >> > > >> >>>>> As you said, there are some batched system source, like
> > >> > > parquet/orc
> > >> > > >> >>>> source.
> > >> > > >> >>>>>> Could we have the batch emit interface to improve
> > >> performance?
> > >> > > The
> > >> > > >> >>>> queue of
> > >> > > >> >>>>>> per record may cause performance degradation.
> > >> > > >> >>>>>
> > >> > > >> >>>>>
> > >> > > >> >>>>> The current interface does not necessarily cause
> > performance
> > >> > > problem
> > >> > > >> >>> in a
> > >> > > >> >>>>> multi-threading case. In fact, the base implementation
> > allows
> > >> > > >> >>>> SplitReaders
> > >> > > >> >>>>> to add a batch <E> of records<T> to the records queue<E>,
> > so
> > >> > each
> > >> > > >> >>> element
> > >> > > >> >>>>> in the records queue would be a batch <E>. In this case,
> > when
> > >> > the
> > >> > > >> >> main
> > >> > > >> >>>>> thread polls records, it will take a batch <E> of records
> > <T>
> > >> > from
> > >> > > >> >> the
> > >> > > >> >>>>> shared records queue and process the records <T> in a
> batch
> > >> > > manner.
> > >> > > >> >>>>>
> > >> > > >> >>>>> Thanks,
> > >> > > >> >>>>>
> > >> > > >> >>>>> Jiangjie (Becket) Qin
> > >> > > >> >>>>>
> > >> > > >> >>>>> On Thu, Dec 12, 2019 at 1:29 PM Jingsong Li <
> > >> > > jingsongl...@gmail.com
> > >> > > >> >
> > >> > > >> >>>> wrote:
> > >> > > >> >>>>>
> > >> > > >> >>>>>> Hi Becket,
> > >> > > >> >>>>>>
> > >> > > >> >>>>>> I also have some performance concerns too.
> > >> > > >> >>>>>>
> > >> > > >> >>>>>> If I understand correctly, SourceOutput will emit data
> per
> > >> > record
> > >> > > >> >> into
> > >> > > >> >>>> the
> > >> > > >> >>>>>> queue? I'm worried about the multithreading performance
> of
> > >> this
> > >> > > >> >> queue.
> > >> > > >> >>>>>>
> > >> > > >> >>>>>>> One example is some batched messaging systems which
> only
> > >> have
> > >> > an
> > >> > > >> >>> offset
> > >> > > >> >>>>>> for the entire batch instead of individual messages in
> the
> > >> > batch.
> > >> > > >> >>>>>>
> > >> > > >> >>>>>> As you said, there are some batched system source, like
> > >> > > parquet/orc
> > >> > > >> >>>> source.
> > >> > > >> >>>>>> Could we have the batch emit interface to improve
> > >> performance?
> > >> > > The
> > >> > > >> >>>> queue of
> > >> > > >> >>>>>> per record may cause performance degradation.
> > >> > > >> >>>>>>
> > >> > > >> >>>>>> Best,
> > >> > > >> >>>>>> Jingsong Lee
> > >> > > >> >>>>>>
> > >> > > >> >>>>>> On Thu, Dec 12, 2019 at 9:15 AM Jark Wu <
> imj...@gmail.com
> > >
> > >> > > wrote:
> > >> > > >> >>>>>>
> > >> > > >> >>>>>>> Hi Becket,
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>> I think Dawid explained things clearly and makes a lot
> of
> > >> > sense.
> > >> > > >> >>>>>>> I'm also in favor of #2, because #1 doesn't work for
> our
> > >> > future
> > >> > > >> >>> unified
> > >> > > >> >>>>>>> envrionment.
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>> You can see the vision in this documentation [1]. In
> the
> > >> > future,
> > >> > > >> we
> > >> > > >> >>>> would
> > >> > > >> >>>>>>> like to
> > >> > > >> >>>>>>> drop the global streaming/batch mode in SQL (i.e.
> > >> > > >> >>>>>>> EnvironmentSettings#inStreamingMode/inBatchMode).
> > >> > > >> >>>>>>> A source is bounded or unbounded once defined, so
> queries
> > >> can
> > >> > be
> > >> > > >> >>>> inferred
> > >> > > >> >>>>>>> from source to run
> > >> > > >> >>>>>>> in streaming or batch or hybrid mode. However, in #1,
> we
> > >> will
> > >> > > lose
> > >> > > >> >>> this
> > >> > > >> >>>>>>> ability because the framework
> > >> > > >> >>>>>>> doesn't know whether the source is bounded or
> unbounded.
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>> Best,
> > >> > > >> >>>>>>> Jark
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>> [1]:
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>
> > >> > > >> >>>>
> > >> > > >> >>>
> > >> > > >> >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://docs.google.com/document/d/1yrKXEIRATfxHJJ0K3t6wUgXAtZq8D-XgvEnvl2uUcr0/edit#heading=h.v4ib17buma1p
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>> On Wed, 11 Dec 2019 at 20:52, Piotr Nowojski <
> > >> > > pi...@ververica.com
> > >> > > >> >
> > >> > > >> >>>>>> wrote:
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>>> Hi,
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>> Regarding the:
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>> Collection<E> getNextRecords()
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>> I’m pretty sure such design would unfortunately impact
> > the
> > >> > > >> >>> performance
> > >> > > >> >>>>>>>> (accessing and potentially creating the collection on
> > the
> > >> hot
> > >> > > >> >> path).
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>> Also the
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>> InputStatus emitNext(DataOutput<T> output) throws
> > >> Exception;
> > >> > > >> >>>>>>>> or
> > >> > > >> >>>>>>>> Status pollNext(SourceOutput<T> sourceOutput) throws
> > >> > Exception;
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>> Gives us some opportunities in the future, to allow
> > Source
> > >> > hot
> > >> > > >> >>> looping
> > >> > > >> >>>>>>>> inside, until it receives some signal “please exit
> > >> because of
> > >> > > >> some
> > >> > > >> >>>>>>> reasons”
> > >> > > >> >>>>>>>> (output collector could return such hint upon
> collecting
> > >> the
> > >> > > >> >>> result).
> > >> > > >> >>>>>> But
> > >> > > >> >>>>>>>> that’s another topic outside of this FLIP’s scope.
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>> Piotrek
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>>> On 11 Dec 2019, at 10:41, Till Rohrmann <
> > >> > trohrm...@apache.org
> > >> > > >
> > >> > > >> >>>>>> wrote:
> > >> > > >> >>>>>>>>>
> > >> > > >> >>>>>>>>> Hi Becket,
> > >> > > >> >>>>>>>>>
> > >> > > >> >>>>>>>>> quick clarification from my side because I think you
> > >> > > >> >> misunderstood
> > >> > > >> >>> my
> > >> > > >> >>>>>>>>> question. I did not suggest to let the SourceReader
> > >> return
> > >> > > only
> > >> > > >> a
> > >> > > >> >>>>>>> single
> > >> > > >> >>>>>>>>> record at a time when calling getNextRecords. As the
> > >> return
> > >> > > type
> > >> > > >> >>>>>>>> indicates,
> > >> > > >> >>>>>>>>> the method can return an arbitrary number of records.
> > >> > > >> >>>>>>>>>
> > >> > > >> >>>>>>>>> Cheers,
> > >> > > >> >>>>>>>>> Till
> > >> > > >> >>>>>>>>>
> > >> > > >> >>>>>>>>> On Wed, Dec 11, 2019 at 10:13 AM Dawid Wysakowicz <
> > >> > > >> >>>>>>>> dwysakow...@apache.org <mailto:dwysakow...@apache.org
> >>
> > >> > > >> >>>>>>>>> wrote:
> > >> > > >> >>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi Becket,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Issue #1 - Design of Source interface
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> I mentioned the lack of a method like
> > >> > > >> >>>>>>>> Source#createEnumerator(Boundedness
> > >> > > >> >>>>>>>>>> boundedness, SplitEnumeratorContext context),
> because
> > >> > without
> > >> > > >> >> the
> > >> > > >> >>>>>>>> current
> > >> > > >> >>>>>>>>>> proposal is not complete/does not work.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> If we say that boundedness is an intrinsic property
> > of a
> > >> > > source
> > >> > > >> >>> imo
> > >> > > >> >>>>>> we
> > >> > > >> >>>>>>>>>> don't need the Source#createEnumerator(Boundedness
> > >> > > boundedness,
> > >> > > >> >>>>>>>>>> SplitEnumeratorContext context) method.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Assuming a source from my previous example:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Source source = KafkaSource.builder()
> > >> > > >> >>>>>>>>>>    ...
> > >> > > >> >>>>>>>>>>    .untilTimestamp(...)
> > >> > > >> >>>>>>>>>>    .build()
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Would the enumerator differ if created like
> > >> > > >> >>>>>>>>>> source.createEnumerator(CONTINUOUS_UNBOUNDED, ...)
> vs
> > >> > source
> > >> > > >> >>>>>>>>>> .createEnumerator(BOUNDED, ...)? I know I am
> repeating
> > >> > > myself,
> > >> > > >> >> but
> > >> > > >> >>>>>>> this
> > >> > > >> >>>>>>>> is
> > >> > > >> >>>>>>>>>> the part that my opinion differ the most from the
> > >> current
> > >> > > >> >>> proposal.
> > >> > > >> >>>>>> I
> > >> > > >> >>>>>>>>>> really think it should always be the source that
> tells
> > >> if
> > >> > it
> > >> > > is
> > >> > > >> >>>>>>> bounded
> > >> > > >> >>>>>>>> or
> > >> > > >> >>>>>>>>>> not. In the current proposal methods
> > >> > > >> >> continousSource/boundedSource
> > >> > > >> >>>>>>>> somewhat
> > >> > > >> >>>>>>>>>> reconfigure the source, which I think is misleading.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> I think a call like:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Source source = KafkaSource.builder()
> > >> > > >> >>>>>>>>>>    ...
> > >> > > >> >>>>>>>>>>    .readContinously() / readUntilLatestOffset() /
> > >> > > >> >>> readUntilTimestamp
> > >> > > >> >>>> /
> > >> > > >> >>>>>>>> readUntilOffsets / ...
> > >> > > >> >>>>>>>>>>    .build()
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> is way cleaner (and expressive) than
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Source source = KafkaSource.builder()
> > >> > > >> >>>>>>>>>>    ...
> > >> > > >> >>>>>>>>>>    .build()
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> env.continousSource(source) // which actually
> > underneath
> > >> > > would
> > >> > > >> >>> call
> > >> > > >> >>>>>>>> createEnumerator(CONTINUOUS, ctx) which would be
> > >> equivalent
> > >> > to
> > >> > > >> >>>>>>>> source.readContinously().createEnumerator(ctx)
> > >> > > >> >>>>>>>>>> // or
> > >> > > >> >>>>>>>>>> env.boundedSource(source) // which actually
> underneath
> > >> > would
> > >> > > >> >> call
> > >> > > >> >>>>>>>> createEnumerator(BOUNDED, ctx) which would be
> equivalent
> > >> to
> > >> > > >> >>>>>>>> source.readUntilLatestOffset().createEnumerator(ctx)
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Sorry for the comparison, but to me it seems there
> is
> > >> too
> > >> > > much
> > >> > > >> >>> magic
> > >> > > >> >>>>>>>>>> happening underneath those two calls.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> I really believe the Source interface should have
> > >> > > >> getBoundedness
> > >> > > >> >>>>>>> method
> > >> > > >> >>>>>>>>>> instead of (supportBoundedness) +
> > >> > > createEnumerator(Boundedness,
> > >> > > >> >>> ...)
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Issue #2 - Design of
> > >> > > >> >>>>>>>>>>
> > >> > > >>
> ExecutionEnvironment#source()/continuousSource()/boundedSource()
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> As you might have guessed I am slightly in favor of
> > >> option
> > >> > #2
> > >> > > >> >>>>>>> modified.
> > >> > > >> >>>>>>>>>> Yes I am aware every step of the dag would have to
> be
> > >> able
> > >> > to
> > >> > > >> >> say
> > >> > > >> >>> if
> > >> > > >> >>>>>>> it
> > >> > > >> >>>>>>>> is
> > >> > > >> >>>>>>>>>> bounded or not. I have a feeling it would be easier
> to
> > >> > > express
> > >> > > >> >>> cross
> > >> > > >> >>>>>>>>>> bounded/unbounded operations, but I must admit I
> have
> > >> not
> > >> > > >> >> thought
> > >> > > >> >>> it
> > >> > > >> >>>>>>>>>> through thoroughly, In the spirit of batch is just a
> > >> > special
> > >> > > >> >> case
> > >> > > >> >>> of
> > >> > > >> >>>>>>>>>> streaming I thought BoundedStream would extend from
> > >> > > DataStream.
> > >> > > >> >>>>>>> Correct
> > >> > > >> >>>>>>>> me
> > >> > > >> >>>>>>>>>> if I am wrong. In such a setup the cross
> > >> bounded/unbounded
> > >> > > >> >>> operation
> > >> > > >> >>>>>>>> could
> > >> > > >> >>>>>>>>>> be expressed quite easily I think:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> DataStream {
> > >> > > >> >>>>>>>>>>    DataStream join(DataStream, ...); // we could not
> > >> really
> > >> > > >> tell
> > >> > > >> >> if
> > >> > > >> >>>>>> the
> > >> > > >> >>>>>>>> result is bounded or not, but because bounded stream
> is
> > a
> > >> > > special
> > >> > > >> >>> case
> > >> > > >> >>>>>> of
> > >> > > >> >>>>>>>> unbounded the API object is correct, irrespective if
> the
> > >> left
> > >> > > or
> > >> > > >> >>> right
> > >> > > >> >>>>>>> side
> > >> > > >> >>>>>>>> of the join is bounded
> > >> > > >> >>>>>>>>>> }
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> BoundedStream extends DataStream {
> > >> > > >> >>>>>>>>>>    BoundedStream join(BoundedStream, ...); // only
> if
> > >> both
> > >> > > >> sides
> > >> > > >> >>> are
> > >> > > >> >>>>>>>> bounded the result can be bounded as well. However we
> do
> > >> have
> > >> > > >> >> access
> > >> > > >> >>>> to
> > >> > > >> >>>>>>> the
> > >> > > >> >>>>>>>> DataStream#join here, so you can still join with a
> > >> DataStream
> > >> > > >> >>>>>>>>>> }
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On the other hand I also see benefits of two
> > completely
> > >> > > >> >> disjointed
> > >> > > >> >>>>>>> APIs,
> > >> > > >> >>>>>>>>>> as we could prohibit some streaming calls in the
> > bounded
> > >> > > API. I
> > >> > > >> >>>>>> can't
> > >> > > >> >>>>>>>> think
> > >> > > >> >>>>>>>>>> of any unbounded operators that could not be
> > implemented
> > >> > for
> > >> > > >> >>> bounded
> > >> > > >> >>>>>>>> stream.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Besides I think we both agree we don't like the
> > method:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> DataStream boundedStream(Source)
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> suggested in the current state of the FLIP. Do we ?
> :)
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Best,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Dawid
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On 10/12/2019 18:57, Becket Qin wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi folks,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Thanks for the discussion, great feedback. Also
> thanks
> > >> > Dawid
> > >> > > >> for
> > >> > > >> >>> the
> > >> > > >> >>>>>>>>>> explanation, it is much clearer now.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> One thing that is indeed missing from the FLIP is
> how
> > >> the
> > >> > > >> >>>>>> boundedness
> > >> > > >> >>>>>>> is
> > >> > > >> >>>>>>>>>> passed to the Source implementation. So the API
> should
> > >> be
> > >> > > >> >>>>>>>>>> Source#createEnumerator(Boundedness boundedness,
> > >> > > >> >>>>>>> SplitEnumeratorContext
> > >> > > >> >>>>>>>>>> context)
> > >> > > >> >>>>>>>>>> And we can probably remove the
> > >> > > >> >>> Source#supportBoundedness(Boundedness
> > >> > > >> >>>>>>>>>> boundedness) method.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Assuming we have that, we are essentially choosing
> > from
> > >> one
> > >> > > of
> > >> > > >> >> the
> > >> > > >> >>>>>>>>>> following two options:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Option 1:
> > >> > > >> >>>>>>>>>> // The source is continuous source, and only
> unbounded
> > >> > > >> >> operations
> > >> > > >> >>>>>> can
> > >> > > >> >>>>>>> be
> > >> > > >> >>>>>>>>>> performed.
> > >> > > >> >>>>>>>>>> DataStream<Type> datastream =
> > >> > > env.continuousSource(someSource);
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> // The source is bounded source, both bounded and
> > >> unbounded
> > >> > > >> >>>>>> operations
> > >> > > >> >>>>>>>> can
> > >> > > >> >>>>>>>>>> be performed.
> > >> > > >> >>>>>>>>>> BoundedDataStream<Type> boundedDataStream =
> > >> > > >> >>>>>>>> env.boundedSource(someSource);
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>    - Pros:
> > >> > > >> >>>>>>>>>>         a) explicit boundary between bounded /
> > unbounded
> > >> > > >> streams,
> > >> > > >> >>> it
> > >> > > >> >>>>>> is
> > >> > > >> >>>>>>>>>> quite simple and clear to the users.
> > >> > > >> >>>>>>>>>>    - Cons:
> > >> > > >> >>>>>>>>>>         a) For applications that do not involve
> > bounded
> > >> > > >> >> operations,
> > >> > > >> >>>>>> they
> > >> > > >> >>>>>>>>>> still have to call different API to distinguish
> > bounded
> > >> /
> > >> > > >> >>> unbounded
> > >> > > >> >>>>>>>> streams.
> > >> > > >> >>>>>>>>>>         b) No support for bounded stream to run in a
> > >> > > streaming
> > >> > > >> >>>> runtime
> > >> > > >> >>>>>>>>>> setting, i.e. scheduling and operators behaviors.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Option 2:
> > >> > > >> >>>>>>>>>> // The source is either bounded or unbounded, but
> only
> > >> > > >> unbounded
> > >> > > >> >>>>>>>> operations
> > >> > > >> >>>>>>>>>> could be performed on the returned DataStream.
> > >> > > >> >>>>>>>>>> DataStream<Type> dataStream =
> env.source(someSource);
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> // The source must be a bounded source, otherwise
> > >> exception
> > >> > > is
> > >> > > >> >>>>>> thrown.
> > >> > > >> >>>>>>>>>> BoundedDataStream<Type> boundedDataStream =
> > >> > > >> >>>>>>>>>> env.boundedSource(boundedSource);
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> The pros and cons are exactly the opposite of option
> > 1.
> > >> > > >> >>>>>>>>>>    - Pros:
> > >> > > >> >>>>>>>>>>         a) For applications that do not involve
> > bounded
> > >> > > >> >> operations,
> > >> > > >> >>>>>> they
> > >> > > >> >>>>>>>>>> still have to call different API to distinguish
> > bounded
> > >> /
> > >> > > >> >>> unbounded
> > >> > > >> >>>>>>>> streams.
> > >> > > >> >>>>>>>>>>         b) Support for bounded stream to run in a
> > >> streaming
> > >> > > >> >> runtime
> > >> > > >> >>>>>>>> setting,
> > >> > > >> >>>>>>>>>> i.e. scheduling and operators behaviors.
> > >> > > >> >>>>>>>>>>    - Cons:
> > >> > > >> >>>>>>>>>>         a) Bounded / unbounded streams are kind of
> > >> mixed,
> > >> > > i.e.
> > >> > > >> >>> given
> > >> > > >> >>>> a
> > >> > > >> >>>>>>>>>> DataStream, it is not clear whether it is bounded or
> > >> not,
> > >> > > >> unless
> > >> > > >> >>> you
> > >> > > >> >>>>>>>> have
> > >> > > >> >>>>>>>>>> the access to its source.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> If we only think from the Source API perspective,
> > >> option 2
> > >> > > >> >> seems a
> > >> > > >> >>>>>>>> better
> > >> > > >> >>>>>>>>>> choice because functionality wise it is a superset
> of
> > >> > option
> > >> > > 1,
> > >> > > >> >> at
> > >> > > >> >>>>>> the
> > >> > > >> >>>>>>>> cost
> > >> > > >> >>>>>>>>>> of some seemingly acceptable ambiguity in the
> > DataStream
> > >> > API.
> > >> > > >> >>>>>>>>>> But if we look at the DataStream API as a whole,
> > option
> > >> 1
> > >> > > seems
> > >> > > >> >> a
> > >> > > >> >>>>>>>> clearer
> > >> > > >> >>>>>>>>>> choice. For example, some times a library may have
> to
> > >> know
> > >> > > >> >>> whether a
> > >> > > >> >>>>>>>>>> certain task will finish or not. And it would be
> > >> difficult
> > >> > to
> > >> > > >> >> tell
> > >> > > >> >>>>>> if
> > >> > > >> >>>>>>>> the
> > >> > > >> >>>>>>>>>> input is a DataStream, unless additional information
> > is
> > >> > > >> provided
> > >> > > >> >>> all
> > >> > > >> >>>>>>> the
> > >> > > >> >>>>>>>>>> way from the Source. One possible solution is to
> have
> > a
> > >> > > >> >> *modified
> > >> > > >> >>>>>>>> option 2*
> > >> > > >> >>>>>>>>>> which adds a method to the DataStream API to
> indicate
> > >> > > >> >> boundedness,
> > >> > > >> >>>>>>> such
> > >> > > >> >>>>>>>> as
> > >> > > >> >>>>>>>>>> getBoundedness(). It would solve the problem with a
> > >> > potential
> > >> > > >> >>>>>>> confusion
> > >> > > >> >>>>>>>> of
> > >> > > >> >>>>>>>>>> what is difference between a DataStream with
> > >> > > >> >> getBoundedness()=true
> > >> > > >> >>>>>>> and a
> > >> > > >> >>>>>>>>>> BoundedDataStream. But that seems not super
> difficult
> > to
> > >> > > >> >> explain.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> So from API's perspective, I don't have a strong
> > opinion
> > >> > > >> between
> > >> > > >> >>>>>>>> *option 1*
> > >> > > >> >>>>>>>>>> and *modified option 2. *I like the cleanness of
> > option
> > >> 1,
> > >> > > but
> > >> > > >> >>>>>>> modified
> > >> > > >> >>>>>>>>>> option 2 would be more attractive if we have
> concrete
> > >> use
> > >> > > case
> > >> > > >> >> for
> > >> > > >> >>>>>> the
> > >> > > >> >>>>>>>>>> "Bounded stream with unbounded streaming runtime
> > >> settings".
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Re: Till
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Maybe this has already been asked before but I was
> > >> > wondering
> > >> > > >> why
> > >> > > >> >>> the
> > >> > > >> >>>>>>>>>> SourceReader interface has the method pollNext which
> > >> hands
> > >> > > the
> > >> > > >> >>>>>>>>>> responsibility of outputting elements to the
> > >> SourceReader
> > >> > > >> >>>>>>>> implementation?
> > >> > > >> >>>>>>>>>> Has this been done for backwards compatibility
> reasons
> > >> with
> > >> > > the
> > >> > > >> >>> old
> > >> > > >> >>>>>>>> source
> > >> > > >> >>>>>>>>>> interface? If not, then one could define a
> > Collection<E>
> > >> > > >> >>>>>>>> getNextRecords()
> > >> > > >> >>>>>>>>>> method which returns the currently retrieved records
> > and
> > >> > then
> > >> > > >> >> the
> > >> > > >> >>>>>>> caller
> > >> > > >> >>>>>>>>>> emits them outside of the SourceReader. That way the
> > >> > > interface
> > >> > > >> >>> would
> > >> > > >> >>>>>>> not
> > >> > > >> >>>>>>>>>> allow to implement an outputting loop where we never
> > >> hand
> > >> > > back
> > >> > > >> >>>>>> control
> > >> > > >> >>>>>>>> to
> > >> > > >> >>>>>>>>>> the caller. At the moment, this contract can be
> easily
> > >> > broken
> > >> > > >> >> and
> > >> > > >> >>> is
> > >> > > >> >>>>>>>> only
> > >> > > >> >>>>>>>>>> mentioned loosely in the JavaDocs.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> The primary reason we handover the SourceOutput to
> the
> > >> > > >> >>> SourceReader
> > >> > > >> >>>>>> is
> > >> > > >> >>>>>>>>>> because sometimes it is difficult for a SourceReader
> > to
> > >> > emit
> > >> > > >> one
> > >> > > >> >>>>>>> record
> > >> > > >> >>>>>>>> at
> > >> > > >> >>>>>>>>>> a time. One example is some batched messaging
> systems
> > >> which
> > >> > > >> only
> > >> > > >> >>>>>> have
> > >> > > >> >>>>>>> an
> > >> > > >> >>>>>>>>>> offset for the entire batch instead of individual
> > >> messages
> > >> > in
> > >> > > >> >> the
> > >> > > >> >>>>>>>> batch. In
> > >> > > >> >>>>>>>>>> that case, returning one record at a time would
> leave
> > >> the
> > >> > > >> >>>>>> SourceReader
> > >> > > >> >>>>>>>> in
> > >> > > >> >>>>>>>>>> an uncheckpointable state because they can only
> > >> checkpoint
> > >> > at
> > >> > > >> >> the
> > >> > > >> >>>>>>> batch
> > >> > > >> >>>>>>>>>> boundaries.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Thanks,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Jiangjie (Becket) Qin
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On Tue, Dec 10, 2019 at 5:33 PM Till Rohrmann <
> > >> > > >> >>> trohrm...@apache.org
> > >> > > >> >>>>>>>> <mailto:trohrm...@apache.org>> <trohrm...@apache.org
> > >> > <mailto:
> > >> > > >> >>>>>>>> trohrm...@apache.org>> wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi everyone,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> thanks for drafting this FLIP. It reads very well.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Concerning Dawid's proposal, I tend to agree. The
> > >> > boundedness
> > >> > > >> >>> could
> > >> > > >> >>>>>>> come
> > >> > > >> >>>>>>>>>> from the source and tell the system how to treat the
> > >> > operator
> > >> > > >> >>>>>>>> (scheduling
> > >> > > >> >>>>>>>>>> wise). From a user's perspective it should be fine
> to
> > >> get
> > >> > > back
> > >> > > >> a
> > >> > > >> >>>>>>>> DataStream
> > >> > > >> >>>>>>>>>> when calling env.source(boundedSource) if he does
> not
> > >> need
> > >> > > >> >> special
> > >> > > >> >>>>>>>>>> operations defined on a BoundedDataStream. If he
> needs
> > >> > this,
> > >> > > >> >> then
> > >> > > >> >>>>>> one
> > >> > > >> >>>>>>>> could
> > >> > > >> >>>>>>>>>> use the method BoundedDataStream
> > >> > > >> >> env.boundedSource(boundedSource).
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> If possible, we could enforce the proper usage of
> > >> > > >> >>>>>> env.boundedSource()
> > >> > > >> >>>>>>> by
> > >> > > >> >>>>>>>>>> introducing a BoundedSource type so that one cannot
> > >> pass an
> > >> > > >> >>>>>>>>>> unbounded source to it. That way users would not be
> > >> able to
> > >> > > >> >> shoot
> > >> > > >> >>>>>>>>>> themselves in the foot.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Maybe this has already been asked before but I was
> > >> > wondering
> > >> > > >> why
> > >> > > >> >>> the
> > >> > > >> >>>>>>>>>> SourceReader interface has the method pollNext which
> > >> hands
> > >> > > the
> > >> > > >> >>>>>>>>>> responsibility of outputting elements to the
> > >> SourceReader
> > >> > > >> >>>>>>>> implementation?
> > >> > > >> >>>>>>>>>> Has this been done for backwards compatibility
> reasons
> > >> with
> > >> > > the
> > >> > > >> >>> old
> > >> > > >> >>>>>>>> source
> > >> > > >> >>>>>>>>>> interface? If not, then one could define a
> > Collection<E>
> > >> > > >> >>>>>>>> getNextRecords()
> > >> > > >> >>>>>>>>>> method which returns the currently retrieved records
> > and
> > >> > then
> > >> > > >> >> the
> > >> > > >> >>>>>>> caller
> > >> > > >> >>>>>>>>>> emits them outside of the SourceReader. That way the
> > >> > > interface
> > >> > > >> >>> would
> > >> > > >> >>>>>>> not
> > >> > > >> >>>>>>>>>> allow to implement an outputting loop where we never
> > >> hand
> > >> > > back
> > >> > > >> >>>>>> control
> > >> > > >> >>>>>>>> to
> > >> > > >> >>>>>>>>>> the caller. At the moment, this contract can be
> easily
> > >> > broken
> > >> > > >> >> and
> > >> > > >> >>> is
> > >> > > >> >>>>>>>> only
> > >> > > >> >>>>>>>>>> mentioned loosely in the JavaDocs.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Cheers,
> > >> > > >> >>>>>>>>>> Till
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On Tue, Dec 10, 2019 at 7:49 AM Jingsong Li <
> > >> > > >> >>> jingsongl...@gmail.com
> > >> > > >> >>>>>>>> <mailto:jingsongl...@gmail.com>> <
> > jingsongl...@gmail.com
> > >> > > >> <mailto:
> > >> > > >> >>>>>>>> jingsongl...@gmail.com>>
> > >> > > >> >>>>>>>>>> wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi all,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> I think current design is good.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> My understanding is:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> For execution mode: bounded mode and continuous
> mode,
> > >> It's
> > >> > > >> >> totally
> > >> > > >> >>>>>>>>>> different. I don't think we have the ability to
> > >> integrate
> > >> > the
> > >> > > >> >> two
> > >> > > >> >>>>>>> models
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> at
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> present. It's about scheduling, memory, algorithms,
> > >> States,
> > >> > > >> etc.
> > >> > > >> >>> we
> > >> > > >> >>>>>>>>>> shouldn't confuse them.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> For source capabilities: only bounded, only
> > continuous,
> > >> > both
> > >> > > >> >>> bounded
> > >> > > >> >>>>>>> and
> > >> > > >> >>>>>>>>>> continuous.
> > >> > > >> >>>>>>>>>> I think Kafka is a source that can be ran both
> bounded
> > >> > > >> >>>>>>>>>> and continuous execution mode.
> > >> > > >> >>>>>>>>>> And Kafka with end offset should be ran both bounded
> > >> > > >> >>>>>>>>>> and continuous execution mode.  Using apache Beam
> with
> > >> > Flink
> > >> > > >> >>>>>> runner, I
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> used
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> to run a "bounded" Kafka in streaming mode. For our
> > >> > previous
> > >> > > >> >>>>>>> DataStream,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> it
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> is not necessarily required that the source cannot
> be
> > >> > > bounded.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> So it is my thought for Dawid's question:
> > >> > > >> >>>>>>>>>> 1.pass a bounded source to continuousSource() +1
> > >> > > >> >>>>>>>>>> 2.pass a continuous source to boundedSource() -1,
> > should
> > >> > > throw
> > >> > > >> >>>>>>>> exception.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> In StreamExecutionEnvironment, continuousSource and
> > >> > > >> >> boundedSource
> > >> > > >> >>>>>>> define
> > >> > > >> >>>>>>>>>> the execution mode. It defines a clear boundary of
> > >> > execution
> > >> > > >> >> mode.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Best,
> > >> > > >> >>>>>>>>>> Jingsong Lee
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On Tue, Dec 10, 2019 at 10:37 AM Jark Wu <
> > >> imj...@gmail.com
> > >> > > >> >>> <mailto:
> > >> > > >> >>>>>>>> imj...@gmail.com>> <imj...@gmail.com <mailto:
> > >> > imj...@gmail.com
> > >> > > >>
> > >> > > >> >>>> wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> I agree with Dawid's point that the boundedness
> > >> information
> > >> > > >> >> should
> > >> > > >> >>>>>>> come
> > >> > > >> >>>>>>>>>> from the source itself (e.g. the end timestamp), not
> > >> > through
> > >> > > >> >>>>>>>>>> env.boundedSouce()/continuousSource().
> > >> > > >> >>>>>>>>>> I think if we want to support something like
> > >> `env.source()`
> > >> > > >> that
> > >> > > >> >>>>>>> derive
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> execution mode from source,
> > >> > > `supportsBoundedness(Boundedness)`
> > >> > > >> >>>>>>>>>> method is not enough, because we don't know whether
> it
> > >> is
> > >> > > >> >> bounded
> > >> > > >> >>> or
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> not.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Best,
> > >> > > >> >>>>>>>>>> Jark
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On Mon, 9 Dec 2019 at 22:21, Dawid Wysakowicz <
> > >> > > >> >>>>>> dwysakow...@apache.org
> > >> > > >> >>>>>>>> <mailto:dwysakow...@apache.org>> <
> > dwysakow...@apache.org
> > >> > > >> <mailto:
> > >> > > >> >>>>>>>> dwysakow...@apache.org>>
> > >> > > >> >>>>>>>>>> wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> One more thing. In the current proposal, with the
> > >> > > >> >>>>>>>>>> supportsBoundedness(Boundedness) method and the
> > >> boundedness
> > >> > > >> >> coming
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> from
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> either continuousSource or boundedSource I could not
> > >> find
> > >> > how
> > >> > > >> >> this
> > >> > > >> >>>>>>>>>> information is fed back to the SplitEnumerator.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Best,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Dawid
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On 09/12/2019 13:52, Becket Qin wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi Dawid,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Thanks for the comments. This actually brings
> another
> > >> > > relevant
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> question
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> about what does a "bounded source" imply. I actually
> > had
> > >> > the
> > >> > > >> >> same
> > >> > > >> >>>>>>>>>> impression when I look at the Source API. Here is
> > what I
> > >> > > >> >>> understand
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> after
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> some discussion with Stephan. The bounded source has
> > the
> > >> > > >> >> following
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> impacts.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 1. API validity.
> > >> > > >> >>>>>>>>>> - A bounded source generates a bounded stream so
> some
> > >> > > >> operations
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> that
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> only
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> works for bounded records would be performed, e.g.
> > sort.
> > >> > > >> >>>>>>>>>> - To expose these bounded stream only APIs, there
> are
> > >> two
> > >> > > >> >> options:
> > >> > > >> >>>>>>>>>>       a. Add them to the DataStream API and throw
> > >> exception
> > >> > > if
> > >> > > >> a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> method
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> is
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> called on an unbounded stream.
> > >> > > >> >>>>>>>>>>       b. Create a BoundedDataStream class which is
> > >> returned
> > >> > > >> from
> > >> > > >> >>>>>>>>>> env.boundedSource(), while DataStream is returned
> from
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> env.continousSource().
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Note that this cannot be done by having single
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> env.source(theSource)
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> even
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the Source has a getBoundedness() method.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 2. Scheduling
> > >> > > >> >>>>>>>>>> - A bounded source could be computed stage by stage
> > >> without
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> bringing
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> up
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> all
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the tasks at the same time.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 3. Operator behaviors
> > >> > > >> >>>>>>>>>> - A bounded source indicates the records are finite
> so
> > >> some
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> operators
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> can
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> wait until it receives all the records before it
> > starts
> > >> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> processing.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> In the above impact, only 1 is relevant to the API
> > >> design.
> > >> > > And
> > >> > > >> >> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> current
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> proposal in FLIP-27 is following 1.b.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> // boundedness depends of source property, imo this
> > >> should
> > >> > > >> >> always
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> be
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> preferred
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> DataStream<MyType> stream = env.source(theSource);
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> In your proposal, does DataStream have bounded
> stream
> > >> only
> > >> > > >> >>> methods?
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> It
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> looks it should have, otherwise passing a bounded
> > >> Source to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> env.source()
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> would be confusing. In that case, we will
> essentially
> > do
> > >> > 1.a
> > >> > > if
> > >> > > >> >> an
> > >> > > >> >>>>>>>>>> unbounded Source is created from
> > >> > env.source(unboundedSource).
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> If we have the methods only supported for bounded
> > >> streams
> > >> > in
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> DataStream,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> it
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> seems a little weird to have a separate
> > >> BoundedDataStream
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> interface.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Am I understand it correctly?
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Thanks,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Jiangjie (Becket) Qin
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz <
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> dwysakow...@apache.org <mailto:
> dwysakow...@apache.org
> > >>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi all,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Really well written proposal and very important
> one. I
> > >> must
> > >> > > >> >> admit
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> I
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> have
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> not understood all the intricacies of it yet.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> One question I have though is about where does the
> > >> > > information
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> about
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> boundedness come from. I think in most cases it is a
> > >> > property
> > >> > > >> of
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> source. As you described it might be e.g. end
> offset,
> > a
> > >> > flag
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> should
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> it
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> monitor new splits etc. I think it would be a really
> > >> nice
> > >> > use
> > >> > > >> >> case
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> be
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> able to say:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> new KafkaSource().readUntil(long timestamp),
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> which could work as an "end offset". Moreover I
> think
> > >> all
> > >> > > >> >> Bounded
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> sources
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> support continuous mode, but no intrinsically
> > continuous
> > >> > > source
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> support
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Bounded mode. If I understood the proposal correctly
> > it
> > >> > > suggest
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> boundedness sort of "comes" from the outside of the
> > >> source,
> > >> > > >> from
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> invokation of either boundedStream or
> continousSource.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> I am wondering if it would make sense to actually
> > change
> > >> > the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> method
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> boolean Source#supportsBoundedness(Boundedness)
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Boundedness Source#getBoundedness().
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> As for the methods #boundedSource, #continousSource,
> > >> > assuming
> > >> > > >> >> the
> > >> > > >> >>>>>>>>>> boundedness is property of the source they do not
> > affect
> > >> > how
> > >> > > >> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> enumerator
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> works, but mostly how the dag is scheduled, right? I
> > am
> > >> not
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> against
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> those
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> methods, but I think it is a very specific use case
> to
> > >> > > actually
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> override
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the property of the source. In general I would
> expect
> > >> users
> > >> > > to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> only
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> call
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> env.source(theSource), where the source tells if it
> is
> > >> > > bounded
> > >> > > >> >> or
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> not. I
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> would suggest considering following set of methods:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> // boundedness depends of source property, imo this
> > >> should
> > >> > > >> >> always
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> be
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> preferred
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> DataStream<MyType> stream = env.source(theSource);
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> // always continous execution, whether bounded or
> > >> unbounded
> > >> > > >> >> source
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> DataStream<MyType> boundedStream =
> > >> > > >> >> env.continousSource(theSource);
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> // imo this would make sense if the
> BoundedDataStream
> > >> > > provides
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> additional features unavailable for continous mode
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> BoundedDataStream<MyType> batch =
> > >> > > env.boundedSource(theSource);
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Best,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Dawid
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On 04/12/2019 11:25, Stephan Ewen wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Thanks, Becket, for updating this.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> I agree with moving the aspects you mentioned into
> > >> separate
> > >> > > >> >> FLIPs
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> -
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> this
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> one way becoming unwieldy in size.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> +1 to the FLIP in its current state. Its a very
> > detailed
> > >> > > >> >> write-up,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> nicely
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> done!
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <
> > >> > > >> becket....@gmail.com
> > >> > > >> >>>>>>>> <mailto:becket....@gmail.com>> <becket....@gmail.com
> > >> > <mailto:
> > >> > > >> >>>>>>>> becket....@gmail.com>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> <
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> becket....@gmail.com <mailto:becket....@gmail.com>>
> > >> wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi all,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Sorry for the long belated update. I have updated
> > >> FLIP-27
> > >> > > wiki
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> page
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> with
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the latest proposals. Some noticeable changes
> include:
> > >> > > >> >>>>>>>>>> 1. A new generic communication mechanism between
> > >> > > >> SplitEnumerator
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> and
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> SourceReader.
> > >> > > >> >>>>>>>>>> 2. Some detail API method signature changes.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> We left a few things out of this FLIP and will
> address
> > >> them
> > >> > > in
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> separate
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> FLIPs. Including:
> > >> > > >> >>>>>>>>>> 1. Per split event time.
> > >> > > >> >>>>>>>>>> 2. Event time alignment.
> > >> > > >> >>>>>>>>>> 3. Fine grained failover for SplitEnumerator
> failure.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Please let us know if you have any question.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Thanks,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Jiangjie (Becket) Qin
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <
> > >> > > se...@apache.org
> > >> > > >> >>>>>>> <mailto:
> > >> > > >> >>>>>>>> se...@apache.org>> <se...@apache.org <mailto:
> > >> > se...@apache.org
> > >> > > >>
> > >> > > >> <
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> se...@apache.org <mailto:se...@apache.org>> wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi  Łukasz!
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Becket and me are working hard on figuring out the
> > last
> > >> > > details
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> and
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> implementing the first PoC. We would update the FLIP
> > >> > > hopefully
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> next
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> week.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> There is a fair chance that a first version of this
> > >> will be
> > >> > > in
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 1.10,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> but
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> I
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> think it will take another release to battle test it
> > and
> > >> > > >> migrate
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> connectors.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Best,
> > >> > > >> >>>>>>>>>> Stephan
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On Fri, Nov 15, 2019 at 11:14 AM Łukasz
> Jędrzejewski <
> > >> > > >> >> l...@touk.pl
> > >> > > >> >>>>>>>> <mailto:l...@touk.pl>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> <
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> l...@touk.pl <mailto:l...@touk.pl>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> This proposal looks very promising for us. Do you
> have
> > >> any
> > >> > > >> plans
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> in
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> which
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Flink release it is going to be released? We are
> > >> thinking
> > >> > on
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> using a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Data
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Set API for our future use cases but on the other
> hand
> > >> Data
> > >> > > Set
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> API
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> is
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> going to be deprecated so using proposed bounded
> data
> > >> > streams
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> solution
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> could be more viable in the long term.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Thanks,
> > >> > > >> >>>>>>>>>> Łukasz
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On 2019/10/01 15:48:03, Thomas Weise <
> > >> > thomas.we...@gmail.com
> > >> > > >> >>>>>> <mailto:
> > >> > > >> >>>>>>>> thomas.we...@gmail.com>> <thomas.we...@gmail.com
> > <mailto:
> > >> > > >> >>>>>>>> thomas.we...@gmail.com>> <
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> thomas.we...@gmail.com <mailto:
> thomas.we...@gmail.com
> > >>
> > >> > > wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Thanks for putting together this proposal!
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> I see that the "Per Split Event Time" and "Event
> Time
> > >> > > >> Alignment"
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> sections
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> are still TBD.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> It would probably be good to flesh those out a bit
> > >> before
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> proceeding
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> too
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> far
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> as the event time alignment will probably influence
> > the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> interaction
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> with
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the split reader, specifically ReaderStatus
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> emitNext(SourceOutput<E>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> output).
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> We currently have only one implementation for event
> > time
> > >> > > >> >> alignment
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> in
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Kinesis consumer. The synchronization in that case
> > takes
> > >> > > place
> > >> > > >> >> as
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> last
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> step before records are emitted downstream
> > >> (RecordEmitter).
> > >> > > >> With
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> currently proposed interfaces, the equivalent can be
> > >> > > >> implemented
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> in
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> reader loop, although note that in the Kinesis
> > consumer
> > >> the
> > >> > > per
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> shard
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> threads push records.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Synchronization has not been implemented for the
> Kafka
> > >> > > consumer
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> yet.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-12675 <
> > >> > > >> >>>>>>>> https://issues.apache.org/jira/browse/FLINK-12675>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> When I looked at it, I realized that the
> > implementation
> > >> > will
> > >> > > >> >> look
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> quite
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> different
> > >> > > >> >>>>>>>>>> from Kinesis because it needs to take place in the
> > pull
> > >> > part,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> where
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> records
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> are taken from the Kafka client. Due to the
> > >> multiplexing it
> > >> > > >> >> cannot
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> be
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> done
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> by blocking the split thread like it currently works
> > for
> > >> > > >> >> Kinesis.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Reading
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> from individual Kafka partitions needs to be
> > controlled
> > >> via
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> pause/resume
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> on the Kafka client.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> To take on that responsibility the split thread
> would
> > >> need
> > >> > to
> > >> > > >> be
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> aware
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> of
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>> watermarks or at least whether it should or should
> not
> > >> > > continue
> > >> > > >> >> to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> consume
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> a given split and this may require a different
> > >> SourceReader
> > >> > > or
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> SourceOutput
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> interface.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Thanks,
> > >> > > >> >>>>>>>>>> Thomas
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <
> > >> > mmyy1...@gmail.com
> > >> > > >> >>>>>> <mailto:
> > >> > > >> >>>>>>>> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:
> > >> > > >> >> mmyy1...@gmail.com
> > >> > > >> >>>>>
> > >> > > >> >>>>>> <
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
> > wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi Stephan,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Thank you for feedback!
> > >> > > >> >>>>>>>>>> Will take a look at your branch before public
> > >> discussing.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <
> > >> > > >> se...@apache.org
> > >> > > >> >>>>>>>> <mailto:se...@apache.org>> <se...@apache.org <mailto:
> > >> > > >> >>> se...@apache.org
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> <
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> se...@apache.org <mailto:se...@apache.org>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi Biao!
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Thanks for reviving this. I would like to join this
> > >> > > discussion,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> but
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> am
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> quite occupied with the 1.9 release, so can we maybe
> > >> pause
> > >> > > this
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> discussion
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> for a week or so?
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> In the meantime I can share some suggestion based on
> > >> prior
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> experiments:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> How to do watermarks / timestamp extractors in a
> > simpler
> > >> > and
> > >> > > >> >> more
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> flexible
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> way. I think that part is quite promising should be
> > >> part of
> > >> > > the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> new
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> source
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> interface.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>
> > >> > > >> >>>>
> > >> > > >> >>>
> > >> > > >> >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
> > >> > > >> >>>>>>>> <
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>
> > >> > > >> >>>>
> > >> > > >> >>>
> > >> > > >> >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
> > >> > > >> >>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>
> > >> > > >> >>>>
> > >> > > >> >>>
> > >> > > >> >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
> > >> > > >> >>>>>>>> <
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>
> > >> > > >> >>>>
> > >> > > >> >>>
> > >> > > >> >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
> > >> > > >> >>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Some experiments on how to build the source reader
> and
> > >> its
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> library
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> for
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> common threading/split patterns:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>
> > >> > > >> >>>>
> > >> > > >> >>>
> > >> > > >> >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
> > >> > > >> >>>>>>>> <
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>
> > >> > > >> >>>>
> > >> > > >> >>>
> > >> > > >> >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
> > >> > > >> >>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Best,
> > >> > > >> >>>>>>>>>> Stephan
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <
> > >> > > mmyy1...@gmail.com
> > >> > > >> >>>>>>> <mailto:
> > >> > > >> >>>>>>>> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:
> > >> > > >> >> mmyy1...@gmail.com
> > >> > > >> >>>>>
> > >> > > >> >>>>>> <
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi devs,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Since 1.9 is nearly released, I think we could get
> > back
> > >> to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> FLIP-27.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> I
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> believe it should be included in 1.10.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> There are so many things mentioned in document of
> > >> FLIP-27.
> > >> > > [1]
> > >> > > >> I
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> think
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> we'd better discuss them separately. However the
> wiki
> > is
> > >> > not
> > >> > > a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> good
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> place
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> to discuss. I wrote google doc about SplitReader API
> > >> which
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> misses
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> some
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> details in the document. [2]
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 1.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>
> > >> > > >> >>>>
> > >> > > >> >>>
> > >> > > >> >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> > >> > > >> >>>>>>>> <
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>
> > >> > > >> >>>>
> > >> > > >> >>>
> > >> > > >> >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> > >> > > >> >>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 2.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>
> > >> > > >> >>>>
> > >> > > >> >>>
> > >> > > >> >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
> > >> > > >> >>>>>>>> <
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>
> > >> > > >> >>>>
> > >> > > >> >>>
> > >> > > >> >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
> > >> > > >> >>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> CC Stephan, Aljoscha, Piotrek, Becket
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <
> > >> > mmyy1...@gmail.com
> > >> > > >> >>>>>> <mailto:
> > >> > > >> >>>>>>>> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:
> > >> > > >> >> mmyy1...@gmail.com
> > >> > > >> >>>>>
> > >> > > >> >>>>>> <
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi Steven,
> > >> > > >> >>>>>>>>>> Thank you for the feedback. Please take a look at
> the
> > >> > > document
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> FLIP-27
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> <
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>
> > >> > > >> >>>>
> > >> > > >> >>>
> > >> > > >> >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > >> > > >> >>>>>>>> <
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>
> > >> > > >> >>>>
> > >> > > >> >>>
> > >> > > >> >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > >> > > >> >>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> which
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> is updated recently. A lot of details of enumerator
> > were
> > >> > > added
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> in
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> this
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> document. I think it would help.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Steven Wu <stevenz...@gmail.com <mailto:
> > >> > stevenz...@gmail.com
> > >> > > >>
> > >> > > >> >> <
> > >> > > >> >>>>>>>> stevenz...@gmail.com <mailto:stevenz...@gmail.com>> <
> > >> > > >> >>>>>>> stevenz...@gmail.com
> > >> > > >> >>>>>>>> <mailto:stevenz...@gmail.com>> <stevenz...@gmail.com
> > >> > <mailto:
> > >> > > >> >>>>>>>> stevenz...@gmail.com>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 于2019年3月28日周四
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 下午12:52写道:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> This proposal mentioned that SplitEnumerator might
> run
> > >> on
> > >> > the
> > >> > > >> >>>>>>>>>> JobManager or
> > >> > > >> >>>>>>>>>> in a single task on a TaskManager.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> if enumerator is a single task on a taskmanager,
> then
> > >> the
> > >> > job
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> DAG
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> can
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> never
> > >> > > >> >>>>>>>>>> been embarrassingly parallel anymore. That will
> > nullify
> > >> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> leverage
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> of
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> fine-grained recovery for embarrassingly parallel
> > jobs.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> It's not clear to me what's the implication of
> running
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> enumerator
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> on
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> jobmanager. So I will leave that out for now.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <
> > >> > mmyy1...@gmail.com
> > >> > > >> >>>>>> <mailto:
> > >> > > >> >>>>>>>> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:
> > >> > > >> >> mmyy1...@gmail.com
> > >> > > >> >>>>>
> > >> > > >> >>>>>> <
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi Stephan & Piotrek,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Thank you for feedback.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> It seems that there are a lot of things to do in
> > >> community.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> I
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> am
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> just
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> afraid that this discussion may be forgotten since
> > >> there so
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> many
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> proposals
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> recently.
> > >> > > >> >>>>>>>>>> Anyway, wish to see the split topics soon :)
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Piotr Nowojski <pi...@da-platform.com <mailto:
> > >> > > >> >>> pi...@da-platform.com
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>> <
> > >> > > >> >>>>>>>> pi...@da-platform.com <mailto:pi...@da-platform.com>>
> <
> > >> > > >> >>>>>>>> pi...@da-platform.com <mailto:pi...@da-platform.com>>
> <
> > >> > > >> >>>>>>>> pi...@da-platform.com <mailto:pi...@da-platform.com>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 于2019年1月24日周四
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 下午8:21写道:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi Biao!
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> This discussion was stalled because of preparations
> > for
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> open
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> sourcing
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> & merging Blink. I think before creating the tickets
> > we
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> should
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> split this
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> discussion into topics/areas outlined by Stephan and
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> create
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Flips
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> for
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> that.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> I think there is no chance for this to be completed
> in
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> couple
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> of
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> remaining
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> weeks/1 month before 1.8 feature freeze, however it
> > >> would
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> be
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> good
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> to aim
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> with those changes for 1.9.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Piotrek
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On 20 Jan 2019, at 16:08, Biao Liu <
> > mmyy1...@gmail.com
> > >> > > >> <mailto:
> > >> > > >> >>>>>>>> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:
> > >> > > >> >> mmyy1...@gmail.com
> > >> > > >> >>>>>
> > >> > > >> >>>>>> <
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi community,
> > >> > > >> >>>>>>>>>> The summary of Stephan makes a lot sense to me. It
> is
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> much
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> clearer
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> indeed
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> after splitting the complex topic into small ones.
> > >> > > >> >>>>>>>>>> I was wondering is there any detail plan for next
> > step?
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> If
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> not,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> I
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> would
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> like to push this thing forward by creating some
> JIRA
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> issues.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Another question is that should version 1.8 include
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> these
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> features?
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Stephan Ewen <se...@apache.org <mailto:
> > se...@apache.org
> > >> >>
> > >> > <
> > >> > > >> >>>>>>>> se...@apache.org <mailto:se...@apache.org>> <
> > >> > se...@apache.org
> > >> > > >> >>>> <mailto:
> > >> > > >> >>>>>>>> se...@apache.org>> <se...@apache.org <mailto:
> > >> > se...@apache.org
> > >> > > >>
> > >> > > >> >>>>>>>> 于2018年12月1日周六
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 上午4:20写道:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Thanks everyone for the lively discussion. Let me
> try
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> summarize
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> where I
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> see convergence in the discussion and open issues.
> > >> > > >> >>>>>>>>>> I'll try to group this by design aspect of the
> source.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Please
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> let me
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> know
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> if I got things wrong or missed something crucial
> > here.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> For issues 1-3, if the below reflects the state of
> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> discussion, I
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> would
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> try and update the FLIP in the next days.
> > >> > > >> >>>>>>>>>> For the remaining ones we need more discussion.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> I would suggest to fork each of these aspects into a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> separate
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> mail
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> thread,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> or will loose sight of the individual aspects.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> *(1) Separation of Split Enumerator and Split
> Reader*
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - All seem to agree this is a good thing
> > >> > > >> >>>>>>>>>> - Split Enumerator could in the end live on
> JobManager
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> (and
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> assign
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> splits
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> via RPC) or in a task (and assign splits via data
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> streams)
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - this discussion is orthogonal and should come
> later,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> when
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> interface
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> is agreed upon.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> *(2) Split Readers for one or more splits*
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - Discussion seems to agree that we need to support
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> one
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> reader
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> that
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> possibly handles multiple splits concurrently.
> > >> > > >> >>>>>>>>>> - The requirement comes from sources where one
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> poll()-style
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> call
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> fetches
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> data from different splits / partitions
> > >> > > >> >>>>>>>>>>     --> example sources that require that would be
> for
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> example
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Kafka,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Pravega, Pulsar
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - Could have one split reader per source, or
> multiple
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> split
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> readers
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> that
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> share the "poll()" function
> > >> > > >> >>>>>>>>>> - To not make it too complicated, we can start with
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> thinking
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> about
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> one
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> split reader for all splits initially and see if
> that
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> covers
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> all
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> requirements
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> *(3) Threading model of the Split Reader*
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - Most active part of the discussion ;-)
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - A non-blocking way for Flink's task code to
> interact
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> with
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> source
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> is
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> needed in order to a task runtime code based on a
> > >> > > >> >>>>>>>>>> single-threaded/actor-style task design
> > >> > > >> >>>>>>>>>>     --> I personally am a big proponent of that, it
> > will
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> help
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> with
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> well-behaved checkpoints, efficiency, and simpler
> yet
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> more
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> robust
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> runtime
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> code
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - Users care about simple abstraction, so as a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> subclass
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> of
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> SplitReader
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> (non-blocking / async) we need to have a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> BlockingSplitReader
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> which
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> will
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> form the basis of most source implementations.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> BlockingSplitReader
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> lets
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> users do blocking simple poll() calls.
> > >> > > >> >>>>>>>>>> - The BlockingSplitReader would spawn a thread (or
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> more)
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> and
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> thread(s) can make blocking calls and hand over data
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> buffers
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> via
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> blocking
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> queue
> > >> > > >> >>>>>>>>>> - This should allow us to cover both, a fully async
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> runtime,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> and a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> simple
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> blocking interface for users.
> > >> > > >> >>>>>>>>>> - This is actually very similar to how the Kafka
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> connectors
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> work.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Kafka
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 9+ with one thread, Kafka 8 with multiple threads
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - On the base SplitReader (the async one), the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> non-blocking
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> method
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> that
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> gets the next chunk of data would signal data
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> availability
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> via
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> CompletableFuture, because that gives the best
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> flexibility
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> (can
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> await
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> completion or register notification handlers).
> > >> > > >> >>>>>>>>>> - The source task would register a "thenHandle()"
> (or
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> similar)
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> on the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> future to put a "take next data" task into the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> actor-style
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> mailbox
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> *(4) Split Enumeration and Assignment*
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - Splits may be generated lazily, both in cases
> where
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> there
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> is a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> limited
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> number of splits (but very many), or splits are
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> discovered
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> over
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> time
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - Assignment should also be lazy, to get better load
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> balancing
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - Assignment needs support locality preferences
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - Possible design based on discussion so far:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>     --> SplitReader has a method
> > "addSplits(SplitT...)"
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> add
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> one or
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> more
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> splits. Some split readers might assume they have
> only
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> one
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> split
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> ever,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> concurrently, others assume multiple splits. (Note:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> idea
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> behind
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> being
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> able
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> to add multiple splits at the same time is to ease
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> startup
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> where
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> multiple
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> splits may be assigned instantly.)
> > >> > > >> >>>>>>>>>>     --> SplitReader has a context object on which it
> > can
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> call
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> indicate
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> when
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> splits are completed. The enumerator gets that
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> notification and
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> can
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> use
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> decide when to assign new splits. This should help
> > both
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> in
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> cases
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> of
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> sources
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> that take splits lazily (file readers) and in case
> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> source
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> needs to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> preserve a partial order between splits (Kinesis,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Pravega,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Pulsar may
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> need
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> that).
> > >> > > >> >>>>>>>>>>     --> SplitEnumerator gets notification when
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> SplitReaders
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> start
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> and
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> when
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> they finish splits. They can decide at that moment
> to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> push
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> more
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> splits
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> that reader
> > >> > > >> >>>>>>>>>>     --> The SplitEnumerator should probably be aware
> > of
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> source
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> parallelism, to build its initial distribution.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - Open question: Should the source expose something
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> like
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> "host
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> preferences", so that yarn/mesos/k8s can take this
> > into
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> account
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> when
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> selecting a node to start a TM on?
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> *(5) Watermarks and event time alignment*
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - Watermark generation, as well as idleness, needs
> to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> be
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> per
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> split
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> (like
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> currently in the Kafka Source, per partition)
> > >> > > >> >>>>>>>>>> - It is desirable to support optional
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> event-time-alignment,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> meaning
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> that
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> splits that are ahead are back-pressured or
> > temporarily
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> unsubscribed
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - I think i would be desirable to encapsulate
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> watermark
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> generation
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> logic
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> in watermark generators, for a separation of
> concerns.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> The
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> watermark
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> generators should run per split.
> > >> > > >> >>>>>>>>>> - Using watermark generators would also help with
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> another
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> problem of
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> suggested interface, namely supporting non-periodic
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> watermarks
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> efficiently.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - Need a way to "dispatch" next record to different
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> watermark
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> generators
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - Need a way to tell SplitReader to "suspend" a
> split
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> until a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> certain
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> watermark is reached (event time backpressure)
> > >> > > >> >>>>>>>>>> - This would in fact be not needed (and thus
> simpler)
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> if
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> we
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> had
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> SplitReader per split and may be a reason to re-open
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> that
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> discussion
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> *(6) Watermarks across splits and in the Split
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Enumerator*
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - The split enumerator may need some watermark
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> awareness,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> which
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> should
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> be
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> purely based on split metadata (like create
> timestamp
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> of
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> file
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> splits)
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - If there are still more splits with overlapping
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> event
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> time
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> range
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> for
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> split reader, then that split reader should not
> > advance
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> watermark
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> within the split beyond the overlap boundary.
> > Otherwise
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> future
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> splits
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> will
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> produce late data.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - One way to approach this could be that the split
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> enumerator
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> may
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> send
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> watermarks to the readers, and the readers cannot
> emit
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> watermarks
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> beyond
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> that received watermark.
> > >> > > >> >>>>>>>>>> - Many split enumerators would simply immediately
> send
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Long.MAX
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> out
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> and
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> leave the progress purely to the split readers.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - For event-time alignment / split back pressure,
> this
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> begs
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> question
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> how we can avoid deadlocks that may arise when
> splits
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> are
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> suspended
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> for
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> event time back pressure,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> *(7) Batch and streaming Unification*
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - Functionality wise, the above design should
> support
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> both
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - Batch often (mostly) does not care about reading
> "in
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> order"
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> and
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> generating watermarks
> > >> > > >> >>>>>>>>>>     --> Might use different enumerator logic that is
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> more
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> locality
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> aware
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> and ignores event time order
> > >> > > >> >>>>>>>>>>     --> Does not generate watermarks
> > >> > > >> >>>>>>>>>> - Would be great if bounded sources could be
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> identified
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> at
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> compile
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> time,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> so that "env.addBoundedSource(...)" is type safe and
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> can
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> return a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> "BoundedDataStream".
> > >> > > >> >>>>>>>>>> - Possible to defer this discussion until later
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> *Miscellaneous Comments*
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - Should the source have a TypeInformation for the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> produced
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> type,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> instead
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> of a serializer? We need a type information in the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> stream
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> anyways, and
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> can
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> derive the serializer from that. Plus, creating the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> serializer
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> should
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> respect the ExecutionConfig.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> - The TypeSerializer interface is very powerful but
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> also
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> not
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> easy to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> implement. Its purpose is to handle data super
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> efficiently,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> support
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> flexible ways of evolution, etc.
> > >> > > >> >>>>>>>>>> For metadata I would suggest to look at the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> SimpleVersionedSerializer
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> instead, which is used for example for checkpoint
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> master
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> hooks,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> or for
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> streaming file sink. I think that is is a good match
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> for
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> cases
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> where
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> we
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> do
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> not need more than ser/deser (no copy, etc.) and
> don't
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> need to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> push
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> versioning out of the serialization paths for best
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> performance
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> (as in
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> TypeSerializer)
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> k.klou...@data-artisans.com>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> wrote:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Hi Biao,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Thanks for the answer!
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> So given the multi-threaded readers, now we have as
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> open
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> questions:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 1) How do we let the checkpoints pass through our
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> multi-threaded
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> reader
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> operator?
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 2) Do we have separate reader and source operators
> or
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> not? In
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> strategy
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> that has a separate source, the source operator has
> a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> parallelism of
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 1
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> and
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> is responsible for split recovery only.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> For the first one, given also the constraints
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> (blocking,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> finite
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> queues,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> etc), I do not have an answer yet.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> For the 2nd, I think that we should go with separate
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> operators
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> for
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> source and the readers, for the following reasons:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 1) This is more aligned with a potential future
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> improvement
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> where the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> split
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> discovery becomes a responsibility of the JobManager
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> and
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> readers are
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> pooling more work from the JM.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> 2) The source is going to be the "single point of
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> truth".
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> It
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> will
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> know
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> what
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> has been processed and what not. If the source and
> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> readers
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> are a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> single
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> operator with parallelism > 1, or in general, if the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> split
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> discovery
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> is
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> done by each task individually, then:
> > >> > > >> >>>>>>>>>>    i) we have to have a deterministic scheme for
> each
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> reader to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> assign
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> splits to itself (e.g. mod subtaskId). This is not
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> necessarily
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> trivial
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> for
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> all sources.
> > >> > > >> >>>>>>>>>>    ii) each reader would have to keep a copy of all
> > its
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> processed
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> slpits
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>    iii) the state has to be a union state with a
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> non-trivial
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> merging
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> logic
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> in order to support rescaling.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Two additional points that you raised above:
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> i) The point that you raised that we need to keep
> all
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> splits
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> (processed
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> and
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> not-processed) I think is a bit of a strong
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> requirement.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> This
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> would
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> imply
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> that for infinite sources the state will grow
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> indefinitely.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> This is
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> problem
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> is even more pronounced if we do not have a single
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> source
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> that
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> assigns
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> splits to readers, as each reader will have its own
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> copy
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> of
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> state.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> ii) it is true that for finite sources we need to
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> somehow
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> not
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> close
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> readers when the source/split discoverer finishes.
> The
> > >> > > >> >>>>>>>>>> ContinuousFileReaderOperator has a work-around for
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> that.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> It is
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> not
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> elegant,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> and checkpoints are not emitted after closing the
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> source,
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> but
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> this, I
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> believe, is a bigger problem which requires more
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> changes
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> than
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> just
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> refactoring the source interface.
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> Cheers,
> > >> > > >> >>>>>>>>>> Kostas
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>>
> > >> > > >> >>>>>>>>>> --
> > >> > > >> >>>>>>>>>> Best, Jingsong Lee
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>>
> > >> > > >> >>>>>>>
> > >> > > >> >>>>>>
> > >> > > >> >>>>>>
> > >> > > >> >>>>>> --
> > >> > > >> >>>>>> Best, Jingsong Lee
> > >> > > >> >>>>>>
> > >> > > >> >>>>>
> > >> > > >> >>>>
> > >> > > >> >>>>
> > >> > > >> >>>
> > >> > > >> >>
> > >> > > >> >
> > >> > > >>
> > >> > > >>
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to