> > Emitting records for downstream operators in or after > notifyCheckpointComplete no longer works after FLIP-147 when executing the > final checkpoint. The problem is that the final checkpoint happens after > the EOI and we would like to keep the property that you can terminate the > whole topology with a single checkpoint, if possible. >
Sorry for the confusion, I meant to say emit them after the barrier (e.g. in snapshotState). On Thu, Nov 4, 2021 at 9:49 AM Till Rohrmann <trohrm...@apache.org> wrote: > Thanks for the detailed description Arvid. > > I might misunderstand things but one comment concerning: > > | We could even optimize the writer to only emit the committables after > notifyCheckpointComplete as long as we retain them in the state of the > writer. > > Emitting records for downstream operators in or after > notifyCheckpointComplete no longer works after FLIP-147 when executing the > final checkpoint. The problem is that the final checkpoint happens after > the EOI and we would like to keep the property that you can terminate the > whole topology with a single checkpoint, if possible. > > Cheers, > Till > > On Thu, Nov 4, 2021 at 9:05 AM Arvid Heise <ar...@apache.org> wrote: > >> Hi folks, >> >> thanks for the lively discussion. Let me present my point of view on a >> couple of items: >> >> *Impact on checkpointing times* >> >> Currently, we send the committables of the writer downstream before the >> barrier is sent. That allows us to include all committables in the state of >> the committers, such that the committer receives all committables before it >> snapshots. >> >> All proposals now add a processing step on the committables where certain >> committables are virtually merged into larger committables. However, none >> of the approaches require the physical merge to happen before the barrier >> is sent downstream. In fact, we can even delay the virtual merge after the >> checkpoint has been fully taken. We could even optimize the writer to only >> emit the committables after notifyCheckpointComplete as long as we >> retain them in the state of the writer. The important point is that the >> committables become part of the checkpoint, but they are always committed >> only after notifyCheckpointComplete. Especially the retry mechanism of >> 1.14 decouples the exact time of the commit from notifyCheckpointComplete. >> The retry happens asynchronously, so there is no reason to believe that we >> can't do the merging asynchronously with any option. >> >> Naturally, all approaches delay the checkpoint barrier a bit by either >> adding RPC calls or shuffles but the impact is rather minimal in a well >> configured system (the number of committables is assumed to be tiny), so >> I'm assuming a tad higher checkpointing time because the topology is more >> complex (in all cases). >> >> *Impact on latency* >> >> All approaches will also delay the effective commit, since additionaly >> work needs to be done but I'd argue that this is by design and should be >> clear to everyone. Especially, when merging files across checkpoints, >> certain results will not be visible until much later. Once, we settle for >> an approach, we should think which options we give to sink developers and >> end-users to impact that latency. >> >> An important aspect here is that we also refine the contract on >> GlobalCommitter. Currently, it's not clear when it is supposed to be >> called; for example, to register files in a metastore. Naively, I would >> have said that the GlobalCommitter is invoked when all committables of a >> certain checkpoint have been committed. >> a) But what happens in the case of a failure and retry? Do we delay until >> the commit finally happens? >> b) What do we do with committables that are held back for compaction? Do >> we global commit when all committables of checkpoint A are committed >> ignoring small files? Or do we wait until a later checkpoint, when all >> small files of A have been merged such that indeed all data of A has been >> committed. >> >> *Re: Developers understand that the cost is relatively high.* >> >> Yes that is a valid concern that we already have with committer and >> global committer (which none of the new users understand). I don't like >> that we have so many Optional methods where it's not clear which methods to >> implement to achieve certain functionality. Ideally, we split up the Sink >> in many smaller components where you add certain traits. For example, >> HiveSink implements Sink, HasState, HasCommitter, HasCompaction, >> HasGlobalCommitter (there are probably better names for the traits) >> We still can change that as everything is experimental (and annoy >> connector devs) but it seems to be an orthogonal discussion. >> >> If we compare Option 2 with Option 3, I'd argue that the mental model of >> the connector dev is more stressed with option 3 than with 2. He needs to >> map the high-level concepts of the current Sink to the low-level concepts >> of DataStream. He needs to understand the data that is being sent between >> writer and committer to be able to hook in. >> Note that we need to move away from just sending CommT and wrap it with >> some metadata, such as checkpoint id and subtask id. >> >> The dev also needs to care about managing the state, which we may >> abstract in Option 2 (not entirely sure). And the dev needs to understand >> the task life-cycle to emit the remaining committables before the job shuts >> down (is that even possible on DataStream level? Are we notified on EOI or >> do we expect devs to use operator API?). Lastly, if things are done >> asynchronously as you have championed, the user also needs to take care of >> ensuring that all async tasks are done before shutdown. >> >> *Re 2. Repeated implementation* >> >> The newly introduced `aggregate` can set the parallelism, thus perhaps >>> `setUid`, `slotSharingGroup (including resources)`, and `maxParallelism` >>> also need to be supported? If they are supported, additional workloads >>> are >>> required, and at the same time I feel that these workloads are >>> unnecessary; >>> if not, it will also increase the developers’ understanding cost: for >>> example, why these operators can not set these attributes? >>> >> >> I'm assuming that you mean that we are about to replicate API between >> DataStream and Sink compactor? I wouldn't do that. I would actually also >> fix the parallelism to the writer's parallelism. So we just have a pre-key, >> aggregate, and post-key where we can have defaults for the keys. We should >> provide access to metadata of CommT through a context. I'd always run >> compaction in the same slot sharing group as the writer, similar to how the >> committer is run. I don't see why we should take a different route with the >> design of the compaction than with the committer where we abstract all >> these things away. >> >> Also for another >>> example, `SinkWriter` has introduced many existing (or repeated) >>> implementations in the DataStream API in order to support ‘State’, >>> ‘Timer’, >>> ‘asynchronization’, ‘subindex’, etc. >>> >> >> I agree that the repeated implementations for Timer is weird but that >> could have been solved similar to how we solved it with asynchronization >> where we did not replicate interfaces but rather pulled up the concept of >> MailboxExecutor from datastream to core. >> Having specific Context objects is imho actually a good design decision - >> RuntimeContext is heavily overloaded and shouldn't be emulated or used. >> >> >>> In addition, if a new feature is introduced on the DataStream in the >>> future, I think it may be necessary to consider the special operators of >>> the sinks separately, which may also be a burden. >>> >> >> Yes, option 2 has a higher maintenance cost on Flink since Flink need to >> provide these operators but this gives us also more control about changing >> things in the future. If we expose the elements that we exchange between >> the different sink operators, we cannot change that in the future. Now we >> can easily abstract a change in the elements away. >> Regarding new features in DataStream: I think this is actually an >> advantage. We can evolve DataStream API without thinking about how that >> could screw up potential sinks. >> >> *Re 2.Aggregator(I think 1 parallelism is OK, why is it multiple >> parallelism?)* >> >> If a user wants to aggregate with parallelism 1, he can use a constant >> key (which we should describe in the javadoc) and that should be default. >> I think that a higher parallelism makes sense if you want to compact >> across checkpoints where you probably want to aggregate the files in >> different buckets and you can process buckets in parallel. >> >> >> >> I hope to have cleared things up a bit. I guess it became obvious that I >> prefer Option 2 because it gives us more means to provide a good >> implementation and help the sink developer while also giving us flexibility >> in the future. Happy to go more into details on certain points. >> >> >> >> On Thu, Nov 4, 2021 at 7:20 AM Guowei Ma <guowei....@gmail.com> wrote: >> >>> Hi, >>> >>> Very thanks Fabian for drafting this FLIP! It looks very good to me. I >>> see >>> currently most of us agree with option 2, but I personally feel that >>> option >>> 3 may be better :-) >>> I have some small concerns for option 2 >>> >>> 1. Developers understand that the cost is relatively high. >>> The merging of small files is very important, but from the perspective of >>> sink as a whole, this requirement is only a special case of sink. We >>> expose >>> this requirement directly in the Sink API, which means that all >>> developers >>> may need to understand this "special case". From my personal point of >>> view, >>> for a developer who has no compaction requirements, seeing this API may >>> make him feel more complicated. In addition, from a higher level, the >>> first >>> level of the Sink API should have a relatively general abstraction; the >>> second level might be different abstractions according to specific types. >>> >>> 2. Repeated implementation >>> The newly introduced `aggregate` can set the parallelism, thus perhaps >>> `setUid`, `slotSharingGroup (including resources)`, and `maxParallelism` >>> also need to be supported? If they are supported, additional workloads >>> are >>> required, and at the same time I feel that these workloads are >>> unnecessary; >>> if not, it will also increase the developers’ understanding cost: for >>> example, why these operators can not set these attributes? Also for >>> another >>> example, `SinkWriter` has introduced many existing (or repeated) >>> implementations in the DataStream API in order to support ‘State’, >>> ‘Timer’, >>> ‘asynchronization’, ‘subindex’, etc. >>> In addition, if a new feature is introduced on the DataStream in the >>> future, I think it may be necessary to consider the special operators of >>> the sinks separately, which may also be a burden. >>> >>> 3. Might Blocking checkpoint >>> For option 2, we would need to actually merge files in the committer. >>> However, this might cause some problems in whether compaction blocks >>> checkpoint: suppose now our policy is to merge all the files produced in >>> 10 >>> checkpoints, if we allow users to construct the topology, they could >>> merge >>> the file asynchronously in a dedicated executor, the merge could span >>> multiple checkpoints (as long as not exceed 10), and then we emit the >>> files >>> to be renamed to the committer. But if we merge in a committer, it has to >>> be done before the next checkpoint since we do not support asynchronous >>> commits now. But with option3, this can be done at present. >>> >>> >>> In addition, The FLIP mentioned that option 3 has a certain burden on >>> developers from the perspective of stream batch unification. Could you >>> enlighten me a little about what the burden is? I think that the current >>> APIs of DataStream are all stream-batch unification, and the topology in >>> Opiont3 is also done with `DataStream`, so I understand that developers >>> will not have any additional burdens. Or what is missing >>> >>> Best, >>> Guowei >>> >>> >>> On Thu, Nov 4, 2021 at 11:16 AM Jingsong Li <jingsongl...@gmail.com> >>> wrote: >>> >>> > Hi Fabian, >>> > >>> > Thanks for drafting the FLIP! >>> > >>> > ## Few thoughts of user requirements >>> > >>> > 1.compact files from multiple checkpoints >>> > >>> > This is what users need very much. >>> > >>> > 2.The compaction block the checkpointing >>> > >>> > - Some scenarios are required. For example, the user expects the >>> > output data to be consistent with the input data. If the checkpoint >>> > succeeds, it needs to see how much data is output. Otherwise, the user >>> > restarts a job to consume the same source offset, and he may lose >>> > data. >>> > - But I think in most cases, users are concerned about this, and we >>> > can delay the data to be visible. >>> > >>> > 3.The end-to-end latency of data >>> > >>> > This also depends on the situation. >>> > - Some user delays are very important. We'd better compact the data at >>> > the current checkpoint, even if it affects the checkpoint delay. >>> > - Some users think that the delay doesn't matter (the delay is at the >>> > minute level). As long as you compact the file, it won't bring me >>> > trouble with small files. >>> > >>> > So I think flexibility is important. >>> > >>> > ## Few thoughts on the option 2 >>> > >>> > Option1 and option2 seem to be just the difference between the >>> > aggregator in the middle, whether it is a separate operator or a >>> > coordinator. >>> > >>> > I would be prefer to option 2. >>> > >>> > If I understand correctly, the whole process should be as follows: >>> > >>> > 1.SinkWriter -> >>> > 2.Aggregator(I think 1 parallelism is OK, why is it multiple >>> parallelism?) >>> > -> >>> > 3.LocalCommitter (Do compaction works) -> >>> > 4.GlobalCommitter >>> > >>> > The Aggregator can control single checkpoint compaction or cross >>> > checkpoint compaction. Controlling block or not block the current >>> > checkpoint. Controlling the end-to-end latency of data is how many >>> > checkpoints to wait for. >>> > >>> > Best, >>> > Jingsong >>> > >>> > On Wed, Nov 3, 2021 at 11:01 PM Fabian Paul <fabianp...@ververica.com> >>> > wrote: >>> > > >>> > > >>> > > Hi David and Till, >>> > > >>> > > Thanks for your great feedback. One definitely confusing point in the >>> > FLIP is who is doing the actual compaction. The compaction will not be >>> done >>> > > by the CommittableAggregator operator but the committers so it should >>> > also not affect the checkpointing duration or have a significant >>> performance >>> > > bottleneck because the committers are executed in parallel (also in >>> > batch mode [1]). >>> > > >>> > > I will update the FLIP to clarify it. >>> > > >>> > > > From your description I would be in favour of option 2 for the >>> > following >>> > > > reasons: Assuming that option 2 solves all our current problems, it >>> > seems >>> > > > like the least invasive change and smallest in scope. Your main >>> > concern is >>> > > > that it might not cover future use cases. Do you have some >>> specific use >>> > > > cases in mind? >>> > > >>> > > No, I do not have anything specific in mind I just wanted to raise >>> the >>> > point that adding more and more operators to the sink might complicate >>> the >>> > > development in the future that they can all be used together. >>> > > >>> > > > What I am missing a bit >>> > > > from the description is how option 2 will behave wrt checkpoints >>> and >>> > the >>> > > > batch execution mode. >>> > > >>> > > My idea was to always invoke CommittableAggregate#aggregate on a >>> > checkpoint and endOfInput. In the batch case the aggregation is only >>> done >>> > > once on all committables. >>> > > >>> > > >>> > > > Few thoughts on the option 2) >>> > > > >>> > > > The file compaction is by definition quite costly IO bound >>> operation. >>> > If I >>> > > > understand the proposal correctly, the aggregation itself would run >>> > during >>> > > > operator (aggregator) checkpoint. Would this significantly >>> increase the >>> > > > checkpoint duration? >>> > > > >>> > > > Compaction between different sub-tasks incur additional network IO >>> (to >>> > > > fetch the raw non-compacted files from the remote filesystem), so >>> this >>> > > > could quickly become a bottleneck. Basically we're decreasing the >>> sink >>> > > > parallelism (possible throughput) to parallelism of the aggregator. >>> > > >>> > > Hopefully these concerns are covered by the explanation at the >>> beginning. >>> > > >>> > > > To be really effective here, compaction would ideally be able to >>> > compact >>> > > > files from multiple checkpoints. However there is a huge tradeoff >>> > between >>> > > > latency a efficiency (especially with exactly once). Is this >>> something >>> > > > worth exploring? >>> > > >>> > > I agree with you by enabling the compaction across checkpoint the >>> > latency can increase because files might be committed several >>> checkpoints >>> > > later. I guess the best we can do is to let the user configure the >>> > behaviour. By configuring the checkpointing interval and the wanted >>> file >>> > size the >>> > > user can already affect the latency. >>> > > Is this answering you questions? I am not fully sure what you are >>> > referring to with efficiency. @dvmk >>> > > >>> > > > I hope that with option 2, we can support both use cases: single >>> task >>> > > compaction as well as cross task compaction if needed. Similarly for >>> > single >>> > > checkpoint compaction as well as cross checkpoint compaction. >>> > > >>> > > Compaction across subtasks should be controllable by the parallelism >>> of >>> > the commttableAggregator operator i.e. a parallelism of 2 can reduce >>> > > the computational complexity but might not compute the best >>> compaction. >>> > > >>> > > Best, >>> > > Fabian >>> > > >>> > > [1] https://github.com/apache/flink/pull/17536 < >>> > https://github.com/apache/flink/pull/17536>) >>> > >>> > >>> > >>> > -- >>> > Best, Jingsong Lee >>> > >>> >>