Hi Becket,

Thanks for updating the progress!


I have a question about the #OperatorCoordinator. Will there be any 
communication between different #OperatorCoordinators (or in the future plan)? 
Because in that way it may be able to cover some cases in FLIP-27[1] like 
initializing static data before main input processing. Of course it requires 
more thinking, just want to speak up some ideas in my mind.


+1 to the FLIP and detailed design!




[1]. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API


Best,
Jiayi Liao


 Original Message 
Sender: Stephan Ewen<se...@apache.org>
Recipient: dev<dev@flink.apache.org>
Date: Wednesday, Dec 4, 2019 18:25
Subject: Re: [DISCUSS] FLIP-27: Refactor Source Interface


Thanks, Becket, for updating this. I agree with moving the aspects you 
mentioned into separate FLIPs - this one way becoming unwieldy in size. +1 to 
the FLIP in its current state. Its a very detailed write-up, nicely done! On 
Wed, Dec 4, 2019 at 7:38 AM Becket Qin <becket....@gmail.com> wrote: > Hi all, 
> > Sorry for the long belated update. I have updated FLIP-27 wiki page with > 
the latest proposals. Some noticeable changes include: > 1. A new generic 
communication mechanism between SplitEnumerator and > SourceReader. > 2. Some 
detail API method signature changes. > > We left a few things out of this FLIP 
and will address them in separate > FLIPs. Including: > 1. Per split event 
time. > 2. Event time alignment. > 3. Fine grained failover for SplitEnumerator 
failure. > > Please let us know if you have any question. > > Thanks, > > 
Jiangjie (Becket) Qin > > On Sat, Nov 16, 2019 at 6:10 AM Stephan Ewen 
<se...@apache.org> wrote: > > > Hi Łukasz! > > > > Becket and me are working 
hard on figuring out the last details and > > implementing the first PoC. We 
would update the FLIP hopefully next week. > > > > There is a fair chance that 
a first version of this will be in 1.10, but > I > > think it will take another 
release to battle test it and migrate the > > connectors. > > > > Best, > > 
Stephan > > > > > > > > > > On Fri, Nov 15, 2019 at 11:14 AM Łukasz 
Jędrzejewski <l...@touk.pl> > wrote: > > > > > Hi, > > > > > > This proposal 
looks very promising for us. Do you have any plans in > which > > > Flink 
release it is going to be released? We are thinking on using a > Data > > > Set 
API for our future use cases but on the other hand Data Set API is > > > going 
to be deprecated so using proposed bounded data streams solution > > > could be 
more viable in the long term. > > > > > > Thanks, > > > Łukasz > > > > > > On 
2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com> wrote: > > > > 
Thanks for putting together this proposal! > > > > > > > > I see that the "Per 
Split Event Time" and "Event Time Alignment" > > sections > > > > are still 
TBD. > > > > > > > > It would probably be good to flesh those out a bit before 
proceeding > > too > > > far > > > > as the event time alignment will probably 
influence the interaction > > with > > > > the split reader, specifically 
ReaderStatus emitNext(SourceOutput<E> > > > > output). > > > > > > > > We 
currently have only one implementation for event time alignment in > > the > > 
> > Kinesis consumer. The synchronization in that case takes place as the > > > 
last > > > > step before records are emitted downstream (RecordEmitter). With 
the > > > > currently proposed interfaces, the equivalent can be implemented in 
> the > > > > reader loop, although note that in the Kinesis consumer the per 
shard > > > > threads push records. > > > > > > > > Synchronization has not 
been implemented for the Kafka consumer yet. > > > > > > > > 
https://issues.apache.org/jira/browse/FLINK-12675 > > > > > > > > When I looked 
at it, I realized that the implementation will look > quite > > > > different > 
> > > from Kinesis because it needs to take place in the pull part, where > > > 
records > > > > are taken from the Kafka client. Due to the multiplexing it 
cannot be > > > done > > > > by blocking the split thread like it currently 
works for Kinesis. > > Reading > > > > from individual Kafka partitions needs 
to be controlled via > > pause/resume > > > > on the Kafka client. > > > > > > 
> > To take on that responsibility the split thread would need to be > aware > 
> of > > > > the > > > > watermarks or at least whether it should or should not 
continue to > > > consume > > > > a given split and this may require a 
different SourceReader or > > > SourceOutput > > > > interface. > > > > > > > > 
Thanks, > > > > Thomas > > > > > > > > > > > > On Fri, Jul 26, 2019 at 1:39 AM 
Biao Liu <mmyy1...@gmail.com> wrote: > > > > > > > > > Hi Stephan, > > > > > > 
> > > > Thank you for feedback! > > > > > Will take a look at your branch 
before public discussing. > > > > > > > > > > > > > > > On Fri, Jul 26, 2019 at 
12:01 AM Stephan Ewen <se...@apache.org> > > > wrote: > > > > > > > > > > > Hi 
Biao! > > > > > > > > > > > > Thanks for reviving this. I would like to join 
this discussion, > but > > > am > > > > > > quite occupied with the 1.9 
release, so can we maybe pause this > > > > > discussion > > > > > > for a week 
or so? > > > > > > > > > > > > In the meantime I can share some suggestion 
based on prior > > > experiments: > > > > > > > > > > > > How to do watermarks 
/ timestamp extractors in a simpler and more > > > > > flexible > > > > > > 
way. I think that part is quite promising should be part of the > new > > > > > 
source > > > > > > interface. > > > > > > > > > > > > > > > > > > > > > > > 
https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 
https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
 > > > > > > > > > > > > > > > > > > > > > > > > Some experiments on how to 
