Thanks for clarifying (I was initially confused by merging state files rather than output files).
> At some point, Flink will definitely have some WAL adapter that can turn any > sink into an exactly-once sink (with some caveats). For now, we keep that as > an orthogonal solution as it has a rather high price (bursty workload with > high latency). Ideally, we can keep the compaction asynchronously... Yes, that would be something like a WAL. I agree that it would have a different set of trade-offs. Regards, Roman On Mon, Nov 29, 2021 at 3:33 PM Arvid Heise <ar...@apache.org> wrote: >> >> > One way to avoid write-read-merge is by wrapping SinkWriter with >> > another one, which would buffer input elements in a temporary storage >> > (e.g. local file) until a threshold is reached; after that, it would >> > invoke the original SinkWriter. And if a checkpoint barrier comes in >> > earlier, it would send written data to some aggregator. >> >> I think perhaps this seems to be a kind of WAL method? Namely we first >> write the elements to some WAL logs and persist them on checkpoint >> (in snapshot or remote FS), or we directly write WAL logs to the remote >> FS eagerly. >> > At some point, Flink will definitely have some WAL adapter that can turn any > sink into an exactly-once sink (with some caveats). For now, we keep that as > an orthogonal solution as it has a rather high price (bursty workload with > high latency). Ideally, we can keep the compaction asynchronously... > > On Mon, Nov 29, 2021 at 8:52 AM Yun Gao <yungao...@aliyun.com.invalid> wrote: >> >> Hi, >> >> @Roman very sorry for the late response for a long time, >> >> > Merging artifacts from multiple checkpoints would apparently >> require multiple concurrent checkpoints >> >> I think it might not need concurrent checkpoints: suppose some >> operators (like the committer aggregator in the option 2) maintains >> the list of files to merge, it could stores the lists of files to merge >> in the states, then after several checkpoints are done and we have >> enough files, we could merge all the files in the list. >> >> > Asynchronous merging in an aggregator would require some resolution >> > logic on recovery, so that a merged artifact can be used if the >> > original one was deleted. Otherwise, wouldn't recovery fail because >> > some artifacts are missing? >> > We could also defer deletion until the "compacted" checkpoint is >> > subsumed - but isn't it too late, as it will be deleted anyways once >> > subsumed? >> >> I think logically we could delete the original files once the "compacted" >> checkpoint >> (which finish merging the compacted files and record it in the checkpoint) >> is completed >> in all the options. If there are failover before we it, we could restart the >> merging and if >> there are failover after it, we could have already recorded the files in the >> checkpoint. >> >> > One way to avoid write-read-merge is by wrapping SinkWriter with >> > another one, which would buffer input elements in a temporary storage >> > (e.g. local file) until a threshold is reached; after that, it would >> > invoke the original SinkWriter. And if a checkpoint barrier comes in >> > earlier, it would send written data to some aggregator. >> >> I think perhaps this seems to be a kind of WAL method? Namely we first >> write the elements to some WAL logs and persist them on checkpoint >> (in snapshot or remote FS), or we directly write WAL logs to the remote >> FS eagerly. >> >> Sorry if I do not understand correctly somewhere. >> >> Best, >> Yun >> >> >> ------------------------------------------------------------------ >> From:Roman Khachatryan <ro...@apache.org> >> Send Time:2021 Nov. 9 (Tue.) 22:03 >> To:dev <dev@flink.apache.org> >> Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support >> small file compaction >> >> Hi everyone, >> >> Thanks for the proposal and the discussion, I have some remarks: >> (I'm not very familiar with the new Sink API but I thought about the >> same problem in context of the changelog state backend) >> >> 1. Merging artifacts from multiple checkpoints would apparently >> require multiple concurrent checkpoints (otherwise, a new checkpoint >> won't be started before completing the previous one; and the previous >> one can't be completed before durably storing the artifacts). However, >> concurrent checkpoints are currently not supported with Unaligned >> checkpoints (this is besides increasing e2e-latency). >> >> 2. Asynchronous merging in an aggregator would require some resolution >> logic on recovery, so that a merged artifact can be used if the >> original one was deleted. Otherwise, wouldn't recovery fail because >> some artifacts are missing? >> We could also defer deletion until the "compacted" checkpoint is >> subsumed - but isn't it too late, as it will be deleted anyways once >> subsumed? >> >> 3. Writing small files, then reading and merging them for *every* >> checkpoint seems worse than only reading them on recovery. I guess I'm >> missing some cases of reading, so to me it would make sense to mention >> these cases explicitly in the FLIP motivation section. >> >> 4. One way to avoid write-read-merge is by wrapping SinkWriter with >> another one, which would buffer input elements in a temporary storage >> (e.g. local file) until a threshold is reached; after that, it would >> invoke the original SinkWriter. And if a checkpoint barrier comes in >> earlier, it would send written data to some aggregator. It will >> increase checkpoint delay (async phase) compared to the current Flink; >> but not compared to the write-read-merge solution, IIUC. >> Then such "BufferingSinkWriters" could aggregate input elements from >> each other, potentially recursively (I mean something like >> https://cwiki.apache.org/confluence/download/attachments/173082889/DSTL-DFS-DAG.png >> ) >> >> 5. Reducing the number of files by reducing aggregator parallelism as >> opposed to merging on reaching size threshold will likely be less >> optimal and more difficult to configure. OTH, thresholds might be more >> difficult to implement and (with recursive merging) would incur higher >> latency. Maybe that's also something to decide explicitly or at least >> mention in the FLIP. >> >> >> >> Regards, >> Roman >> >> >> On Tue, Nov 9, 2021 at 5:23 AM Reo Lei <leinuo...@gmail.com> wrote: >> > >> > Hi Fabian, >> > >> > Thanks for drafting the FLIP and trying to support small file compaction. I >> > think this feature is very urgent and valuable for users(at least for me). >> > >> > Currently I am trying to support streaming rewrite(compact) for Iceberg on >> > PR#3323 <https://github.com/apache/iceberg/pull/3323>. As Steven mentioned, >> > Iceberg sink and compact data through the following steps: >> > Step-1: Some parallel data writer(sinker) to write streaming data as files. >> > Step-2: A single parallelism data files committer to commit the completed >> > files as soon as possible to make them available. >> > Step-3: Some parallel file rewriter(compactor) to collect committed files >> > from multiple checkpoints, and rewriter(compact) them together once the >> > total file size or number of files reach the threshold. >> > Step-4: A single parallelism rewrite(compact) result committer to commit >> > the rewritten(compacted) files to replace the old files and make them >> > available. >> > >> > >> > If Flink want to support small file compaction, some key point I think is >> > necessary: >> > >> > 1, Compact files from multiple checkpoints. >> > I totally agree with Jingsong, because completed file size usually could >> > not reach the threshold in a single checkpoint. Especially for partitioned >> > table, we need to compact the files of each partition, but usually the file >> > size of each partition will be different and may not reach the merge >> > threshold. If we compact these files, in a single checkpoint, regardless of >> > whether the total file size reaches the threshold, then the value of >> > compacting will be diminished and we will still get small files because >> > these compacted files are not reach to target size. So we need the >> > compactor to collect committed files from multiple checkpoints and compact >> > them until they reach the threshold. >> > >> > 2, Separate write phase and compact phase. >> > Users usually hope the data becomes available as soon as possible, and the >> > end-to-end latency is very important. I think we need to separate the >> > write and compact phase. For the write phase, there include the Step-1 >> > and Step-2, we sink data as file and commit it pre checkpoint and >> > regardless >> > of whether the file size it is. That could ensure the data will be >> > available ASAP. For the compact phase, there include the Step-3 >> > and Step-4, the compactor should collect committed files from multiple >> > checkpoints and compact them asynchronously once they reach the threshold, >> > and the compact committer will commit the compaction result in the next >> > checkpoint. We compact the committed files asynchronously because we don't >> > want the compaction to affect the data sink or the whole pipeline. >> > >> > 3, Exactly once guarantee between write and compact phase. >> > Once we separate write phase and compact phase, we need to consider >> > how to guarantee >> > the exact once semantic between two phases. We should not lose any data or >> > files on the compactor(Step-3) in any case and cause the compaction result >> > to be inconsistent with before. I think flink should provide an easy-to-use >> > interface to make that easier. >> > >> > 4, Metadata operation and compaction result validation. >> > In the compact phase, there may be not only compact files, but also a lot >> > of metadata operations, such as the iceberg needing to read/write manifest >> > and do MOR. And we need some interface to support users to do some >> > validation of the compaction result. I think these points should be >> > considered when we design the compaction API. >> > >> > >> > Back to FLIP-191, option 1 looks very complicated while option 2 is >> > relatively simple, but neither of these two solutions separates the write >> > phase from the compact phase. So I think we should consider the points I >> > mentioned above. And if you have any other questions you can always feel >> > free to reach out to me! >> > >> > BR, >> > Reo >> > >> > Fabian Paul <fabianp...@ververica.com> 于2021年11月8日周一 下午7:59写道: >> > >> > > Hi all, >> > > >> > > Thanks for the lively discussions. I am really excited to see so many >> > > people >> > > participating in this thread. It also underlines the need that many >> > > people >> > > would >> > > like to see a solution soon. >> > > >> > > I have updated the FLIP and removed the parallelism configuration because >> > > it is >> > > unnecessary since users can configure a constant exchange key to send all >> > > committables to only one committable aggregator. >> > > >> > > >> > > 1. Burden for developers w.r.t batch stream unification. >> > > >> > > @yun @guowei, from a theoretical point you are right about exposing the >> > > DataStream >> > > API in the sink users have the full power to write correct batch and >> > > streaming >> > > sinks. I think in reality a lot of users still struggle to build >> > > pipelines >> > > with >> > > i.e. the operator pipeline which works correct in streaming and batch >> > > mode. >> > > Another problem I see is by exposing more deeper concepts is that we >> > > cannot do >> > > any optimization because we cannot reason about how sinks are built in >> > > the >> > > future. >> > > >> > > We should also try to steer users towards using only `Functions` to give >> > > us more >> > > flexibility to swap the internal operator representation. I agree with >> > > @yun we >> > > should try to make the `ProcessFunction` more versatile to work on that >> > > goal but >> > > I see this as unrelated to the FLIP. >> > > >> > > >> > > 2. Regarding Commit / Global commit >> > > >> > > I envision the global committer to be specific depending on the data lake >> > > solution you want to write to. However, it is entirely orthogonal to the >> > > compaction. >> > > Currently, I do not expect any changes w.r.t the Global commit introduces >> > > by >> > > this FLIP. >> > > >> > > >> > > 3. Regarding the case of trans-checkpoints merging >> > > >> > > @yun, as user, I would expect that if the committer receives in a >> > > checkpoint files >> > > to merge/commit that these are also finished when the checkpoint >> > > finishes. >> > > I think all sinks rely on this principle currently i.e., KafkaSink needs >> > > to >> > > commit all open transactions until the next checkpoint can happen. >> > > >> > > Maybe in the future, we can somehow move the Committer#commit call to an >> > > asynchronous execution, but we should discuss it as a separate thread. >> > > >> > > > We probably should first describe the different causes of small files >> > > > and >> > > > what problems was this proposal trying to solve. I wrote a data >> > > > shuffling >> > > > proposal [1] for Flink Iceberg sink (shared with Iceberg community >> > > > [2]). >> > > It >> > > > can address small files problems due to skewed data distribution across >> > > > Iceberg table partitions. Streaming shuffling before writers (to files) >> > > is >> > > > typically more efficient than post-write file compaction (which >> > > > involves >> > > > read-merge-write). It is usually cheaper to prevent a problem (small >> > > files) >> > > > than fixing it. >> > > >> > > >> > > @steven you are raising a good point, although I think only using a >> > > customizable >> > > shuffle won't address the generation of small files. One assumption is >> > > that >> > > at least the sink generates one file per subtask, which can already be >> > > too >> > > many. >> > > Another problem is that with low checkpointing intervals, the files do >> > > not >> > > meet >> > > the required size. The latter point is probably addressable by changing >> > > the >> > > checkpoint interval, which might be inconvenient for some users. >> > > >> > > > The sink coordinator checkpoint problem (mentioned in option 1) would >> > > > be >> > > > great if Flink can address it. In the spirit of source >> > > (enumerator-reader) >> > > > and sink (writer-coordinator) duality, sink coordinator checkpoint >> > > > should >> > > > happen after the writer operator. This would be a natural fit to >> > > > support >> > > > global committer in FLIP-143. It is probably an orthogonal matter to >> > > > this >> > > > proposal. >> > > >> > > >> > > To me the question here is what are the benefits of having a coordinator >> > > in >> > > comparison to a global committer/aggregator operator. >> > > >> > > > Personally, I am usually in favor of keeping streaming ingestion (to >> > > > data >> > > > lake) relatively simple and stable. Also sometimes compaction and >> > > > sorting >> > > > are performed together in data rewrite maintenance jobs to improve read >> > > > performance. In that case, the value of compacting (in Flink streaming >> > > > ingestion) diminishes. >> > > >> > > >> > > I agree it is always possible to have scheduled maintenance jobs keeping >> > > care of >> > > your data i.e., doing compaction. Unfortunately, the downside is that you >> > > have to your data after it is already available for other downstream >> > > consumers. >> > > I guess this can lead to all kinds of visibility problems. I am also >> > > surprised that >> > > you personally are a fan of this approach and, on the other hand, are >> > > developing >> > > the Iceberg sink, which goes somewhat against your mentioned principle of >> > > keeping >> > > the sink simple. >> > > >> > > > Currently, it is unclear from the doc and this thread where the >> > > compaction >> > > > is actually happening. Jingsong's reply described one model >> > > > writer (parallel) -> aggregator (single-parallelism compaction planner) >> > > -> >> > > > compactor (parallel) -> global committer (single-parallelism) >> > > >> > > >> > > My idea of the topology is very similar to the one outlined by Jinsong. >> > > The >> > > compaction will happen in the committer operator. >> > > >> > > > >> > > > In the Iceberg community, the following model has been discussed. It is >> > > > better for Iceberg because it won't delay the data availability. >> > > > writer (parallel) -> global committer for append (single parallelism) >> > > > -> >> > > > compactor (parallel) -> global committer for rewrite commit (single >> > > > parallelism) >> > > >> > > >> > > From a quick glimpse, it seems that the exact same topology is possible >> > > to >> > > express with the committable aggregator, but this definitely depends on >> > > the exact >> > > setup. >> > > >> > > Best, >> > > Fabian >>