> > > One way to avoid write-read-merge is by wrapping SinkWriter with > > another one, which would buffer input elements in a temporary storage > > (e.g. local file) until a threshold is reached; after that, it would > > invoke the original SinkWriter. And if a checkpoint barrier comes in > > earlier, it would send written data to some aggregator. > > I think perhaps this seems to be a kind of WAL method? Namely we first > write the elements to some WAL logs and persist them on checkpoint > (in snapshot or remote FS), or we directly write WAL logs to the remote > FS eagerly. > > At some point, Flink will definitely have some WAL adapter that can turn any sink into an exactly-once sink (with some caveats). For now, we keep that as an orthogonal solution as it has a rather high price (bursty workload with high latency). Ideally, we can keep the compaction asynchronously...
On Mon, Nov 29, 2021 at 8:52 AM Yun Gao <yungao...@aliyun.com.invalid> wrote: > Hi, > > @Roman very sorry for the late response for a long time, > > > Merging artifacts from multiple checkpoints would apparently > require multiple concurrent checkpoints > > I think it might not need concurrent checkpoints: suppose some > operators (like the committer aggregator in the option 2) maintains > the list of files to merge, it could stores the lists of files to merge > in the states, then after several checkpoints are done and we have > enough files, we could merge all the files in the list. > > > Asynchronous merging in an aggregator would require some resolution > > logic on recovery, so that a merged artifact can be used if the > > original one was deleted. Otherwise, wouldn't recovery fail because > > some artifacts are missing? > > We could also defer deletion until the "compacted" checkpoint is > > subsumed - but isn't it too late, as it will be deleted anyways once > > subsumed? > > I think logically we could delete the original files once the "compacted" > checkpoint > (which finish merging the compacted files and record it in the checkpoint) > is completed > in all the options. If there are failover before we it, we could restart > the merging and if > there are failover after it, we could have already recorded the files in > the checkpoint. > > > One way to avoid write-read-merge is by wrapping SinkWriter with > > another one, which would buffer input elements in a temporary storage > > (e.g. local file) until a threshold is reached; after that, it would > > invoke the original SinkWriter. And if a checkpoint barrier comes in > > earlier, it would send written data to some aggregator. > > I think perhaps this seems to be a kind of WAL method? Namely we first > write the elements to some WAL logs and persist them on checkpoint > (in snapshot or remote FS), or we directly write WAL logs to the remote > FS eagerly. > > Sorry if I do not understand correctly somewhere. > > Best, > Yun > > > ------------------------------------------------------------------ > From:Roman Khachatryan <ro...@apache.org> > Send Time:2021 Nov. 9 (Tue.) 22:03 > To:dev <dev@flink.apache.org> > Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support > small file compaction > > Hi everyone, > > Thanks for the proposal and the discussion, I have some remarks: > (I'm not very familiar with the new Sink API but I thought about the > same problem in context of the changelog state backend) > > 1. Merging artifacts from multiple checkpoints would apparently > require multiple concurrent checkpoints (otherwise, a new checkpoint > won't be started before completing the previous one; and the previous > one can't be completed before durably storing the artifacts). However, > concurrent checkpoints are currently not supported with Unaligned > checkpoints (this is besides increasing e2e-latency). > > 2. Asynchronous merging in an aggregator would require some resolution > logic on recovery, so that a merged artifact can be used if the > original one was deleted. Otherwise, wouldn't recovery fail because > some artifacts are missing? > We could also defer deletion until the "compacted" checkpoint is > subsumed - but isn't it too late, as it will be deleted anyways once > subsumed? > > 3. Writing small files, then reading and merging them for *every* > checkpoint seems worse than only reading them on recovery. I guess I'm > missing some cases of reading, so to me it would make sense to mention > these cases explicitly in the FLIP motivation section. > > 4. One way to avoid write-read-merge is by wrapping SinkWriter with > another one, which would buffer input elements in a temporary storage > (e.g. local file) until a threshold is reached; after that, it would > invoke the original SinkWriter. And if a checkpoint barrier comes in > earlier, it would send written data to some aggregator. It will > increase checkpoint delay (async phase) compared to the current Flink; > but not compared to the write-read-merge solution, IIUC. > Then such "BufferingSinkWriters" could aggregate input elements from > each other, potentially recursively (I mean something like > > https://cwiki.apache.org/confluence/download/attachments/173082889/DSTL-DFS-DAG.png > ) > > 5. Reducing the number of files by reducing aggregator parallelism as > opposed to merging on reaching size threshold will likely be less > optimal and more difficult to configure. OTH, thresholds might be more > difficult to implement and (with recursive merging) would incur higher > latency. Maybe that's also something to decide explicitly or at least > mention in the FLIP. > > > > Regards, > Roman > > > On Tue, Nov 9, 2021 at 5:23 AM Reo Lei <leinuo...@gmail.com> wrote: > > > > Hi Fabian, > > > > Thanks for drafting the FLIP and trying to support small file > compaction. I > > think this feature is very urgent and valuable for users(at least for > me). > > > > Currently I am trying to support streaming rewrite(compact) for Iceberg > on > > PR#3323 <https://github.com/apache/iceberg/pull/3323>. As Steven > mentioned, > > Iceberg sink and compact data through the following steps: > > Step-1: Some parallel data writer(sinker) to write streaming data as > files. > > Step-2: A single parallelism data files committer to commit the completed > > files as soon as possible to make them available. > > Step-3: Some parallel file rewriter(compactor) to collect committed files > > from multiple checkpoints, and rewriter(compact) them together once the > > total file size or number of files reach the threshold. > > Step-4: A single parallelism rewrite(compact) result committer to commit > > the rewritten(compacted) files to replace the old files and make them > > available. > > > > > > If Flink want to support small file compaction, some key point I think is > > necessary: > > > > 1, Compact files from multiple checkpoints. > > I totally agree with Jingsong, because completed file size usually could > > not reach the threshold in a single checkpoint. Especially for > partitioned > > table, we need to compact the files of each partition, but usually the > file > > size of each partition will be different and may not reach the merge > > threshold. If we compact these files, in a single checkpoint, regardless > of > > whether the total file size reaches the threshold, then the value of > > compacting will be diminished and we will still get small files because > > these compacted files are not reach to target size. So we need the > > compactor to collect committed files from multiple checkpoints and > compact > > them until they reach the threshold. > > > > 2, Separate write phase and compact phase. > > Users usually hope the data becomes available as soon as possible, and > the > > end-to-end latency is very important. I think we need to separate the > > write and compact phase. For the write phase, there include the Step-1 > > and Step-2, we sink data as file and commit it pre checkpoint and > regardless > > of whether the file size it is. That could ensure the data will be > > available ASAP. For the compact phase, there include the Step-3 > > and Step-4, the compactor should collect committed files from multiple > > checkpoints and compact them asynchronously once they reach the > threshold, > > and the compact committer will commit the compaction result in the next > > checkpoint. We compact the committed files asynchronously because we > don't > > want the compaction to affect the data sink or the whole pipeline. > > > > 3, Exactly once guarantee between write and compact phase. > > Once we separate write phase and compact phase, we need to consider > > how to guarantee > > the exact once semantic between two phases. We should not lose any data > or > > files on the compactor(Step-3) in any case and cause the compaction > result > > to be inconsistent with before. I think flink should provide an > easy-to-use > > interface to make that easier. > > > > 4, Metadata operation and compaction result validation. > > In the compact phase, there may be not only compact files, but also a lot > > of metadata operations, such as the iceberg needing to read/write > manifest > > and do MOR. And we need some interface to support users to do some > > validation of the compaction result. I think these points should be > > considered when we design the compaction API. > > > > > > Back to FLIP-191, option 1 looks very complicated while option 2 is > > relatively simple, but neither of these two solutions separates the write > > phase from the compact phase. So I think we should consider the points I > > mentioned above. And if you have any other questions you can always feel > > free to reach out to me! > > > > BR, > > Reo > > > > Fabian Paul <fabianp...@ververica.com> 于2021年11月8日周一 下午7:59写道: > > > > > Hi all, > > > > > > Thanks for the lively discussions. I am really excited to see so many > > > people > > > participating in this thread. It also underlines the need that many > people > > > would > > > like to see a solution soon. > > > > > > I have updated the FLIP and removed the parallelism configuration > because > > > it is > > > unnecessary since users can configure a constant exchange key to send > all > > > committables to only one committable aggregator. > > > > > > > > > 1. Burden for developers w.r.t batch stream unification. > > > > > > @yun @guowei, from a theoretical point you are right about exposing the > > > DataStream > > > API in the sink users have the full power to write correct batch and > > > streaming > > > sinks. I think in reality a lot of users still struggle to build > pipelines > > > with > > > i.e. the operator pipeline which works correct in streaming and batch > mode. > > > Another problem I see is by exposing more deeper concepts is that we > > > cannot do > > > any optimization because we cannot reason about how sinks are built in > the > > > future. > > > > > > We should also try to steer users towards using only `Functions` to > give > > > us more > > > flexibility to swap the internal operator representation. I agree with > > > @yun we > > > should try to make the `ProcessFunction` more versatile to work on that > > > goal but > > > I see this as unrelated to the FLIP. > > > > > > > > > 2. Regarding Commit / Global commit > > > > > > I envision the global committer to be specific depending on the data > lake > > > solution you want to write to. However, it is entirely orthogonal to > the > > > compaction. > > > Currently, I do not expect any changes w.r.t the Global commit > introduces > > > by > > > this FLIP. > > > > > > > > > 3. Regarding the case of trans-checkpoints merging > > > > > > @yun, as user, I would expect that if the committer receives in a > > > checkpoint files > > > to merge/commit that these are also finished when the checkpoint > finishes. > > > I think all sinks rely on this principle currently i.e., KafkaSink > needs to > > > commit all open transactions until the next checkpoint can happen. > > > > > > Maybe in the future, we can somehow move the Committer#commit call to > an > > > asynchronous execution, but we should discuss it as a separate thread. > > > > > > > We probably should first describe the different causes of small > files and > > > > what problems was this proposal trying to solve. I wrote a data > shuffling > > > > proposal [1] for Flink Iceberg sink (shared with Iceberg community > [2]). > > > It > > > > can address small files problems due to skewed data distribution > across > > > > Iceberg table partitions. Streaming shuffling before writers (to > files) > > > is > > > > typically more efficient than post-write file compaction (which > involves > > > > read-merge-write). It is usually cheaper to prevent a problem (small > > > files) > > > > than fixing it. > > > > > > > > > @steven you are raising a good point, although I think only using a > > > customizable > > > shuffle won't address the generation of small files. One assumption is > that > > > at least the sink generates one file per subtask, which can already be > too > > > many. > > > Another problem is that with low checkpointing intervals, the files do > not > > > meet > > > the required size. The latter point is probably addressable by > changing the > > > checkpoint interval, which might be inconvenient for some users. > > > > > > > The sink coordinator checkpoint problem (mentioned in option 1) > would be > > > > great if Flink can address it. In the spirit of source > > > (enumerator-reader) > > > > and sink (writer-coordinator) duality, sink coordinator checkpoint > should > > > > happen after the writer operator. This would be a natural fit to > support > > > > global committer in FLIP-143. It is probably an orthogonal matter to > this > > > > proposal. > > > > > > > > > To me the question here is what are the benefits of having a > coordinator in > > > comparison to a global committer/aggregator operator. > > > > > > > Personally, I am usually in favor of keeping streaming ingestion (to > data > > > > lake) relatively simple and stable. Also sometimes compaction and > sorting > > > > are performed together in data rewrite maintenance jobs to improve > read > > > > performance. In that case, the value of compacting (in Flink > streaming > > > > ingestion) diminishes. > > > > > > > > > I agree it is always possible to have scheduled maintenance jobs > keeping > > > care of > > > your data i.e., doing compaction. Unfortunately, the downside is that > you > > > have to your data after it is already available for other downstream > > > consumers. > > > I guess this can lead to all kinds of visibility problems. I am also > > > surprised that > > > you personally are a fan of this approach and, on the other hand, are > > > developing > > > the Iceberg sink, which goes somewhat against your mentioned principle > of > > > keeping > > > the sink simple. > > > > > > > Currently, it is unclear from the doc and this thread where the > > > compaction > > > > is actually happening. Jingsong's reply described one model > > > > writer (parallel) -> aggregator (single-parallelism compaction > planner) > > > -> > > > > compactor (parallel) -> global committer (single-parallelism) > > > > > > > > > My idea of the topology is very similar to the one outlined by > Jinsong. The > > > compaction will happen in the committer operator. > > > > > > > > > > > In the Iceberg community, the following model has been discussed. It > is > > > > better for Iceberg because it won't delay the data availability. > > > > writer (parallel) -> global committer for append (single > parallelism) -> > > > > compactor (parallel) -> global committer for rewrite commit (single > > > > parallelism) > > > > > > > > > From a quick glimpse, it seems that the exact same topology is > possible to > > > express with the committable aggregator, but this definitely depends on > > > the exact > > > setup. > > > > > > Best, > > > Fabian > >