build the source reader and its > library > > > for > > > > > > common 
threading/split patterns: > > > > > > > > > > > > > > > > > > > > > > > 
https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
 > > > > > > > > > > > > > > > > > > Best, > > > > > > Stephan > > > > > > > > 
> > > > > > > > > > On Thu, Jul 25, 2019 at 10:03 AM Biao Liu 
<mmyy1...@gmail.com> > > > wrote: > > > > > > > > > > > >> Hi devs, > > > > > 
>> > > > > > >> Since 1.9 is nearly released, I think we could get back to > > 
FLIP-27. > > > I > > > > > >> believe it should be included in 1.10. > > > > > 
>> > > > > > >> There are so many things mentioned in document of FLIP-27. [1] 
I > > > think > > > > > >> we'd better discuss them separately. However the 
wiki is not a > > good > > > > > place > > > > > >> to discuss. I wrote google 
doc about SplitReader API which > misses > > > some > > > > > >> details in the 
document. [2] > > > > > >> > > > > > >> 1. > > > > > >> > > > > > > > > > > > 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
 > > > > > >> 2. > > > > > >> > > > > > > > > > > > 
https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
 > > > > > >> > > > > > >> CC Stephan, Aljoscha, Piotrek, Becket > > > > > >> > 
> > > > >> > > > > > >> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu 
<mmyy1...@gmail.com> > > > wrote: > > > > > >> > > > > > >>> Hi Steven, > > > > 
> >>> Thank you for the feedback. Please take a look at the document > > > 
FLIP-27 > > > > > >>> < > > > > > > > > > > > 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
 > > > > > > > > > which > > > > > >>> is updated recently. A lot of details of 
