Hi Becket,

Issue #1 - Design of Source interface

I mentioned the lack of a method like
Source#createEnumerator(Boundedness boundedness, SplitEnumeratorContext
context), because without the current proposal is not complete/does not
work.

If we say that boundedness is an intrinsic property of a source imo we
don't need the Source#createEnumerator(Boundedness boundedness,
SplitEnumeratorContext context) method.

Assuming a source from my previous example:

Source source = KafkaSource.builder()
  ...
  .untilTimestamp(...)
  .build()

Would the enumerator differ if created like
source.createEnumerator(CONTINUOUS_UNBOUNDED, ...) vs source
.createEnumerator(BOUNDED, ...)? I know I am repeating myself, but this
is the part that my opinion differ the most from the current proposal. I
really think it should always be the source that tells if it is bounded
or not. In the current proposal methods continousSource/boundedSource
somewhat reconfigure the source, which I think is misleading.

I think a call like:

Source source = KafkaSource.builder()
  ...
  .readContinously() / readUntilLatestOffset() / readUntilTimestamp / 
readUntilOffsets / ...
  .build()

is way cleaner (and expressive) than

Source source = KafkaSource.builder()
  ...
  .build()

env.continousSource(source) // which actually underneath would call 
createEnumerator(CONTINUOUS, ctx) which would be equivalent to 
source.readContinously().createEnumerator(ctx)
// or
env.boundedSource(source) // which actually underneath would call 
createEnumerator(BOUNDED, ctx) which would be equivalent to 
source.readUntilLatestOffset().createEnumerator(ctx)

Sorry for the comparison, but to me it seems there is too much magic
happening underneath those two calls.

I really believe the Source interface should have getBoundedness method
instead of (supportBoundedness) + createEnumerator(Boundedness, ...)


Issue #2 - Design of
ExecutionEnvironment#source()/continuousSource()/boundedSource()

As you might have guessed I am slightly in favor of option #2 modified.
Yes I am aware every step of the dag would have to be able to say if it
is bounded or not. I have a feeling it would be easier to express cross
bounded/unbounded operations, but I must admit I have not thought it
through thoroughly, In the spirit of batch is just a special case of
streaming I thought BoundedStream would extend from DataStream. Correct
me if I am wrong. In such a setup the cross bounded/unbounded operation
could be expressed quite easily I think:

DataStream {
  DataStream join(DataStream, ...); // we could not really tell if the result 
is bounded or not, but because bounded stream is a special case of unbounded 
the API object is correct, irrespective if the left or right side of the join 
is bounded
}

BoundedStream extends DataStream {
  BoundedStream join(BoundedStream, ...); // only if both sides are bounded the 
result can be bounded as well. However we do have access to the DataStream#join 
here, so you can still join with a DataStream
}

On the other hand I also see benefits of two completely disjointed APIs,
as we could prohibit some streaming calls in the bounded API. I can't
think of any unbounded operators that could not be implemented for
bounded stream.

Besides I think we both agree we don't like the method:

DataStream boundedStream(Source)

suggested in the current state of the FLIP. Do we ? :)

Best,

Dawid

On 10/12/2019 18:57, Becket Qin wrote:

