Hi Łukasz! Becket and me are working hard on figuring out the last details and implementing the first PoC. We would update the FLIP hopefully next week.
There is a fair chance that a first version of this will be in 1.10, but I think it will take another release to battle test it and migrate the connectors. Best, Stephan On Fri, Nov 15, 2019 at 11:14 AM Łukasz Jędrzejewski <l...@touk.pl> wrote: > Hi, > > This proposal looks very promising for us. Do you have any plans in which > Flink release it is going to be released? We are thinking on using a Data > Set API for our future use cases but on the other hand Data Set API is > going to be deprecated so using proposed bounded data streams solution > could be more viable in the long term. > > Thanks, > Łukasz > > On 2019/10/01 15:48:03, Thomas Weise <thomas.we...@gmail.com> wrote: > > Thanks for putting together this proposal! > > > > I see that the "Per Split Event Time" and "Event Time Alignment" sections > > are still TBD. > > > > It would probably be good to flesh those out a bit before proceeding too > far > > as the event time alignment will probably influence the interaction with > > the split reader, specifically ReaderStatus emitNext(SourceOutput<E> > > output). > > > > We currently have only one implementation for event time alignment in the > > Kinesis consumer. The synchronization in that case takes place as the > last > > step before records are emitted downstream (RecordEmitter). With the > > currently proposed interfaces, the equivalent can be implemented in the > > reader loop, although note that in the Kinesis consumer the per shard > > threads push records. > > > > Synchronization has not been implemented for the Kafka consumer yet. > > > > https://issues.apache.org/jira/browse/FLINK-12675 > > > > When I looked at it, I realized that the implementation will look quite > > different > > from Kinesis because it needs to take place in the pull part, where > records > > are taken from the Kafka client. Due to the multiplexing it cannot be > done > > by blocking the split thread like it currently works for Kinesis. Reading > > from individual Kafka partitions needs to be controlled via pause/resume > > on the Kafka client. > > > > To take on that responsibility the split thread would need to be aware of > > the > > watermarks or at least whether it should or should not continue to > consume > > a given split and this may require a different SourceReader or > SourceOutput > > interface. > > > > Thanks, > > Thomas > > > > > > On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com> wrote: > > > > > Hi Stephan, > > > > > > Thank you for feedback! > > > Will take a look at your branch before public discussing. > > > > > > > > > On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org> > wrote: > > > > > > > Hi Biao! > > > > > > > > Thanks for reviving this. I would like to join this discussion, but > am > > > > quite occupied with the 1.9 release, so can we maybe pause this > > > discussion > > > > for a week or so? > > > > > > > > In the meantime I can share some suggestion based on prior > experiments: > > > > > > > > How to do watermarks / timestamp extractors in a simpler and more > > > flexible > > > > way. I think that part is quite promising should be part of the new > > > source > > > > interface. > > > > > > > > > > > > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime > > > > > > > > > > > > > > > > https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java > > > > > > > > > > > > > > > > Some experiments on how to build the source reader and its library > for > > > > common threading/split patterns: > > > > > > > > > > > > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src > > > > > > > > > > > > Best, > > > > Stephan > > > > > > > > > > > > On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com> > wrote: > > > > > > > >> Hi devs, > > > >> > > > >> Since 1.9 is nearly released, I think we could get back to FLIP-27. > I > > > >> believe it should be included in 1.10. > > > >> > > > >> There are so many things mentioned in document of FLIP-27. [1] I > think > > > >> we'd better discuss them separately. However the wiki is not a good > > > place > > > >> to discuss. I wrote google doc about SplitReader API which misses > some > > > >> details in the document. [2] > > > >> > > > >> 1. > > > >> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface > > > >> 2. > > > >> > > > > https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing > > > >> > > > >> CC Stephan, Aljoscha, Piotrek, Becket > > > >> > > > >> > > > >> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com> > wrote: > > > >> > > > >>> Hi Steven, > > > >>> Thank you for the feedback. Please take a look at the document > FLIP-27 > > > >>> < > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > > > > which > > > >>> is updated recently. A lot of details of enumerator were added in > this > > > >>> document. I think it would help. > > > >>> > > > >>> Steven Wu <stevenz...@gmail.com> 于2019年3月28日周四 下午12:52写道: > > > >>> > > > >>>> This proposal mentioned that SplitEnumerator might run on the > > > >>>> JobManager or > > > >>>> in a single task on a TaskManager. > > > >>>> > > > >>>> if enumerator is a single task on a taskmanager, then the job DAG > can > > > >>>> never > > > >>>> been embarrassingly parallel anymore. That will nullify the > leverage > > > of > > > >>>> fine-grained recovery for embarrassingly parallel jobs. > > > >>>> > > > >>>> It's not clear to me what's the implication of running enumerator > on > > > the > > > >>>> jobmanager. So I will leave that out for now. > > > >>>> > > > >>>> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> > wrote: > > > >>>> > > > >>>> > Hi Stephan & Piotrek, > > > >>>> > > > > >>>> > Thank you for feedback. > > > >>>> > > > > >>>> > It seems that there are a lot of things to do in community. I am > > > just > > > >>>> > afraid that this discussion may be forgotten since there so many > > > >>>> proposals > > > >>>> > recently. > > > >>>> > Anyway, wish to see the split topics soon :) > > > >>>> > > > > >>>> > Piotr Nowojski <pi...@da-platform.com> 于2019年1月24日周四 下午8:21写道: > > > >>>> > > > > >>>> > > Hi Biao! > > > >>>> > > > > > >>>> > > This discussion was stalled because of preparations for the > open > > > >>>> sourcing > > > >>>> > > & merging Blink. I think before creating the tickets we should > > > >>>> split this > > > >>>> > > discussion into topics/areas outlined by Stephan and create > Flips > > > >>>> for > > > >>>> > that. > > > >>>> > > > > > >>>> > > I think there is no chance for this to be completed in couple > of > > > >>>> > remaining > > > >>>> > > weeks/1 month before 1.8 feature freeze, however it would be > good > > > >>>> to aim > > > >>>> > > with those changes for 1.9. > > > >>>> > > > > > >>>> > > Piotrek > > > >>>> > > > > > >>>> > > > On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> > wrote: > > > >>>> > > > > > > >>>> > > > Hi community, > > > >>>> > > > The summary of Stephan makes a lot sense to me. It is much > > > clearer > > > >>>> > indeed > > > >>>> > > > after splitting the complex topic into small ones. > > > >>>> > > > I was wondering is there any detail plan for next step? If > not, > > > I > > > >>>> would > > > >>>> > > > like to push this thing forward by creating some JIRA > issues. > > > >>>> > > > Another question is that should version 1.8 include these > > > >>>> features? > > > >>>> > > > > > > >>>> > > > Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道: > > > >>>> > > > > > > >>>> > > >> Thanks everyone for the lively discussion. Let me try to > > > >>>> summarize > > > >>>> > > where I > > > >>>> > > >> see convergence in the discussion and open issues. > > > >>>> > > >> I'll try to group this by design aspect of the source. > Please > > > >>>> let me > > > >>>> > > know > > > >>>> > > >> if I got things wrong or missed something crucial here. > > > >>>> > > >> > > > >>>> > > >> For issues 1-3, if the below reflects the state of the > > > >>>> discussion, I > > > >>>> > > would > > > >>>> > > >> try and update the FLIP in the next days. > > > >>>> > > >> For the remaining ones we need more discussion. > > > >>>> > > >> > > > >>>> > > >> I would suggest to fork each of these aspects into a > separate > > > >>>> mail > > > >>>> > > thread, > > > >>>> > > >> or will loose sight of the individual aspects. > > > >>>> > > >> > > > >>>> > > >> *(1) Separation of Split Enumerator and Split Reader* > > > >>>> > > >> > > > >>>> > > >> - All seem to agree this is a good thing > > > >>>> > > >> - Split Enumerator could in the end live on JobManager > (and > > > >>>> assign > > > >>>> > > splits > > > >>>> > > >> via RPC) or in a task (and assign splits via data streams) > > > >>>> > > >> - this discussion is orthogonal and should come later, > when > > > the > > > >>>> > > interface > > > >>>> > > >> is agreed upon. > > > >>>> > > >> > > > >>>> > > >> *(2) Split Readers for one or more splits* > > > >>>> > > >> > > > >>>> > > >> - Discussion seems to agree that we need to support one > reader > > > >>>> that > > > >>>> > > >> possibly handles multiple splits concurrently. > > > >>>> > > >> - The requirement comes from sources where one > poll()-style > > > call > > > >>>> > > fetches > > > >>>> > > >> data from different splits / partitions > > > >>>> > > >> --> example sources that require that would be for > example > > > >>>> Kafka, > > > >>>> > > >> Pravega, Pulsar > > > >>>> > > >> > > > >>>> > > >> - Could have one split reader per source, or multiple > split > > > >>>> readers > > > >>>> > > that > > > >>>> > > >> share the "poll()" function > > > >>>> > > >> - To not make it too complicated, we can start with > thinking > > > >>>> about > > > >>>> > one > > > >>>> > > >> split reader for all splits initially and see if that > covers > > > all > > > >>>> > > >> requirements > > > >>>> > > >> > > > >>>> > > >> *(3) Threading model of the Split Reader* > > > >>>> > > >> > > > >>>> > > >> - Most active part of the discussion ;-) > > > >>>> > > >> > > > >>>> > > >> - A non-blocking way for Flink's task code to interact > with > > > the > > > >>>> > source > > > >>>> > > is > > > >>>> > > >> needed in order to a task runtime code based on a > > > >>>> > > >> single-threaded/actor-style task design > > > >>>> > > >> --> I personally am a big proponent of that, it will > help > > > with > > > >>>> > > >> well-behaved checkpoints, efficiency, and simpler yet more > > > robust > > > >>>> > > runtime > > > >>>> > > >> code > > > >>>> > > >> > > > >>>> > > >> - Users care about simple abstraction, so as a subclass of > > > >>>> > SplitReader > > > >>>> > > >> (non-blocking / async) we need to have a > BlockingSplitReader > > > >>>> which > > > >>>> > will > > > >>>> > > >> form the basis of most source implementations. > > > >>>> BlockingSplitReader > > > >>>> > lets > > > >>>> > > >> users do blocking simple poll() calls. > > > >>>> > > >> - The BlockingSplitReader would spawn a thread (or more) > and > > > the > > > >>>> > > >> thread(s) can make blocking calls and hand over data > buffers > > > via > > > >>>> a > > > >>>> > > blocking > > > >>>> > > >> queue > > > >>>> > > >> - This should allow us to cover both, a fully async > runtime, > > > >>>> and a > > > >>>> > > simple > > > >>>> > > >> blocking interface for users. > > > >>>> > > >> - This is actually very similar to how the Kafka > connectors > > > >>>> work. > > > >>>> > Kafka > > > >>>> > > >> 9+ with one thread, Kafka 8 with multiple threads > > > >>>> > > >> > > > >>>> > > >> - On the base SplitReader (the async one), the > non-blocking > > > >>>> method > > > >>>> > that > > > >>>> > > >> gets the next chunk of data would signal data availability > via > > > a > > > >>>> > > >> CompletableFuture, because that gives the best flexibility > (can > > > >>>> await > > > >>>> > > >> completion or register notification handlers). > > > >>>> > > >> - The source task would register a "thenHandle()" (or > similar) > > > >>>> on the > > > >>>> > > >> future to put a "take next data" task into the actor-style > > > >>>> mailbox > > > >>>> > > >> > > > >>>> > > >> *(4) Split Enumeration and Assignment* > > > >>>> > > >> > > > >>>> > > >> - Splits may be generated lazily, both in cases where > there > > > is a > > > >>>> > > limited > > > >>>> > > >> number of splits (but very many), or splits are discovered > over > > > >>>> time > > > >>>> > > >> - Assignment should also be lazy, to get better load > balancing > > > >>>> > > >> - Assignment needs support locality preferences > > > >>>> > > >> > > > >>>> > > >> - Possible design based on discussion so far: > > > >>>> > > >> > > > >>>> > > >> --> SplitReader has a method "addSplits(SplitT...)" to > add > > > >>>> one or > > > >>>> > > more > > > >>>> > > >> splits. Some split readers might assume they have only one > > > split > > > >>>> ever, > > > >>>> > > >> concurrently, others assume multiple splits. (Note: idea > behind > > > >>>> being > > > >>>> > > able > > > >>>> > > >> to add multiple splits at the same time is to ease startup > > > where > > > >>>> > > multiple > > > >>>> > > >> splits may be assigned instantly.) > > > >>>> > > >> --> SplitReader has a context object on which it can > call > > > >>>> indicate > > > >>>> > > when > > > >>>> > > >> splits are completed. The enumerator gets that > notification and > > > >>>> can > > > >>>> > use > > > >>>> > > to > > > >>>> > > >> decide when to assign new splits. This should help both in > > > cases > > > >>>> of > > > >>>> > > sources > > > >>>> > > >> that take splits lazily (file readers) and in case the > source > > > >>>> needs to > > > >>>> > > >> preserve a partial order between splits (Kinesis, Pravega, > > > >>>> Pulsar may > > > >>>> > > need > > > >>>> > > >> that). > > > >>>> > > >> --> SplitEnumerator gets notification when SplitReaders > > > start > > > >>>> and > > > >>>> > > when > > > >>>> > > >> they finish splits. They can decide at that moment to push > more > > > >>>> splits > > > >>>> > > to > > > >>>> > > >> that reader > > > >>>> > > >> --> The SplitEnumerator should probably be aware of the > > > source > > > >>>> > > >> parallelism, to build its initial distribution. > > > >>>> > > >> > > > >>>> > > >> - Open question: Should the source expose something like > "host > > > >>>> > > >> preferences", so that yarn/mesos/k8s can take this into > account > > > >>>> when > > > >>>> > > >> selecting a node to start a TM on? > > > >>>> > > >> > > > >>>> > > >> *(5) Watermarks and event time alignment* > > > >>>> > > >> > > > >>>> > > >> - Watermark generation, as well as idleness, needs to be > per > > > >>>> split > > > >>>> > > (like > > > >>>> > > >> currently in the Kafka Source, per partition) > > > >>>> > > >> - It is desirable to support optional > event-time-alignment, > > > >>>> meaning > > > >>>> > > that > > > >>>> > > >> splits that are ahead are back-pressured or temporarily > > > >>>> unsubscribed > > > >>>> > > >> > > > >>>> > > >> - I think i would be desirable to encapsulate watermark > > > >>>> generation > > > >>>> > > logic > > > >>>> > > >> in watermark generators, for a separation of concerns. The > > > >>>> watermark > > > >>>> > > >> generators should run per split. > > > >>>> > > >> - Using watermark generators would also help with another > > > >>>> problem of > > > >>>> > > the > > > >>>> > > >> suggested interface, namely supporting non-periodic > watermarks > > > >>>> > > efficiently. > > > >>>> > > >> > > > >>>> > > >> - Need a way to "dispatch" next record to different > watermark > > > >>>> > > generators > > > >>>> > > >> - Need a way to tell SplitReader to "suspend" a split > until a > > > >>>> certain > > > >>>> > > >> watermark is reached (event time backpressure) > > > >>>> > > >> - This would in fact be not needed (and thus simpler) if > we > > > had > > > >>>> a > > > >>>> > > >> SplitReader per split and may be a reason to re-open that > > > >>>> discussion > > > >>>> > > >> > > > >>>> > > >> *(6) Watermarks across splits and in the Split Enumerator* > > > >>>> > > >> > > > >>>> > > >> - The split enumerator may need some watermark awareness, > > > which > > > >>>> > should > > > >>>> > > be > > > >>>> > > >> purely based on split metadata (like create timestamp of > file > > > >>>> splits) > > > >>>> > > >> - If there are still more splits with overlapping event > time > > > >>>> range > > > >>>> > for > > > >>>> > > a > > > >>>> > > >> split reader, then that split reader should not advance the > > > >>>> watermark > > > >>>> > > >> within the split beyond the overlap boundary. Otherwise > future > > > >>>> splits > > > >>>> > > will > > > >>>> > > >> produce late data. > > > >>>> > > >> > > > >>>> > > >> - One way to approach this could be that the split > enumerator > > > >>>> may > > > >>>> > send > > > >>>> > > >> watermarks to the readers, and the readers cannot emit > > > watermarks > > > >>>> > beyond > > > >>>> > > >> that received watermark. > > > >>>> > > >> - Many split enumerators would simply immediately send > > > Long.MAX > > > >>>> out > > > >>>> > and > > > >>>> > > >> leave the progress purely to the split readers. > > > >>>> > > >> > > > >>>> > > >> - For event-time alignment / split back pressure, this > begs > > > the > > > >>>> > > question > > > >>>> > > >> how we can avoid deadlocks that may arise when splits are > > > >>>> suspended > > > >>>> > for > > > >>>> > > >> event time back pressure, > > > >>>> > > >> > > > >>>> > > >> *(7) Batch and streaming Unification* > > > >>>> > > >> > > > >>>> > > >> - Functionality wise, the above design should support both > > > >>>> > > >> - Batch often (mostly) does not care about reading "in > order" > > > >>>> and > > > >>>> > > >> generating watermarks > > > >>>> > > >> --> Might use different enumerator logic that is more > > > locality > > > >>>> > aware > > > >>>> > > >> and ignores event time order > > > >>>> > > >> --> Does not generate watermarks > > > >>>> > > >> - Would be great if bounded sources could be identified at > > > >>>> compile > > > >>>> > > time, > > > >>>> > > >> so that "env.addBoundedSource(...)" is type safe and can > > > return a > > > >>>> > > >> "BoundedDataStream". > > > >>>> > > >> - Possible to defer this discussion until later > > > >>>> > > >> > > > >>>> > > >> *Miscellaneous Comments* > > > >>>> > > >> > > > >>>> > > >> - Should the source have a TypeInformation for the > produced > > > >>>> type, > > > >>>> > > instead > > > >>>> > > >> of a serializer? We need a type information in the stream > > > >>>> anyways, and > > > >>>> > > can > > > >>>> > > >> derive the serializer from that. Plus, creating the > serializer > > > >>>> should > > > >>>> > > >> respect the ExecutionConfig. > > > >>>> > > >> > > > >>>> > > >> - The TypeSerializer interface is very powerful but also > not > > > >>>> easy to > > > >>>> > > >> implement. Its purpose is to handle data super efficiently, > > > >>>> support > > > >>>> > > >> flexible ways of evolution, etc. > > > >>>> > > >> For metadata I would suggest to look at the > > > >>>> SimpleVersionedSerializer > > > >>>> > > >> instead, which is used for example for checkpoint master > hooks, > > > >>>> or for > > > >>>> > > the > > > >>>> > > >> streaming file sink. I think that is is a good match for > cases > > > >>>> where > > > >>>> > we > > > >>>> > > do > > > >>>> > > >> not need more than ser/deser (no copy, etc.) and don't > need to > > > >>>> push > > > >>>> > > >> versioning out of the serialization paths for best > performance > > > >>>> (as in > > > >>>> > > the > > > >>>> > > >> TypeSerializer) > > > >>>> > > >> > > > >>>> > > >> > > > >>>> > > >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas < > > > >>>> > > >> k.klou...@data-artisans.com> > > > >>>> > > >> wrote: > > > >>>> > > >> > > > >>>> > > >>> Hi Biao, > > > >>>> > > >>> > > > >>>> > > >>> Thanks for the answer! > > > >>>> > > >>> > > > >>>> > > >>> So given the multi-threaded readers, now we have as open > > > >>>> questions: > > > >>>> > > >>> > > > >>>> > > >>> 1) How do we let the checkpoints pass through our > > > multi-threaded > > > >>>> > reader > > > >>>> > > >>> operator? > > > >>>> > > >>> > > > >>>> > > >>> 2) Do we have separate reader and source operators or > not? In > > > >>>> the > > > >>>> > > >> strategy > > > >>>> > > >>> that has a separate source, the source operator has a > > > >>>> parallelism of > > > >>>> > 1 > > > >>>> > > >> and > > > >>>> > > >>> is responsible for split recovery only. > > > >>>> > > >>> > > > >>>> > > >>> For the first one, given also the constraints (blocking, > > > finite > > > >>>> > queues, > > > >>>> > > >>> etc), I do not have an answer yet. > > > >>>> > > >>> > > > >>>> > > >>> For the 2nd, I think that we should go with separate > operators > > > >>>> for > > > >>>> > the > > > >>>> > > >>> source and the readers, for the following reasons: > > > >>>> > > >>> > > > >>>> > > >>> 1) This is more aligned with a potential future > improvement > > > >>>> where the > > > >>>> > > >> split > > > >>>> > > >>> discovery becomes a responsibility of the JobManager and > > > >>>> readers are > > > >>>> > > >>> pooling more work from the JM. > > > >>>> > > >>> > > > >>>> > > >>> 2) The source is going to be the "single point of truth". > It > > > >>>> will > > > >>>> > know > > > >>>> > > >> what > > > >>>> > > >>> has been processed and what not. If the source and the > readers > > > >>>> are a > > > >>>> > > >> single > > > >>>> > > >>> operator with parallelism > 1, or in general, if the split > > > >>>> discovery > > > >>>> > is > > > >>>> > > >>> done by each task individually, then: > > > >>>> > > >>> i) we have to have a deterministic scheme for each > reader to > > > >>>> assign > > > >>>> > > >>> splits to itself (e.g. mod subtaskId). This is not > necessarily > > > >>>> > trivial > > > >>>> > > >> for > > > >>>> > > >>> all sources. > > > >>>> > > >>> ii) each reader would have to keep a copy of all its > > > processed > > > >>>> > slpits > > > >>>> > > >>> iii) the state has to be a union state with a > non-trivial > > > >>>> merging > > > >>>> > > >> logic > > > >>>> > > >>> in order to support rescaling. > > > >>>> > > >>> > > > >>>> > > >>> Two additional points that you raised above: > > > >>>> > > >>> > > > >>>> > > >>> i) The point that you raised that we need to keep all > splits > > > >>>> > (processed > > > >>>> > > >> and > > > >>>> > > >>> not-processed) I think is a bit of a strong requirement. > This > > > >>>> would > > > >>>> > > imply > > > >>>> > > >>> that for infinite sources the state will grow > indefinitely. > > > >>>> This is > > > >>>> > > >> problem > > > >>>> > > >>> is even more pronounced if we do not have a single source > that > > > >>>> > assigns > > > >>>> > > >>> splits to readers, as each reader will have its own copy > of > > > the > > > >>>> > state. > > > >>>> > > >>> > > > >>>> > > >>> ii) it is true that for finite sources we need to somehow > not > > > >>>> close > > > >>>> > the > > > >>>> > > >>> readers when the source/split discoverer finishes. The > > > >>>> > > >>> ContinuousFileReaderOperator has a work-around for that. > It is > > > >>>> not > > > >>>> > > >> elegant, > > > >>>> > > >>> and checkpoints are not emitted after closing the source, > but > > > >>>> this, I > > > >>>> > > >>> believe, is a bigger problem which requires more changes > than > > > >>>> just > > > >>>> > > >>> refactoring the source interface. > > > >>>> > > >>> > > > >>>> > > >>> Cheers, > > > >>>> > > >>> Kostas > > > >>>> > > >>> > > > >>>> > > >> > > > >>>> > > > > > >>>> > > > > > >>>> > > > > >>>> > > > >>> > > > > > >