enumerator were added > in > > > this > > > > > >>> document. I think it would 
help. > > > > > >>> > > > > > >>> Steven Wu <stevenz...@gmail.com> 
于2019年3月28日周四 下午12:52写道: > > > > > >>> > > > > > >>>> This proposal mentioned 
that SplitEnumerator might run on the > > > > > >>>> JobManager or > > > > > 
>>>> in a single task on a TaskManager. > > > > > >>>> > > > > > >>>> if 
enumerator is a single task on a taskmanager, then the job > > DAG > > > can > 
> > > > >>>> never > > > > > >>>> been embarrassingly parallel anymore. That 
will nullify the > > > leverage > > > > > of > > > > > >>>> fine-grained 
recovery for embarrassingly parallel jobs. > > > > > >>>> > > > > > >>>> It's 
not clear to me what's the implication of running > > enumerator > > > on > > > 
> > the > > > > > >>>> jobmanager. So I will leave that out for now. > > > > > 
>>>> > > > > > >>>> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu 
<mmyy1...@gmail.com> > > > wrote: > > > > > >>>> > > > > > >>>> > Hi Stephan & 
Piotrek, > > > > > >>>> > > > > > > >>>> > Thank you for feedback. > > > > > 
>>>> > > > > > > >>>> > It seems that there are a lot of things to do in 
community. > I > > am > > > > > just > > > > > >>>> > afraid that this 
discussion may be forgotten since there so > > many > > > > > >>>> proposals > 
> > > > >>>> > recently. > > > > > >>>> > Anyway, wish to see the split topics 
soon :) > > > > > >>>> > > > > > > >>>> > Piotr Nowojski 
<pi...@da-platform.com> 于2019年1月24日周四 > > 下午8:21写道: > > > > > >>>> > > > > > > 
>>>> > > Hi Biao! > > > > > >>>> > > > > > > > >>>> > > This discussion was 
stalled because of preparations for > the > > > open > > > > > >>>> sourcing > 
> > > > >>>> > > & merging Blink. I think before creating the tickets we > > 
should > > > > > >>>> split this > > > > > >>>> > > discussion into 
topics/areas outlined by Stephan and > create > > > Flips > > > > > >>>> for > 
> > > > >>>> > that. > > > > > >>>> > > > > > > > >>>> > > I think there is no 
chance for this to be completed in > > couple > > > of > > > > > >>>> > 
remaining > > > > > >>>> > > weeks/1 month before 1.8 feature freeze, however 
it would > be > > > good > > > > > >>>> to aim > > > > > >>>> > > with those 
changes for 1.9. > > > > > >>>> > > > > > > > >>>> > > Piotrek > > > > > >>>> > 
> > > > > > >>>> > > > On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> 
> > > wrote: > > > > > >>>> > > > > > > > > >>>> > > > Hi community, > > > > > 
>>>> > > > The summary of Stephan makes a lot sense to me. It is > much > > > > 
> clearer > > > > > >>>> > indeed > > > > > >>>> > > > after splitting the 
complex topic into small ones. > > > > > >>>> > > > I was wondering is there 
any detail plan for next step? > If > > > not, > > > > > I > > > > > >>>> would 
> > > > > >>>> > > > like to push this thing forward by creating some JIRA > > 
> issues. > > > > > >>>> > > > Another question is that should version 1.8 
include > these > > > > > >>>> features? > > > > > >>>> > > > > > > > > >>>> > 
> > Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道: > > > > > >>>> > > 
> > > > > > >>>> > > >> Thanks everyone for the lively discussion. Let me try > 
to > > > > > >>>> summarize > > > > > >>>> > > where I > > > > > >>>> > > >> 
see convergence in the discussion and open issues. > > > > > >>>> > > >> I'll 
try to group this by design aspect of the source. > > > Please > > > > > >>>> 
let me > > > > > >>>> > > know > > > > > >>>> > > >> if I got things wrong or 
missed something crucial here. > > > > > >>>> > > >> > > > > > >>>> > > >> For 
issues 1-3, if the below reflects the state of the > > > > > >>>> discussion, I 
> > > > > >>>> > > would > > > > > >>>> > > >> try and update the FLIP in the 
next days. > > > > > >>>> > > >> For the remaining ones we need more 
discussion. > > > > > >>>> > > >> > > > > > >>>> > > >> I would suggest to fork 
each of these aspects into a > > > separate > > > > > >>>> mail > > > > > >>>> 
> > thread, > > > > > >>>> > > >> or will loose sight of the individual 
aspects. > > > > > >>>> > > >> > > > > > >>>> > > >> *(1) Separation of Split 
Enumerator and Split Reader* > > > > > >>>> > > >> > > > > > >>>> > > >> - All 
seem to agree this is a good thing > > > > > >>>> > > >> - Split Enumerator 
could in the end live on JobManager > > > (and > > > > > >>>> assign > > > > > 
>>>> > > splits > > > > > >>>> > > >> via RPC) or in a task (and assign splits 
via data > > streams) > > > > > >>>> > > >> - this discussion is orthogonal and 
should come later, > > > when > > > > > the > > > > > >>>> > > interface > > > 
> > >>>> > > >> is agreed upon. > > > > > >>>> > > >> > > > > > >>>> > > >> 
*(2) Split Readers for one or more splits* > > > > > >>>> > > >> > > > > > >>>> 
> > >> - Discussion seems to agree that we need to support > one > > > reader > 
> > > > >>>> that > > > > > >>>> > > >> possibly handles multiple splits 
concurrently. > > > > > >>>> > > >> - The requirement comes from sources where 
one > > > poll()-style > > > > > call > > > > > >>>> > > fetches > > > > > >>>> 
> > >> data from different splits / partitions > > > > > >>>> > > >> --> 
example sources that require that would be for > > > example > > > > > >>>> 
Kafka, > > > > > >>>> > > >> Pravega, Pulsar > > > > > >>>> > > >> > > > > > 
>>>> > > >> - Could have one split reader per source, or multiple > > > split > 
> > > > >>>> readers > > > > > >>>> > > that > > > > > >>>> > > >> share the 
"poll()" function > > > > > >>>> > > >> - To not make it too complicated, we 
can start with > > > thinking > > > > > >>>> about > > > > > >>>> > one > > > > 
> >>>> > > >> split reader for all splits initially and see if that > > > 
covers > > > > > all > > > > > >>>> > > >> requirements > > > > > >>>> > > >> > 
> > > > >>>> > > >> *(3) Threading model of the Split Reader* > > > > > >>>> > 
> >> > > > > > >>>> > > >> - Most active part of the discussion ;-) > > > > > 
>>>> > > >> > > > > > >>>> > > >> - A non-blocking way for Flink's task code to 
interact > > > with > > > > > the > > > > > >>>> > source > > > > > >>>> > > is 
> > > > > >>>> > > >> needed in order to a task runtime code based on a > > > > 
> >>>> > > >> single-threaded/actor-style task design > > > > > >>>> > > >> --> 
I personally am a big proponent of that, it will > > > help > > > > > with > > 
> > > >>>> > > >> well-behaved checkpoints, efficiency, and simpler yet > > 
more > > > > > robust > > > > > >>>> > > runtime > > > > > >>>> > > >> code > > 
> > > >>>> > > >> > > > > > >>>> > > >> - Users care about simple abstraction, 
so as a > subclass > > of > > > > > >>>> > SplitReader > > > > > >>>> > > >> 
(non-blocking / async) we need to have a > > > BlockingSplitReader > > > > > 
>>>> which > > > > > >>>> > will > > > > > >>>> > > >> form the basis of most 
source implementations. > > > > > >>>> BlockingSplitReader > > > > > >>>> > 
lets > > > > > >>>> > > >> users do blocking simple poll() calls. > > > > > 
>>>> > > >> - The BlockingSplitReader would spawn a thread (or > more) > > > 
and > > > > > the > > > > > >>>> > > >> thread(s) can make blocking calls and 
hand over data > > > buffers > > > > > via > > > > > >>>> a > > > > > >>>> > > 
blocking > > > > > >>>> > > >> queue > > > > > >>>> > > >> - This should allow 
us to cover both, a fully async > > > runtime, > > > > > >>>> and a > > > > > 
>>>> > > simple > > > > > >>>> > > >> blocking interface for users. > > > > > 
>>>> > > >> - This is actually very similar to how the Kafka > > > connectors > 
> > > > >>>> work. > > > > > >>>> > Kafka > > > > > >>>> > > >> 9+ with one 
thread, Kafka 8 with multiple threads > > > > > >>>> > > >> > > > > > >>>> > > 
>> - On the base SplitReader (the async one), the > > > non-blocking > > > > > 
>>>> method > > > > > >>>> > that > > > > > >>>> > > >> gets the next chunk of 
data would signal data > > availability > > > via > > > > > a > > > > > >>>> > 
> >> CompletableFuture, because that gives the best > > flexibility > > > (can 
> > > > > >>>> await > > > > > >>>> > > >> completion or register notification 
handlers). > > > > > >>>> > > >> - The source task would register a 
"thenHandle()" (or > > > similar) > > > > > >>>> on the > > > > > >>>> > > >> 
future to put a "take next data" task into the > > actor-style > > > > > >>>> 
mailbox > > > > > >>>> > > >> > > > > > >>>> > > >> *(4) Split Enumeration and 
Assignment* > > > > > >>>> > > >> > > > > > >>>> > > >> - Splits may be 
generated lazily, both in cases where > > > there > > > > > is a > > > > > >>>> 
> > limited > > > > > >>>> > > >> number of splits (but very many), or splits 
are > > discovered > > > over > > > > > >>>> time > > > > > >>>> > > >> - 
Assignment should also be lazy, to get better load > > > balancing > > > > > 
>>>> > > >> - Assignment needs support locality preferences > > > > > >>>> > > 
>> > > > > > >>>> > > >> - Possible design based on discussion so far: > > > > 
> >>>> > > >> > > > > > >>>> > > >> --> SplitReader has a method 
"addSplits(SplitT...)" > to > > > add > > > > > >>>> one or > > > > > >>>> > > 
more > > > > > >>>> > > >> splits. Some split readers might assume they have 
only > > one > > > > > split > > > > > >>>> ever, > > > > > >>>> > > >> 
concurrently, others assume multiple splits. (Note: > idea > > > behind > > > > 
> >>>> being > > > > > >>>> > > able > > > > > >>>> > > >> to add multiple 
splits at the same time is to ease > > startup > > > > > where > > > > > >>>> > 
> multiple > > > > > >>>> > > >> splits may be assigned instantly.) > > > > > 
>>>> > > >> --> SplitReader has a context object on which it can > > > call > > 
> > > >>>> indicate > > > > > >>>> > > when > > > > > >>>> > > >> splits are 
completed. The enumerator gets that > > > notification and > > > > > >>>> can > 
> > > > >>>> > use > > > > > >>>> > > to > > > > > >>>> > > >> decide when to 
assign new splits. This should help both > > in > > > > > cases > > > > > >>>> 
of > > > > > >>>> > > sources > > > > > >>>> > > >> that take splits lazily 
(file readers) and in case the > > > source > > > > > >>>> needs to > > > > > 
>>>> > > >> preserve a partial order between splits (Kinesis, > > Pravega, > > 
> > > >>>> Pulsar may > > > > > >>>> > > need > > > > > >>>> > > >> that). > > 
> > > >>>> > > >> --> SplitEnumerator gets notification when > > SplitReaders > 
> > > > start > > > > > >>>> and > > > > > >>>> > > when > > > > > >>>> > > >> 
they finish splits. They can decide at that moment to > > push > > > more > > > 
> > >>>> splits > > > > > >>>> > > to > > > > > >>>> > > >> that reader > > > > 
> >>>> > > >> --> The SplitEnumerator should probably be aware of > > the > > > 
> > source > > > > > >>>> > > >> parallelism, to build its initial 
distribution. > > > > > >>>> > > >> > > > > > >>>> > > >> - Open question: 
Should the source expose something > like > > > "host > > > > > >>>> > > >> 
preferences", so that yarn/mesos/k8s can take this into > > > account > > > > > 
>>>> when > > > > > >>>> > > >> selecting a node to start a TM on? > > > > > 
>>>> > > >> > > > > > >>>> > > >> *(5) Watermarks and event time alignment* > > 
> > > >>>> > > >> > > > > > >>>> > > >> - Watermark generation, as well as 
idleness, needs to > be > > > per > > > > > >>>> split > > > > > >>>> > > (like 
> > > > > >>>> > > >> currently in the Kafka Source, per partition) > > > > > 
>>>> > > >> - It is desirable to support optional > > > event-time-alignment, > 
> > > > >>>> meaning > > > > > >>>> > > that > > > > > >>>> > > >> splits that 
are ahead are back-pressured or temporarily > > > > > >>>> unsubscribed > > > > 
> >>>> > > >> > > > > > >>>> > > >> - I think i would be desirable to 
encapsulate > watermark > > > > > >>>> generation > > > > > >>>> > > logic > > 
> > > >>>> > > >> in watermark generators, for a separation of concerns. > > 
The > > > > > >>>> watermark > > > > > >>>> > > >> generators should run per 
split. > > > > > >>>> > > >> - Using watermark generators would also help with 
> > another > > > > > >>>> problem of > > > > > >>>> > > the > > > > > >>>> > > 
>> suggested interface, namely supporting non-periodic > > > watermarks > > > > 
> >>>> > > efficiently. > > > > > >>>> > > >> > > > > > >>>> > > >> - Need a 
way to "dispatch" next record to different > > > watermark > > > > > >>>> > > 
generators > > > > > >>>> > > >> - Need a way to tell SplitReader to "suspend" 
a split > > > until a > > > > > >>>> certain > > > > > >>>> > > >> watermark is 
reached (event time backpressure) > > > > > >>>> > > >> - This would in fact be 
not needed (and thus simpler) > if > > > we > > > > > had > > > > > >>>> a > > 
> > > >>>> > > >> SplitReader per split and may be a reason to re-open > that > 
> > > > >>>> discussion > > > > > >>>> > > >> > > > > > >>>> > > >> *(6) 
Watermarks across splits and in the Split > > Enumerator* > > > > > >>>> > > >> 
> > > > > >>>> > > >> - The split enumerator may need some watermark > > 
awareness, > > > > > which > > > > > >>>> > should > > > > > >>>> > > be > > > 
> > >>>> > > >> purely based on split metadata (like create timestamp > of > > 
> file > > > > > >>>> splits) > > > > > >>>> > > >> - If there are still more 
splits with overlapping > event > > > time > > > > > >>>> range > > > > > >>>> 
> for > > > > > >>>> > > a > > > > > >>>> > > >> split reader, then that split 
reader should not advance > > the > > > > > >>>> watermark > > > > > >>>> > > 
>> within the split beyond the overlap boundary. Otherwise > > > future > > > > 
> >>>> splits > > > > > >>>> > > will > > > > > >>>> > > >> produce late data. 
> > > > > >>>> > > >> > > > > > >>>> > > >> - One way to approach this could be 
that the split > > > enumerator > > > > > >>>> may > > > > > >>>> > send > > > 
> > >>>> > > >> watermarks to the readers, and the readers cannot emit > > > > 
> watermarks > > > > > >>>> > beyond > > > > > >>>> > > >> that received 
watermark. > > > > > >>>> > > >> - Many split enumerators would simply 
immediately send > > > > > Long.MAX > > > > > >>>> out > > > > > >>>> > and > > 
> > > >>>> > > >> leave the progress purely to the split readers. > > > > > 
>>>> > > >> > > > > > >>>> > > >> - For event-time alignment / split back 
pressure, this > > > begs > > > > > the > > > > > >>>> > > question > > > > > 
>>>> > > >> how we can avoid deadlocks that may arise when splits > are > > > > 
> >>>> suspended > > > > > >>>> > for > > > > > >>>> > > >> event time back 
pressure, > > > > > >>>> > > >> > > > > > >>>> > > >> *(7) Batch and streaming 
Unification* > > > > > >>>> > > >> > > > > > >>>> > > >> - Functionality wise, 
the above design should support > > both > > > > > >>>> > > >> - Batch often 
(mostly) does not care about reading "in > > > order" > > > > > >>>> and > > > 
> > >>>> > > >> generating watermarks > > > > > >>>> > > >> --> Might use 
different enumerator logic that is > more > > > > > locality > > > > > >>>> > 
aware > > > > > >>>> > > >> and ignores event time order > > > > > >>>> > > >> 
--> Does not generate watermarks > > > > > >>>> > > >> - Would be great if 
bounded sources could be > identified > > at > > > > > >>>> compile > > > > > 
>>>> > > time, > > > > > >>>> > > >> so that "env.addBoundedSource(...)" is 
type safe and > can > > > > > return a > > > > > >>>> > > >> 
"BoundedDataStream". > > > > > >>>> > > >> - Possible to defer this discussion 
until later > > > > > >>>> > > >> > > > > > >>>> > > >> *Miscellaneous 
Comments* > > > > > >>>> > > >> > > > > > >>>> > > >> - Should the source have 
a TypeInformation for the > > > produced > > > > > >>>> type, > > > > > >>>> > 
> instead > > > > > >>>> > > >> of a serializer? We need a type information in 
the > stream > > > > > >>>> anyways, and > > > > > >>>> > > can > > > > > >>>> 
> > >> derive the serializer from that. Plus, creating the > > > serializer > > 
> > > >>>> should > > > > > >>>> > > >> respect the ExecutionConfig. > > > > > 
>>>> > > >> > > > > > >>>> > > >> - The TypeSerializer interface is very 
powerful but > also > > > not > > > > > >>>> easy to > > > > > >>>> > > >> 
implement. Its purpose is to handle data super > > efficiently, > > > > > >>>> 
support > > > > > >>>> > > >> flexible ways of evolution, etc. > > > > > >>>> > 
> >> For metadata I would suggest to look at the > > > > > >>>> 
SimpleVersionedSerializer > > > > > >>>> > > >> instead, which is used for 
example for checkpoint > master > > > hooks, > > > > > >>>> or for > > > > > 
>>>> > > the > > > > > >>>> > > >> streaming file sink. I think that is is a 
good match > for > > > cases > > > > > >>>> where > > > > > >>>> > we > > > > > 
>>>> > > do > > > > > >>>> > > >> not need more than ser/deser (no copy, etc.) 
and don't > > > need to > > > > > >>>> push > > > > > >>>> > > >> versioning 
out of the serialization paths for best > > > performance > > > > > >>>> (as in 
> > > > > >>>> > > the > > > > > >>>> > > >> TypeSerializer) > > > > > >>>> > > 
>> > > > > > >>>> > > >> > > > > > >>>> > > >> On Tue, Nov 27, 2018 at 11:45 AM 
Kostas Kloudas < > > > > > >>>> > > >> k.klou...@data-artisans.com> > > > > > 
>>>> > > >> wrote: > > > > > >>>> > > >> > > > > > >>>> > > >>> Hi Biao, > > > 
> > >>>> > > >>> > > > > > >>>> > > >>> Thanks for the answer! > > > > > >>>> > 
> >>> > > > > > >>>> > > >>> So given the multi-threaded readers, now we have 
as > open > > > > > >>>> questions: > > > > > >>>> > > >>> > > > > > >>>> > > 
>>> 1) How do we let the checkpoints pass through our > > > > > multi-threaded 
> > > > > >>>> > reader > > > > > >>>> > > >>> operator? > > > > > >>>> > > >>> 
> > > > > >>>> > > >>> 2) Do we have separate reader and source operators or > 
> > not? In > > > > > >>>> the > > > > > >>>> > > >> strategy > > > > > >>>> > 
> >>> that has a separate source, the source operator has a > > > > > >>>> 
parallelism of > > > > > >>>> > 1 > > > > > >>>> > > >> and > > > > > >>>> > > 
>>> is responsible for split recovery only. > > > > > >>>> > > >>> > > > > > 
>>>> > > >>> For the first one, given also the constraints > (blocking, > > > > 
> finite > > > > > >>>> > queues, > > > > > >>>> > > >>> etc), I do not have an 
answer yet. > > > > > >>>> > > >>> > > > > > >>>> > > >>> For the 2nd, I think 
that we should go with separate > > > operators > > > > > >>>> for > > > > > 
>>>> > the > > > > > >>>> > > >>> source and the readers, for the following 
reasons: > > > > > >>>> > > >>> > > > > > >>>> > > >>> 1) This is more aligned 
with a potential future > > > improvement > > > > > >>>> where the > > > > > 
>>>> > > >> split > > > > > >>>> > > >>> discovery becomes a responsibility of 
the JobManager > and > > > > > >>>> readers are > > > > > >>>> > > >>> pooling 
more work from the JM. > > > > > >>>> > > >>> > > > > > >>>> > > >>> 2) The 
source is going to be the "single point of > > truth". > > > It > > > > > >>>> 
will > > > > > >>>> > know > > > > > >>>> > > >> what > > > > > >>>> > > >>> 
has been processed and what not. If the source and the > > > readers > > > > > 
>>>> are a > > > > > >>>> > > >> single > > > > > >>>> > > >>> operator with 
parallelism > 1, or in general, if the > > split > > > > > >>>> discovery > > > 
> > >>>> > is > > > > > >>>> > > >>> done by each task individually, then: > > 
> > > >>>> > > >>> i) we have to have a deterministic scheme for each > > > 
reader to > > > > > >>>> assign > > > > > >>>> > > >>> splits to itself (e.g. 
mod subtaskId). This is not > > > necessarily > > > > > >>>> > trivial > > > > 
> >>>> > > >> for > > > > > >>>> > > >>> all sources. > > > > > >>>> > > >>> 
ii) each reader would have to keep a copy of all its > > > > > processed > > > 
> > >>>> > slpits > > > > > >>>> > > >>> iii) the state has to be a union state 
with a > > > non-trivial > > > > > >>>> merging > > > > > >>>> > > >> logic > > 
> > > >>>> > > >>> in order to support rescaling. > > > > > >>>> > > >>> > > > 
> > >>>> > > >>> Two additional points that you raised above: > > > > > >>>> > 
> >>> > > > > > >>>> > > >>> i) The point that you raised that we need to keep 
all > > > splits > > > > > >>>> > (processed > > > > > >>>> > > >> and > > > > 
> >>>> > > >>> not-processed) I think is a bit of a strong > requirement. > > > 
This > > > > > >>>> would > > > > > >>>> > > imply > > > > > >>>> > > >>> that 
for infinite sources the state will grow > > > indefinitely. > > > > > >>>> 
This is > > > > > >>>> > > >> problem > > > > > >>>> > > >>> is even more 
pronounced if we do not have a single > > source > > > that > > > > > >>>> > 
assigns > > > > > >>>> > > >>> splits to readers, as each reader will have its 
own > copy > > > of > > > > > the > > > > > >>>> > state. > > > > > >>>> > > 
>>> > > > > > >>>> > > >>> ii) it is true that for finite sources we need to > 
> somehow > > > not > > > > > >>>> close > > > > > >>>> > the > > > > > >>>> > 
> >>> readers when the source/split discoverer finishes. The > > > > > >>>> > > 
>>> ContinuousFileReaderOperator has a work-around for > that. > > > It is > > 
> > > >>>> not > > > > > >>>> > > >> elegant, > > > > > >>>> > > >>> and 
checkpoints are not emitted after closing the > > source, > > > but > > > > > 
>>>> this, I > > > > > >>>> > > >>> believe, is a bigger problem which requires 
more > changes > > > than > > > > > >>>> just > > > > > >>>> > > >>> 
refactoring the source interface. > > > > > >>>> > > >>> > > > > > >>>> > > >>> 
Cheers, > > > > > >>>> > > >>> Kostas > > > > > >>>> > > >>> > > > > > >>>> > > 
>> > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > >>>> > > > 
> > >>> > > > > > > > > > > > > > > >

Reply via email to