> Hi folks,
>
> Thanks for the discussion, great feedback. Also thanks Dawid for the
> explanation, it is much clearer now.
>
> One thing that is indeed missing from the FLIP is how the boundedness is
> passed to the Source implementation. So the API should be
> Source#createEnumerator(Boundedness boundedness, SplitEnumeratorContext
> context)
> And we can probably remove the Source#supportBoundedness(Boundedness
> boundedness) method.
>
> Assuming we have that, we are essentially choosing from one of the
> following two options:
>
> Option 1:
> // The source is continuous source, and only unbounded operations can be
> performed.
> DataStream<Type> datastream = env.continuousSource(someSource);
>
> // The source is bounded source, both bounded and unbounded operations can
> be performed.
> BoundedDataStream<Type> boundedDataStream = env.boundedSource(someSource);
>
>   - Pros:
>        a) explicit boundary between bounded / unbounded streams, it is
> quite simple and clear to the users.
>   - Cons:
>        a) For applications that do not involve bounded operations, they
> still have to call different API to distinguish bounded / unbounded streams.
>        b) No support for bounded stream to run in a streaming runtime
> setting, i.e. scheduling and operators behaviors.
>
>
> Option 2:
> // The source is either bounded or unbounded, but only unbounded operations
> could be performed on the returned DataStream.
> DataStream<Type> dataStream = env.source(someSource);
>
> // The source must be a bounded source, otherwise exception is thrown.
> BoundedDataStream<Type> boundedDataStream =
> env.boundedSource(boundedSource);
>
> The pros and cons are exactly the opposite of option 1.
>   - Pros:
>        a) For applications that do not involve bounded operations, they
> still have to call different API to distinguish bounded / unbounded streams.
>        b) Support for bounded stream to run in a streaming runtime setting,
> i.e. scheduling and operators behaviors.
>   - Cons:
>        a) Bounded / unbounded streams are kind of mixed, i.e. given a
> DataStream, it is not clear whether it is bounded or not, unless you have
> the access to its source.
>
>
> If we only think from the Source API perspective, option 2 seems a better
> choice because functionality wise it is a superset of option 1, at the cost
> of some seemingly acceptable ambiguity in the DataStream API.
> But if we look at the DataStream API as a whole, option 1 seems a clearer
> choice. For example, some times a library may have to know whether a
> certain task will finish or not. And it would be difficult to tell if the
> input is a DataStream, unless additional information is provided all the
> way from the Source. One possible solution is to have a *modified option 2*
> which adds a method to the DataStream API to indicate boundedness, such as
> getBoundedness(). It would solve the problem with a potential confusion of
> what is difference between a DataStream with getBoundedness()=true and a
> BoundedDataStream. But that seems not super difficult to explain.
>
> So from API's perspective, I don't have a strong opinion between *option 1*
> and *modified option 2. *I like the cleanness of option 1, but modified
> option 2 would be more attractive if we have concrete use case for the
> "Bounded stream with unbounded streaming runtime settings".
>
> Re: Till
>
>> Maybe this has already been asked before but I was wondering why the
>> SourceReader interface has the method pollNext which hands the
>> responsibility of outputting elements to the SourceReader implementation?
>> Has this been done for backwards compatibility reasons with the old source
>> interface? If not, then one could define a Collection<E> getNextRecords()
>> method which returns the currently retrieved records and then the caller
>> emits them outside of the SourceReader. That way the interface would not
>> allow to implement an outputting loop where we never hand back control to
>> the caller. At the moment, this contract can be easily broken and is only
>> mentioned loosely in the JavaDocs.
>>
> The primary reason we handover the SourceOutput to the SourceReader is
> because sometimes it is difficult for a SourceReader to emit one record at
> a time. One example is some batched messaging systems which only have an
> offset for the entire batch instead of individual messages in the batch. In
> that case, returning one record at a time would leave the SourceReader in
> an uncheckpointable state because they can only checkpoint at the batch
> boundaries.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Dec 10, 2019 at 5:33 PM Till Rohrmann <trohrm...@apache.org> wrote:
>
>> Hi everyone,
>>
>> thanks for drafting this FLIP. It reads very well.
>>
>> Concerning Dawid's proposal, I tend to agree. The boundedness could come
>> from the source and tell the system how to treat the operator (scheduling
>> wise). From a user's perspective it should be fine to get back a DataStream
>> when calling env.source(boundedSource) if he does not need special
>> operations defined on a BoundedDataStream. If he needs this, then one could
>> use the method BoundedDataStream env.boundedSource(boundedSource).
>>
>> If possible, we could enforce the proper usage of env.boundedSource() by
>> introducing a BoundedSource type so that one cannot pass an
>> unbounded source to it. That way users would not be able to shoot
>> themselves in the foot.
>>
>> Maybe this has already been asked before but I was wondering why the
>> SourceReader interface has the method pollNext which hands the
>> responsibility of outputting elements to the SourceReader implementation?
>> Has this been done for backwards compatibility reasons with the old source
>> interface? If not, then one could define a Collection<E> getNextRecords()
>> method which returns the currently retrieved records and then the caller
>> emits them outside of the SourceReader. That way the interface would not
>> allow to implement an outputting loop where we never hand back control to
>> the caller. At the moment, this contract can be easily broken and is only
>> mentioned loosely in the JavaDocs.
>>
>> Cheers,
>> Till
>>
>> On Tue, Dec 10, 2019 at 7:49 AM Jingsong Li <jingsongl...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I think current design is good.
>>>
>>> My understanding is:
>>>
>>> For execution mode: bounded mode and continuous mode, It's totally
>>> different. I don't think we have the ability to integrate the two models
>> at
>>> present. It's about scheduling, memory, algorithms, States, etc. we
>>> shouldn't confuse them.
>>>
>>> For source capabilities: only bounded, only continuous, both bounded and
>>> continuous.
>>> I think Kafka is a source that can be ran both bounded
>>> and continuous execution mode.
>>> And Kafka with end offset should be ran both bounded
>>> and continuous execution mode.  Using apache Beam with Flink runner, I
>> used
>>> to run a "bounded" Kafka in streaming mode. For our previous DataStream,
>> it
>>> is not necessarily required that the source cannot be bounded.
>>>
>>> So it is my thought for Dawid's question:
>>> 1.pass a bounded source to continuousSource() +1
>>> 2.pass a continuous source to boundedSource() -1, should throw exception.
>>>
>>> In StreamExecutionEnvironment, continuousSource and boundedSource define
>>> the execution mode. It defines a clear boundary of execution mode.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Tue, Dec 10, 2019 at 10:37 AM Jark Wu <imj...@gmail.com> wrote:
>>>
>>>> I agree with Dawid's point that the boundedness information should come
>>>> from the source itself (e.g. the end timestamp), not through
>>>> env.boundedSouce()/continuousSource().
>>>> I think if we want to support something like `env.source()` that derive
>>> the
>>>> execution mode from source, `supportsBoundedness(Boundedness)`
>>>> method is not enough, because we don't know whether it is bounded or
>> not.
>>>> Best,
>>>> Jark
>>>>
>>>>
>>>> On Mon, 9 Dec 2019 at 22:21, Dawid Wysakowicz <dwysakow...@apache.org>
>>>> wrote:
>>>>
>>>>> One more thing. In the current proposal, with the
>>>>> supportsBoundedness(Boundedness) method and the boundedness coming
>> from
>>>>> either continuousSource or boundedSource I could not find how this
>>>>> information is fed back to the SplitEnumerator.
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>>
>>>>> On 09/12/2019 13:52, Becket Qin wrote:
>>>>>> Hi Dawid,
>>>>>>
>>>>>> Thanks for the comments. This actually brings another relevant
>>> question
>>>>>> about what does a "bounded source" imply. I actually had the same
>>>>>> impression when I look at the Source API. Here is what I understand
>>>> after
>>>>>> some discussion with Stephan. The bounded source has the following
>>>>> impacts.
>>>>>> 1. API validity.
>>>>>> - A bounded source generates a bounded stream so some operations
>> that
>>>>> only
>>>>>> works for bounded records would be performed, e.g. sort.
>>>>>> - To expose these bounded stream only APIs, there are two options:
>>>>>>      a. Add them to the DataStream API and throw exception if a
>>> method
>>>> is
>>>>>> called on an unbounded stream.
>>>>>>      b. Create a BoundedDataStream class which is returned from
>>>>>> env.boundedSource(), while DataStream is returned from
>>>>> env.continousSource().
>>>>>> Note that this cannot be done by having single
>> env.source(theSource)
>>>> even
>>>>>> the Source has a getBoundedness() method.
>>>>>>
>>>>>> 2. Scheduling
>>>>>> - A bounded source could be computed stage by stage without
>> bringing
>>> up
>>>>> all
>>>>>> the tasks at the same time.
>>>>>>
>>>>>> 3. Operator behaviors
>>>>>> - A bounded source indicates the records are finite so some
>> operators
>>>> can
>>>>>> wait until it receives all the records before it starts the
>>> processing.
>>>>>> In the above impact, only 1 is relevant to the API design. And the
>>>>> current
>>>>>> proposal in FLIP-27 is following 1.b.
>>>>>>
>>>>>> // boundedness depends of source property, imo this should always
>> be
>>>>>>> preferred
>>>>>>>
>>>>>> DataStream<MyType> stream = env.source(theSource);
>>>>>>
>>>>>>
>>>>>> In your proposal, does DataStream have bounded stream only methods?
>>> It
>>>>>> looks it should have, otherwise passing a bounded Source to
>>>> env.source()
>>>>>> would be confusing. In that case, we will essentially do 1.a if an
>>>>>> unbounded Source is created from env.source(unboundedSource).
>>>>>>
>>>>>> If we have the methods only supported for bounded streams in
>>>> DataStream,
>>>>> it
>>>>>> seems a little weird to have a separate BoundedDataStream
>> interface.
>>>>>> Am I understand it correctly?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jiangjie (Becket) Qin
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 9, 2019 at 6:40 PM Dawid Wysakowicz <
>>>> dwysakow...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> Really well written proposal and very important one. I must admit
>> I
>>>> have
>>>>>>> not understood all the intricacies of it yet.
>>>>>>>
>>>>>>> One question I have though is about where does the information
>> about
>>>>>>> boundedness come from. I think in most cases it is a property of
>> the
>>>>>>> source. As you described it might be e.g. end offset, a flag
>> should
>>> it
>>>>>>> monitor new splits etc. I think it would be a really nice use case
>>> to
>>>> be
>>>>>>> able to say:
>>>>>>>
>>>>>>> new KafkaSource().readUntil(long timestamp),
>>>>>>>
>>>>>>> which could work as an "end offset". Moreover I think all Bounded
>>>>> sources
>>>>>>> support continuous mode, but no intrinsically continuous source
>>>> support
>>>>> the
>>>>>>> Bounded mode. If I understood the proposal correctly it suggest
>> the
>>>>>>> boundedness sort of "comes" from the outside of the source, from
>> the
>>>>>>> invokation of either boundedStream or continousSource.
>>>>>>>
>>>>>>> I am wondering if it would make sense to actually change the
>> method
>>>>>>> boolean Source#supportsBoundedness(Boundedness)
>>>>>>>
>>>>>>> to
>>>>>>>
>>>>>>> Boundedness Source#getBoundedness().
>>>>>>>
>>>>>>> As for the methods #boundedSource, #continousSource, assuming the
>>>>>>> boundedness is property of the source they do not affect how the
>>>>> enumerator
>>>>>>> works, but mostly how the dag is scheduled, right? I am not
>> against
>>>>> those
>>>>>>> methods, but I think it is a very specific use case to actually
>>>> override
>>>>>>> the property of the source. In general I would expect users to
>> only
>>>> call
>>>>>>> env.source(theSource), where the source tells if it is bounded or
>>>> not. I
>>>>>>> would suggest considering following set of methods:
>>>>>>>
>>>>>>> // boundedness depends of source property, imo this should always
>> be
>>>>> preferred
>>>>>>> DataStream<MyType> stream = env.source(theSource);
>>>>>>>
>>>>>>>
>>>>>>> // always continous execution, whether bounded or unbounded source
>>>>>>>
>>>>>>> DataStream<MyType> boundedStream = env.continousSource(theSource);
>>>>>>>
>>>>>>> // imo this would make sense if the BoundedDataStream provides
>>>>> additional features unavailable for continous mode
>>>>>>> BoundedDataStream<MyType> batch = env.boundedSource(theSource);
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Dawid
>>>>>>>
>>>>>>>
>>>>>>> On 04/12/2019 11:25, Stephan Ewen wrote:
>>>>>>>
>>>>>>> Thanks, Becket, for updating this.
>>>>>>>
>>>>>>> I agree with moving the aspects you mentioned into separate FLIPs
>> -
>>>> this
>>>>>>> one way becoming unwieldy in size.
>>>>>>>
>>>>>>> +1 to the FLIP in its current state. Its a very detailed write-up,
>>>>> nicely
>>>>>>> done!
>>>>>>>
>>>>>>> On Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com>
>> <
>>>>> becket....@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> Sorry for the long belated update. I have updated FLIP-27 wiki
>> page
>>>> with
>>>>>>> the latest proposals. Some noticeable changes include:
>>>>>>> 1. A new generic communication mechanism between SplitEnumerator
>> and
>>>>>>> SourceReader.
>>>>>>> 2. Some detail API method signature changes.
>>>>>>>
>>>>>>> We left a few things out of this FLIP and will address them in
>>>> separate
>>>>>>> FLIPs. Including:
>>>>>>> 1. Per split event time.
>>>>>>> 2. Event time alignment.
>>>>>>> 3. Fine grained failover for SplitEnumerator failure.
>>>>>>>
>>>>>>> Please let us know if you have any question.
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Jiangjie (Becket) Qin
>>>>>>>
>>>>>>> On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen <se...@apache.org> <
>>>>> se...@apache.org> wrote:
>>>>>>>
>>>>>>> Hi  Łukasz!
>>>>>>>
>>>>>>> Becket and me are working hard on figuring out the last details
>> and
>>>>>>> implementing the first PoC. We would update the FLIP hopefully
>> next
>>>>> week.
>>>>>>> There is a fair chance that a first version of this will be in
>> 1.10,
>>>> but
>>>>>>> I
>>>>>>>
>>>>>>> think it will take another release to battle test it and migrate
>> the
>>>>>>> connectors.
>>>>>>>
>>>>>>> Best,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl
>>> <
>>>>> l...@touk.pl>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> This proposal looks very promising for us. Do you have any plans
>> in
>>>>>>> which
>>>>>>>
>>>>>>> Flink release it is going to be released? We are thinking on
>> using a
>>>>>>> Data
>>>>>>>
>>>>>>> Set API for our future use cases but on the other hand Data Set
>> API
>>> is
>>>>>>> going to be deprecated so using proposed bounded data streams
>>> solution
>>>>>>> could be more viable in the long term.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Łukasz
>>>>>>>
>>>>>>> On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com> <
>>>>> thomas.we...@gmail.com> wrote:
>>>>>>> Thanks for putting together this proposal!
>>>>>>>
>>>>>>> I see that the "Per Split Event Time" and "Event Time Alignment"
>>>>>>>
>>>>>>> sections
>>>>>>>
>>>>>>> are still TBD.
>>>>>>>
>>>>>>> It would probably be good to flesh those out a bit before
>> proceeding
>>>>>>> too
>>>>>>>
>>>>>>> far
>>>>>>>
>>>>>>> as the event time alignment will probably influence the
>> interaction
>>>>>>> with
>>>>>>>
>>>>>>> the split reader, specifically ReaderStatus
>> emitNext(SourceOutput<E>
>>>>>>> output).
>>>>>>>
>>>>>>> We currently have only one implementation for event time alignment
>>> in
>>>>>>> the
>>>>>>>
>>>>>>> Kinesis consumer. The synchronization in that case takes place as
>>> the
>>>>>>> last
>>>>>>>
>>>>>>> step before records are emitted downstream (RecordEmitter). With
>> the
>>>>>>> currently proposed interfaces, the equivalent can be implemented
>> in
>>>>>>> the
>>>>>>>
>>>>>>> reader loop, although note that in the Kinesis consumer the per
>>> shard
>>>>>>> threads push records.
>>>>>>>
>>>>>>> Synchronization has not been implemented for the Kafka consumer
>> yet.
>>>>>>> https://issues.apache.org/jira/browse/FLINK-12675
>>>>>>>
>>>>>>> When I looked at it, I realized that the implementation will look
>>>>>>>
>>>>>>> quite
>>>>>>>
>>>>>>> different
>>>>>>> from Kinesis because it needs to take place in the pull part,
>> where
>>>>>>> records
>>>>>>>
>>>>>>> are taken from the Kafka client. Due to the multiplexing it cannot
>>> be
>>>>>>> done
>>>>>>>
>>>>>>> by blocking the split thread like it currently works for Kinesis.
>>>>>>>
>>>>>>> Reading
>>>>>>>
>>>>>>> from individual Kafka partitions needs to be controlled via
>>>>>>>
>>>>>>> pause/resume
>>>>>>>
>>>>>>> on the Kafka client.
>>>>>>>
>>>>>>> To take on that responsibility the split thread would need to be
>>>>>>>
>>>>>>> aware
>>>>>>>
>>>>>>> of
>>>>>>>
>>>>>>> the
>>>>>>> watermarks or at least whether it should or should not continue to
>>>>>>>
>>>>>>> consume
>>>>>>>
>>>>>>> a given split and this may require a different SourceReader or
>>>>>>>
>>>>>>> SourceOutput
>>>>>>>
>>>>>>> interface.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com> <
>>>>> mmyy1...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi Stephan,
>>>>>>>
>>>>>>> Thank you for feedback!
>>>>>>> Will take a look at your branch before public discussing.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org>
>> <
>>>>> se...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Biao!
>>>>>>>
>>>>>>> Thanks for reviving this. I would like to join this discussion,
>>>>>>>
>>>>>>> but
>>>>>>>
>>>>>>> am
>>>>>>>
>>>>>>> quite occupied with the 1.9 release, so can we maybe pause this
>>>>>>>
>>>>>>> discussion
>>>>>>>
>>>>>>> for a week or so?
>>>>>>>
>>>>>>> In the meantime I can share some suggestion based on prior
>>>>>>>
>>>>>>> experiments:
>>>>>>>
>>>>>>> How to do watermarks / timestamp extractors in a simpler and more
>>>>>>>
>>>>>>> flexible
>>>>>>>
>>>>>>> way. I think that part is quite promising should be part of the
>>>>>>>
>>>>>>> new
>>>>>>>
>>>>>>> source
>>>>>>>
>>>>>>> interface.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
>>>>>>>
>> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
>>>>>>> Some experiments on how to build the source reader and its
>>>>>>>
>>>>>>> library
>>>>>>>
>>>>>>> for
>>>>>>>
>>>>>>> common threading/split patterns:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
>>>>>>> Best,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com> <
>>>>> mmyy1...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi devs,
>>>>>>>
>>>>>>> Since 1.9 is nearly released, I think we could get back to
>>>>>>>
>>>>>>> FLIP-27.
>>>>>>>
>>>>>>> I
>>>>>>>
>>>>>>> believe it should be included in 1.10.
>>>>>>>
>>>>>>> There are so many things mentioned in document of FLIP-27. [1] I
>>>>>>>
>>>>>>> think
>>>>>>>
>>>>>>> we'd better discuss them separately. However the wiki is not a
>>>>>>>
>>>>>>> good
>>>>>>>
>>>>>>> place
>>>>>>>
>>>>>>> to discuss. I wrote google doc about SplitReader API which
>>>>>>>
>>>>>>> misses
>>>>>>>
>>>>>>> some
>>>>>>>
>>>>>>> details in the document. [2]
>>>>>>>
>>>>>>> 1.
>>>>>>>
>>>>>>>
>>>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
>>>>>>> 2.
>>>>>>>
>>>>>>>
>>>>>>>
>> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
>>>>>>> CC Stephan, Aljoscha, Piotrek, Becket
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com> <
>>>>> mmyy1...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Steven,
>>>>>>> Thank you for the feedback. Please take a look at the document
>>>>>>>
>>>>>>> FLIP-27
>>>>>>>
>>>>>>> <
>>>>>>>
>>>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>>> which
>>>>>>>
>>>>>>> is updated recently. A lot of details of enumerator were added
>>>>>>>
>>>>>>> in
>>>>>>>
>>>>>>> this
>>>>>>>
>>>>>>> document. I think it would help.
>>>>>>>
>>>>>>> Steven Wu <stevenz...@gmail.com> <stevenz...@gmail.com>
>>> 于2019年3月28日周四
>>>>> 下午12:52写道:
>>>>>>>
>>>>>>> This proposal mentioned that SplitEnumerator might run on the
>>>>>>> JobManager or
>>>>>>> in a single task on a TaskManager.
>>>>>>>
>>>>>>> if enumerator is a single task on a taskmanager, then the job
>>>>>>>
>>>>>>> DAG
>>>>>>>
>>>>>>> can
>>>>>>>
>>>>>>> never
>>>>>>> been embarrassingly parallel anymore. That will nullify the
>>>>>>>
>>>>>>> leverage
>>>>>>>
>>>>>>> of
>>>>>>>
>>>>>>> fine-grained recovery for embarrassingly parallel jobs.
>>>>>>>
>>>>>>> It's not clear to me what's the implication of running
>>>>>>>
>>>>>>> enumerator
>>>>>>>
>>>>>>> on
>>>>>>>
>>>>>>> the
>>>>>>>
>>>>>>> jobmanager. So I will leave that out for now.
>>>>>>>
>>>>>>> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> <
>>>>> mmyy1...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Stephan & Piotrek,
>>>>>>>
>>>>>>> Thank you for feedback.
>>>>>>>
>>>>>>> It seems that there are a lot of things to do in community.
>>>>>>>
>>>>>>> I
>>>>>>>
>>>>>>> am
>>>>>>>
>>>>>>> just
>>>>>>>
>>>>>>> afraid that this discussion may be forgotten since there so
>>>>>>>
>>>>>>> many
>>>>>>>
>>>>>>> proposals
>>>>>>>
>>>>>>> recently.
>>>>>>> Anyway, wish to see the split topics soon :)
>>>>>>>
>>>>>>> Piotr Nowojski <pi...@da-platform.com> <pi...@da-platform.com>
>>>>> 于2019年1月24日周四
>>>>>>> 下午8:21写道:
>>>>>>>
>>>>>>> Hi Biao!
>>>>>>>
>>>>>>> This discussion was stalled because of preparations for
>>>>>>>
>>>>>>> the
>>>>>>>
>>>>>>> open
>>>>>>>
>>>>>>> sourcing
>>>>>>>
>>>>>>> & merging Blink. I think before creating the tickets we
>>>>>>>
>>>>>>> should
>>>>>>>
>>>>>>> split this
>>>>>>>
>>>>>>> discussion into topics/areas outlined by Stephan and
>>>>>>>
>>>>>>> create
>>>>>>>
>>>>>>> Flips
>>>>>>>
>>>>>>> for
>>>>>>>
>>>>>>> that.
>>>>>>>
>>>>>>> I think there is no chance for this to be completed in
>>>>>>>
>>>>>>> couple
>>>>>>>
>>>>>>> of
>>>>>>>
>>>>>>> remaining
>>>>>>>
>>>>>>> weeks/1 month before 1.8 feature freeze, however it would
>>>>>>>
>>>>>>> be
>>>>>>>
>>>>>>> good
>>>>>>>
>>>>>>> to aim
>>>>>>>
>>>>>>> with those changes for 1.9.
>>>>>>>
>>>>>>> Piotrek
>>>>>>>
>>>>>>>
>>>>>>> On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> <
>>>>> mmyy1...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi community,
>>>>>>> The summary of Stephan makes a lot sense to me. It is
>>>>>>>
>>>>>>> much
>>>>>>>
>>>>>>> clearer
>>>>>>>
>>>>>>> indeed
>>>>>>>
>>>>>>> after splitting the complex topic into small ones.
>>>>>>> I was wondering is there any detail plan for next step?
>>>>>>>
>>>>>>> If
>>>>>>>
>>>>>>> not,
>>>>>>>
>>>>>>> I
>>>>>>>
>>>>>>> would
>>>>>>>
>>>>>>> like to push this thing forward by creating some JIRA
>>>>>>>
>>>>>>> issues.
>>>>>>>
>>>>>>> Another question is that should version 1.8 include
>>>>>>>
>>>>>>> these
>>>>>>>
>>>>>>> features?
>>>>>>>
>>>>>>> Stephan Ewen <se...@apache.org> <se...@apache.org> 于2018年12月1日周六
>>>>> 上午4:20写道:
>>>>>>>
>>>>>>> Thanks everyone for the lively discussion. Let me try
>>>>>>>
>>>>>>> to
>>>>>>>
>>>>>>> summarize
>>>>>>>
>>>>>>> where I
>>>>>>>
>>>>>>> see convergence in the discussion and open issues.
>>>>>>> I'll try to group this by design aspect of the source.
>>>>>>>
>>>>>>> Please
>>>>>>>
>>>>>>> let me
>>>>>>>
>>>>>>> know
>>>>>>>
>>>>>>> if I got things wrong or missed something crucial here.
>>>>>>>
>>>>>>> For issues 1-3, if the below reflects the state of the
>>>>>>>
>>>>>>> discussion, I
>>>>>>>
>>>>>>> would
>>>>>>>
>>>>>>> try and update the FLIP in the next days.
>>>>>>> For the remaining ones we need more discussion.
>>>>>>>
>>>>>>> I would suggest to fork each of these aspects into a
>>>>>>>
>>>>>>> separate
>>>>>>>
>>>>>>> mail
>>>>>>>
>>>>>>> thread,
>>>>>>>
>>>>>>> or will loose sight of the individual aspects.
>>>>>>>
>>>>>>> *(1) Separation of Split Enumerator and Split Reader*
>>>>>>>
>>>>>>>  - All seem to agree this is a good thing
>>>>>>>  - Split Enumerator could in the end live on JobManager
>>>>>>>
>>>>>>> (and
>>>>>>>
>>>>>>> assign
>>>>>>>
>>>>>>> splits
>>>>>>>
>>>>>>> via RPC) or in a task (and assign splits via data
>>>>>>>
>>>>>>> streams)
>>>>>>>
>>>>>>>  - this discussion is orthogonal and should come later,
>>>>>>>
>>>>>>> when
>>>>>>>
>>>>>>> the
>>>>>>>
>>>>>>> interface
>>>>>>>
>>>>>>> is agreed upon.
>>>>>>>
>>>>>>> *(2) Split Readers for one or more splits*
>>>>>>>
>>>>>>>  - Discussion seems to agree that we need to support
>>>>>>>
>>>>>>> one
>>>>>>>
>>>>>>> reader
>>>>>>>
>>>>>>> that
>>>>>>>
>>>>>>> possibly handles multiple splits concurrently.
>>>>>>>  - The requirement comes from sources where one
>>>>>>>
>>>>>>> poll()-style
>>>>>>>
>>>>>>> call
>>>>>>>
>>>>>>> fetches
>>>>>>>
>>>>>>> data from different splits / partitions
>>>>>>>    --> example sources that require that would be for
>>>>>>>
>>>>>>> example
>>>>>>>
>>>>>>> Kafka,
>>>>>>>
>>>>>>> Pravega, Pulsar
>>>>>>>
>>>>>>>  - Could have one split reader per source, or multiple
>>>>>>>
>>>>>>> split
>>>>>>>
>>>>>>> readers
>>>>>>>
>>>>>>> that
>>>>>>>
>>>>>>> share the "poll()" function
>>>>>>>  - To not make it too complicated, we can start with
>>>>>>>
>>>>>>> thinking
>>>>>>>
>>>>>>> about
>>>>>>>
>>>>>>> one
>>>>>>>
>>>>>>> split reader for all splits initially and see if that
>>>>>>>
>>>>>>> covers
>>>>>>>
>>>>>>> all
>>>>>>>
>>>>>>> requirements
>>>>>>>
>>>>>>> *(3) Threading model of the Split Reader*
>>>>>>>
>>>>>>>  - Most active part of the discussion ;-)
>>>>>>>
>>>>>>>  - A non-blocking way for Flink's task code to interact
>>>>>>>
>>>>>>> with
>>>>>>>
>>>>>>> the
>>>>>>>
>>>>>>> source
>>>>>>>
>>>>>>> is
>>>>>>>
>>>>>>> needed in order to a task runtime code based on a
>>>>>>> single-threaded/actor-style task design
>>>>>>>    --> I personally am a big proponent of that, it will
>>>>>>>
>>>>>>> help
>>>>>>>
>>>>>>> with
>>>>>>>
>>>>>>> well-behaved checkpoints, efficiency, and simpler yet
>>>>>>>
>>>>>>> more
>>>>>>>
>>>>>>> robust
>>>>>>>
>>>>>>> runtime
>>>>>>>
>>>>>>> code
>>>>>>>
>>>>>>>  - Users care about simple abstraction, so as a
>>>>>>>
>>>>>>> subclass
>>>>>>>
>>>>>>> of
>>>>>>>
>>>>>>> SplitReader
>>>>>>>
>>>>>>> (non-blocking / async) we need to have a
>>>>>>>
>>>>>>> BlockingSplitReader
>>>>>>>
>>>>>>> which
>>>>>>>
>>>>>>> will
>>>>>>>
>>>>>>> form the basis of most source implementations.
>>>>>>>
>>>>>>> BlockingSplitReader
>>>>>>>
>>>>>>> lets
>>>>>>>
>>>>>>> users do blocking simple poll() calls.
>>>>>>>  - The BlockingSplitReader would spawn a thread (or
>>>>>>>
>>>>>>> more)
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>> the
>>>>>>>
>>>>>>> thread(s) can make blocking calls and hand over data
>>>>>>>
>>>>>>> buffers
>>>>>>>
>>>>>>> via
>>>>>>>
>>>>>>> a
>>>>>>>
>>>>>>> blocking
>>>>>>>
>>>>>>> queue
>>>>>>>  - This should allow us to cover both, a fully async
>>>>>>>
>>>>>>> runtime,
>>>>>>>
>>>>>>> and a
>>>>>>>
>>>>>>> simple
>>>>>>>
>>>>>>> blocking interface for users.
>>>>>>>  - This is actually very similar to how the Kafka
>>>>>>>
>>>>>>> connectors
>>>>>>>
>>>>>>> work.
>>>>>>>
>>>>>>> Kafka
>>>>>>>
>>>>>>> 9+ with one thread, Kafka 8 with multiple threads
>>>>>>>
>>>>>>>  - On the base SplitReader (the async one), the
>>>>>>>
>>>>>>> non-blocking
>>>>>>>
>>>>>>> method
>>>>>>>
>>>>>>> that
>>>>>>>
>>>>>>> gets the next chunk of data would signal data
>>>>>>>
>>>>>>> availability
>>>>>>>
>>>>>>> via
>>>>>>>
>>>>>>> a
>>>>>>>
>>>>>>> CompletableFuture, because that gives the best
>>>>>>>
>>>>>>> flexibility
>>>>>>>
>>>>>>> (can
>>>>>>>
>>>>>>> await
>>>>>>>
>>>>>>> completion or register notification handlers).
>>>>>>>  - The source task would register a "thenHandle()" (or
>>>>>>>
>>>>>>> similar)
>>>>>>>
>>>>>>> on the
>>>>>>>
>>>>>>> future to put a "take next data" task into the
>>>>>>>
>>>>>>> actor-style
>>>>>>>
>>>>>>> mailbox
>>>>>>>
>>>>>>> *(4) Split Enumeration and Assignment*
>>>>>>>
>>>>>>>  - Splits may be generated lazily, both in cases where
>>>>>>>
>>>>>>> there
>>>>>>>
>>>>>>> is a
>>>>>>>
>>>>>>> limited
>>>>>>>
>>>>>>> number of splits (but very many), or splits are
>>>>>>>
>>>>>>> discovered
>>>>>>>
>>>>>>> over
>>>>>>>
>>>>>>> time
>>>>>>>
>>>>>>>  - Assignment should also be lazy, to get better load
>>>>>>>
>>>>>>> balancing
>>>>>>>
>>>>>>>  - Assignment needs support locality preferences
>>>>>>>
>>>>>>>  - Possible design based on discussion so far:
>>>>>>>
>>>>>>>    --> SplitReader has a method "addSplits(SplitT...)"
>>>>>>>
>>>>>>> to
>>>>>>>
>>>>>>> add
>>>>>>>
>>>>>>> one or
>>>>>>>
>>>>>>> more
>>>>>>>
>>>>>>> splits. Some split readers might assume they have only
>>>>>>>
>>>>>>> one
>>>>>>>
>>>>>>> split
>>>>>>>
>>>>>>> ever,
>>>>>>>
>>>>>>> concurrently, others assume multiple splits. (Note:
>>>>>>>
>>>>>>> idea
>>>>>>>
>>>>>>> behind
>>>>>>>
>>>>>>> being
>>>>>>>
>>>>>>> able
>>>>>>>
>>>>>>> to add multiple splits at the same time is to ease
>>>>>>>
>>>>>>> startup
>>>>>>>
>>>>>>> where
>>>>>>>
>>>>>>> multiple
>>>>>>>
>>>>>>> splits may be assigned instantly.)
>>>>>>>    --> SplitReader has a context object on which it can
>>>>>>>
>>>>>>> call
>>>>>>>
>>>>>>> indicate
>>>>>>>
>>>>>>> when
>>>>>>>
>>>>>>> splits are completed. The enumerator gets that
>>>>>>>
>>>>>>> notification and
>>>>>>>
>>>>>>> can
>>>>>>>
>>>>>>> use
>>>>>>>
>>>>>>> to
>>>>>>>
>>>>>>> decide when to assign new splits. This should help both
>>>>>>>
>>>>>>> in
>>>>>>>
>>>>>>> cases
>>>>>>>
>>>>>>> of
>>>>>>>
>>>>>>> sources
>>>>>>>
>>>>>>> that take splits lazily (file readers) and in case the
>>>>>>>
>>>>>>> source
>>>>>>>
>>>>>>> needs to
>>>>>>>
>>>>>>> preserve a partial order between splits (Kinesis,
>>>>>>>
>>>>>>> Pravega,
>>>>>>>
>>>>>>> Pulsar may
>>>>>>>
>>>>>>> need
>>>>>>>
>>>>>>> that).
>>>>>>>    --> SplitEnumerator gets notification when
>>>>>>>
>>>>>>> SplitReaders
>>>>>>>
>>>>>>> start
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>> when
>>>>>>>
>>>>>>> they finish splits. They can decide at that moment to
>>>>>>>
>>>>>>> push
>>>>>>>
>>>>>>> more
>>>>>>>
>>>>>>> splits
>>>>>>>
>>>>>>> to
>>>>>>>
>>>>>>> that reader
>>>>>>>    --> The SplitEnumerator should probably be aware of
>>>>>>>
>>>>>>> the
>>>>>>>
>>>>>>> source
>>>>>>>
>>>>>>> parallelism, to build its initial distribution.
>>>>>>>
>>>>>>>  - Open question: Should the source expose something
>>>>>>>
>>>>>>> like
>>>>>>>
>>>>>>> "host
>>>>>>>
>>>>>>> preferences", so that yarn/mesos/k8s can take this into
>>>>>>>
>>>>>>> account
>>>>>>>
>>>>>>> when
>>>>>>>
>>>>>>> selecting a node to start a TM on?
>>>>>>>
>>>>>>> *(5) Watermarks and event time alignment*
>>>>>>>
>>>>>>>  - Watermark generation, as well as idleness, needs to
>>>>>>>
>>>>>>> be
>>>>>>>
>>>>>>> per
>>>>>>>
>>>>>>> split
>>>>>>>
>>>>>>> (like
>>>>>>>
>>>>>>> currently in the Kafka Source, per partition)
>>>>>>>  - It is desirable to support optional
>>>>>>>
>>>>>>> event-time-alignment,
>>>>>>>
>>>>>>> meaning
>>>>>>>
>>>>>>> that
>>>>>>>
>>>>>>> splits that are ahead are back-pressured or temporarily
>>>>>>>
>>>>>>> unsubscribed
>>>>>>>
>>>>>>>  - I think i would be desirable to encapsulate
>>>>>>>
>>>>>>> watermark
>>>>>>>
>>>>>>> generation
>>>>>>>
>>>>>>> logic
>>>>>>>
>>>>>>> in watermark generators, for a separation of concerns.
>>>>>>>
>>>>>>> The
>>>>>>>
>>>>>>> watermark
>>>>>>>
>>>>>>> generators should run per split.
>>>>>>>  - Using watermark generators would also help with
>>>>>>>
>>>>>>> another
>>>>>>>
>>>>>>> problem of
>>>>>>>
>>>>>>> the
>>>>>>>
>>>>>>> suggested interface, namely supporting non-periodic
>>>>>>>
>>>>>>> watermarks
>>>>>>>
>>>>>>> efficiently.
>>>>>>>
>>>>>>>  - Need a way to "dispatch" next record to different
>>>>>>>
>>>>>>> watermark
>>>>>>>
>>>>>>> generators
>>>>>>>
>>>>>>>  - Need a way to tell SplitReader to "suspend" a split
>>>>>>>
>>>>>>> until a
>>>>>>>
>>>>>>> certain
>>>>>>>
>>>>>>> watermark is reached (event time backpressure)
>>>>>>>  - This would in fact be not needed (and thus simpler)
>>>>>>>
>>>>>>> if
>>>>>>>
>>>>>>> we
>>>>>>>
>>>>>>> had
>>>>>>>
>>>>>>> a
>>>>>>>
>>>>>>> SplitReader per split and may be a reason to re-open
>>>>>>>
>>>>>>> that
>>>>>>>
>>>>>>> discussion
>>>>>>>
>>>>>>> *(6) Watermarks across splits and in the Split
>>>>>>>
>>>>>>> Enumerator*
>>>>>>>
>>>>>>>  - The split enumerator may need some watermark
>>>>>>>
>>>>>>> awareness,
>>>>>>>
>>>>>>> which
>>>>>>>
>>>>>>> should
>>>>>>>
>>>>>>> be
>>>>>>>
>>>>>>> purely based on split metadata (like create timestamp
>>>>>>>
>>>>>>> of
>>>>>>>
>>>>>>> file
>>>>>>>
>>>>>>> splits)
>>>>>>>
>>>>>>>  - If there are still more splits with overlapping
>>>>>>>
>>>>>>> event
>>>>>>>
>>>>>>> time
>>>>>>>
>>>>>>> range
>>>>>>>
>>>>>>> for
>>>>>>>
>>>>>>> a
>>>>>>>
>>>>>>> split reader, then that split reader should not advance
>>>>>>>
>>>>>>> the
>>>>>>>
>>>>>>> watermark
>>>>>>>
>>>>>>> within the split beyond the overlap boundary. Otherwise
>>>>>>>
>>>>>>> future
>>>>>>>
>>>>>>> splits
>>>>>>>
>>>>>>> will
>>>>>>>
>>>>>>> produce late data.
>>>>>>>
>>>>>>>  - One way to approach this could be that the split
>>>>>>>
>>>>>>> enumerator
>>>>>>>
>>>>>>> may
>>>>>>>
>>>>>>> send
>>>>>>>
>>>>>>> watermarks to the readers, and the readers cannot emit
>>>>>>>
>>>>>>> watermarks
>>>>>>>
>>>>>>> beyond
>>>>>>>
>>>>>>> that received watermark.
>>>>>>>  - Many split enumerators would simply immediately send
>>>>>>>
>>>>>>> Long.MAX
>>>>>>>
>>>>>>> out
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>> leave the progress purely to the split readers.
>>>>>>>
>>>>>>>  - For event-time alignment / split back pressure, this
>>>>>>>
>>>>>>> begs
>>>>>>>
>>>>>>> the
>>>>>>>
>>>>>>> question
>>>>>>>
>>>>>>> how we can avoid deadlocks that may arise when splits
>>>>>>>
>>>>>>> are
>>>>>>>
>>>>>>> suspended
>>>>>>>
>>>>>>> for
>>>>>>>
>>>>>>> event time back pressure,
>>>>>>>
>>>>>>> *(7) Batch and streaming Unification*
>>>>>>>
>>>>>>>  - Functionality wise, the above design should support
>>>>>>>
>>>>>>> both
>>>>>>>
>>>>>>>  - Batch often (mostly) does not care about reading "in
>>>>>>>
>>>>>>> order"
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>> generating watermarks
>>>>>>>    --> Might use different enumerator logic that is
>>>>>>>
>>>>>>> more
>>>>>>>
>>>>>>> locality
>>>>>>>
>>>>>>> aware
>>>>>>>
>>>>>>> and ignores event time order
>>>>>>>    --> Does not generate watermarks
>>>>>>>  - Would be great if bounded sources could be
>>>>>>>
>>>>>>> identified
>>>>>>>
>>>>>>> at
>>>>>>>
>>>>>>> compile
>>>>>>>
>>>>>>> time,
>>>>>>>
>>>>>>> so that "env.addBoundedSource(...)" is type safe and
>>>>>>>
>>>>>>> can
>>>>>>>
>>>>>>> return a
>>>>>>>
>>>>>>> "BoundedDataStream".
>>>>>>>  - Possible to defer this discussion until later
>>>>>>>
>>>>>>> *Miscellaneous Comments*
>>>>>>>
>>>>>>>  - Should the source have a TypeInformation for the
>>>>>>>
>>>>>>> produced
>>>>>>>
>>>>>>> type,
>>>>>>>
>>>>>>> instead
>>>>>>>
>>>>>>> of a serializer? We need a type information in the
>>>>>>>
>>>>>>> stream
>>>>>>>
>>>>>>> anyways, and
>>>>>>>
>>>>>>> can
>>>>>>>
>>>>>>> derive the serializer from that. Plus, creating the
>>>>>>>
>>>>>>> serializer
>>>>>>>
>>>>>>> should
>>>>>>>
>>>>>>> respect the ExecutionConfig.
>>>>>>>
>>>>>>>  - The TypeSerializer interface is very powerful but
>>>>>>>
>>>>>>> also
>>>>>>>
>>>>>>> not
>>>>>>>
>>>>>>> easy to
>>>>>>>
>>>>>>> implement. Its purpose is to handle data super
>>>>>>>
>>>>>>> efficiently,
>>>>>>>
>>>>>>> support
>>>>>>>
>>>>>>> flexible ways of evolution, etc.
>>>>>>>  For metadata I would suggest to look at the
>>>>>>>
>>>>>>> SimpleVersionedSerializer
>>>>>>>
>>>>>>> instead, which is used for example for checkpoint
>>>>>>>
>>>>>>> master
>>>>>>>
>>>>>>> hooks,
>>>>>>>
>>>>>>> or for
>>>>>>>
>>>>>>> the
>>>>>>>
>>>>>>> streaming file sink. I think that is is a good match
>>>>>>>
>>>>>>> for
>>>>>>>
>>>>>>> cases
>>>>>>>
>>>>>>> where
>>>>>>>
>>>>>>> we
>>>>>>>
>>>>>>> do
>>>>>>>
>>>>>>> not need more than ser/deser (no copy, etc.) and don't
>>>>>>>
>>>>>>> need to
>>>>>>>
>>>>>>> push
>>>>>>>
>>>>>>> versioning out of the serialization paths for best
>>>>>>>
>>>>>>> performance
>>>>>>>
>>>>>>> (as in
>>>>>>>
>>>>>>> the
>>>>>>>
>>>>>>> TypeSerializer)
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
>>>>> k.klou...@data-artisans.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>
>>>>>>> Hi Biao,
>>>>>>>
>>>>>>> Thanks for the answer!
>>>>>>>
>>>>>>> So given the multi-threaded readers, now we have as
>>>>>>>
>>>>>>> open
>>>>>>>
>>>>>>> questions:
>>>>>>>
>>>>>>> 1) How do we let the checkpoints pass through our
>>>>>>>
>>>>>>> multi-threaded
>>>>>>>
>>>>>>> reader
>>>>>>>
>>>>>>> operator?
>>>>>>>
>>>>>>> 2) Do we have separate reader and source operators or
>>>>>>>
>>>>>>> not? In
>>>>>>>
>>>>>>> the
>>>>>>>
>>>>>>> strategy
>>>>>>>
>>>>>>> that has a separate source, the source operator has a
>>>>>>>
>>>>>>> parallelism of
>>>>>>>
>>>>>>> 1
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>> is responsible for split recovery only.
>>>>>>>
>>>>>>> For the first one, given also the constraints
>>>>>>>
>>>>>>> (blocking,
>>>>>>>
>>>>>>> finite
>>>>>>>
>>>>>>> queues,
>>>>>>>
>>>>>>> etc), I do not have an answer yet.
>>>>>>>
>>>>>>> For the 2nd, I think that we should go with separate
>>>>>>>
>>>>>>> operators
>>>>>>>
>>>>>>> for
>>>>>>>
>>>>>>> the
>>>>>>>
>>>>>>> source and the readers, for the following reasons:
>>>>>>>
>>>>>>> 1) This is more aligned with a potential future
>>>>>>>
>>>>>>> improvement
>>>>>>>
>>>>>>> where the
>>>>>>>
>>>>>>> split
>>>>>>>
>>>>>>> discovery becomes a responsibility of the JobManager
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>> readers are
>>>>>>>
>>>>>>> pooling more work from the JM.
>>>>>>>
>>>>>>> 2) The source is going to be the "single point of
>>>>>>>
>>>>>>> truth".
>>>>>>>
>>>>>>> It
>>>>>>>
>>>>>>> will
>>>>>>>
>>>>>>> know
>>>>>>>
>>>>>>> what
>>>>>>>
>>>>>>> has been processed and what not. If the source and the
>>>>>>>
>>>>>>> readers
>>>>>>>
>>>>>>> are a
>>>>>>>
>>>>>>> single
>>>>>>>
>>>>>>> operator with parallelism > 1, or in general, if the
>>>>>>>
>>>>>>> split
>>>>>>>
>>>>>>> discovery
>>>>>>>
>>>>>>> is
>>>>>>>
>>>>>>> done by each task individually, then:
>>>>>>>   i) we have to have a deterministic scheme for each
>>>>>>>
>>>>>>> reader to
>>>>>>>
>>>>>>> assign
>>>>>>>
>>>>>>> splits to itself (e.g. mod subtaskId). This is not
>>>>>>>
>>>>>>> necessarily
>>>>>>>
>>>>>>> trivial
>>>>>>>
>>>>>>> for
>>>>>>>
>>>>>>> all sources.
>>>>>>>   ii) each reader would have to keep a copy of all its
>>>>>>>
>>>>>>> processed
>>>>>>>
>>>>>>> slpits
>>>>>>>
>>>>>>>   iii) the state has to be a union state with a
>>>>>>>
>>>>>>> non-trivial
>>>>>>>
>>>>>>> merging
>>>>>>>
>>>>>>> logic
>>>>>>>
>>>>>>> in order to support rescaling.
>>>>>>>
>>>>>>> Two additional points that you raised above:
>>>>>>>
>>>>>>> i) The point that you raised that we need to keep all
>>>>>>>
>>>>>>> splits
>>>>>>>
>>>>>>> (processed
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>> not-processed) I think is a bit of a strong
>>>>>>>
>>>>>>> requirement.
>>>>>>>
>>>>>>> This
>>>>>>>
>>>>>>> would
>>>>>>>
>>>>>>> imply
>>>>>>>
>>>>>>> that for infinite sources the state will grow
>>>>>>>
>>>>>>> indefinitely.
>>>>>>>
>>>>>>> This is
>>>>>>>
>>>>>>> problem
>>>>>>>
>>>>>>> is even more pronounced if we do not have a single
>>>>>>>
>>>>>>> source
>>>>>>>
>>>>>>> that
>>>>>>>
>>>>>>> assigns
>>>>>>>
>>>>>>> splits to readers, as each reader will have its own
>>>>>>>
>>>>>>> copy
>>>>>>>
>>>>>>> of
>>>>>>>
>>>>>>> the
>>>>>>>
>>>>>>> state.
>>>>>>>
>>>>>>> ii) it is true that for finite sources we need to
>>>>>>>
>>>>>>> somehow
>>>>>>>
>>>>>>> not
>>>>>>>
>>>>>>> close
>>>>>>>
>>>>>>> the
>>>>>>>
>>>>>>> readers when the source/split discoverer finishes. The
>>>>>>> ContinuousFileReaderOperator has a work-around for
>>>>>>>
>>>>>>> that.
>>>>>>>
>>>>>>> It is
>>>>>>>
>>>>>>> not
>>>>>>>
>>>>>>> elegant,
>>>>>>>
>>>>>>> and checkpoints are not emitted after closing the
>>>>>>>
>>>>>>> source,
>>>>>>>
>>>>>>> but
>>>>>>>
>>>>>>> this, I
>>>>>>>
>>>>>>> believe, is a bigger problem which requires more
>>>>>>>
>>>>>>> changes
>>>>>>>
>>>>>>> than
>>>>>>>
>>>>>>> just
>>>>>>>
>>>>>>> refactoring the source interface.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Kostas
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to