Hi Fabian, quick question on your comment 3. If there is a pipelined data exchange with a keyBy between the writers/committers and the component that does the global commit, then there will only be a single failover region. So is it correct that you assumed blocking data exchanges for the scenario you described?
Cheers, Till On Thu, Dec 16, 2021 at 9:23 AM Fabian Paul <fp...@apache.org> wrote: > Hi Yun, > > Thanks for your fast feedback. Let me clarify your points. > > 1. We solve it by using StreamExchangeMode.BATCH before any exchange. > That obviously doesn’t help with lost TM but we would need to employ > HA storage for that. Same issue as now and orthogonal. > > 2. Extending V1 with V2 or vice versa would require renames of methods > (since return types are non-optional) and is making API changes. Even > though Experimental, we want to give connector developers the > opportunity to provide 1 implementation for all of Flink 1.X. We will > offer an internal adapter from V1 to V2, 2 sinkTo , and internally > just have one code-path. > > 3. DataStreamSink would act as a unified view on all the operators and > update them all at once when using setParallelism and so on (setName > and setUid will receive suffixes per operator). > Iceberg actually has a different requirement: They want to have a > committer with parallelism 1 but as a coordinator such that > embarrassingly parallel pipelines have different fail-over regions. I > was thinking that in this case, they need to implement a no-op > committer (that just forwards the committables) and use a post-commit > topology that achieves that. > Another option is that they use the preCommit topology and insert a > constant key-by that forwards all committables to a single committer. > We are planning to provide building blocks for such pipelines as we > go. > > Best, > Fabian > > On Thu, Dec 16, 2021 at 5:50 AM Yun Gao <yungao...@aliyun.com> wrote: > > > > Hi Fabian, > > > > Very thanks for the update! I think the latest version in general looks > good from my side > > and I think using separate feature interface would be much more easy to > understand > > and extend in the future. I have some pending issues on the details > though: > > > > 1. The first one is if we could support end-to-end exactly-once with > post-committing > > topology in the batch mode ? Since for the batch mode, currently we > could only commit > > all the transactions after the whole job is finished, otherwise if > there are JM failover or the > > writer / committer get restarted due to indeterminstic (A -> [B1, B2], > A, B1 have finished and > > B2 failed, if -> is rebalance / random / rescale, all of A, B1, B2 > would restarted), there might > > be repeat records. Previously one possible thought is to move committer > and global committer > > to the operator coordinator, but if it is a topology, we might need > some other kind of solutions? > > > > 2. I also want to have a dobule confirmation with the compatibility: > since the old sink is also named > > with Sink, do we want to put the Sink v2 in a new package ? Besides, > since we might want to keep > > only have one `sinkTo(Sink<?> sink)` , perhaps we also need to make the > Sink v1 to be a subclass of > > Sink v2 and extends the stateful and two-phase-commit sinks, right? > > > > 3. I'd like also have a confirmation on ours thoughts with the > `DataStreamSink` returned by the sinkTo method: > > The main issue is how do we implement the method like `setParallelism` > or `setMaxParallelism` since now the sink > > would be translated to multiple transformations? perhaps we could make > it the default values for all the transformations > > for the sink? A related issue would be for iceberg sink, I think it > would need to have only one committer to avoid the > > competition of the optimistic locks (which would cause performance > degradation), then it might need to have N writers > > with 1 committers, to build such topology, perhaps we might need to add > new methods to specify the parallelism of > > the writers and committers separately? > > > > Best, > > Yun > > > > > > ------------------Original Mail ------------------ > > Sender:Fabian Paul <fp...@apache.org> > > Send Date:Mon Dec 13 23:59:43 2021 > > Recipients:dev <dev@flink.apache.org> > > Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support > small file compaction > >> > >> Hi all, > >> > >> > >> > >> After a lot of discussions with different, we received very fruitful > >> > >> feedback and reworked the ideas behind this FLIP. Initially, we had > >> > >> the impression that the compaction problem is solvable by a single > >> > >> topology that we can reuse across different sinks. We now have a > >> > >> better understanding that different external systems require different > >> > >> compaction mechanism i.e. Hive requires compaction before finally > >> > >> registering the file in the metastore or Iceberg compacts the files > >> > >> after they have been registered and just lazily compacts them. > >> > >> > >> > >> Considering all these different views we came up with a design that > >> > >> builds upon what @guowei....@gmail.com and @yungao...@aliyun.com have > >> > >> proposed at the beginning. We allow inserting custom topologies before > >> > >> and after the SinkWriters and Committers. Furthermore, we do not see > >> > >> it as a downside. The Sink interfaces that will expose the DataStream > >> > >> to the user reside in flink-streaming-java in contrast to the basic > >> > >> Sink interfaces that reside fin flink-core deem it to be only used by > >> > >> expert users. > >> > >> > >> > >> Moreover, we also wanted to remove the global committer from the > >> > >> unified Sink interfaces and replace it with a custom post-commit > >> > >> topology. Unfortunately, we cannot do it without breaking the Sink > >> > >> interface since the GlobalCommittables are part of the parameterized > >> > >> Sink interface. Thus, we propose building a new Sink V2 interface > >> > >> consisting of composable interfaces that do not offer the > >> > >> GlobalCommitter anymore. We will implement a utility to extend a Sink > >> > >> with post topology that mimics the behavior of the GlobalCommitter. > >> > >> The new Sink V2 provides the same sort of methods as the Sink V1 > >> > >> interface, so a migration of sinks that do not use the GlobalCommitter > >> > >> should be very easy. > >> > >> We plan to keep the existing Sink V1 interfaces to not break > >> > >> externally built sinks. As part of this FLIP, we migrate all the > >> > >> connectors inside of the main repository to the new Sink V2 API. > >> > >> > >> > >> The FLIP document is also updated and includes the proposed changes. > >> > >> > >> > >> Looking forward to your feedback, > >> > >> Fabian > >> > >> > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction > >> > >> > >> > >> > >> > >> On Thu, Dec 2, 2021 at 10:15 AM Roman Khachatryan wrote: > >> > >> > > >> > >> > Thanks for clarifying (I was initially confused by merging state files > >> > >> > rather than output files). > >> > >> > > >> > >> > > At some point, Flink will definitely have some WAL adapter that can > turn any sink into an exactly-once sink (with some caveats). For now, we > keep that as an orthogonal solution as it has a rather high price (bursty > workload with high latency). Ideally, we can keep the compaction > asynchronously... > >> > >> > > >> > >> > Yes, that would be something like a WAL. I agree that it would have a > >> > >> > different set of trade-offs. > >> > >> > > >> > >> > > >> > >> > Regards, > >> > >> > Roman > >> > >> > > >> > >> > On Mon, Nov 29, 2021 at 3:33 PM Arvid Heise wrote: > >> > >> > >> > >> > >> > >> > One way to avoid write-read-merge is by wrapping SinkWriter with > >> > >> > >> > another one, which would buffer input elements in a temporary > storage > >> > >> > >> > (e.g. local file) until a threshold is reached; after that, it > would > >> > >> > >> > invoke the original SinkWriter. And if a checkpoint barrier > comes in > >> > >> > >> > earlier, it would send written data to some aggregator. > >> > >> > >> > >> > >> > >> I think perhaps this seems to be a kind of WAL method? Namely we > first > >> > >> > >> write the elements to some WAL logs and persist them on checkpoint > >> > >> > >> (in snapshot or remote FS), or we directly write WAL logs to the > remote > >> > >> > >> FS eagerly. > >> > >> > >> > >> > >> > > At some point, Flink will definitely have some WAL adapter that can > turn any sink into an exactly-once sink (with some caveats). For now, we > keep that as an orthogonal solution as it has a rather high price (bursty > workload with high latency). Ideally, we can keep the compaction > asynchronously... > >> > >> > > > >> > >> > > On Mon, Nov 29, 2021 at 8:52 AM Yun Gao wrote: > >> > >> > >> > >> > >> > >> Hi, > >> > >> > >> > >> > >> > >> @Roman very sorry for the late response for a long time, > >> > >> > >> > >> > >> > >> > Merging artifacts from multiple checkpoints would apparently > >> > >> > >> require multiple concurrent checkpoints > >> > >> > >> > >> > >> > >> I think it might not need concurrent checkpoints: suppose some > >> > >> > >> operators (like the committer aggregator in the option 2) maintains > >> > >> > >> the list of files to merge, it could stores the lists of files to > merge > >> > >> > >> in the states, then after several checkpoints are done and we have > >> > >> > >> enough files, we could merge all the files in the list. > >> > >> > >> > >> > >> > >> > Asynchronous merging in an aggregator would require some > resolution > >> > >> > >> > logic on recovery, so that a merged artifact can be used if the > >> > >> > >> > original one was deleted. Otherwise, wouldn't recovery fail > because > >> > >> > >> > some artifacts are missing? > >> > >> > >> > We could also defer deletion until the "compacted" checkpoint is > >> > >> > >> > subsumed - but isn't it too late, as it will be deleted anyways > once > >> > >> > >> > subsumed? > >> > >> > >> > >> > >> > >> I think logically we could delete the original files once the > "compacted" checkpoint > >> > >> > >> (which finish merging the compacted files and record it in the > checkpoint) is completed > >> > >> > >> in all the options. If there are failover before we it, we could > restart the merging and if > >> > >> > >> there are failover after it, we could have already recorded the > files in the checkpoint. > >> > >> > >> > >> > >> > >> > One way to avoid write-read-merge is by wrapping SinkWriter with > >> > >> > >> > another one, which would buffer input elements in a temporary > storage > >> > >> > >> > (e.g. local file) until a threshold is reached; after that, it > would > >> > >> > >> > invoke the original SinkWriter. And if a checkpoint barrier > comes in > >> > >> > >> > earlier, it would send written data to some aggregator. > >> > >> > >> > >> > >> > >> I think perhaps this seems to be a kind of WAL method? Namely we > first > >> > >> > >> write the elements to some WAL logs and persist them on checkpoint > >> > >> > >> (in snapshot or remote FS), or we directly write WAL logs to the > remote > >> > >> > >> FS eagerly. > >> > >> > >> > >> > >> > >> Sorry if I do not understand correctly somewhere. > >> > >> > >> > >> > >> > >> Best, > >> > >> > >> Yun > >> > >> > >> > >> > >> > >> > >> > >> > >> ------------------------------------------------------------------ > >> > >> > >> From:Roman Khachatryan > >> > >> > >> Send Time:2021 Nov. 9 (Tue.) 22:03 > >> > >> > >> To:dev > >> > >> > >> Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to > support small file compaction > >> > >> > >> > >> > >> > >> Hi everyone, > >> > >> > >> > >> > >> > >> Thanks for the proposal and the discussion, I have some remarks: > >> > >> > >> (I'm not very familiar with the new Sink API but I thought about > the > >> > >> > >> same problem in context of the changelog state backend) > >> > >> > >> > >> > >> > >> 1. Merging artifacts from multiple checkpoints would apparently > >> > >> > >> require multiple concurrent checkpoints (otherwise, a new > checkpoint > >> > >> > >> won't be started before completing the previous one; and the > previous > >> > >> > >> one can't be completed before durably storing the artifacts). > However, > >> > >> > >> concurrent checkpoints are currently not supported with Unaligned > >> > >> > >> checkpoints (this is besides increasing e2e-latency). > >> > >> > >> > >> > >> > >> 2. Asynchronous merging in an aggregator would require some > resolution > >> > >> > >> logic on recovery, so that a merged artifact can be used if the > >> > >> > >> original one was deleted. Otherwise, wouldn't recovery fail because > >> > >> > >> some artifacts are missing? > >> > >> > >> We could also defer deletion until the "compacted" checkpoint is > >> > >> > >> subsumed - but isn't it too late, as it will be deleted anyways > once > >> > >> > >> subsumed? > >> > >> > >> > >> > >> > >> 3. Writing small files, then reading and merging them for *every* > >> > >> > >> checkpoint seems worse than only reading them on recovery. I guess > I'm > >> > >> > >> missing some cases of reading, so to me it would make sense to > mention > >> > >> > >> these cases explicitly in the FLIP motivation section. > >> > >> > >> > >> > >> > >> 4. One way to avoid write-read-merge is by wrapping SinkWriter with > >> > >> > >> another one, which would buffer input elements in a temporary > storage > >> > >> > >> (e.g. local file) until a threshold is reached; after that, it > would > >> > >> > >> invoke the original SinkWriter. And if a checkpoint barrier comes > in > >> > >> > >> earlier, it would send written data to some aggregator. It will > >> > >> > >> increase checkpoint delay (async phase) compared to the current > Flink; > >> > >> > >> but not compared to the write-read-merge solution, IIUC. > >> > >> > >> Then such "BufferingSinkWriters" could aggregate input elements > from > >> > >> > >> each other, potentially recursively (I mean something like > >> > >> > >> > https://cwiki.apache.org/confluence/download/attachments/173082889/DSTL-DFS-DAG.png > >> > >> > >> ) > >> > >> > >> > >> > >> > >> 5. Reducing the number of files by reducing aggregator parallelism > as > >> > >> > >> opposed to merging on reaching size threshold will likely be less > >> > >> > >> optimal and more difficult to configure. OTH, thresholds might be > more > >> > >> > >> difficult to implement and (with recursive merging) would incur > higher > >> > >> > >> latency. Maybe that's also something to decide explicitly or at > least > >> > >> > >> mention in the FLIP. > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> Regards, > >> > >> > >> Roman > >> > >> > >> > >> > >> > >> > >> > >> > >> On Tue, Nov 9, 2021 at 5:23 AM Reo Lei wrote: > >> > >> > >> > > >> > >> > >> > Hi Fabian, > >> > >> > >> > > >> > >> > >> > Thanks for drafting the FLIP and trying to support small file > compaction. I > >> > >> > >> > think this feature is very urgent and valuable for users(at > least for me). > >> > >> > >> > > >> > >> > >> > Currently I am trying to support streaming rewrite(compact) for > Iceberg on > >> > >> > >> > PR#3323 . As Steven mentioned, > >> > >> > >> > Iceberg sink and compact data through the following steps: > >> > >> > >> > Step-1: Some parallel data writer(sinker) to write streaming > data as files. > >> > >> > >> > Step-2: A single parallelism data files committer to commit the > completed > >> > >> > >> > files as soon as possible to make them available. > >> > >> > >> > Step-3: Some parallel file rewriter(compactor) to collect > committed files > >> > >> > >> > from multiple checkpoints, and rewriter(compact) them together > once the > >> > >> > >> > total file size or number of files reach the threshold. > >> > >> > >> > Step-4: A single parallelism rewrite(compact) result committer > to commit > >> > >> > >> > the rewritten(compacted) files to replace the old files and make > them > >> > >> > >> > available. > >> > >> > >> > > >> > >> > >> > > >> > >> > >> > If Flink want to support small file compaction, some key point I > think is > >> > >> > >> > necessary: > >> > >> > >> > > >> > >> > >> > 1, Compact files from multiple checkpoints. > >> > >> > >> > I totally agree with Jingsong, because completed file size > usually could > >> > >> > >> > not reach the threshold in a single checkpoint. Especially for > partitioned > >> > >> > >> > table, we need to compact the files of each partition, but > usually the file > >> > >> > >> > size of each partition will be different and may not reach the > merge > >> > >> > >> > threshold. If we compact these files, in a single checkpoint, > regardless of > >> > >> > >> > whether the total file size reaches the threshold, then the > value of > >> > >> > >> > compacting will be diminished and we will still get small files > because > >> > >> > >> > these compacted files are not reach to target size. So we need > the > >> > >> > >> > compactor to collect committed files from multiple checkpoints > and compact > >> > >> > >> > them until they reach the threshold. > >> > >> > >> > > >> > >> > >> > 2, Separate write phase and compact phase. > >> > >> > >> > Users usually hope the data becomes available as soon as > possible, and the > >> > >> > >> > end-to-end latency is very important. I think we need to > separate the > >> > >> > >> > write and compact phase. For the write phase, there include the > Step-1 > >> > >> > >> > and Step-2, we sink data as file and commit it pre checkpoint > and regardless > >> > >> > >> > of whether the file size it is. That could ensure the data will > be > >> > >> > >> > available ASAP. For the compact phase, there include the Step-3 > >> > >> > >> > and Step-4, the compactor should collect committed files from > multiple > >> > >> > >> > checkpoints and compact them asynchronously once they reach the > threshold, > >> > >> > >> > and the compact committer will commit the compaction result in > the next > >> > >> > >> > checkpoint. We compact the committed files asynchronously > because we don't > >> > >> > >> > want the compaction to affect the data sink or the whole > pipeline. > >> > >> > >> > > >> > >> > >> > 3, Exactly once guarantee between write and compact phase. > >> > >> > >> > Once we separate write phase and compact phase, we need to > consider > >> > >> > >> > how to guarantee > >> > >> > >> > the exact once semantic between two phases. We should not lose > any data or > >> > >> > >> > files on the compactor(Step-3) in any case and cause the > compaction result > >> > >> > >> > to be inconsistent with before. I think flink should provide an > easy-to-use > >> > >> > >> > interface to make that easier. > >> > >> > >> > > >> > >> > >> > 4, Metadata operation and compaction result validation. > >> > >> > >> > In the compact phase, there may be not only compact files, but > also a lot > >> > >> > >> > of metadata operations, such as the iceberg needing to > read/write manifest > >> > >> > >> > and do MOR. And we need some interface to support users to do > some > >> > >> > >> > validation of the compaction result. I think these points should > be > >> > >> > >> > considered when we design the compaction API. > >> > >> > >> > > >> > >> > >> > > >> > >> > >> > Back to FLIP-191, option 1 looks very complicated while option 2 > is > >> > >> > >> > relatively simple, but neither of these two solutions separates > the write > >> > >> > >> > phase from the compact phase. So I think we should consider the > points I > >> > >> > >> > mentioned above. And if you have any other questions you can > always feel > >> > >> > >> > free to reach out to me! > >> > >> > >> > > >> > >> > >> > BR, > >> > >> > >> > Reo > >> > >> > >> > > >> > >> > >> > Fabian Paul 于2021年11月8日周一 下午7:59写道: > >> > >> > >> > > >> > >> > >> > > Hi all, > >> > >> > >> > > > >> > >> > >> > > Thanks for the lively discussions. I am really excited to see > so many > >> > >> > >> > > people > >> > >> > >> > > participating in this thread. It also underlines the need that > many people > >> > >> > >> > > would > >> > >> > >> > > like to see a solution soon. > >> > >> > >> > > > >> > >> > >> > > I have updated the FLIP and removed the parallelism > configuration because > >> > >> > >> > > it is > >> > >> > >> > > unnecessary since users can configure a constant exchange key > to send all > >> > >> > >> > > committables to only one committable aggregator. > >> > >> > >> > > > >> > >> > >> > > > >> > >> > >> > > 1. Burden for developers w.r.t batch stream unification. > >> > >> > >> > > > >> > >> > >> > > @yun @guowei, from a theoretical point you are right about > exposing the > >> > >> > >> > > DataStream > >> > >> > >> > > API in the sink users have the full power to write correct > batch and > >> > >> > >> > > streaming > >> > >> > >> > > sinks. I think in reality a lot of users still struggle to > build pipelines > >> > >> > >> > > with > >> > >> > >> > > i.e. the operator pipeline which works correct in streaming > and batch mode. > >> > >> > >> > > Another problem I see is by exposing more deeper concepts is > that we > >> > >> > >> > > cannot do > >> > >> > >> > > any optimization because we cannot reason about how sinks are > built in the > >> > >> > >> > > future. > >> > >> > >> > > > >> > >> > >> > > We should also try to steer users towards using only > `Functions` to give > >> > >> > >> > > us more > >> > >> > >> > > flexibility to swap the internal operator representation. I > agree with > >> > >> > >> > > @yun we > >> > >> > >> > > should try to make the `ProcessFunction` more versatile to > work on that > >> > >> > >> > > goal but > >> > >> > >> > > I see this as unrelated to the FLIP. > >> > >> > >> > > > >> > >> > >> > > > >> > >> > >> > > 2. Regarding Commit / Global commit > >> > >> > >> > > > >> > >> > >> > > I envision the global committer to be specific depending on > the data lake > >> > >> > >> > > solution you want to write to. However, it is entirely > orthogonal to the > >> > >> > >> > > compaction. > >> > >> > >> > > Currently, I do not expect any changes w.r.t the Global commit > introduces > >> > >> > >> > > by > >> > >> > >> > > this FLIP. > >> > >> > >> > > > >> > >> > >> > > > >> > >> > >> > > 3. Regarding the case of trans-checkpoints merging > >> > >> > >> > > > >> > >> > >> > > @yun, as user, I would expect that if the committer receives > in a > >> > >> > >> > > checkpoint files > >> > >> > >> > > to merge/commit that these are also finished when the > checkpoint finishes. > >> > >> > >> > > I think all sinks rely on this principle currently i.e., > KafkaSink needs to > >> > >> > >> > > commit all open transactions until the next checkpoint can > happen. > >> > >> > >> > > > >> > >> > >> > > Maybe in the future, we can somehow move the Committer#commit > call to an > >> > >> > >> > > asynchronous execution, but we should discuss it as a separate > thread. > >> > >> > >> > > > >> > >> > >> > > > We probably should first describe the different causes of > small files and > >> > >> > >> > > > what problems was this proposal trying to solve. I wrote a > data shuffling > >> > >> > >> > > > proposal [1] for Flink Iceberg sink (shared with Iceberg > community [2]). > >> > >> > >> > > It > >> > >> > >> > > > can address small files problems due to skewed data > distribution across > >> > >> > >> > > > Iceberg table partitions. Streaming shuffling before writers > (to files) > >> > >> > >> > > is > >> > >> > >> > > > typically more efficient than post-write file compaction > (which involves > >> > >> > >> > > > read-merge-write). It is usually cheaper to prevent a > problem (small > >> > >> > >> > > files) > >> > >> > >> > > > than fixing it. > >> > >> > >> > > > >> > >> > >> > > > >> > >> > >> > > @steven you are raising a good point, although I think only > using a > >> > >> > >> > > customizable > >> > >> > >> > > shuffle won't address the generation of small files. One > assumption is that > >> > >> > >> > > at least the sink generates one file per subtask, which can > already be too > >> > >> > >> > > many. > >> > >> > >> > > Another problem is that with low checkpointing intervals, the > files do not > >> > >> > >> > > meet > >> > >> > >> > > the required size. The latter point is probably addressable by > changing the > >> > >> > >> > > checkpoint interval, which might be inconvenient for some > users. > >> > >> > >> > > > >> > >> > >> > > > The sink coordinator checkpoint problem (mentioned in option > 1) would be > >> > >> > >> > > > great if Flink can address it. In the spirit of source > >> > >> > >> > > (enumerator-reader) > >> > >> > >> > > > and sink (writer-coordinator) duality, sink coordinator > checkpoint should > >> > >> > >> > > > happen after the writer operator. This would be a natural > fit to support > >> > >> > >> > > > global committer in FLIP-143. It is probably an orthogonal > matter to this > >> > >> > >> > > > proposal. > >> > >> > >> > > > >> > >> > >> > > > >> > >> > >> > > To me the question here is what are the benefits of having a > coordinator in > >> > >> > >> > > comparison to a global committer/aggregator operator. > >> > >> > >> > > > >> > >> > >> > > > Personally, I am usually in favor of keeping streaming > ingestion (to data > >> > >> > >> > > > lake) relatively simple and stable. Also sometimes > compaction and sorting > >> > >> > >> > > > are performed together in data rewrite maintenance jobs to > improve read > >> > >> > >> > > > performance. In that case, the value of compacting (in Flink > streaming > >> > >> > >> > > > ingestion) diminishes. > >> > >> > >> > > > >> > >> > >> > > > >> > >> > >> > > I agree it is always possible to have scheduled maintenance > jobs keeping > >> > >> > >> > > care of > >> > >> > >> > > your data i.e., doing compaction. Unfortunately, the downside > is that you > >> > >> > >> > > have to your data after it is already available for other > downstream > >> > >> > >> > > consumers. > >> > >> > >> > > I guess this can lead to all kinds of visibility problems. I > am also > >> > >> > >> > > surprised that > >> > >> > >> > > you personally are a fan of this approach and, on the other > hand, are > >> > >> > >> > > developing > >> > >> > >> > > the Iceberg sink, which goes somewhat against your mentioned > principle of > >> > >> > >> > > keeping > >> > >> > >> > > the sink simple. > >> > >> > >> > > > >> > >> > >> > > > Currently, it is unclear from the doc and this thread where > the > >> > >> > >> > > compaction > >> > >> > >> > > > is actually happening. Jingsong's reply described one model > >> > >> > >> > > > writer (parallel) -> aggregator (single-parallelism > compaction planner) > >> > >> > >> > > -> > >> > >> > >> > > > compactor (parallel) -> global committer (single-parallelism) > >> > >> > >> > > > >> > >> > >> > > > >> > >> > >> > > My idea of the topology is very similar to the one outlined by > Jinsong. The > >> > >> > >> > > compaction will happen in the committer operator. > >> > >> > >> > > > >> > >> > >> > > > > >> > >> > >> > > > In the Iceberg community, the following model has been > discussed. It is > >> > >> > >> > > > better for Iceberg because it won't delay the data > availability. > >> > >> > >> > > > writer (parallel) -> global committer for append (single > parallelism) -> > >> > >> > >> > > > compactor (parallel) -> global committer for rewrite commit > (single > >> > >> > >> > > > parallelism) > >> > >> > >> > > > >> > >> > >> > > > >> > >> > >> > > From a quick glimpse, it seems that the exact same topology is > possible to > >> > >> > >> > > express with the committable aggregator, but this definitely > depends on > >> > >> > >> > > the exact > >> > >> > >> > > setup. > >> > >> > >> > > > >> > >> > >> > > Best, > >> > >> > >> > > Fabian > >> > >> > >> >