Hi team, Do we have a PR for this type of delete compaction?
> Merge: the changes specified in delete files are applied to data files > and then overwrite the original data file, e.g. merging delete files to > data files. Yufei On Wed, Nov 3, 2021 at 8:29 AM Puneet Zaroo <pza...@netflix.com.invalid> wrote: > Sounds great. I will look at the PRs. > thanks, > > On Tue, Nov 2, 2021 at 11:35 PM Jack Ye <yezhao...@gmail.com> wrote: > >> >> Yes I am actually arriving at exactly the same conclusion as you just >> now. I was focusing on the immediate removal of delete files too much when >> writing the doc and lost this aspect that we don't need to remove the >> deletes after having the functionality to preserve sequence number. >> >> I just published https://github.com/apache/iceberg/pull/3454 to add the >> option for selecting based on deletes in BinPackStrategy this afternoon, I >> will add another PR that preserves the sequence number tomorrow. >> >> -Jack >> >> On Tue, Nov 2, 2021 at 11:23 PM Puneet Zaroo <pza...@netflix.com.invalid> >> wrote: >> >>> Thanks for further clarifications, and outlining the detailed steps for >>> the delete or 'MERGE' compaction. It seems this compaction is explicitly >>> geared towards removing delete files. While that may be useful; I feel for >>> CDC tables doing the Bin-pack and Sorting compactions and *removing the >>> NEED for reading delete files in downstream queries * quickly without >>> conflicts with concurrent CDC updates is more important. This guarantees >>> that downstream query performance is decent soon after the data has landed >>> in the table. >>> The actual delete file removal can happen in a somewhat delayed manner >>> as well; as that is now just a storage optimization as those delete files >>> are no longer accessed in the query path. >>> >>> The above requirements can be achieved if the output of the current >>> sorting and bin-pack actions also set the sequence number to the sequence >>> number of the snapshot with which the compaction was started. And of-course >>> the file selection criteria has to be extended to also pick files which >>> have > a threshold number of delete files associated with them, in addition >>> to the criteria already used (incorrect file size for bin pack or range >>> overlap for sort). >>> >>> Thanks, >>> - Puneet >>> >>> On Tue, Nov 2, 2021 at 7:33 PM Jack Ye <yezhao...@gmail.com> wrote: >>> >>>> > I think even with the custom sequence file numbers on output data >>>> files; the position delete files have to be deleted; *since position >>>> deletes also apply on data files with the same sequence number*. Also, >>>> unless I am missing something, I think the equality delete files cannot be >>>> deleted at the end of this compaction, as it is really hard to figure out >>>> if all the impacted data files have been rewritten: >>>> >>>> The plan is to always remove both position and equality deletes. Given >>>> a predicate (e.g. COMPACT table WHERE region='US'), the initial >>>> implementation will always compact full partitions by (1) look for all >>>> delete files based on the predicate, (2) get all impacted partitions, (3) >>>> rewrite all data files in those partitions that has deletes, (4) remove >>>> those delete files. The algorithm can be improved to a smaller subset of >>>> files, but currently we mostly rely on Iceberg's scan planning and as you >>>> said it's hard to figure out if a delete file (especially equality delete) >>>> covers any additional data files. But we know each delete file only belongs >>>> to a single partition, which guarantees the removal is safe. (For >>>> partitioned tables, global deletes will be handled separately and not >>>> removed unless specifically requested by the user because it requires a >>>> full table compact, but CDC does not write global deletes anyway) >>>> >>>> -Jack >>>> >>>> >>>> >>>> On Tue, Nov 2, 2021 at 4:10 PM Puneet Zaroo <pza...@netflix.com.invalid> >>>> wrote: >>>> >>>>> Thanks for the clarifications; and thanks for pulling together the >>>>> documentation for the row-level delete functionality separately; as that >>>>> will be very helpful. >>>>> I think we are in agreement on most points. I just want to reiterate >>>>> my understanding of the merge compaction behavior to make sure we are on >>>>> the same page. >>>>> >>>>> The output table of a Flink CDC pipeline will in a lot of cases have >>>>> small files with unsorted data; so doing the bin-pack and sorting >>>>> compactions is also important for those tables (and obviously being able >>>>> to >>>>> do so without conflicts with incoming data is also important). If the >>>>> existing bin-pack and sort compaction actions are enhanced with >>>>> >>>>> 1) writing the output data files with the sequence number of the >>>>> snapshot with which the compaction was started with *AND * >>>>> 2) deleting all position delete files whose data files have been >>>>> rewritten; >>>>> >>>>> then I believe we can run those (bin-pack and sort) compactions as >>>>> well without fear of conflicting with newer CDC updates. I think even with >>>>> the custom sequence file numbers on output data files; the position delete >>>>> files have to be deleted; *since position deletes also apply on data >>>>> files with the same sequence number*. Also, unless I am missing >>>>> something, I think the equality delete files cannot be deleted at the end >>>>> of this compaction, as it is really hard to figure out if all the impacted >>>>> data files have been rewritten. >>>>> >>>>> If the bin-pack and sort compactions are enhanced in this manner; then >>>>> I foresee just running those so that the same compaction can take care of >>>>> all relevant optimizations for a table (including delete file removal). At >>>>> the end, potentially, only some unnecessary equality delete files will be >>>>> remaining which will have to be deleted by some other action. >>>>> >>>>> Thanks, >>>>> - Puneet >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Nov 2, 2021 at 1:17 PM Jack Ye <yezhao...@gmail.com> wrote: >>>>> >>>>>> > why can't this strategy do bin-packing or sorting as well; if that >>>>>> is required; as long as the sequence number is not updated. >>>>>> > wouldn't subsequent reads re-apply the delete files which were used >>>>>> in the merge as well? >>>>>> >>>>>> I think you are right, we can use the sequence number of the snapshot >>>>>> we start compaction as the new sequence number of all rewritten files >>>>>> because all the deletes of older sequence numbers related to the file >>>>>> have >>>>>> been applied. (correct me if I missed anything here) >>>>>> But from the correctness perspective it's the same because we remove >>>>>> those delete files as a part of the compaction so we will not really >>>>>> reapply those deletes. >>>>>> With this improvement, we can now also do bin-packing easily. (If we >>>>>> preserve all sequence numbers, we can still do bin-packing for files >>>>>> sharing the same sequence number, what I described was just the easiest >>>>>> way >>>>>> to ensure sequence number preservation) >>>>>> >>>>>> > for the tables only being written into by a Flink CDC pipeline, >>>>>> this should not happen as position deletes are only created for >>>>>> in-progress >>>>>> (uncommitted) data files, correct ? >>>>>> >>>>>> Correct >>>>>> >>>>>> > I believe for the CDC use case it is hard to guarantee that that >>>>>> partitions will turn cold and can be merged without conflicts, as >>>>>> 'hotness' >>>>>> is a factor of mutation rate in the source DB >>>>>> >>>>>> I agree. When I say hot/cold I am mostly referring to those >>>>>> time-partitioned use cases with clear hot-cold division of data. But in >>>>>> the >>>>>> end the principle is that the compaction strategy should be determined by >>>>>> the characteristics of the partition. If all the partitions are always >>>>>> with >>>>>> high traffic, then I think merge with preserved sequence number seems >>>>>> like >>>>>> the way to go for the entire table. >>>>>> >>>>>> I have recently summarized all the related concepts in >>>>>> https://github.com/apache/iceberg/pull/3432, it would be great if >>>>>> you can take a look about anything else I missed, thanks! >>>>>> >>>>>> Best, >>>>>> Jack Ye >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Nov 1, 2021 at 2:44 PM Puneet Zaroo >>>>>> <pza...@netflix.com.invalid> wrote: >>>>>> >>>>>>> Another follow-up regarding this : *"Merge strategy that does not >>>>>>> do any bin-packing, and only merges the delete files for each data file >>>>>>> and >>>>>>> writes it back. The new data file will have the same sequence number as >>>>>>> the >>>>>>> old file before Merge"* ; shouldn't the sequence number be set to >>>>>>> the highest sequence number of any applied delete file to the data >>>>>>> file. If >>>>>>> the sequence number of the data file is not changed at-all, wouldn't >>>>>>> subsequent reads re-apply the delete files which were used in the merge >>>>>>> as >>>>>>> well? >>>>>>> >>>>>>> thanks >>>>>>> >>>>>>> On Mon, Nov 1, 2021 at 2:41 PM Puneet Zaroo <pza...@netflix.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I had a few follow-up points. >>>>>>>> >>>>>>>> 1 *"(1) for hot partitions, users can try to only perform Convert >>>>>>>> and Rewrite to keep delete file sizes and count manageable, until the >>>>>>>> partition becomes cold and a Merge can be performed safely.".* : I >>>>>>>> believe for the CDC use case it is hard to guarantee that that >>>>>>>> partitions >>>>>>>> will turn cold and can be merged without conflicts, as 'hotness' is a >>>>>>>> factor of mutation rate in the source DB; and perhaps some partitions >>>>>>>> are >>>>>>>> always "hot"; so in essence the following: *"Merge strategy that >>>>>>>> does not do any bin-packing, and only merges the delete files for each >>>>>>>> data >>>>>>>> file and writes it back. The new data file will have the same sequence >>>>>>>> number as the old file before Merge"* seems important. Though as a >>>>>>>> follow-up I am wondering why can't this strategy do bin-packing or >>>>>>>> sorting >>>>>>>> as well; if that is required; as long as the sequence number is not >>>>>>>> updated. >>>>>>>> >>>>>>>> 2 *"During the commit validation phase of a Merge operation, we >>>>>>>> need to verify that for each data file that would be removed, there >>>>>>>> are no >>>>>>>> new position deletes with higher sequence number added."* : Just >>>>>>>> to be clear; for the tables only being written into by a Flink CDC >>>>>>>> pipeline, this should not happen as position deletes are only created >>>>>>>> for >>>>>>>> in-progress (uncommitted) data files, correct ? >>>>>>>> >>>>>>>> Thanks and regards, >>>>>>>> - Puneet >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Oct 21, 2021 at 10:54 PM Jack Ye <yezhao...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Had some offline discussions on Slack and WeChat. >>>>>>>>> >>>>>>>>> For Russell's point, we are reconfirming with related people on >>>>>>>>> Slack, and will post updates once we have an agreement. >>>>>>>>> >>>>>>>>> Regarding point 6, for Flink CDC the data file flushed to disk >>>>>>>>> might be associated with position deletes, but after the flush all >>>>>>>>> deletes >>>>>>>>> will be equality deletes, so 6-2 still works. After all, as long as >>>>>>>>> data >>>>>>>>> files for position deletes are not removed, the process should be >>>>>>>>> able to >>>>>>>>> succeed with optimistic retry. Currently we are missing the following >>>>>>>>> that >>>>>>>>> needs to be worked on to resolve the CDC performance issue: >>>>>>>>> 1. We need to support setting the sequence number for individual >>>>>>>>> content files. >>>>>>>>> 2. During the commit validation phase of a Merge operation, we >>>>>>>>> need to verify that for each data file that would be removed, there >>>>>>>>> are no >>>>>>>>> new position deletes with higher sequence number added. If detected, >>>>>>>>> merge >>>>>>>>> of that file has to be completely retried (we can support incremental >>>>>>>>> progress for this). >>>>>>>>> >>>>>>>>> -Jack >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Oct 21, 2021 at 7:58 PM Russell Spitzer < >>>>>>>>> russell.spit...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I think I understood the Rewrite strategy discussion a little >>>>>>>>>> differently >>>>>>>>>> >>>>>>>>>> Binpack Strategy and SortStrategy each get a new flag which lets >>>>>>>>>> you pick files based on their number of delete files. So basically >>>>>>>>>> you can >>>>>>>>>> set a variety of parameters, small files, large files, files with >>>>>>>>>> deletes >>>>>>>>>> etc ... >>>>>>>>>> >>>>>>>>>> A new strategy is added which determines which file to rewrite by >>>>>>>>>> looking for all files currently touched by delete files. Instead of >>>>>>>>>> looking >>>>>>>>>> through files with X deletes, we look up all files affected by >>>>>>>>>> deletes and >>>>>>>>>> rewrite them. Although now as I write this it's basically running >>>>>>>>>> the above >>>>>>>>>> strategies with number of delete files >= 1 and files per group at >>>>>>>>>> 1. So >>>>>>>>>> maybe it doesn't need another strategy? >>>>>>>>>> >>>>>>>>>> But maybe I got that wrong ... >>>>>>>>>> >>>>>>>>>> On Thu, Oct 21, 2021 at 8:39 PM Jack Ye <yezhao...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks to everyone who came to the meeting. >>>>>>>>>>> >>>>>>>>>>> Here is the full meeting recording I made: >>>>>>>>>>> https://drive.google.com/file/d/1yuBFlNn9nkMlH9TIut2H8CXmJGLd18Sa/view?usp=sharing >>>>>>>>>>> >>>>>>>>>>> Here are some key takeaways: >>>>>>>>>>> >>>>>>>>>>> 1. we generally agreed upon the division of compactions into >>>>>>>>>>> Rewrite, Convert and Merge. >>>>>>>>>>> >>>>>>>>>>> 2. Merge will be implemented through RewriteDataFiles as >>>>>>>>>>> proposed in https://github.com/apache/iceberg/pull/3207, but >>>>>>>>>>> instead as a new strategy by extending the existing >>>>>>>>>>> BinPackStrategy. For >>>>>>>>>>> users who would also like to run sort during Merge, we will have >>>>>>>>>>> another >>>>>>>>>>> delete strategy that extends the SortStrategy. >>>>>>>>>>> >>>>>>>>>>> 3. Merge can have an option that allows users to set the minimum >>>>>>>>>>> numbers of delete files to trigger a compaction. However, that >>>>>>>>>>> would result >>>>>>>>>>> in very frequent compaction of full partition if people add many >>>>>>>>>>> global >>>>>>>>>>> delete files. A Convert of global equality deletes to partition >>>>>>>>>>> position deletes while maintaining the same sequence number can be >>>>>>>>>>> used to >>>>>>>>>>> solve the issue. Currently there is no way to write files with a >>>>>>>>>>> custom >>>>>>>>>>> sequence number. This functionality needs to be added. >>>>>>>>>>> >>>>>>>>>>> 4. we generally agreed upon the APIs for Rewrite and Convert at >>>>>>>>>>> https://github.com/apache/iceberg/pull/2841. >>>>>>>>>>> >>>>>>>>>>> 5. we had some discussion around the separation of row and >>>>>>>>>>> partition level filters. The general direction in the meeting is to >>>>>>>>>>> just >>>>>>>>>>> have a single filter method. We will sync offline to reach an >>>>>>>>>>> agreement. >>>>>>>>>>> >>>>>>>>>>> 6. people raised the issue that if new delete files are added to >>>>>>>>>>> a data file while a Merge is going on, then the Merge would fail. >>>>>>>>>>> That >>>>>>>>>>> causes huge performance issues for CDC streaming use cases and >>>>>>>>>>> Merge is >>>>>>>>>>> very hard to succeed. There are 2 proposed solutions: >>>>>>>>>>> (1) for hot partitions, users can try to only perform Convert >>>>>>>>>>> and Rewrite to keep delete file sizes and count manageable, until >>>>>>>>>>> the >>>>>>>>>>> partition becomes cold and a Merge can be performed safely. >>>>>>>>>>> (2) it looks like we need a Merge strategy that does not do >>>>>>>>>>> any bin-packing, and only merges the delete files for each data >>>>>>>>>>> file and >>>>>>>>>>> writes it back. The new data file will have the same sequence >>>>>>>>>>> number as the >>>>>>>>>>> old file before Merge. By doing so, new delete files can still be >>>>>>>>>>> applied >>>>>>>>>>> safely and the compaction can succeed without concerns around >>>>>>>>>>> conflict. The >>>>>>>>>>> caveat is that this does not work for position deletes because the >>>>>>>>>>> row >>>>>>>>>>> position changes for each file after Merge. But for the CDC >>>>>>>>>>> streaming use >>>>>>>>>>> case it is acceptable to only write equality deletes, so this looks >>>>>>>>>>> like a >>>>>>>>>>> feasible approach. >>>>>>>>>>> >>>>>>>>>>> 7. people raised the concern about the memory consumption issue >>>>>>>>>>> for the is_deleted metadata column. We ran out of time and will >>>>>>>>>>> continue >>>>>>>>>>> the discussion offline on Slack. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Jack Ye >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Oct 18, 2021 at 7:50 PM Jack Ye <yezhao...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi everyone, >>>>>>>>>>>> >>>>>>>>>>>> We are planning to have a meeting to discuss the design of >>>>>>>>>>>> Iceberg delete compaction on Thursday 5-6pm PDT. The meeting link >>>>>>>>>>>> is >>>>>>>>>>>> https://meet.google.com/nxx-nnvj-omx. >>>>>>>>>>>> >>>>>>>>>>>> We have also created the channel #compaction on Slack, please >>>>>>>>>>>> join the channel for daily discussions if you are interested in the >>>>>>>>>>>> progress. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Jack Ye >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Sep 28, 2021 at 10:23 PM Jack Ye <yezhao...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>> >>>>>>>>>>>>> As there are more and more people adopting the v2 spec, we are >>>>>>>>>>>>> seeing an increasing number of requests for delete compaction >>>>>>>>>>>>> support. >>>>>>>>>>>>> >>>>>>>>>>>>> Here is a document discussing the use cases and basic >>>>>>>>>>>>> interface design for it to get the community aligned around what >>>>>>>>>>>>> compactions we would offer and how the interfaces would be >>>>>>>>>>>>> divided: >>>>>>>>>>>>> https://docs.google.com/document/d/1-EyKSfwd_W9iI5jrzAvomVw3w1mb_kayVNT7f2I-SUg >>>>>>>>>>>>> >>>>>>>>>>>>> Any feedback would be appreciated! >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Jack Ye >>>>>>>>>>>>> >>>>>>>>>>>>