Becket,

Regarding "UNBOUNDED source that stops at some point", I found it difficult
to grasp what UNBOUNDED really mean.

If we want to use Kafka source with an end/stop time, I guess you call it
UNBOUNDED kafka source that stops (aka BOUNDED-streaming). The
terminology is a little confusing to me. Maybe BOUNDED/UNBOUNDED shouldn't
be used to categorize source. Just call it Kafka source and it can run in
either BOUNDED or UNBOUNDED mode.

Thanks,
Steven

On Thu, Dec 19, 2019 at 7:02 PM Becket Qin <becket....@gmail.com> wrote:

> I had an offline chat with Jark, and here are some more thoughts:
>
> 1. From SQL perspective, BOUNDED source leads to the batch execution mode,
> UNBOUNDED source leads to the streaming execution mode.
> 2. The semantic of UNBOUNDED source is may or may not stop. The semantic of
> BOUNDED source is will stop.
> 3. The semantic of DataStream is may or may not terminate. The semantic of
> BoundedDataStream is will terminate.
>
> Given that, option 3 seems a better option because:
> 1. SQL already has strict binding between Boundedness and execution mode.
> Letting DataStream be consistent would be good.
> 2. The semantic of UNBOUNDED source is exactly the same as DataStream. So
> we should avoid breaking such semantic, i.e. turning some DataStream from
> "may or may not terminate" to "will terminate".
>
> For case where users want BOUNDED-streaming combination, they can simply
> use an UNBOUNDED source that stops at some point. We can even provide a
> simple wrapper to wrap a BOUNDED source as an UNBOUNDED source if that
> helps. But API wise, option 3 seems telling a pretty good whole story.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
> On Thu, Dec 19, 2019 at 10:30 PM Becket Qin <becket....@gmail.com> wrote:
>
> > Hi Timo,
> >
> > Bounded is just a special case of unbounded and every bounded source can
> >> also be treated as an unbounded source. This would unify the API if
> >> people don't need a bounded operation.
> >
> >
> > With option 3 users can still get a unified API with something like
> below:
> >
> > DataStream boundedStream = env.boundedSource(boundedSource);
> > DataStream unboundedStream = env.source(unboundedSource);
> >
> > So in both cases, users can still use a unified DataStream without
> > touching the bounded stream only methods.
> > By "unify the API if people don't need the bounded operation". Do you
> > expect a DataStream with a Bounded source to have the batch operators and
> > scheduler settings as well?
> >
> >
> > If we allow DataStream from BOUNDED source, we will essentially pick
> "*modified
> > option 2*".
> >
> > // The source is either bounded or unbounded, but only unbounded
> >> operations could be performed on the returned DataStream.
> >> DataStream<Type> dataStream = env.source(someSource);
> >
> >
> >> // The source must be a bounded source, otherwise exception is thrown.
> >> BoundedDataStream<Type> boundedDataStream =
> >> env.boundedSource(boundedSource);
> >
> >
> >
> > // Add the following method to DataStream
> >
> > Boundedness DataStream#getBoundedness();
> >
> >
> > From pure logical perspective, Boundedness and runtime settings
> > (Stream/Batch) are two orthogonal dimensions. And are specified in the
> > following way.
> >
> > *Boundedness* - defined by the source: BOUNDED / UNBOUNDED.
> > *Running mode* - defined by the API class: DataStream (Streaming mode) /
> > BoundedDataStream (batch mode).
> >
> > Excluding the UNBOUNDED-batch combination, the "*modified option 2"*
> > covers the rest three combination. Compared with "*modified option 2*",
> > the main benefit of option 3 is its simplicity and clearness, by tying
> > boundedness to running mode and giving up BOUNDED-streaming combination.
> >
> > Just to be clear, I am fine with either option. But I would like to
> > understand a bit more about the bounded-streaming use case and when users
> > would prefer this over bounded-batch case, and whether the added value
> > justifies the additional complexity in the API. Two cases I can think of
> > are:
> > 1. The records in DataStream will be processed in order, while
> > BoundedDataStream processes records without order guarantee.
> > 2. DataStream emits intermediate results when processing a finite
> dataset,
> > while BoundedDataStream only emit the final result. In any case, it could
> > be supported by an UNBOUNDED source stopping at some point.
> >
> > Case 1 is actually misleading because DataStream in general doesn't
> really
> > support in-order process.
> > Case 2 seems a rare use case because the instantaneous intermediate
> result
> > seems difficult to reason about. In any case, this can be supported by an
> > UNBOUNDED source that stops at some point.
> >
> > Is there other use cases for bounded-streaming combination I missed? I am
> > a little hesitating to put the testing requirement here because ideally
> I'd
> > avoid having public APIs for testing purpose only. And this could be
> > resolved by having a UNBOUNDED source stopping at some point as well.
> >
> > Sorry for the long discussion, but I would really like to make an API
> > decision after knowing all the pros and cons.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> >
> >
> >
> >
> > On Thu, Dec 19, 2019 at 6:19 PM Timo Walther <twal...@apache.org> wrote:
> >
> >> Hi Becket,
> >>
> >> regarding *Option 3* I think we can relax the constraints for
> >> env.source():
> >>
> >> // MySource can be bounded or unbounded
> >> DataStream<Type> dataStream = env.source(mySource);
> >>
> >> // MySource must be bounded, otherwise throws exception.
> >> BoundedDataStream<Type> boundedDataStream = env.boundedSource(mySource);
> >>
> >> Bounded is just a special case of unbounded and every bounded source can
> >> also be treated as an unbounded source. This would unify the API if
> >> people don't need a bounded operation. It also addresses Jark's
> concerns.
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 18.12.19 14:16, Becket Qin wrote:
> >> > Hi Jark,
> >> >
> >> > Please see the reply below:
> >> >
> >> > Regarding to option#3, my concern is that if we don't support
> streaming
> >> >> mode for bounded source,
> >> >> how could we create a testing source for streaming mode? Currently,
> >> all the
> >> >> testing source for streaming
> >> >> are bounded, so that the integration test will finish finally.
> >> >
> >> >
> >> > An UNBOUNDED source does not mean it will never stops. It simply
> >> indicates
> >> > that the source *may* run forever, so the runtime needs to be prepared
> >> for
> >> > that, but the task may still stop at some point when it hits some
> >> > source-specific condition. So an UNBOUNDED testing source can still
> >> stop at
> >> > some point if needed.
> >> >
> >> > Regarding to Source#getRecordOrder(), could we have a implicit
> contract
> >> >> that unbounded source should
> >> >> already read in order (i.e. reading partitions in parallel), for
> >> bounded
> >> >> source the order is not mandatory.
> >> >
> >> >
> >> >
> >> >> This is also the behaviors of the current sources.
> >> >
> >> > 1) a source can't guarantee it reads in strict order, because the
> >> producer
> >> >> may produce data not in order.
> >> >> 2) *Bounded-StrictOrder* is not necessary, because batch can reorder
> >> data.
> >> >
> >> >
> >> > It is true that sometimes the source cannot guarantee the record
> order,
> >> but
> >> > sometimes it can. Right now, even for stream processing, there is no
> >> > processing order guarantee. For example, a join operator may emit a
> >> later
> >> > record which successfully found a join match earlier.
> >> > Event order is one of the most important requirements for event
> >> processing,
> >> > a clear order guarantee would be necessary. That said, I agree that
> >> right
> >> > now even if the sources provide the record order requirement, the
> >> runtime
> >> > is not able to guarantee that out of the box. So I am OK if we add the
> >> > record order to the Source later. But we should avoid misleading users
> >> to
> >> > make them think the processing order is guaranteed when using the
> >> unbounded
> >> > runtime.
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin
> >> >
> >> > On Wed, Dec 18, 2019 at 10:29 AM Jark Wu <imj...@gmail.com> wrote:
> >> >
> >> >> Hi Becket,
> >> >>
> >> >> That's great we have reached a consensus on Source#getBoundedness().
> >> >>
> >> >> Regarding to option#3, my concern is that if we don't support
> streaming
> >> >> mode for bounded source,
> >> >> how could we create a testing source for streaming mode? Currently,
> >> all the
> >> >> testing source for streaming
> >> >> are bounded, so that the integration test will finish finally.
> >> >>
> >> >> Regarding to Source#getRecordOrder(), could we have a implicit
> contract
> >> >> that unbounded source should
> >> >> already read in order (i.e. reading partitions in parallel), for
> >> bounded
> >> >> source the order is not mandatory.
> >> >> This is also the behaviors of the current sources.
> >> >> 1) a source can't guarantee it reads in strict order, because the
> >> producer
> >> >> may produce data not in order.
> >> >> 2) *Bounded-StrictOrder* is not necessary, because batch can reorder
> >> data.
> >> >>
> >> >> Best,
> >> >> Jark
> >> >>
> >> >>
> >> >>
> >> >> On Tue, 17 Dec 2019 at 22:03, Becket Qin <becket....@gmail.com>
> wrote:
> >> >>
> >> >>> Hi folks,
> >> >>>
> >> >>> Thanks for the comments. I am convinced that the Source API should
> not
> >> >> take
> >> >>> boundedness as a parameter after it is constructed. What Timo and
> >> Dawid
> >> >>> suggested sounds a reasonable solution to me. So the Source API
> would
> >> >>> become:
> >> >>>
> >> >>> Source {
> >> >>>      Boundedness getBoundedness();
> >> >>> }
> >> >>>
> >> >>> Assuming the above Source API, in addition to the two options
> >> mentioned
> >> >> in
> >> >>> earlier emails, I am thinking of another option:
> >> >>>
> >> >>> *Option 3:*
> >> >>> // MySource must be unbounded, otherwise throws exception.
> >> >>> DataStream<Type> dataStream = env.source(mySource);
> >> >>>
> >> >>> // MySource must be bounded, otherwise throws exception.
> >> >>> BoundedDataStream<Type> boundedDataStream =
> >> env.boundedSource(mySource);
> >> >>>
> >> >>> The pros of this API are:
> >> >>>     a) It fits the requirements from Table / SQL well.
> >> >>>     b) DataStream users still have type safety (option 2 only has
> >> partial
> >> >>> type safety).
> >> >>>     c) Cristal clear boundedness from the API which makes DataStream
> >> join
> >> >> /
> >> >>> connect easy to reason about.
> >> >>> The caveats I see,
> >> >>>     a) It is inconsistent with Table since Table has one unified
> >> >> interface.
> >> >>>     b) No streaming mode for bounded source.
> >> >>>
> >> >>> @Stephan Ewen <ewenstep...@gmail.com> @Aljoscha Krettek
> >> >>> <aljos...@ververica.com> what do you think of the approach?
> >> >>>
> >> >>>
> >> >>> Orthogonal to the above API, I am wondering whether boundedness is
> the
> >> >> only
> >> >>> dimension needed to describe the characteristic of the Source
> >> behavior.
> >> >> We
> >> >>> may also need to have another dimension of *record order*.
> >> >>>
> >> >>> For example, when a file source is reading from a directory with
> >> bounded
> >> >>> records, it may have two ways to read.
> >> >>> 1. Read files in parallel.
> >> >>> 2. Read files in the chronological order.
> >> >>> In both cases, the file source is a Bounded Source. However, the
> >> >> processing
> >> >>> requirement for downstream may be different. In the first case, the
> >> >>> record processing and result emitting order does not matter, e.g.
> word
> >> >>> count. In the second case, the records may have to be processed in
> the
> >> >>> order they were read, e.g. change log processing.
> >> >>>
> >> >>> If the Source only has a getBoundedness() method, the downstream
> >> >> processors
> >> >>> would not know whether the records emitted from the Source should be
> >> >>> processed in order or not. So combining the boundedness and record
> >> order,
> >> >>> we will have four scenarios:
> >> >>>
> >> >>> *Bounded-StrictOrder*:     A segment of change log.
> >> >>> *Bounded-Random*:          Batch Word Count.
> >> >>> *Unbounded-StrictOrder*: An infinite change log.
> >> >>> *Unbounded-Random*:     Streaming Word Count.
> >> >>>
> >> >>> Option 2 mentioned in the previous email was kind of trying to
> handle
> >> the
> >> >>> Bounded-StrictOrder case by creating a DataStream from a bounded
> >> source,
> >> >>> which actually does not work.
> >> >>> It looks that we do not have strict order support in some operators
> at
> >> >> this
> >> >>> point, e.g. join. But we may still want to add the semantic to the
> >> Source
> >> >>> first so later on we don't need to change all the source
> >> implementations,
> >> >>> especially given that many of them will be implemented by 3rd party.
> >> >>>
> >> >>> Given that, we need another dimension of *Record Order* in the
> Source.
> >> >> More
> >> >>> specifically, the API would become:
> >> >>>
> >> >>> Source {
> >> >>>      Boundedness getBoundedness();
> >> >>>      RecordOrder getRecordOrder();
> >> >>> }
> >> >>>
> >> >>> public enum RecordOrder {
> >> >>>      /** The record in the DataStream must be processed in its
> strict
> >> >> order
> >> >>> for correctness. */
> >> >>>      STRICT,
> >> >>>      /** The record in the DataStream can be processed in arbitrary
> >> order.
> >> >>> */
> >> >>>      RANDOM;
> >> >>> }
> >> >>>
> >> >>> Any thoughts?
> >> >>>
> >> >>> Thanks,
> >> >>>
> >> >>> Jiangjie (Becket) Qin
> >> >>>
> >> >>> On Tue, Dec 17, 2019 at 3:44 PM Timo Walther <twal...@apache.org>
> >> wrote:
> >> >>>
> >> >>>> Hi Becket,
> >> >>>>
> >> >>>> I completely agree with Dawid's suggestion. The information about
> the
> >> >>>> boundedness should come out of the source. Because most of the
> >> >> streaming
> >> >>>> sources can be made bounded based on some connector specific
> >> criterion.
> >> >>>> In Kafka, it would be an end offset or end timestamp but in any
> case
> >> >>>> having just a env.boundedSource() is not enough because parameters
> >> for
> >> >>>> making the source bounded are missing.
> >> >>>>
> >> >>>> I suggest to have a simple `isBounded(): Boolean` flag in every
> >> source
> >> >>>> that might be influenced by a connector builder as Dawid mentioned.
> >> >>>>
> >> >>>> For type safety during programming, we can still go with *Final
> state
> >> >>>> 1*. By having a env.source() vs env.boundedSource(). The latter
> would
> >> >>>> just enforce that the boolean flag is set to `true` and could make
> >> >>>> bounded operations available (if we need that actually).
> >> >>>>
> >> >>>> However, I don't think that we should start making a unified Table
> >> API
> >> >>>> ununified again. Boundedness is an optimization property. Every
> >> bounded
> >> >>>> operation can also executed in an unbounded way using
> >> >> updates/retraction
> >> >>>> or watermarks.
> >> >>>>
> >> >>>> Regards,
> >> >>>> Timo
> >> >>>>
> >> >>>>
> >> >>>> On 15.12.19 14:22, Becket Qin wrote:
> >> >>>>> Hi Dawid and Jark,
> >> >>>>>
> >> >>>>> I think the discussion ultimately boils down to the question that
> >> >> which
> >> >>>> one
> >> >>>>> of the following two final states do we want? Once we make this
> >> >>> decision,
> >> >>>>> everything else can be naturally derived.
> >> >>>>>
> >> >>>>> *Final state 1*: Separate API for bounded / unbounded DataStream &
> >> >>> Table.
> >> >>>>> That means any code users write will be valid at the point when
> they
> >> >>>> write
> >> >>>>> the code. This is similar to having type safety check at
> programming
> >> >>>> time.
> >> >>>>> For example,
> >> >>>>>
> >> >>>>> BoundedDataStream extends DataStream {
> >> >>>>> // Operations only available for bounded data.
> >> >>>>> BoundedDataStream sort(...);
> >> >>>>>
> >> >>>>> // Interaction with another BoundedStream returns a Bounded
> stream.
> >> >>>>> BoundedJoinedDataStream join(BoundedDataStream other)
> >> >>>>>
> >> >>>>> // Interaction with another unbounded stream returns an unbounded
> >> >>> stream.
> >> >>>>> JoinedDataStream join(DataStream other)
> >> >>>>> }
> >> >>>>>
> >> >>>>> BoundedTable extends Table {
> >> >>>>>     // Bounded only operation.
> >> >>>>> BoundedTable sort(...);
> >> >>>>>
> >> >>>>> // Interaction with another BoundedTable returns a BoundedTable.
> >> >>>>> BoundedTable join(BoundedTable other)
> >> >>>>>
> >> >>>>> // Interaction with another unbounded table returns an unbounded
> >> >> table.
> >> >>>>> Table join(Table other)
> >> >>>>> }
> >> >>>>>
> >> >>>>> *Final state 2*: One unified API for bounded / unbounded
> DataStream
> >> /
> >> >>>>> Table.
> >> >>>>> That unified API may throw exception at DAG compilation time if an
> >> >>>> invalid
> >> >>>>> operation is tried. This is what Table API currently follows.
> >> >>>>>
> >> >>>>> DataStream {
> >> >>>>> // Throws exception if the DataStream is unbounded.
> >> >>>>> DataStream sort();
> >> >>>>> // Get boundedness.
> >> >>>>> Boundedness getBoundedness();
> >> >>>>> }
> >> >>>>>
> >> >>>>> Table {
> >> >>>>> // Throws exception if the table has infinite rows.
> >> >>>>> Table orderBy();
> >> >>>>>
> >> >>>>> // Get boundedness.
> >> >>>>> Boundedness getBoundedness();
> >> >>>>> }
> >> >>>>>
> >> >>>>> >From what I understand, there is no consensus so far on this
> >> decision
> >> >>>> yet.
> >> >>>>> Whichever final state we choose, we need to make it consistent
> >> across
> >> >>> the
> >> >>>>> entire project. We should avoid the case that Table follows one
> >> final
> >> >>>> state
> >> >>>>> while DataStream follows another. Some arguments I am aware of
> from
> >> >>> both
> >> >>>>> sides so far are following:
> >> >>>>>
> >> >>>>> Arguments for final state 1:
> >> >>>>> 1a) Clean API with method safety check at programming time.
> >> >>>>> 1b) (Counter 2b) Although SQL does not have programming time error
> >> >>>> check, SQL
> >> >>>>> is not really a "programming language" per se. So SQL can be
> >> >> different
> >> >>>> from
> >> >>>>> Table and DataStream.
> >> >>>>> 1c)  Although final state 2 seems making it easier for SQL to use
> >> >> given
> >> >>>> it
> >> >>>>> is more "config based" than "parameter based", final state 1 can
> >> >>> probably
> >> >>>>> also meet what SQL wants by wrapping the Source in TableSource /
> >> >>>>> TableSourceFactory API if needed.
> >> >>>>>
> >> >>>>> Arguments for final state 2:
> >> >>>>> 2a) The Source API itself seems already sort of following the
> >> unified
> >> >>> API
> >> >>>>> pattern.
> >> >>>>> 2b) There is no "programming time" method error check in SQL case,
> >> so
> >> >>> we
> >> >>>>> cannot really achieve final state 1 across the board.
> >> >>>>> 2c) It is an easier path given our current status, i.e. Table is
> >> >>> already
> >> >>>>> following final state 2.
> >> >>>>> 2d) Users can always explicitly check the boundedness if they want
> >> >> to.
> >> >>>>>
> >> >>>>> As I mentioned earlier, my initial thought was also to have a
> >> >>>>> "configuration based" Source rather than a "parameter based"
> Source.
> >> >> So
> >> >>>> it
> >> >>>>> is completely possible that I missed some important consideration
> or
> >> >>>> design
> >> >>>>> principles that we want to enforce for the project. It would be
> good
> >> >>>>> if @Stephan
> >> >>>>> Ewen <step...@ververica.com> and @Aljoscha Krettek <
> >> >>>> aljos...@ververica.com> can
> >> >>>>> also provide more thoughts on this.
> >> >>>>>
> >> >>>>>
> >> >>>>> Re: Jingsong
> >> >>>>>
> >> >>>>> As you said, there are some batched system source, like
> parquet/orc
> >> >>>> source.
> >> >>>>>> Could we have the batch emit interface to improve performance?
> The
> >> >>>> queue of
> >> >>>>>> per record may cause performance degradation.
> >> >>>>>
> >> >>>>>
> >> >>>>> The current interface does not necessarily cause performance
> problem
> >> >>> in a
> >> >>>>> multi-threading case. In fact, the base implementation allows
> >> >>>> SplitReaders
> >> >>>>> to add a batch <E> of records<T> to the records queue<E>, so each
> >> >>> element
> >> >>>>> in the records queue would be a batch <E>. In this case, when the
> >> >> main
> >> >>>>> thread polls records, it will take a batch <E> of records <T> from
> >> >> the
> >> >>>>> shared records queue and process the records <T> in a batch
> manner.
> >> >>>>>
> >> >>>>> Thanks,
> >> >>>>>
> >> >>>>> Jiangjie (Becket) Qin
> >> >>>>>
> >> >>>>> On Thu, Dec 12, 2019 at 1:29 PM Jingsong Li <
> jingsongl...@gmail.com
> >> >
> >> >>>> wrote:
> >> >>>>>
> >> >>>>>> Hi Becket,
> >> >>>>>>
> >> >>>>>> I also have some performance concerns too.
> >> >>>>>>
> >> >>>>>> If I understand correctly, SourceOutput will emit data per record
> >> >> into
> >> >>>> the
> >> >>>>>> queue? I'm worried about the multithreading performance of this
> >> >> queue.
> >> >>>>>>
> >> >>>>>>> One example is some batched messaging systems which only have an
> >> >>> offset
> >> >>>>>> for the entire batch instead of individual messages in the batch.
> >> >>>>>>
> >> >>>>>> As you said, there are some batched system source, like
> parquet/orc
> >> >>>> source.
> >> >>>>>> Could we have the batch emit interface to improve performance?
> The
> >> >>>> queue of
> >> >>>>>> per record may cause performance degradation.
> >> >>>>>>
> >> >>>>>> Best,
> >> >>>>>> Jingsong Lee
> >> >>>>>>
> >> >>>>>> On Thu, Dec 12, 2019 at 9:15 AM Jark Wu <imj...@gmail.com>
> wrote:
> >> >>>>>>
> >> >>>>>>> Hi Becket,
> >> >>>>>>>
> >> >>>>>>> I think Dawid explained things clearly and makes a lot of sense.
> >> >>>>>>> I'm also in favor of #2, because #1 doesn't work for our future
> >> >>> unified
> >> >>>>>>> envrionment.
> >> >>>>>>>
> >> >>>>>>> You can see the vision in this documentation [1]. In the future,
> >> we
> >> >>>> would
> >> >>>>>>> like to
> >> >>>>>>> drop the global streaming/batch mode in SQL (i.e.
> >> >>>>>>> EnvironmentSettings#inStreamingMode/inBatchMode).
> >> >>>>>>> A source is bounded or unbounded once defined, so queries can be
> >> >>>> inferred
> >> >>>>>>> from source to run
> >> >>>>>>> in streaming or batch or hybrid mode. However, in #1, we will
> lose
> >> >>> this
> >> >>>>>>> ability because the framework
> >> >>>>>>> doesn't know whether the source is bounded or unbounded.
> >> >>>>>>>
> >> >>>>>>> Best,
> >> >>>>>>> Jark
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>> [1]:
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://docs.google.com/document/d/1yrKXEIRATfxHJJ0K3t6wUgXAtZq8D-XgvEnvl2uUcr0/edit#heading=h.v4ib17buma1p
> >> >>>>>>>
> >> >>>>>>> On Wed, 11 Dec 2019 at 20:52, Piotr Nowojski <
> pi...@ververica.com
> >> >
> >> >>>>>> wrote:
> >> >>>>>>>
> >> >>>>>>>> Hi,
> >> >>>>>>>>
> >> >>>>>>>> Regarding the:
> >> >>>>>>>>
> >> >>>>>>>> Collection<E> getNextRecords()
> >> >>>>>>>>
> >> >>>>>>>> I’m pretty sure such design would unfortunately impact the
> >> >>> performance
> >> >>>>>>>> (accessing and potentially creating the collection on the hot
> >> >> path).
> >> >>>>>>>>
> >> >>>>>>>> Also the
> >> >>>>>>>>
> >> >>>>>>>> InputStatus emitNext(DataOutput<T> output) throws Exception;
> >> >>>>>>>> or
> >> >>>>>>>> Status pollNext(SourceOutput<T> sourceOutput) throws Exception;
> >> >>>>>>>>
> >> >>>>>>>> Gives us some opportunities in the future, to allow Source hot
> >> >>> looping
> >> >>>>>>>> inside, until it receives some signal “please exit because of
> >> some
> >> >>>>>>> reasons”
> >> >>>>>>>> (output collector could return such hint upon collecting the
> >> >>> result).
> >> >>>>>> But
> >> >>>>>>>> that’s another topic outside of this FLIP’s scope.
> >> >>>>>>>>
> >> >>>>>>>> Piotrek
> >> >>>>>>>>
> >> >>>>>>>>> On 11 Dec 2019, at 10:41, Till Rohrmann <trohrm...@apache.org
> >
> >> >>>>>> wrote:
> >> >>>>>>>>>
> >> >>>>>>>>> Hi Becket,
> >> >>>>>>>>>
> >> >>>>>>>>> quick clarification from my side because I think you
> >> >> misunderstood
> >> >>> my
> >> >>>>>>>>> question. I did not suggest to let the SourceReader return
> only
> >> a
> >> >>>>>>> single
> >> >>>>>>>>> record at a time when calling getNextRecords. As the return
> type
> >> >>>>>>>> indicates,
> >> >>>>>>>>> the method can return an arbitrary number of records.
> >> >>>>>>>>>
> >> >>>>>>>>> Cheers,
> >> >>>>>>>>> Till
> >> >>>>>>>>>
> >> >>>>>>>>> On Wed, Dec 11, 2019 at 10:13 AM Dawid Wysakowicz <
> >> >>>>>>>> dwysakow...@apache.org <mailto:dwysakow...@apache.org>>
> >> >>>>>>>>> wrote:
> >> >>>>>>>>>
> >> >>>>>>>>>> Hi Becket,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Issue #1 - Design of Source interface
> >> >>>>>>>>>>
> >> >>>>>>>>>> I mentioned the lack of a method like
> >> >>>>>>>> Source#createEnumerator(Boundedness
> >> >>>>>>>>>> boundedness, SplitEnumeratorContext context), because without
> >> >> the
> >> >>>>>>>> current
> >> >>>>>>>>>> proposal is not complete/does not work.
> >> >>>>>>>>>>
> >> >>>>>>>>>> If we say that boundedness is an intrinsic property of a
> source
> >> >>> imo
> >> >>>>>> we
> >> >>>>>>>>>> don't need the Source#createEnumerator(Boundedness
> boundedness,
> >> >>>>>>>>>> SplitEnumeratorContext context) method.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Assuming a source from my previous example:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Source source = KafkaSource.builder()
> >> >>>>>>>>>>    ...
> >> >>>>>>>>>>    .untilTimestamp(...)
> >> >>>>>>>>>>    .build()
> >> >>>>>>>>>>
> >> >>>>>>>>>> Would the enumerator differ if created like
> >> >>>>>>>>>> source.createEnumerator(CONTINUOUS_UNBOUNDED, ...) vs source
> >> >>>>>>>>>> .createEnumerator(BOUNDED, ...)? I know I am repeating
> myself,
> >> >> but
> >> >>>>>>> this
> >> >>>>>>>> is
> >> >>>>>>>>>> the part that my opinion differ the most from the current
> >> >>> proposal.
> >> >>>>>> I
> >> >>>>>>>>>> really think it should always be the source that tells if it
> is
> >> >>>>>>> bounded
> >> >>>>>>>> or
> >> >>>>>>>>>> not. In the current proposal methods
> >> >> continousSource/boundedSource
> >> >>>>>>>> somewhat
> >> >>>>>>>>>> reconfigure the source, which I think is misleading.
> >> >>>>>>>>>>
> >> >>>>>>>>>> I think a call like:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Source source = KafkaSource.builder()
> >> >>>>>>>>>>    ...
> >> >>>>>>>>>>    .readContinously() / readUntilLatestOffset() /
> >> >>> readUntilTimestamp
> >> >>>> /
> >> >>>>>>>> readUntilOffsets / ...
> >> >>>>>>>>>>    .build()
> >> >>>>>>>>>>
> >> >>>>>>>>>> is way cleaner (and expressive) than
> >> >>>>>>>>>>
> >> >>>>>>>>>> Source source = KafkaSource.builder()
> >> >>>>>>>>>>    ...
> >> >>>>>>>>>>    .build()
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> env.continousSource(source) // which actually underneath
> would
> >> >>> call
> >> >>>>>>>> createEnumerator(CONTINUOUS, ctx) which would be equivalent to
> >> >>>>>>>> source.readContinously().createEnumerator(ctx)
> >> >>>>>>>>>> // or
> >> >>>>>>>>>> env.boundedSource(source) // which actually underneath would
> >> >> call
> >> >>>>>>>> createEnumerator(BOUNDED, ctx) which would be equivalent to
> >> >>>>>>>> source.readUntilLatestOffset().createEnumerator(ctx)
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> Sorry for the comparison, but to me it seems there is too
> much
> >> >>> magic
> >> >>>>>>>>>> happening underneath those two calls.
> >> >>>>>>>>>>
> >> >>>>>>>>>> I really believe the Source interface should have
> >> getBoundedness
> >> >>>>>>> method
> >> >>>>>>>>>> instead of (supportBoundedness) +
> createEnumerator(Boundedness,
> >> >>> ...)
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> Issue #2 - Design of
> >> >>>>>>>>>>
> >> ExecutionEnvironment#source()/continuousSource()/boundedSource()
> >> >>>>>>>>>>
> >> >>>>>>>>>> As you might have guessed I am slightly in favor of option #2
> >> >>>>>>> modified.
> >> >>>>>>>>>> Yes I am aware every step of the dag would have to be able to
> >> >> say
> >> >>> if
> >> >>>>>>> it
> >> >>>>>>>> is
> >> >>>>>>>>>> bounded or not. I have a feeling it would be easier to
> express
> >> >>> cross
> >> >>>>>>>>>> bounded/unbounded operations, but I must admit I have not
> >> >> thought
> >> >>> it
> >> >>>>>>>>>> through thoroughly, In the spirit of batch is just a special
> >> >> case
> >> >>> of
> >> >>>>>>>>>> streaming I thought BoundedStream would extend from
> DataStream.
> >> >>>>>>> Correct
> >> >>>>>>>> me
> >> >>>>>>>>>> if I am wrong. In such a setup the cross bounded/unbounded
> >> >>> operation
> >> >>>>>>>> could
> >> >>>>>>>>>> be expressed quite easily I think:
> >> >>>>>>>>>>
> >> >>>>>>>>>> DataStream {
> >> >>>>>>>>>>    DataStream join(DataStream, ...); // we could not really
> >> tell
> >> >> if
> >> >>>>>> the
> >> >>>>>>>> result is bounded or not, but because bounded stream is a
> special
> >> >>> case
> >> >>>>>> of
> >> >>>>>>>> unbounded the API object is correct, irrespective if the left
> or
> >> >>> right
> >> >>>>>>> side
> >> >>>>>>>> of the join is bounded
> >> >>>>>>>>>> }
> >> >>>>>>>>>>
> >> >>>>>>>>>> BoundedStream extends DataStream {
> >> >>>>>>>>>>    BoundedStream join(BoundedStream, ...); // only if both
> >> sides
> >> >>> are
> >> >>>>>>>> bounded the result can be bounded as well. However we do have
> >> >> access
> >> >>>> to
> >> >>>>>>> the
> >> >>>>>>>> DataStream#join here, so you can still join with a DataStream
> >> >>>>>>>>>> }
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> On the other hand I also see benefits of two completely
> >> >> disjointed
> >> >>>>>>> APIs,
> >> >>>>>>>>>> as we could prohibit some streaming calls in the bounded
> API. I
> >> >>>>>> can't
> >> >>>>>>>> think
> >> >>>>>>>>>> of any unbounded operators that could not be implemented for
> >> >>> bounded
> >> >>>>>>>> stream.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Besides I think we both agree we don't like the method:
> >> >>>>>>>>>>
> >> >>>>>>>>>> DataStream boundedStream(Source)
> >> >>>>>>>>>>
> >> >>>>>>>>>> suggested in the current state of the FLIP. Do we ? :)
> >> >>>>>>>>>>
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Dawid
> >> >>>>>>>>>>
> >> >>>>>>>>>> On 10/12/2019 18:57, Becket Qin wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi folks,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks for the discussion, great feedback. Also thanks Dawid
> >> for
> >> >>> the
> >> >>>>>>>>>> explanation, it is much clearer now.
> >> >>>>>>>>>>
> >> >>>>>>>>>> One thing that is indeed missing from the FLIP is how the
> >> >>>>>> boundedness
> >> >>>>>>> is
> >> >>>>>>>>>> passed to the Source implementation. So the API should be
> >> >>>>>>>>>> Source#createEnumerator(Boundedness boundedness,
> >> >>>>>>> SplitEnumeratorContext
> >> >>>>>>>>>> context)
> >> >>>>>>>>>> And we can probably remove the
> >> >>> Source#supportBoundedness(Boundedness
> >> >>>>>>>>>> boundedness) method.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Assuming we have that, we are essentially choosing from one
> of
> >> >> the
> >> >>>>>>>>>> following two options:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Option 1:
> >> >>>>>>>>>> // The source is continuous source, and only unbounded
> >> >> operations
> >> >>>>>> can
> >> >>>>>>> be
> >> >>>>>>>>>> performed.
> >> >>>>>>>>>> DataStream<Type> datastream =
> env.continuousSource(someSource);
> >> >>>>>>>>>>
> >> >>>>>>>>>> // The source is bounded source, both bounded and unbounded
> >> >>>>>> operations
> >> >>>>>>>> can
> >> >>>>>>>>>> be performed.
> >> >>>>>>>>>> BoundedDataStream<Type> boundedDataStream =
> >> >>>>>>>> env.boundedSource(someSource);
> >> >>>>>>>>>>
> >> >>>>>>>>>>    - Pros:
> >> >>>>>>>>>>         a) explicit boundary between bounded / unbounded
> >> streams,
> >> >>> it
> >> >>>>>> is
> >> >>>>>>>>>> quite simple and clear to the users.
> >> >>>>>>>>>>    - Cons:
> >> >>>>>>>>>>         a) For applications that do not involve bounded
> >> >> operations,
> >> >>>>>> they
> >> >>>>>>>>>> still have to call different API to distinguish bounded /
> >> >>> unbounded
> >> >>>>>>>> streams.
> >> >>>>>>>>>>         b) No support for bounded stream to run in a
> streaming
> >> >>>> runtime
> >> >>>>>>>>>> setting, i.e. scheduling and operators behaviors.
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> Option 2:
> >> >>>>>>>>>> // The source is either bounded or unbounded, but only
> >> unbounded
> >> >>>>>>>> operations
> >> >>>>>>>>>> could be performed on the returned DataStream.
> >> >>>>>>>>>> DataStream<Type> dataStream = env.source(someSource);
> >> >>>>>>>>>>
> >> >>>>>>>>>> // The source must be a bounded source, otherwise exception
> is
> >> >>>>>> thrown.
> >> >>>>>>>>>> BoundedDataStream<Type> boundedDataStream =
> >> >>>>>>>>>> env.boundedSource(boundedSource);
> >> >>>>>>>>>>
> >> >>>>>>>>>> The pros and cons are exactly the opposite of option 1.
> >> >>>>>>>>>>    - Pros:
> >> >>>>>>>>>>         a) For applications that do not involve bounded
> >> >> operations,
> >> >>>>>> they
> >> >>>>>>>>>> still have to call different API to distinguish bounded /
> >> >>> unbounded
> >> >>>>>>>> streams.
> >> >>>>>>>>>>         b) Support for bounded stream to run in a streaming
> >> >> runtime
> >> >>>>>>>> setting,
> >> >>>>>>>>>> i.e. scheduling and operators behaviors.
> >> >>>>>>>>>>    - Cons:
> >> >>>>>>>>>>         a) Bounded / unbounded streams are kind of mixed,
> i.e.
> >> >>> given
> >> >>>> a
> >> >>>>>>>>>> DataStream, it is not clear whether it is bounded or not,
> >> unless
> >> >>> you
> >> >>>>>>>> have
> >> >>>>>>>>>> the access to its source.
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> If we only think from the Source API perspective, option 2
> >> >> seems a
> >> >>>>>>>> better
> >> >>>>>>>>>> choice because functionality wise it is a superset of option
> 1,
> >> >> at
> >> >>>>>> the
> >> >>>>>>>> cost
> >> >>>>>>>>>> of some seemingly acceptable ambiguity in the DataStream API.
> >> >>>>>>>>>> But if we look at the DataStream API as a whole, option 1
> seems
> >> >> a
> >> >>>>>>>> clearer
> >> >>>>>>>>>> choice. For example, some times a library may have to know
> >> >>> whether a
> >> >>>>>>>>>> certain task will finish or not. And it would be difficult to
> >> >> tell
> >> >>>>>> if
> >> >>>>>>>> the
> >> >>>>>>>>>> input is a DataStream, unless additional information is
> >> provided
> >> >>> all
> >> >>>>>>> the
> >> >>>>>>>>>> way from the Source. One possible solution is to have a
> >> >> *modified
> >> >>>>>>>> option 2*
> >> >>>>>>>>>> which adds a method to the DataStream API to indicate
> >> >> boundedness,
> >> >>>>>>> such
> >> >>>>>>>> as
> >> >>>>>>>>>> getBoundedness(). It would solve the problem with a potential
> >> >>>>>>> confusion
> >> >>>>>>>> of
> >> >>>>>>>>>> what is difference between a DataStream with
> >> >> getBoundedness()=true
> >> >>>>>>> and a
> >> >>>>>>>>>> BoundedDataStream. But that seems not super difficult to
> >> >> explain.
> >> >>>>>>>>>>
> >> >>>>>>>>>> So from API's perspective, I don't have a strong opinion
> >> between
> >> >>>>>>>> *option 1*
> >> >>>>>>>>>> and *modified option 2. *I like the cleanness of option 1,
> but
> >> >>>>>>> modified
> >> >>>>>>>>>> option 2 would be more attractive if we have concrete use
> case
> >> >> for
> >> >>>>>> the
> >> >>>>>>>>>> "Bounded stream with unbounded streaming runtime settings".
> >> >>>>>>>>>>
> >> >>>>>>>>>> Re: Till
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> Maybe this has already been asked before but I was wondering
> >> why
> >> >>> the
> >> >>>>>>>>>> SourceReader interface has the method pollNext which hands
> the
> >> >>>>>>>>>> responsibility of outputting elements to the SourceReader
> >> >>>>>>>> implementation?
> >> >>>>>>>>>> Has this been done for backwards compatibility reasons with
> the
> >> >>> old
> >> >>>>>>>> source
> >> >>>>>>>>>> interface? If not, then one could define a Collection<E>
> >> >>>>>>>> getNextRecords()
> >> >>>>>>>>>> method which returns the currently retrieved records and then
> >> >> the
> >> >>>>>>> caller
> >> >>>>>>>>>> emits them outside of the SourceReader. That way the
> interface
> >> >>> would
> >> >>>>>>> not
> >> >>>>>>>>>> allow to implement an outputting loop where we never hand
> back
> >> >>>>>> control
> >> >>>>>>>> to
> >> >>>>>>>>>> the caller. At the moment, this contract can be easily broken
> >> >> and
> >> >>> is
> >> >>>>>>>> only
> >> >>>>>>>>>> mentioned loosely in the JavaDocs.
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> The primary reason we handover the SourceOutput to the
> >> >>> SourceReader
> >> >>>>>> is
> >> >>>>>>>>>> because sometimes it is difficult for a SourceReader to emit
> >> one
> >> >>>>>>> record
> >> >>>>>>>> at
> >> >>>>>>>>>> a time. One example is some batched messaging systems which
> >> only
> >> >>>>>> have
> >> >>>>>>> an
> >> >>>>>>>>>> offset for the entire batch instead of individual messages in
> >> >> the
> >> >>>>>>>> batch. In
> >> >>>>>>>>>> that case, returning one record at a time would leave the
> >> >>>>>> SourceReader
> >> >>>>>>>> in
> >> >>>>>>>>>> an uncheckpointable state because they can only checkpoint at
> >> >> the
> >> >>>>>>> batch
> >> >>>>>>>>>> boundaries.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Jiangjie (Becket) Qin
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Tue, Dec 10, 2019 at 5:33 PM Till Rohrmann <
> >> >>> trohrm...@apache.org
> >> >>>>>>>> <mailto:trohrm...@apache.org>> <trohrm...@apache.org <mailto:
> >> >>>>>>>> trohrm...@apache.org>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi everyone,
> >> >>>>>>>>>>
> >> >>>>>>>>>> thanks for drafting this FLIP. It reads very well.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Concerning Dawid's proposal, I tend to agree. The boundedness
> >> >>> could
> >> >>>>>>> come
> >> >>>>>>>>>> from the source and tell the system how to treat the operator
> >> >>>>>>>> (scheduling
> >> >>>>>>>>>> wise). From a user's perspective it should be fine to get
> back
> >> a
> >> >>>>>>>> DataStream
> >> >>>>>>>>>> when calling env.source(boundedSource) if he does not need
> >> >> special
> >> >>>>>>>>>> operations defined on a BoundedDataStream. If he needs this,
> >> >> then
> >> >>>>>> one
> >> >>>>>>>> could
> >> >>>>>>>>>> use the method BoundedDataStream
> >> >> env.boundedSource(boundedSource).
> >> >>>>>>>>>>
> >> >>>>>>>>>> If possible, we could enforce the proper usage of
> >> >>>>>> env.boundedSource()
> >> >>>>>>> by
> >> >>>>>>>>>> introducing a BoundedSource type so that one cannot pass an
> >> >>>>>>>>>> unbounded source to it. That way users would not be able to
> >> >> shoot
> >> >>>>>>>>>> themselves in the foot.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Maybe this has already been asked before but I was wondering
> >> why
> >> >>> the
> >> >>>>>>>>>> SourceReader interface has the method pollNext which hands
> the
> >> >>>>>>>>>> responsibility of outputting elements to the SourceReader
> >> >>>>>>>> implementation?
> >> >>>>>>>>>> Has this been done for backwards compatibility reasons with
> the
> >> >>> old
> >> >>>>>>>> source
> >> >>>>>>>>>> interface? If not, then one could define a Collection<E>
> >> >>>>>>>> getNextRecords()
> >> >>>>>>>>>> method which returns the currently retrieved records and then
> >> >> the
> >> >>>>>>> caller
> >> >>>>>>>>>> emits them outside of the SourceReader. That way the
> interface
> >> >>> would
> >> >>>>>>> not
> >> >>>>>>>>>> allow to implement an outputting loop where we never hand
> back
> >> >>>>>> control
> >> >>>>>>>> to
> >> >>>>>>>>>> the caller. At the moment, this contract can be easily broken
> >> >> and
> >> >>> is
> >> >>>>>>>> only
> >> >>>>>>>>>> mentioned loosely in the JavaDocs.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Cheers,
> >> >>>>>>>>>> Till
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Tue, Dec 10, 2019 at 7:49 AM Jingsong Li <
> >> >>> jingsongl...@gmail.com
> >> >>>>>>>> <mailto:jingsongl...@gmail.com>> <jingsongl...@gmail.com
> >> <mailto:
> >> >>>>>>>> jingsongl...@gmail.com>>
> >> >>>>>>>>>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi all,
> >> >>>>>>>>>>
> >> >>>>>>>>>> I think current design is good.
> >> >>>>>>>>>>
> >> >>>>>>>>>> My understanding is:
> >> >>>>>>>>>>
> >> >>>>>>>>>> For execution mode: bounded mode and continuous mode, It's
> >> >> totally
> >> >>>>>>>>>> different. I don't think we have the ability to integrate the
> >> >> two
> >> >>>>>>> models
> >> >>>>>>>>>>
> >> >>>>>>>>>> at
> >> >>>>>>>>>>
> >> >>>>>>>>>> present. It's about scheduling, memory, algorithms, States,
> >> etc.
> >> >>> we
> >> >>>>>>>>>> shouldn't confuse them.
> >> >>>>>>>>>>
> >> >>>>>>>>>> For source capabilities: only bounded, only continuous, both
> >> >>> bounded
> >> >>>>>>> and
> >> >>>>>>>>>> continuous.
> >> >>>>>>>>>> I think Kafka is a source that can be ran both bounded
> >> >>>>>>>>>> and continuous execution mode.
> >> >>>>>>>>>> And Kafka with end offset should be ran both bounded
> >> >>>>>>>>>> and continuous execution mode.  Using apache Beam with Flink
> >> >>>>>> runner, I
> >> >>>>>>>>>>
> >> >>>>>>>>>> used
> >> >>>>>>>>>>
> >> >>>>>>>>>> to run a "bounded" Kafka in streaming mode. For our previous
> >> >>>>>>> DataStream,
> >> >>>>>>>>>>
> >> >>>>>>>>>> it
> >> >>>>>>>>>>
> >> >>>>>>>>>> is not necessarily required that the source cannot be
> bounded.
> >> >>>>>>>>>>
> >> >>>>>>>>>> So it is my thought for Dawid's question:
> >> >>>>>>>>>> 1.pass a bounded source to continuousSource() +1
> >> >>>>>>>>>> 2.pass a continuous source to boundedSource() -1, should
> throw
> >> >>>>>>>> exception.
> >> >>>>>>>>>>
> >> >>>>>>>>>> In StreamExecutionEnvironment, continuousSource and
> >> >> boundedSource
> >> >>>>>>> define
> >> >>>>>>>>>> the execution mode. It defines a clear boundary of execution
> >> >> mode.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>> Jingsong Lee
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Tue, Dec 10, 2019 at 10:37 AM Jark Wu <imj...@gmail.com
> >> >>> <mailto:
> >> >>>>>>>> imj...@gmail.com>> <imj...@gmail.com <mailto:imj...@gmail.com
> >>
> >> >>>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> I agree with Dawid's point that the boundedness information
> >> >> should
> >> >>>>>>> come
> >> >>>>>>>>>> from the source itself (e.g. the end timestamp), not through
> >> >>>>>>>>>> env.boundedSouce()/continuousSource().
> >> >>>>>>>>>> I think if we want to support something like `env.source()`
> >> that
> >> >>>>>>> derive
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> execution mode from source,
> `supportsBoundedness(Boundedness)`
> >> >>>>>>>>>> method is not enough, because we don't know whether it is
> >> >> bounded
> >> >>> or
> >> >>>>>>>>>>
> >> >>>>>>>>>> not.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>> Jark
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Mon, 9 Dec 2019 at 22:21, Dawid Wysakowicz <
> >> >>>>>> dwysakow...@apache.org
> >> >>>>>>>> <mailto:dwysakow...@apache.org>> <dwysakow...@apache.org
> >> <mailto:
> >> >>>>>>>> dwysakow...@apache.org>>
> >> >>>>>>>>>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> One more thing. In the current proposal, with the
> >> >>>>>>>>>> supportsBoundedness(Boundedness) method and the boundedness
> >> >> coming
> >> >>>>>>>>>>
> >> >>>>>>>>>> from
> >> >>>>>>>>>>
> >> >>>>>>>>>> either continuousSource or boundedSource I could not find how
> >> >> this
> >> >>>>>>>>>> information is fed back to the SplitEnumerator.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Dawid
> >> >>>>>>>>>>
> >> >>>>>>>>>> On 09/12/2019 13:52, Becket Qin wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi Dawid,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks for the comments. This actually brings another
> relevant
> >> >>>>>>>>>>
> >> >>>>>>>>>> question
> >> >>>>>>>>>>
> >> >>>>>>>>>> about what does a "bounded source" imply. I actually had the
> >> >> same
> >> >>>>>>>>>> impression when I look at the Source API. Here is what I
> >> >>> understand
> >> >>>>>>>>>>
> >> >>>>>>>>>> after
> >> >>>>>>>>>>
> >> >>>>>>>>>> some discussion with Stephan. The bounded source has the
> >> >> following
> >> >>>>>>>>>>
> >> >>>>>>>>>> impacts.
> >> >>>>>>>>>>
> >> >>>>>>>>>> 1. API validity.
> >> >>>>>>>>>> - A bounded source generates a bounded stream so some
> >> operations
> >> >>>>>>>>>>
> >> >>>>>>>>>> that
> >> >>>>>>>>>>
> >> >>>>>>>>>> only
> >> >>>>>>>>>>
> >> >>>>>>>>>> works for bounded records would be performed, e.g. sort.
> >> >>>>>>>>>> - To expose these bounded stream only APIs, there are two
> >> >> options:
> >> >>>>>>>>>>       a. Add them to the DataStream API and throw exception
> if
> >> a
> >> >>>>>>>>>>
> >> >>>>>>>>>> method
> >> >>>>>>>>>>
> >> >>>>>>>>>> is
> >> >>>>>>>>>>
> >> >>>>>>>>>> called on an unbounded stream.
> >> >>>>>>>>>>       b. Create a BoundedDataStream class which is returned
> >> from
> >> >>>>>>>>>> env.boundedSource(), while DataStream is returned from
> >> >>>>>>>>>>
> >> >>>>>>>>>> env.continousSource().
> >> >>>>>>>>>>
> >> >>>>>>>>>> Note that this cannot be done by having single
> >> >>>>>>>>>>
> >> >>>>>>>>>> env.source(theSource)
> >> >>>>>>>>>>
> >> >>>>>>>>>> even
> >> >>>>>>>>>>
> >> >>>>>>>>>> the Source has a getBoundedness() method.
> >> >>>>>>>>>>
> >> >>>>>>>>>> 2. Scheduling
> >> >>>>>>>>>> - A bounded source could be computed stage by stage without
> >> >>>>>>>>>>
> >> >>>>>>>>>> bringing
> >> >>>>>>>>>>
> >> >>>>>>>>>> up
> >> >>>>>>>>>>
> >> >>>>>>>>>> all
> >> >>>>>>>>>>
> >> >>>>>>>>>> the tasks at the same time.
> >> >>>>>>>>>>
> >> >>>>>>>>>> 3. Operator behaviors
> >> >>>>>>>>>> - A bounded source indicates the records are finite so some
> >> >>>>>>>>>>
> >> >>>>>>>>>> operators
> >> >>>>>>>>>>
> >> >>>>>>>>>> can
> >> >>>>>>>>>>
> >> >>>>>>>>>> wait until it receives all the records before it starts the
> >> >>>>>>>>>>
> >> >>>>>>>>>> processing.
> >> >>>>>>>>>>
> >> >>>>>>>>>> In the above impact, only 1 is relevant to the API design.
> And
> >> >> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> current
> >> >>>>>>>>>>
> >> >>>>>>>>>> proposal in FLIP-27 is following 1.b.
> >> >>>>>>>>>>
> >> >>>>>>>>>> // boundedness depends of source property, imo this should
> >> >> always
> >> >>>>>>>>>>
> >> >>>>>>>>>> be
> >> >>>>>>>>>>
> >> >>>>>>>>>> preferred
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> DataStream<MyType> stream = env.source(theSource);
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> In your proposal, does DataStream have bounded stream only
> >> >>> methods?
> >> >>>>>>>>>>
> >> >>>>>>>>>> It
> >> >>>>>>>>>>
> >> >>>>>>>>>> looks it should have, otherwise passing a bounded Source to
> >> >>>>>>>>>>
> >> >>>>>>>>>> env.source()
> >> >>>>>>>>>>
> >> >>>>>>>>>> would be confusing. In that case, we will essentially do 1.a
> if
> >> >> an
> >> >>>>>>>>>> unbounded Source is created from env.source(unboundedSource).
> >> >>>>>>>>>>
> >> >>>>>>>>>> If we have the methods only supported for bounded streams in
> >> >>>>>>>>>>
> >> >>>>>>>>>> DataStream,
> >> >>>>>>>>>>
> >> >>>>>>>>>> it
> >> >>>>>>>>>>
> >> >>>>>>>>>> seems a little weird to have a separate BoundedDataStream
> >> >>>>>>>>>>
> >> >>>>>>>>>> interface.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Am I understand it correctly?
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Jiangjie (Becket) Qin
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz <
> >> >>>>>>>>>>
> >> >>>>>>>>>> dwysakow...@apache.org <mailto:dwysakow...@apache.org>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi all,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Really well written proposal and very important one. I must
> >> >> admit
> >> >>>>>>>>>>
> >> >>>>>>>>>> I
> >> >>>>>>>>>>
> >> >>>>>>>>>> have
> >> >>>>>>>>>>
> >> >>>>>>>>>> not understood all the intricacies of it yet.
> >> >>>>>>>>>>
> >> >>>>>>>>>> One question I have though is about where does the
> information
> >> >>>>>>>>>>
> >> >>>>>>>>>> about
> >> >>>>>>>>>>
> >> >>>>>>>>>> boundedness come from. I think in most cases it is a property
> >> of
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> source. As you described it might be e.g. end offset, a flag
> >> >>>>>>>>>>
> >> >>>>>>>>>> should
> >> >>>>>>>>>>
> >> >>>>>>>>>> it
> >> >>>>>>>>>>
> >> >>>>>>>>>> monitor new splits etc. I think it would be a really nice use
> >> >> case
> >> >>>>>>>>>>
> >> >>>>>>>>>> to
> >> >>>>>>>>>>
> >> >>>>>>>>>> be
> >> >>>>>>>>>>
> >> >>>>>>>>>> able to say:
> >> >>>>>>>>>>
> >> >>>>>>>>>> new KafkaSource().readUntil(long timestamp),
> >> >>>>>>>>>>
> >> >>>>>>>>>> which could work as an "end offset". Moreover I think all
> >> >> Bounded
> >> >>>>>>>>>>
> >> >>>>>>>>>> sources
> >> >>>>>>>>>>
> >> >>>>>>>>>> support continuous mode, but no intrinsically continuous
> source
> >> >>>>>>>>>>
> >> >>>>>>>>>> support
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> Bounded mode. If I understood the proposal correctly it
> suggest
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> boundedness sort of "comes" from the outside of the source,
> >> from
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> invokation of either boundedStream or continousSource.
> >> >>>>>>>>>>
> >> >>>>>>>>>> I am wondering if it would make sense to actually change the
> >> >>>>>>>>>>
> >> >>>>>>>>>> method
> >> >>>>>>>>>>
> >> >>>>>>>>>> boolean Source#supportsBoundedness(Boundedness)
> >> >>>>>>>>>>
> >> >>>>>>>>>> to
> >> >>>>>>>>>>
> >> >>>>>>>>>> Boundedness Source#getBoundedness().
> >> >>>>>>>>>>
> >> >>>>>>>>>> As for the methods #boundedSource, #continousSource, assuming
> >> >> the
> >> >>>>>>>>>> boundedness is property of the source they do not affect how
> >> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> enumerator
> >> >>>>>>>>>>
> >> >>>>>>>>>> works, but mostly how the dag is scheduled, right? I am not
> >> >>>>>>>>>>
> >> >>>>>>>>>> against
> >> >>>>>>>>>>
> >> >>>>>>>>>> those
> >> >>>>>>>>>>
> >> >>>>>>>>>> methods, but I think it is a very specific use case to
> actually
> >> >>>>>>>>>>
> >> >>>>>>>>>> override
> >> >>>>>>>>>>
> >> >>>>>>>>>> the property of the source. In general I would expect users
> to
> >> >>>>>>>>>>
> >> >>>>>>>>>> only
> >> >>>>>>>>>>
> >> >>>>>>>>>> call
> >> >>>>>>>>>>
> >> >>>>>>>>>> env.source(theSource), where the source tells if it is
> bounded
> >> >> or
> >> >>>>>>>>>>
> >> >>>>>>>>>> not. I
> >> >>>>>>>>>>
> >> >>>>>>>>>> would suggest considering following set of methods:
> >> >>>>>>>>>>
> >> >>>>>>>>>> // boundedness depends of source property, imo this should
> >> >> always
> >> >>>>>>>>>>
> >> >>>>>>>>>> be
> >> >>>>>>>>>>
> >> >>>>>>>>>> preferred
> >> >>>>>>>>>>
> >> >>>>>>>>>> DataStream<MyType> stream = env.source(theSource);
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> // always continous execution, whether bounded or unbounded
> >> >> source
> >> >>>>>>>>>>
> >> >>>>>>>>>> DataStream<MyType> boundedStream =
> >> >> env.continousSource(theSource);
> >> >>>>>>>>>>
> >> >>>>>>>>>> // imo this would make sense if the BoundedDataStream
> provides
> >> >>>>>>>>>>
> >> >>>>>>>>>> additional features unavailable for continous mode
> >> >>>>>>>>>>
> >> >>>>>>>>>> BoundedDataStream<MyType> batch =
> env.boundedSource(theSource);
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Dawid
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> On 04/12/2019 11:25, Stephan Ewen wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks, Becket, for updating this.
> >> >>>>>>>>>>
> >> >>>>>>>>>> I agree with moving the aspects you mentioned into separate
> >> >> FLIPs
> >> >>>>>>>>>>
> >> >>>>>>>>>> -
> >> >>>>>>>>>>
> >> >>>>>>>>>> this
> >> >>>>>>>>>>
> >> >>>>>>>>>> one way becoming unwieldy in size.
> >> >>>>>>>>>>
> >> >>>>>>>>>> +1 to the FLIP in its current state. Its a very detailed
> >> >> write-up,
> >> >>>>>>>>>>
> >> >>>>>>>>>> nicely
> >> >>>>>>>>>>
> >> >>>>>>>>>> done!
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <
> >> becket....@gmail.com
> >> >>>>>>>> <mailto:becket....@gmail.com>> <becket....@gmail.com <mailto:
> >> >>>>>>>> becket....@gmail.com>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> <
> >> >>>>>>>>>>
> >> >>>>>>>>>> becket....@gmail.com <mailto:becket....@gmail.com>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi all,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Sorry for the long belated update. I have updated FLIP-27
> wiki
> >> >>>>>>>>>>
> >> >>>>>>>>>> page
> >> >>>>>>>>>>
> >> >>>>>>>>>> with
> >> >>>>>>>>>>
> >> >>>>>>>>>> the latest proposals. Some noticeable changes include:
> >> >>>>>>>>>> 1. A new generic communication mechanism between
> >> SplitEnumerator
> >> >>>>>>>>>>
> >> >>>>>>>>>> and
> >> >>>>>>>>>>
> >> >>>>>>>>>> SourceReader.
> >> >>>>>>>>>> 2. Some detail API method signature changes.
> >> >>>>>>>>>>
> >> >>>>>>>>>> We left a few things out of this FLIP and will address them
> in
> >> >>>>>>>>>>
> >> >>>>>>>>>> separate
> >> >>>>>>>>>>
> >> >>>>>>>>>> FLIPs. Including:
> >> >>>>>>>>>> 1. Per split event time.
> >> >>>>>>>>>> 2. Event time alignment.
> >> >>>>>>>>>> 3. Fine grained failover for SplitEnumerator failure.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Please let us know if you have any question.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Jiangjie (Becket) Qin
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <
> se...@apache.org
> >> >>>>>>> <mailto:
> >> >>>>>>>> se...@apache.org>> <se...@apache.org <mailto:se...@apache.org
> >>
> >> <
> >> >>>>>>>>>>
> >> >>>>>>>>>> se...@apache.org <mailto:se...@apache.org>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi  Łukasz!
> >> >>>>>>>>>>
> >> >>>>>>>>>> Becket and me are working hard on figuring out the last
> details
> >> >>>>>>>>>>
> >> >>>>>>>>>> and
> >> >>>>>>>>>>
> >> >>>>>>>>>> implementing the first PoC. We would update the FLIP
> hopefully
> >> >>>>>>>>>>
> >> >>>>>>>>>> next
> >> >>>>>>>>>>
> >> >>>>>>>>>> week.
> >> >>>>>>>>>>
> >> >>>>>>>>>> There is a fair chance that a first version of this will be
> in
> >> >>>>>>>>>>
> >> >>>>>>>>>> 1.10,
> >> >>>>>>>>>>
> >> >>>>>>>>>> but
> >> >>>>>>>>>>
> >> >>>>>>>>>> I
> >> >>>>>>>>>>
> >> >>>>>>>>>> think it will take another release to battle test it and
> >> migrate
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> connectors.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>> Stephan
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <
> >> >> l...@touk.pl
> >> >>>>>>>> <mailto:l...@touk.pl>
> >> >>>>>>>>>>
> >> >>>>>>>>>> <
> >> >>>>>>>>>>
> >> >>>>>>>>>> l...@touk.pl <mailto:l...@touk.pl>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi,
> >> >>>>>>>>>>
> >> >>>>>>>>>> This proposal looks very promising for us. Do you have any
> >> plans
> >> >>>>>>>>>>
> >> >>>>>>>>>> in
> >> >>>>>>>>>>
> >> >>>>>>>>>> which
> >> >>>>>>>>>>
> >> >>>>>>>>>> Flink release it is going to be released? We are thinking on
> >> >>>>>>>>>>
> >> >>>>>>>>>> using a
> >> >>>>>>>>>>
> >> >>>>>>>>>> Data
> >> >>>>>>>>>>
> >> >>>>>>>>>> Set API for our future use cases but on the other hand Data
> Set
> >> >>>>>>>>>>
> >> >>>>>>>>>> API
> >> >>>>>>>>>>
> >> >>>>>>>>>> is
> >> >>>>>>>>>>
> >> >>>>>>>>>> going to be deprecated so using proposed bounded data streams
> >> >>>>>>>>>>
> >> >>>>>>>>>> solution
> >> >>>>>>>>>>
> >> >>>>>>>>>> could be more viable in the long term.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks,
> >> >>>>>>>>>> Łukasz
> >> >>>>>>>>>>
> >> >>>>>>>>>> On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com
> >> >>>>>> <mailto:
> >> >>>>>>>> thomas.we...@gmail.com>> <thomas.we...@gmail.com <mailto:
> >> >>>>>>>> thomas.we...@gmail.com>> <
> >> >>>>>>>>>>
> >> >>>>>>>>>> thomas.we...@gmail.com <mailto:thomas.we...@gmail.com>>
> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks for putting together this proposal!
> >> >>>>>>>>>>
> >> >>>>>>>>>> I see that the "Per Split Event Time" and "Event Time
> >> Alignment"
> >> >>>>>>>>>>
> >> >>>>>>>>>> sections
> >> >>>>>>>>>>
> >> >>>>>>>>>> are still TBD.
> >> >>>>>>>>>>
> >> >>>>>>>>>> It would probably be good to flesh those out a bit before
> >> >>>>>>>>>>
> >> >>>>>>>>>> proceeding
> >> >>>>>>>>>>
> >> >>>>>>>>>> too
> >> >>>>>>>>>>
> >> >>>>>>>>>> far
> >> >>>>>>>>>>
> >> >>>>>>>>>> as the event time alignment will probably influence the
> >> >>>>>>>>>>
> >> >>>>>>>>>> interaction
> >> >>>>>>>>>>
> >> >>>>>>>>>> with
> >> >>>>>>>>>>
> >> >>>>>>>>>> the split reader, specifically ReaderStatus
> >> >>>>>>>>>>
> >> >>>>>>>>>> emitNext(SourceOutput<E>
> >> >>>>>>>>>>
> >> >>>>>>>>>> output).
> >> >>>>>>>>>>
> >> >>>>>>>>>> We currently have only one implementation for event time
> >> >> alignment
> >> >>>>>>>>>>
> >> >>>>>>>>>> in
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> Kinesis consumer. The synchronization in that case takes
> place
> >> >> as
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> last
> >> >>>>>>>>>>
> >> >>>>>>>>>> step before records are emitted downstream (RecordEmitter).
> >> With
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> currently proposed interfaces, the equivalent can be
> >> implemented
> >> >>>>>>>>>>
> >> >>>>>>>>>> in
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> reader loop, although note that in the Kinesis consumer the
> per
> >> >>>>>>>>>>
> >> >>>>>>>>>> shard
> >> >>>>>>>>>>
> >> >>>>>>>>>> threads push records.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Synchronization has not been implemented for the Kafka
> consumer
> >> >>>>>>>>>>
> >> >>>>>>>>>> yet.
> >> >>>>>>>>>>
> >> >>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-12675 <
> >> >>>>>>>> https://issues.apache.org/jira/browse/FLINK-12675>
> >> >>>>>>>>>>
> >> >>>>>>>>>> When I looked at it, I realized that the implementation will
> >> >> look
> >> >>>>>>>>>>
> >> >>>>>>>>>> quite
> >> >>>>>>>>>>
> >> >>>>>>>>>> different
> >> >>>>>>>>>> from Kinesis because it needs to take place in the pull part,
> >> >>>>>>>>>>
> >> >>>>>>>>>> where
> >> >>>>>>>>>>
> >> >>>>>>>>>> records
> >> >>>>>>>>>>
> >> >>>>>>>>>> are taken from the Kafka client. Due to the multiplexing it
> >> >> cannot
> >> >>>>>>>>>>
> >> >>>>>>>>>> be
> >> >>>>>>>>>>
> >> >>>>>>>>>> done
> >> >>>>>>>>>>
> >> >>>>>>>>>> by blocking the split thread like it currently works for
> >> >> Kinesis.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Reading
> >> >>>>>>>>>>
> >> >>>>>>>>>> from individual Kafka partitions needs to be controlled via
> >> >>>>>>>>>>
> >> >>>>>>>>>> pause/resume
> >> >>>>>>>>>>
> >> >>>>>>>>>> on the Kafka client.
> >> >>>>>>>>>>
> >> >>>>>>>>>> To take on that responsibility the split thread would need to
> >> be
> >> >>>>>>>>>>
> >> >>>>>>>>>> aware
> >> >>>>>>>>>>
> >> >>>>>>>>>> of
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>> watermarks or at least whether it should or should not
> continue
> >> >> to
> >> >>>>>>>>>>
> >> >>>>>>>>>> consume
> >> >>>>>>>>>>
> >> >>>>>>>>>> a given split and this may require a different SourceReader
> or
> >> >>>>>>>>>>
> >> >>>>>>>>>> SourceOutput
> >> >>>>>>>>>>
> >> >>>>>>>>>> interface.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks,
> >> >>>>>>>>>> Thomas
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com
> >> >>>>>> <mailto:
> >> >>>>>>>> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:
> >> >> mmyy1...@gmail.com
> >> >>>>>
> >> >>>>>> <
> >> >>>>>>>>>>
> >> >>>>>>>>>> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi Stephan,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thank you for feedback!
> >> >>>>>>>>>> Will take a look at your branch before public discussing.
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <
> >> se...@apache.org
> >> >>>>>>>> <mailto:se...@apache.org>> <se...@apache.org <mailto:
> >> >>> se...@apache.org
> >> >>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> <
> >> >>>>>>>>>>
> >> >>>>>>>>>> se...@apache.org <mailto:se...@apache.org>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi Biao!
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks for reviving this. I would like to join this
> discussion,
> >> >>>>>>>>>>
> >> >>>>>>>>>> but
> >> >>>>>>>>>>
> >> >>>>>>>>>> am
> >> >>>>>>>>>>
> >> >>>>>>>>>> quite occupied with the 1.9 release, so can we maybe pause
> this
> >> >>>>>>>>>>
> >> >>>>>>>>>> discussion
> >> >>>>>>>>>>
> >> >>>>>>>>>> for a week or so?
> >> >>>>>>>>>>
> >> >>>>>>>>>> In the meantime I can share some suggestion based on prior
> >> >>>>>>>>>>
> >> >>>>>>>>>> experiments:
> >> >>>>>>>>>>
> >> >>>>>>>>>> How to do watermarks / timestamp extractors in a simpler and
> >> >> more
> >> >>>>>>>>>>
> >> >>>>>>>>>> flexible
> >> >>>>>>>>>>
> >> >>>>>>>>>> way. I think that part is quite promising should be part of
> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> new
> >> >>>>>>>>>>
> >> >>>>>>>>>> source
> >> >>>>>>>>>>
> >> >>>>>>>>>> interface.
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
> >> >>>>>>>> <
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
> >> >>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
> >> >>>>>>>> <
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
> >> >>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> Some experiments on how to build the source reader and its
> >> >>>>>>>>>>
> >> >>>>>>>>>> library
> >> >>>>>>>>>>
> >> >>>>>>>>>> for
> >> >>>>>>>>>>
> >> >>>>>>>>>> common threading/split patterns:
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
> >> >>>>>>>> <
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
> >> >>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>> Stephan
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <
> mmyy1...@gmail.com
> >> >>>>>>> <mailto:
> >> >>>>>>>> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:
> >> >> mmyy1...@gmail.com
> >> >>>>>
> >> >>>>>> <
> >> >>>>>>>>>>
> >> >>>>>>>>>> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi devs,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Since 1.9 is nearly released, I think we could get back to
> >> >>>>>>>>>>
> >> >>>>>>>>>> FLIP-27.
> >> >>>>>>>>>>
> >> >>>>>>>>>> I
> >> >>>>>>>>>>
> >> >>>>>>>>>> believe it should be included in 1.10.
> >> >>>>>>>>>>
> >> >>>>>>>>>> There are so many things mentioned in document of FLIP-27.
> [1]
> >> I
> >> >>>>>>>>>>
> >> >>>>>>>>>> think
> >> >>>>>>>>>>
> >> >>>>>>>>>> we'd better discuss them separately. However the wiki is not
> a
> >> >>>>>>>>>>
> >> >>>>>>>>>> good
> >> >>>>>>>>>>
> >> >>>>>>>>>> place
> >> >>>>>>>>>>
> >> >>>>>>>>>> to discuss. I wrote google doc about SplitReader API which
> >> >>>>>>>>>>
> >> >>>>>>>>>> misses
> >> >>>>>>>>>>
> >> >>>>>>>>>> some
> >> >>>>>>>>>>
> >> >>>>>>>>>> details in the document. [2]
> >> >>>>>>>>>>
> >> >>>>>>>>>> 1.
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> >> >>>>>>>> <
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> >> >>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> 2.
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
> >> >>>>>>>> <
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
> >> >>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> CC Stephan, Aljoscha, Piotrek, Becket
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com
> >> >>>>>> <mailto:
> >> >>>>>>>> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:
> >> >> mmyy1...@gmail.com
> >> >>>>>
> >> >>>>>> <
> >> >>>>>>>>>>
> >> >>>>>>>>>> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi Steven,
> >> >>>>>>>>>> Thank you for the feedback. Please take a look at the
> document
> >> >>>>>>>>>>
> >> >>>>>>>>>> FLIP-27
> >> >>>>>>>>>>
> >> >>>>>>>>>> <
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >> >>>>>>>> <
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >> >>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> which
> >> >>>>>>>>>>
> >> >>>>>>>>>> is updated recently. A lot of details of enumerator were
> added
> >> >>>>>>>>>>
> >> >>>>>>>>>> in
> >> >>>>>>>>>>
> >> >>>>>>>>>> this
> >> >>>>>>>>>>
> >> >>>>>>>>>> document. I think it would help.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Steven Wu <stevenz...@gmail.com <mailto:stevenz...@gmail.com
> >>
> >> >> <
> >> >>>>>>>> stevenz...@gmail.com <mailto:stevenz...@gmail.com>> <
> >> >>>>>>> stevenz...@gmail.com
> >> >>>>>>>> <mailto:stevenz...@gmail.com>> <stevenz...@gmail.com <mailto:
> >> >>>>>>>> stevenz...@gmail.com>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> 于2019年3月28日周四
> >> >>>>>>>>>>
> >> >>>>>>>>>> 下午12:52写道:
> >> >>>>>>>>>>
> >> >>>>>>>>>> This proposal mentioned that SplitEnumerator might run on the
> >> >>>>>>>>>> JobManager or
> >> >>>>>>>>>> in a single task on a TaskManager.
> >> >>>>>>>>>>
> >> >>>>>>>>>> if enumerator is a single task on a taskmanager, then the job
> >> >>>>>>>>>>
> >> >>>>>>>>>> DAG
> >> >>>>>>>>>>
> >> >>>>>>>>>> can
> >> >>>>>>>>>>
> >> >>>>>>>>>> never
> >> >>>>>>>>>> been embarrassingly parallel anymore. That will nullify the
> >> >>>>>>>>>>
> >> >>>>>>>>>> leverage
> >> >>>>>>>>>>
> >> >>>>>>>>>> of
> >> >>>>>>>>>>
> >> >>>>>>>>>> fine-grained recovery for embarrassingly parallel jobs.
> >> >>>>>>>>>>
> >> >>>>>>>>>> It's not clear to me what's the implication of running
> >> >>>>>>>>>>
> >> >>>>>>>>>> enumerator
> >> >>>>>>>>>>
> >> >>>>>>>>>> on
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> jobmanager. So I will leave that out for now.
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com
> >> >>>>>> <mailto:
> >> >>>>>>>> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:
> >> >> mmyy1...@gmail.com
> >> >>>>>
> >> >>>>>> <
> >> >>>>>>>>>>
> >> >>>>>>>>>> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi Stephan & Piotrek,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thank you for feedback.
> >> >>>>>>>>>>
> >> >>>>>>>>>> It seems that there are a lot of things to do in community.
> >> >>>>>>>>>>
> >> >>>>>>>>>> I
> >> >>>>>>>>>>
> >> >>>>>>>>>> am
> >> >>>>>>>>>>
> >> >>>>>>>>>> just
> >> >>>>>>>>>>
> >> >>>>>>>>>> afraid that this discussion may be forgotten since there so
> >> >>>>>>>>>>
> >> >>>>>>>>>> many
> >> >>>>>>>>>>
> >> >>>>>>>>>> proposals
> >> >>>>>>>>>>
> >> >>>>>>>>>> recently.
> >> >>>>>>>>>> Anyway, wish to see the split topics soon :)
> >> >>>>>>>>>>
> >> >>>>>>>>>> Piotr Nowojski <pi...@da-platform.com <mailto:
> >> >>> pi...@da-platform.com
> >> >>>>>>>>
> >> >>>>>>> <
> >> >>>>>>>> pi...@da-platform.com <mailto:pi...@da-platform.com>> <
> >> >>>>>>>> pi...@da-platform.com <mailto:pi...@da-platform.com>> <
> >> >>>>>>>> pi...@da-platform.com <mailto:pi...@da-platform.com>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> 于2019年1月24日周四
> >> >>>>>>>>>>
> >> >>>>>>>>>> 下午8:21写道:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi Biao!
> >> >>>>>>>>>>
> >> >>>>>>>>>> This discussion was stalled because of preparations for
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> open
> >> >>>>>>>>>>
> >> >>>>>>>>>> sourcing
> >> >>>>>>>>>>
> >> >>>>>>>>>> & merging Blink. I think before creating the tickets we
> >> >>>>>>>>>>
> >> >>>>>>>>>> should
> >> >>>>>>>>>>
> >> >>>>>>>>>> split this
> >> >>>>>>>>>>
> >> >>>>>>>>>> discussion into topics/areas outlined by Stephan and
> >> >>>>>>>>>>
> >> >>>>>>>>>> create
> >> >>>>>>>>>>
> >> >>>>>>>>>> Flips
> >> >>>>>>>>>>
> >> >>>>>>>>>> for
> >> >>>>>>>>>>
> >> >>>>>>>>>> that.
> >> >>>>>>>>>>
> >> >>>>>>>>>> I think there is no chance for this to be completed in
> >> >>>>>>>>>>
> >> >>>>>>>>>> couple
> >> >>>>>>>>>>
> >> >>>>>>>>>> of
> >> >>>>>>>>>>
> >> >>>>>>>>>> remaining
> >> >>>>>>>>>>
> >> >>>>>>>>>> weeks/1 month before 1.8 feature freeze, however it would
> >> >>>>>>>>>>
> >> >>>>>>>>>> be
> >> >>>>>>>>>>
> >> >>>>>>>>>> good
> >> >>>>>>>>>>
> >> >>>>>>>>>> to aim
> >> >>>>>>>>>>
> >> >>>>>>>>>> with those changes for 1.9.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Piotrek
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com
> >> <mailto:
> >> >>>>>>>> mmyy1...@gmail.com>> <mmyy1...@gmail.com <mailto:
> >> >> mmyy1...@gmail.com
> >> >>>>>
> >> >>>>>> <
> >> >>>>>>>>>>
> >> >>>>>>>>>> mmyy1...@gmail.com <mailto:mmyy1...@gmail.com>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi community,
> >> >>>>>>>>>> The summary of Stephan makes a lot sense to me. It is
> >> >>>>>>>>>>
> >> >>>>>>>>>> much
> >> >>>>>>>>>>
> >> >>>>>>>>>> clearer
> >> >>>>>>>>>>
> >> >>>>>>>>>> indeed
> >> >>>>>>>>>>
> >> >>>>>>>>>> after splitting the complex topic into small ones.
> >> >>>>>>>>>> I was wondering is there any detail plan for next step?
> >> >>>>>>>>>>
> >> >>>>>>>>>> If
> >> >>>>>>>>>>
> >> >>>>>>>>>> not,
> >> >>>>>>>>>>
> >> >>>>>>>>>> I
> >> >>>>>>>>>>
> >> >>>>>>>>>> would
> >> >>>>>>>>>>
> >> >>>>>>>>>> like to push this thing forward by creating some JIRA
> >> >>>>>>>>>>
> >> >>>>>>>>>> issues.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Another question is that should version 1.8 include
> >> >>>>>>>>>>
> >> >>>>>>>>>> these
> >> >>>>>>>>>>
> >> >>>>>>>>>> features?
> >> >>>>>>>>>>
> >> >>>>>>>>>> Stephan Ewen <se...@apache.org <mailto:se...@apache.org>> <
> >> >>>>>>>> se...@apache.org <mailto:se...@apache.org>> <se...@apache.org
> >> >>>> <mailto:
> >> >>>>>>>> se...@apache.org>> <se...@apache.org <mailto:se...@apache.org
> >>
> >> >>>>>>>> 于2018年12月1日周六
> >> >>>>>>>>>>
> >> >>>>>>>>>> 上午4:20写道:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks everyone for the lively discussion. Let me try
> >> >>>>>>>>>>
> >> >>>>>>>>>> to
> >> >>>>>>>>>>
> >> >>>>>>>>>> summarize
> >> >>>>>>>>>>
> >> >>>>>>>>>> where I
> >> >>>>>>>>>>
> >> >>>>>>>>>> see convergence in the discussion and open issues.
> >> >>>>>>>>>> I'll try to group this by design aspect of the source.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Please
> >> >>>>>>>>>>
> >> >>>>>>>>>> let me
> >> >>>>>>>>>>
> >> >>>>>>>>>> know
> >> >>>>>>>>>>
> >> >>>>>>>>>> if I got things wrong or missed something crucial here.
> >> >>>>>>>>>>
> >> >>>>>>>>>> For issues 1-3, if the below reflects the state of the
> >> >>>>>>>>>>
> >> >>>>>>>>>> discussion, I
> >> >>>>>>>>>>
> >> >>>>>>>>>> would
> >> >>>>>>>>>>
> >> >>>>>>>>>> try and update the FLIP in the next days.
> >> >>>>>>>>>> For the remaining ones we need more discussion.
> >> >>>>>>>>>>
> >> >>>>>>>>>> I would suggest to fork each of these aspects into a
> >> >>>>>>>>>>
> >> >>>>>>>>>> separate
> >> >>>>>>>>>>
> >> >>>>>>>>>> mail
> >> >>>>>>>>>>
> >> >>>>>>>>>> thread,
> >> >>>>>>>>>>
> >> >>>>>>>>>> or will loose sight of the individual aspects.
> >> >>>>>>>>>>
> >> >>>>>>>>>> *(1) Separation of Split Enumerator and Split Reader*
> >> >>>>>>>>>>
> >> >>>>>>>>>> - All seem to agree this is a good thing
> >> >>>>>>>>>> - Split Enumerator could in the end live on JobManager
> >> >>>>>>>>>>
> >> >>>>>>>>>> (and
> >> >>>>>>>>>>
> >> >>>>>>>>>> assign
> >> >>>>>>>>>>
> >> >>>>>>>>>> splits
> >> >>>>>>>>>>
> >> >>>>>>>>>> via RPC) or in a task (and assign splits via data
> >> >>>>>>>>>>
> >> >>>>>>>>>> streams)
> >> >>>>>>>>>>
> >> >>>>>>>>>> - this discussion is orthogonal and should come later,
> >> >>>>>>>>>>
> >> >>>>>>>>>> when
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> interface
> >> >>>>>>>>>>
> >> >>>>>>>>>> is agreed upon.
> >> >>>>>>>>>>
> >> >>>>>>>>>> *(2) Split Readers for one or more splits*
> >> >>>>>>>>>>
> >> >>>>>>>>>> - Discussion seems to agree that we need to support
> >> >>>>>>>>>>
> >> >>>>>>>>>> one
> >> >>>>>>>>>>
> >> >>>>>>>>>> reader
> >> >>>>>>>>>>
> >> >>>>>>>>>> that
> >> >>>>>>>>>>
> >> >>>>>>>>>> possibly handles multiple splits concurrently.
> >> >>>>>>>>>> - The requirement comes from sources where one
> >> >>>>>>>>>>
> >> >>>>>>>>>> poll()-style
> >> >>>>>>>>>>
> >> >>>>>>>>>> call
> >> >>>>>>>>>>
> >> >>>>>>>>>> fetches
> >> >>>>>>>>>>
> >> >>>>>>>>>> data from different splits / partitions
> >> >>>>>>>>>>     --> example sources that require that would be for
> >> >>>>>>>>>>
> >> >>>>>>>>>> example
> >> >>>>>>>>>>
> >> >>>>>>>>>> Kafka,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Pravega, Pulsar
> >> >>>>>>>>>>
> >> >>>>>>>>>> - Could have one split reader per source, or multiple
> >> >>>>>>>>>>
> >> >>>>>>>>>> split
> >> >>>>>>>>>>
> >> >>>>>>>>>> readers
> >> >>>>>>>>>>
> >> >>>>>>>>>> that
> >> >>>>>>>>>>
> >> >>>>>>>>>> share the "poll()" function
> >> >>>>>>>>>> - To not make it too complicated, we can start with
> >> >>>>>>>>>>
> >> >>>>>>>>>> thinking
> >> >>>>>>>>>>
> >> >>>>>>>>>> about
> >> >>>>>>>>>>
> >> >>>>>>>>>> one
> >> >>>>>>>>>>
> >> >>>>>>>>>> split reader for all splits initially and see if that
> >> >>>>>>>>>>
> >> >>>>>>>>>> covers
> >> >>>>>>>>>>
> >> >>>>>>>>>> all
> >> >>>>>>>>>>
> >> >>>>>>>>>> requirements
> >> >>>>>>>>>>
> >> >>>>>>>>>> *(3) Threading model of the Split Reader*
> >> >>>>>>>>>>
> >> >>>>>>>>>> - Most active part of the discussion ;-)
> >> >>>>>>>>>>
> >> >>>>>>>>>> - A non-blocking way for Flink's task code to interact
> >> >>>>>>>>>>
> >> >>>>>>>>>> with
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> source
> >> >>>>>>>>>>
> >> >>>>>>>>>> is
> >> >>>>>>>>>>
> >> >>>>>>>>>> needed in order to a task runtime code based on a
> >> >>>>>>>>>> single-threaded/actor-style task design
> >> >>>>>>>>>>     --> I personally am a big proponent of that, it will
> >> >>>>>>>>>>
> >> >>>>>>>>>> help
> >> >>>>>>>>>>
> >> >>>>>>>>>> with
> >> >>>>>>>>>>
> >> >>>>>>>>>> well-behaved checkpoints, efficiency, and simpler yet
> >> >>>>>>>>>>
> >> >>>>>>>>>> more
> >> >>>>>>>>>>
> >> >>>>>>>>>> robust
> >> >>>>>>>>>>
> >> >>>>>>>>>> runtime
> >> >>>>>>>>>>
> >> >>>>>>>>>> code
> >> >>>>>>>>>>
> >> >>>>>>>>>> - Users care about simple abstraction, so as a
> >> >>>>>>>>>>
> >> >>>>>>>>>> subclass
> >> >>>>>>>>>>
> >> >>>>>>>>>> of
> >> >>>>>>>>>>
> >> >>>>>>>>>> SplitReader
> >> >>>>>>>>>>
> >> >>>>>>>>>> (non-blocking / async) we need to have a
> >> >>>>>>>>>>
> >> >>>>>>>>>> BlockingSplitReader
> >> >>>>>>>>>>
> >> >>>>>>>>>> which
> >> >>>>>>>>>>
> >> >>>>>>>>>> will
> >> >>>>>>>>>>
> >> >>>>>>>>>> form the basis of most source implementations.
> >> >>>>>>>>>>
> >> >>>>>>>>>> BlockingSplitReader
> >> >>>>>>>>>>
> >> >>>>>>>>>> lets
> >> >>>>>>>>>>
> >> >>>>>>>>>> users do blocking simple poll() calls.
> >> >>>>>>>>>> - The BlockingSplitReader would spawn a thread (or
> >> >>>>>>>>>>
> >> >>>>>>>>>> more)
> >> >>>>>>>>>>
> >> >>>>>>>>>> and
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> thread(s) can make blocking calls and hand over data
> >> >>>>>>>>>>
> >> >>>>>>>>>> buffers
> >> >>>>>>>>>>
> >> >>>>>>>>>> via
> >> >>>>>>>>>>
> >> >>>>>>>>>> a
> >> >>>>>>>>>>
> >> >>>>>>>>>> blocking
> >> >>>>>>>>>>
> >> >>>>>>>>>> queue
> >> >>>>>>>>>> - This should allow us to cover both, a fully async
> >> >>>>>>>>>>
> >> >>>>>>>>>> runtime,
> >> >>>>>>>>>>
> >> >>>>>>>>>> and a
> >> >>>>>>>>>>
> >> >>>>>>>>>> simple
> >> >>>>>>>>>>
> >> >>>>>>>>>> blocking interface for users.
> >> >>>>>>>>>> - This is actually very similar to how the Kafka
> >> >>>>>>>>>>
> >> >>>>>>>>>> connectors
> >> >>>>>>>>>>
> >> >>>>>>>>>> work.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Kafka
> >> >>>>>>>>>>
> >> >>>>>>>>>> 9+ with one thread, Kafka 8 with multiple threads
> >> >>>>>>>>>>
> >> >>>>>>>>>> - On the base SplitReader (the async one), the
> >> >>>>>>>>>>
> >> >>>>>>>>>> non-blocking
> >> >>>>>>>>>>
> >> >>>>>>>>>> method
> >> >>>>>>>>>>
> >> >>>>>>>>>> that
> >> >>>>>>>>>>
> >> >>>>>>>>>> gets the next chunk of data would signal data
> >> >>>>>>>>>>
> >> >>>>>>>>>> availability
> >> >>>>>>>>>>
> >> >>>>>>>>>> via
> >> >>>>>>>>>>
> >> >>>>>>>>>> a
> >> >>>>>>>>>>
> >> >>>>>>>>>> CompletableFuture, because that gives the best
> >> >>>>>>>>>>
> >> >>>>>>>>>> flexibility
> >> >>>>>>>>>>
> >> >>>>>>>>>> (can
> >> >>>>>>>>>>
> >> >>>>>>>>>> await
> >> >>>>>>>>>>
> >> >>>>>>>>>> completion or register notification handlers).
> >> >>>>>>>>>> - The source task would register a "thenHandle()" (or
> >> >>>>>>>>>>
> >> >>>>>>>>>> similar)
> >> >>>>>>>>>>
> >> >>>>>>>>>> on the
> >> >>>>>>>>>>
> >> >>>>>>>>>> future to put a "take next data" task into the
> >> >>>>>>>>>>
> >> >>>>>>>>>> actor-style
> >> >>>>>>>>>>
> >> >>>>>>>>>> mailbox
> >> >>>>>>>>>>
> >> >>>>>>>>>> *(4) Split Enumeration and Assignment*
> >> >>>>>>>>>>
> >> >>>>>>>>>> - Splits may be generated lazily, both in cases where
> >> >>>>>>>>>>
> >> >>>>>>>>>> there
> >> >>>>>>>>>>
> >> >>>>>>>>>> is a
> >> >>>>>>>>>>
> >> >>>>>>>>>> limited
> >> >>>>>>>>>>
> >> >>>>>>>>>> number of splits (but very many), or splits are
> >> >>>>>>>>>>
> >> >>>>>>>>>> discovered
> >> >>>>>>>>>>
> >> >>>>>>>>>> over
> >> >>>>>>>>>>
> >> >>>>>>>>>> time
> >> >>>>>>>>>>
> >> >>>>>>>>>> - Assignment should also be lazy, to get better load
> >> >>>>>>>>>>
> >> >>>>>>>>>> balancing
> >> >>>>>>>>>>
> >> >>>>>>>>>> - Assignment needs support locality preferences
> >> >>>>>>>>>>
> >> >>>>>>>>>> - Possible design based on discussion so far:
> >> >>>>>>>>>>
> >> >>>>>>>>>>     --> SplitReader has a method "addSplits(SplitT...)"
> >> >>>>>>>>>>
> >> >>>>>>>>>> to
> >> >>>>>>>>>>
> >> >>>>>>>>>> add
> >> >>>>>>>>>>
> >> >>>>>>>>>> one or
> >> >>>>>>>>>>
> >> >>>>>>>>>> more
> >> >>>>>>>>>>
> >> >>>>>>>>>> splits. Some split readers might assume they have only
> >> >>>>>>>>>>
> >> >>>>>>>>>> one
> >> >>>>>>>>>>
> >> >>>>>>>>>> split
> >> >>>>>>>>>>
> >> >>>>>>>>>> ever,
> >> >>>>>>>>>>
> >> >>>>>>>>>> concurrently, others assume multiple splits. (Note:
> >> >>>>>>>>>>
> >> >>>>>>>>>> idea
> >> >>>>>>>>>>
> >> >>>>>>>>>> behind
> >> >>>>>>>>>>
> >> >>>>>>>>>> being
> >> >>>>>>>>>>
> >> >>>>>>>>>> able
> >> >>>>>>>>>>
> >> >>>>>>>>>> to add multiple splits at the same time is to ease
> >> >>>>>>>>>>
> >> >>>>>>>>>> startup
> >> >>>>>>>>>>
> >> >>>>>>>>>> where
> >> >>>>>>>>>>
> >> >>>>>>>>>> multiple
> >> >>>>>>>>>>
> >> >>>>>>>>>> splits may be assigned instantly.)
> >> >>>>>>>>>>     --> SplitReader has a context object on which it can
> >> >>>>>>>>>>
> >> >>>>>>>>>> call
> >> >>>>>>>>>>
> >> >>>>>>>>>> indicate
> >> >>>>>>>>>>
> >> >>>>>>>>>> when
> >> >>>>>>>>>>
> >> >>>>>>>>>> splits are completed. The enumerator gets that
> >> >>>>>>>>>>
> >> >>>>>>>>>> notification and
> >> >>>>>>>>>>
> >> >>>>>>>>>> can
> >> >>>>>>>>>>
> >> >>>>>>>>>> use
> >> >>>>>>>>>>
> >> >>>>>>>>>> to
> >> >>>>>>>>>>
> >> >>>>>>>>>> decide when to assign new splits. This should help both
> >> >>>>>>>>>>
> >> >>>>>>>>>> in
> >> >>>>>>>>>>
> >> >>>>>>>>>> cases
> >> >>>>>>>>>>
> >> >>>>>>>>>> of
> >> >>>>>>>>>>
> >> >>>>>>>>>> sources
> >> >>>>>>>>>>
> >> >>>>>>>>>> that take splits lazily (file readers) and in case the
> >> >>>>>>>>>>
> >> >>>>>>>>>> source
> >> >>>>>>>>>>
> >> >>>>>>>>>> needs to
> >> >>>>>>>>>>
> >> >>>>>>>>>> preserve a partial order between splits (Kinesis,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Pravega,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Pulsar may
> >> >>>>>>>>>>
> >> >>>>>>>>>> need
> >> >>>>>>>>>>
> >> >>>>>>>>>> that).
> >> >>>>>>>>>>     --> SplitEnumerator gets notification when
> >> >>>>>>>>>>
> >> >>>>>>>>>> SplitReaders
> >> >>>>>>>>>>
> >> >>>>>>>>>> start
> >> >>>>>>>>>>
> >> >>>>>>>>>> and
> >> >>>>>>>>>>
> >> >>>>>>>>>> when
> >> >>>>>>>>>>
> >> >>>>>>>>>> they finish splits. They can decide at that moment to
> >> >>>>>>>>>>
> >> >>>>>>>>>> push
> >> >>>>>>>>>>
> >> >>>>>>>>>> more
> >> >>>>>>>>>>
> >> >>>>>>>>>> splits
> >> >>>>>>>>>>
> >> >>>>>>>>>> to
> >> >>>>>>>>>>
> >> >>>>>>>>>> that reader
> >> >>>>>>>>>>     --> The SplitEnumerator should probably be aware of
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> source
> >> >>>>>>>>>>
> >> >>>>>>>>>> parallelism, to build its initial distribution.
> >> >>>>>>>>>>
> >> >>>>>>>>>> - Open question: Should the source expose something
> >> >>>>>>>>>>
> >> >>>>>>>>>> like
> >> >>>>>>>>>>
> >> >>>>>>>>>> "host
> >> >>>>>>>>>>
> >> >>>>>>>>>> preferences", so that yarn/mesos/k8s can take this into
> >> >>>>>>>>>>
> >> >>>>>>>>>> account
> >> >>>>>>>>>>
> >> >>>>>>>>>> when
> >> >>>>>>>>>>
> >> >>>>>>>>>> selecting a node to start a TM on?
> >> >>>>>>>>>>
> >> >>>>>>>>>> *(5) Watermarks and event time alignment*
> >> >>>>>>>>>>
> >> >>>>>>>>>> - Watermark generation, as well as idleness, needs to
> >> >>>>>>>>>>
> >> >>>>>>>>>> be
> >> >>>>>>>>>>
> >> >>>>>>>>>> per
> >> >>>>>>>>>>
> >> >>>>>>>>>> split
> >> >>>>>>>>>>
> >> >>>>>>>>>> (like
> >> >>>>>>>>>>
> >> >>>>>>>>>> currently in the Kafka Source, per partition)
> >> >>>>>>>>>> - It is desirable to support optional
> >> >>>>>>>>>>
> >> >>>>>>>>>> event-time-alignment,
> >> >>>>>>>>>>
> >> >>>>>>>>>> meaning
> >> >>>>>>>>>>
> >> >>>>>>>>>> that
> >> >>>>>>>>>>
> >> >>>>>>>>>> splits that are ahead are back-pressured or temporarily
> >> >>>>>>>>>>
> >> >>>>>>>>>> unsubscribed
> >> >>>>>>>>>>
> >> >>>>>>>>>> - I think i would be desirable to encapsulate
> >> >>>>>>>>>>
> >> >>>>>>>>>> watermark
> >> >>>>>>>>>>
> >> >>>>>>>>>> generation
> >> >>>>>>>>>>
> >> >>>>>>>>>> logic
> >> >>>>>>>>>>
> >> >>>>>>>>>> in watermark generators, for a separation of concerns.
> >> >>>>>>>>>>
> >> >>>>>>>>>> The
> >> >>>>>>>>>>
> >> >>>>>>>>>> watermark
> >> >>>>>>>>>>
> >> >>>>>>>>>> generators should run per split.
> >> >>>>>>>>>> - Using watermark generators would also help with
> >> >>>>>>>>>>
> >> >>>>>>>>>> another
> >> >>>>>>>>>>
> >> >>>>>>>>>> problem of
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> suggested interface, namely supporting non-periodic
> >> >>>>>>>>>>
> >> >>>>>>>>>> watermarks
> >> >>>>>>>>>>
> >> >>>>>>>>>> efficiently.
> >> >>>>>>>>>>
> >> >>>>>>>>>> - Need a way to "dispatch" next record to different
> >> >>>>>>>>>>
> >> >>>>>>>>>> watermark
> >> >>>>>>>>>>
> >> >>>>>>>>>> generators
> >> >>>>>>>>>>
> >> >>>>>>>>>> - Need a way to tell SplitReader to "suspend" a split
> >> >>>>>>>>>>
> >> >>>>>>>>>> until a
> >> >>>>>>>>>>
> >> >>>>>>>>>> certain
> >> >>>>>>>>>>
> >> >>>>>>>>>> watermark is reached (event time backpressure)
> >> >>>>>>>>>> - This would in fact be not needed (and thus simpler)
> >> >>>>>>>>>>
> >> >>>>>>>>>> if
> >> >>>>>>>>>>
> >> >>>>>>>>>> we
> >> >>>>>>>>>>
> >> >>>>>>>>>> had
> >> >>>>>>>>>>
> >> >>>>>>>>>> a
> >> >>>>>>>>>>
> >> >>>>>>>>>> SplitReader per split and may be a reason to re-open
> >> >>>>>>>>>>
> >> >>>>>>>>>> that
> >> >>>>>>>>>>
> >> >>>>>>>>>> discussion
> >> >>>>>>>>>>
> >> >>>>>>>>>> *(6) Watermarks across splits and in the Split
> >> >>>>>>>>>>
> >> >>>>>>>>>> Enumerator*
> >> >>>>>>>>>>
> >> >>>>>>>>>> - The split enumerator may need some watermark
> >> >>>>>>>>>>
> >> >>>>>>>>>> awareness,
> >> >>>>>>>>>>
> >> >>>>>>>>>> which
> >> >>>>>>>>>>
> >> >>>>>>>>>> should
> >> >>>>>>>>>>
> >> >>>>>>>>>> be
> >> >>>>>>>>>>
> >> >>>>>>>>>> purely based on split metadata (like create timestamp
> >> >>>>>>>>>>
> >> >>>>>>>>>> of
> >> >>>>>>>>>>
> >> >>>>>>>>>> file
> >> >>>>>>>>>>
> >> >>>>>>>>>> splits)
> >> >>>>>>>>>>
> >> >>>>>>>>>> - If there are still more splits with overlapping
> >> >>>>>>>>>>
> >> >>>>>>>>>> event
> >> >>>>>>>>>>
> >> >>>>>>>>>> time
> >> >>>>>>>>>>
> >> >>>>>>>>>> range
> >> >>>>>>>>>>
> >> >>>>>>>>>> for
> >> >>>>>>>>>>
> >> >>>>>>>>>> a
> >> >>>>>>>>>>
> >> >>>>>>>>>> split reader, then that split reader should not advance
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> watermark
> >> >>>>>>>>>>
> >> >>>>>>>>>> within the split beyond the overlap boundary. Otherwise
> >> >>>>>>>>>>
> >> >>>>>>>>>> future
> >> >>>>>>>>>>
> >> >>>>>>>>>> splits
> >> >>>>>>>>>>
> >> >>>>>>>>>> will
> >> >>>>>>>>>>
> >> >>>>>>>>>> produce late data.
> >> >>>>>>>>>>
> >> >>>>>>>>>> - One way to approach this could be that the split
> >> >>>>>>>>>>
> >> >>>>>>>>>> enumerator
> >> >>>>>>>>>>
> >> >>>>>>>>>> may
> >> >>>>>>>>>>
> >> >>>>>>>>>> send
> >> >>>>>>>>>>
> >> >>>>>>>>>> watermarks to the readers, and the readers cannot emit
> >> >>>>>>>>>>
> >> >>>>>>>>>> watermarks
> >> >>>>>>>>>>
> >> >>>>>>>>>> beyond
> >> >>>>>>>>>>
> >> >>>>>>>>>> that received watermark.
> >> >>>>>>>>>> - Many split enumerators would simply immediately send
> >> >>>>>>>>>>
> >> >>>>>>>>>> Long.MAX
> >> >>>>>>>>>>
> >> >>>>>>>>>> out
> >> >>>>>>>>>>
> >> >>>>>>>>>> and
> >> >>>>>>>>>>
> >> >>>>>>>>>> leave the progress purely to the split readers.
> >> >>>>>>>>>>
> >> >>>>>>>>>> - For event-time alignment / split back pressure, this
> >> >>>>>>>>>>
> >> >>>>>>>>>> begs
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> question
> >> >>>>>>>>>>
> >> >>>>>>>>>> how we can avoid deadlocks that may arise when splits
> >> >>>>>>>>>>
> >> >>>>>>>>>> are
> >> >>>>>>>>>>
> >> >>>>>>>>>> suspended
> >> >>>>>>>>>>
> >> >>>>>>>>>> for
> >> >>>>>>>>>>
> >> >>>>>>>>>> event time back pressure,
> >> >>>>>>>>>>
> >> >>>>>>>>>> *(7) Batch and streaming Unification*
> >> >>>>>>>>>>
> >> >>>>>>>>>> - Functionality wise, the above design should support
> >> >>>>>>>>>>
> >> >>>>>>>>>> both
> >> >>>>>>>>>>
> >> >>>>>>>>>> - Batch often (mostly) does not care about reading "in
> >> >>>>>>>>>>
> >> >>>>>>>>>> order"
> >> >>>>>>>>>>
> >> >>>>>>>>>> and
> >> >>>>>>>>>>
> >> >>>>>>>>>> generating watermarks
> >> >>>>>>>>>>     --> Might use different enumerator logic that is
> >> >>>>>>>>>>
> >> >>>>>>>>>> more
> >> >>>>>>>>>>
> >> >>>>>>>>>> locality
> >> >>>>>>>>>>
> >> >>>>>>>>>> aware
> >> >>>>>>>>>>
> >> >>>>>>>>>> and ignores event time order
> >> >>>>>>>>>>     --> Does not generate watermarks
> >> >>>>>>>>>> - Would be great if bounded sources could be
> >> >>>>>>>>>>
> >> >>>>>>>>>> identified
> >> >>>>>>>>>>
> >> >>>>>>>>>> at
> >> >>>>>>>>>>
> >> >>>>>>>>>> compile
> >> >>>>>>>>>>
> >> >>>>>>>>>> time,
> >> >>>>>>>>>>
> >> >>>>>>>>>> so that "env.addBoundedSource(...)" is type safe and
> >> >>>>>>>>>>
> >> >>>>>>>>>> can
> >> >>>>>>>>>>
> >> >>>>>>>>>> return a
> >> >>>>>>>>>>
> >> >>>>>>>>>> "BoundedDataStream".
> >> >>>>>>>>>> - Possible to defer this discussion until later
> >> >>>>>>>>>>
> >> >>>>>>>>>> *Miscellaneous Comments*
> >> >>>>>>>>>>
> >> >>>>>>>>>> - Should the source have a TypeInformation for the
> >> >>>>>>>>>>
> >> >>>>>>>>>> produced
> >> >>>>>>>>>>
> >> >>>>>>>>>> type,
> >> >>>>>>>>>>
> >> >>>>>>>>>> instead
> >> >>>>>>>>>>
> >> >>>>>>>>>> of a serializer? We need a type information in the
> >> >>>>>>>>>>
> >> >>>>>>>>>> stream
> >> >>>>>>>>>>
> >> >>>>>>>>>> anyways, and
> >> >>>>>>>>>>
> >> >>>>>>>>>> can
> >> >>>>>>>>>>
> >> >>>>>>>>>> derive the serializer from that. Plus, creating the
> >> >>>>>>>>>>
> >> >>>>>>>>>> serializer
> >> >>>>>>>>>>
> >> >>>>>>>>>> should
> >> >>>>>>>>>>
> >> >>>>>>>>>> respect the ExecutionConfig.
> >> >>>>>>>>>>
> >> >>>>>>>>>> - The TypeSerializer interface is very powerful but
> >> >>>>>>>>>>
> >> >>>>>>>>>> also
> >> >>>>>>>>>>
> >> >>>>>>>>>> not
> >> >>>>>>>>>>
> >> >>>>>>>>>> easy to
> >> >>>>>>>>>>
> >> >>>>>>>>>> implement. Its purpose is to handle data super
> >> >>>>>>>>>>
> >> >>>>>>>>>> efficiently,
> >> >>>>>>>>>>
> >> >>>>>>>>>> support
> >> >>>>>>>>>>
> >> >>>>>>>>>> flexible ways of evolution, etc.
> >> >>>>>>>>>> For metadata I would suggest to look at the
> >> >>>>>>>>>>
> >> >>>>>>>>>> SimpleVersionedSerializer
> >> >>>>>>>>>>
> >> >>>>>>>>>> instead, which is used for example for checkpoint
> >> >>>>>>>>>>
> >> >>>>>>>>>> master
> >> >>>>>>>>>>
> >> >>>>>>>>>> hooks,
> >> >>>>>>>>>>
> >> >>>>>>>>>> or for
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> streaming file sink. I think that is is a good match
> >> >>>>>>>>>>
> >> >>>>>>>>>> for
> >> >>>>>>>>>>
> >> >>>>>>>>>> cases
> >> >>>>>>>>>>
> >> >>>>>>>>>> where
> >> >>>>>>>>>>
> >> >>>>>>>>>> we
> >> >>>>>>>>>>
> >> >>>>>>>>>> do
> >> >>>>>>>>>>
> >> >>>>>>>>>> not need more than ser/deser (no copy, etc.) and don't
> >> >>>>>>>>>>
> >> >>>>>>>>>> need to
> >> >>>>>>>>>>
> >> >>>>>>>>>> push
> >> >>>>>>>>>>
> >> >>>>>>>>>> versioning out of the serialization paths for best
> >> >>>>>>>>>>
> >> >>>>>>>>>> performance
> >> >>>>>>>>>>
> >> >>>>>>>>>> (as in
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> TypeSerializer)
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
> >> >>>>>>>>>>
> >> >>>>>>>>>> k.klou...@data-artisans.com>
> >> >>>>>>>>>>
> >> >>>>>>>>>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi Biao,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks for the answer!
> >> >>>>>>>>>>
> >> >>>>>>>>>> So given the multi-threaded readers, now we have as
> >> >>>>>>>>>>
> >> >>>>>>>>>> open
> >> >>>>>>>>>>
> >> >>>>>>>>>> questions:
> >> >>>>>>>>>>
> >> >>>>>>>>>> 1) How do we let the checkpoints pass through our
> >> >>>>>>>>>>
> >> >>>>>>>>>> multi-threaded
> >> >>>>>>>>>>
> >> >>>>>>>>>> reader
> >> >>>>>>>>>>
> >> >>>>>>>>>> operator?
> >> >>>>>>>>>>
> >> >>>>>>>>>> 2) Do we have separate reader and source operators or
> >> >>>>>>>>>>
> >> >>>>>>>>>> not? In
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> strategy
> >> >>>>>>>>>>
> >> >>>>>>>>>> that has a separate source, the source operator has a
> >> >>>>>>>>>>
> >> >>>>>>>>>> parallelism of
> >> >>>>>>>>>>
> >> >>>>>>>>>> 1
> >> >>>>>>>>>>
> >> >>>>>>>>>> and
> >> >>>>>>>>>>
> >> >>>>>>>>>> is responsible for split recovery only.
> >> >>>>>>>>>>
> >> >>>>>>>>>> For the first one, given also the constraints
> >> >>>>>>>>>>
> >> >>>>>>>>>> (blocking,
> >> >>>>>>>>>>
> >> >>>>>>>>>> finite
> >> >>>>>>>>>>
> >> >>>>>>>>>> queues,
> >> >>>>>>>>>>
> >> >>>>>>>>>> etc), I do not have an answer yet.
> >> >>>>>>>>>>
> >> >>>>>>>>>> For the 2nd, I think that we should go with separate
> >> >>>>>>>>>>
> >> >>>>>>>>>> operators
> >> >>>>>>>>>>
> >> >>>>>>>>>> for
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> source and the readers, for the following reasons:
> >> >>>>>>>>>>
> >> >>>>>>>>>> 1) This is more aligned with a potential future
> >> >>>>>>>>>>
> >> >>>>>>>>>> improvement
> >> >>>>>>>>>>
> >> >>>>>>>>>> where the
> >> >>>>>>>>>>
> >> >>>>>>>>>> split
> >> >>>>>>>>>>
> >> >>>>>>>>>> discovery becomes a responsibility of the JobManager
> >> >>>>>>>>>>
> >> >>>>>>>>>> and
> >> >>>>>>>>>>
> >> >>>>>>>>>> readers are
> >> >>>>>>>>>>
> >> >>>>>>>>>> pooling more work from the JM.
> >> >>>>>>>>>>
> >> >>>>>>>>>> 2) The source is going to be the "single point of
> >> >>>>>>>>>>
> >> >>>>>>>>>> truth".
> >> >>>>>>>>>>
> >> >>>>>>>>>> It
> >> >>>>>>>>>>
> >> >>>>>>>>>> will
> >> >>>>>>>>>>
> >> >>>>>>>>>> know
> >> >>>>>>>>>>
> >> >>>>>>>>>> what
> >> >>>>>>>>>>
> >> >>>>>>>>>> has been processed and what not. If the source and the
> >> >>>>>>>>>>
> >> >>>>>>>>>> readers
> >> >>>>>>>>>>
> >> >>>>>>>>>> are a
> >> >>>>>>>>>>
> >> >>>>>>>>>> single
> >> >>>>>>>>>>
> >> >>>>>>>>>> operator with parallelism > 1, or in general, if the
> >> >>>>>>>>>>
> >> >>>>>>>>>> split
> >> >>>>>>>>>>
> >> >>>>>>>>>> discovery
> >> >>>>>>>>>>
> >> >>>>>>>>>> is
> >> >>>>>>>>>>
> >> >>>>>>>>>> done by each task individually, then:
> >> >>>>>>>>>>    i) we have to have a deterministic scheme for each
> >> >>>>>>>>>>
> >> >>>>>>>>>> reader to
> >> >>>>>>>>>>
> >> >>>>>>>>>> assign
> >> >>>>>>>>>>
> >> >>>>>>>>>> splits to itself (e.g. mod subtaskId). This is not
> >> >>>>>>>>>>
> >> >>>>>>>>>> necessarily
> >> >>>>>>>>>>
> >> >>>>>>>>>> trivial
> >> >>>>>>>>>>
> >> >>>>>>>>>> for
> >> >>>>>>>>>>
> >> >>>>>>>>>> all sources.
> >> >>>>>>>>>>    ii) each reader would have to keep a copy of all its
> >> >>>>>>>>>>
> >> >>>>>>>>>> processed
> >> >>>>>>>>>>
> >> >>>>>>>>>> slpits
> >> >>>>>>>>>>
> >> >>>>>>>>>>    iii) the state has to be a union state with a
> >> >>>>>>>>>>
> >> >>>>>>>>>> non-trivial
> >> >>>>>>>>>>
> >> >>>>>>>>>> merging
> >> >>>>>>>>>>
> >> >>>>>>>>>> logic
> >> >>>>>>>>>>
> >> >>>>>>>>>> in order to support rescaling.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Two additional points that you raised above:
> >> >>>>>>>>>>
> >> >>>>>>>>>> i) The point that you raised that we need to keep all
> >> >>>>>>>>>>
> >> >>>>>>>>>> splits
> >> >>>>>>>>>>
> >> >>>>>>>>>> (processed
> >> >>>>>>>>>>
> >> >>>>>>>>>> and
> >> >>>>>>>>>>
> >> >>>>>>>>>> not-processed) I think is a bit of a strong
> >> >>>>>>>>>>
> >> >>>>>>>>>> requirement.
> >> >>>>>>>>>>
> >> >>>>>>>>>> This
> >> >>>>>>>>>>
> >> >>>>>>>>>> would
> >> >>>>>>>>>>
> >> >>>>>>>>>> imply
> >> >>>>>>>>>>
> >> >>>>>>>>>> that for infinite sources the state will grow
> >> >>>>>>>>>>
> >> >>>>>>>>>> indefinitely.
> >> >>>>>>>>>>
> >> >>>>>>>>>> This is
> >> >>>>>>>>>>
> >> >>>>>>>>>> problem
> >> >>>>>>>>>>
> >> >>>>>>>>>> is even more pronounced if we do not have a single
> >> >>>>>>>>>>
> >> >>>>>>>>>> source
> >> >>>>>>>>>>
> >> >>>>>>>>>> that
> >> >>>>>>>>>>
> >> >>>>>>>>>> assigns
> >> >>>>>>>>>>
> >> >>>>>>>>>> splits to readers, as each reader will have its own
> >> >>>>>>>>>>
> >> >>>>>>>>>> copy
> >> >>>>>>>>>>
> >> >>>>>>>>>> of
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> state.
> >> >>>>>>>>>>
> >> >>>>>>>>>> ii) it is true that for finite sources we need to
> >> >>>>>>>>>>
> >> >>>>>>>>>> somehow
> >> >>>>>>>>>>
> >> >>>>>>>>>> not
> >> >>>>>>>>>>
> >> >>>>>>>>>> close
> >> >>>>>>>>>>
> >> >>>>>>>>>> the
> >> >>>>>>>>>>
> >> >>>>>>>>>> readers when the source/split discoverer finishes. The
> >> >>>>>>>>>> ContinuousFileReaderOperator has a work-around for
> >> >>>>>>>>>>
> >> >>>>>>>>>> that.
> >> >>>>>>>>>>
> >> >>>>>>>>>> It is
> >> >>>>>>>>>>
> >> >>>>>>>>>> not
> >> >>>>>>>>>>
> >> >>>>>>>>>> elegant,
> >> >>>>>>>>>>
> >> >>>>>>>>>> and checkpoints are not emitted after closing the
> >> >>>>>>>>>>
> >> >>>>>>>>>> source,
> >> >>>>>>>>>>
> >> >>>>>>>>>> but
> >> >>>>>>>>>>
> >> >>>>>>>>>> this, I
> >> >>>>>>>>>>
> >> >>>>>>>>>> believe, is a bigger problem which requires more
> >> >>>>>>>>>>
> >> >>>>>>>>>> changes
> >> >>>>>>>>>>
> >> >>>>>>>>>> than
> >> >>>>>>>>>>
> >> >>>>>>>>>> just
> >> >>>>>>>>>>
> >> >>>>>>>>>> refactoring the source interface.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Cheers,
> >> >>>>>>>>>> Kostas
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> --
> >> >>>>>>>>>> Best, Jingsong Lee
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> --
> >> >>>>>> Best, Jingsong Lee
> >> >>>>>>
> >> >>>>>
> >> >>>>
> >> >>>>
> >> >>>
> >> >>
> >> >
> >>
> >>
>

Reply via email to