Hi team,

Do we have a PR for this type of delete compaction?

> Merge: the changes specified in delete files are applied to data files
> and then overwrite the original data file, e.g. merging delete files to
> data files.



Yufei



On Wed, Nov 3, 2021 at 8:29 AM Puneet Zaroo <pza...@netflix.com.invalid>
wrote:

> Sounds great. I will look at the PRs.
> thanks,
>
> On Tue, Nov 2, 2021 at 11:35 PM Jack Ye <yezhao...@gmail.com> wrote:
>
>>
>> Yes I am actually arriving at exactly the same conclusion as you just
>> now. I was focusing on the immediate removal of delete files too much when
>> writing the doc and lost this aspect that we don't need to remove the
>> deletes after having the functionality to preserve sequence number.
>>
>> I just published https://github.com/apache/iceberg/pull/3454 to add the
>> option for selecting based on deletes in BinPackStrategy this afternoon, I
>> will add another PR that preserves the sequence number tomorrow.
>>
>> -Jack
>>
>> On Tue, Nov 2, 2021 at 11:23 PM Puneet Zaroo <pza...@netflix.com.invalid>
>> wrote:
>>
>>> Thanks for further clarifications, and outlining the detailed steps for
>>> the delete or 'MERGE' compaction. It seems this compaction is explicitly
>>> geared towards removing delete files. While that may be useful; I feel for
>>> CDC tables doing the Bin-pack and Sorting compactions and *removing the
>>> NEED for reading delete files in downstream queries * quickly without
>>> conflicts with concurrent CDC updates is more important. This guarantees
>>> that downstream query performance is decent soon after the data has landed
>>> in the table.
>>> The actual delete file removal can happen in a somewhat delayed manner
>>> as well; as that is now just a storage optimization as those delete files
>>> are no longer accessed in the query path.
>>>
>>> The above requirements can be achieved if the output of the current
>>> sorting and bin-pack actions also set the sequence number to the sequence
>>> number of the snapshot with which the compaction was started. And of-course
>>> the file selection criteria has to be extended to also pick files which
>>> have > a threshold number of delete files associated with them, in addition
>>> to the criteria already used (incorrect file size for bin pack or range
>>> overlap for sort).
>>>
>>> Thanks,
>>> - Puneet
>>>
>>> On Tue, Nov 2, 2021 at 7:33 PM Jack Ye <yezhao...@gmail.com> wrote:
>>>
>>>> > I think even with the custom sequence file numbers on output data
>>>> files; the position delete files have to be deleted; *since position
>>>> deletes also apply on data files with the same sequence number*. Also,
>>>> unless I am missing something, I think the equality delete files cannot be
>>>> deleted at the end of this compaction, as it is really hard to figure out
>>>> if all the impacted data files have been rewritten:
>>>>
>>>> The plan is to always remove both position and equality deletes. Given
>>>> a predicate (e.g. COMPACT table WHERE region='US'), the initial
>>>> implementation will always compact full partitions by (1) look for all
>>>> delete files based on the predicate, (2) get all impacted partitions, (3)
>>>> rewrite all data files in those partitions that has deletes, (4) remove
>>>> those delete files. The algorithm can be improved to a smaller subset of
>>>> files, but currently we mostly rely on Iceberg's scan planning and as you
>>>> said it's hard to figure out if a delete file (especially equality delete)
>>>> covers any additional data files. But we know each delete file only belongs
>>>> to a single partition, which guarantees the removal is safe. (For
>>>> partitioned tables, global deletes will be handled separately and not
>>>> removed unless specifically requested by the user because it requires a
>>>> full table compact, but CDC does not write global deletes anyway)
>>>>
>>>> -Jack
>>>>
>>>>
>>>>
>>>> On Tue, Nov 2, 2021 at 4:10 PM Puneet Zaroo <pza...@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> Thanks for the clarifications; and thanks for pulling together the
>>>>> documentation for the row-level delete functionality separately; as that
>>>>> will be very helpful.
>>>>> I think we are in agreement on most points. I just want to reiterate
>>>>> my understanding of the merge compaction behavior to make sure we are on
>>>>> the same page.
>>>>>
>>>>> The output table of a Flink CDC pipeline will in a lot of cases have
>>>>> small files with unsorted data; so doing the bin-pack and sorting
>>>>> compactions is also important for those tables (and obviously being able 
>>>>> to
>>>>> do so without conflicts with incoming data is also important). If the
>>>>> existing bin-pack and sort compaction actions are enhanced with
>>>>>
>>>>> 1) writing the output data files with the sequence number of the
>>>>> snapshot with which the compaction was started with *AND *
>>>>> 2) deleting all position delete files whose data files have been
>>>>> rewritten;
>>>>>
>>>>> then I believe we can run those (bin-pack and sort) compactions as
>>>>> well without fear of conflicting with newer CDC updates. I think even with
>>>>> the custom sequence file numbers on output data files; the position delete
>>>>> files have to be deleted; *since position deletes also apply on data
>>>>> files with the same sequence number*. Also, unless I am missing
>>>>> something, I think the equality delete files cannot be deleted at the end
>>>>> of this compaction, as it is really hard to figure out if all the impacted
>>>>> data files have been rewritten.
>>>>>
>>>>> If the bin-pack and sort compactions are enhanced in this manner; then
>>>>> I foresee just running those so that the same compaction can take care of
>>>>> all relevant optimizations for a table (including delete file removal). At
>>>>> the end, potentially, only some unnecessary equality delete files will be
>>>>> remaining which will have to be deleted by some other action.
>>>>>
>>>>> Thanks,
>>>>> - Puneet
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Nov 2, 2021 at 1:17 PM Jack Ye <yezhao...@gmail.com> wrote:
>>>>>
>>>>>> > why can't this strategy do bin-packing or sorting as well; if that
>>>>>> is required; as long as the sequence number is not updated.
>>>>>> > wouldn't subsequent reads re-apply the delete files which were used
>>>>>> in the merge as well?
>>>>>>
>>>>>> I think you are right, we can use the sequence number of the snapshot
>>>>>> we start compaction as the new sequence number of all rewritten files
>>>>>> because all the deletes of older sequence numbers related to the file 
>>>>>> have
>>>>>> been applied. (correct me if I missed anything here)
>>>>>> But from the correctness perspective it's the same because we remove
>>>>>> those delete files as a part of the compaction so we will not really
>>>>>> reapply those deletes.
>>>>>> With this improvement, we can now also do bin-packing easily. (If we
>>>>>> preserve all sequence numbers, we can still do bin-packing for files
>>>>>> sharing the same sequence number, what I described was just the easiest 
>>>>>> way
>>>>>> to ensure sequence number preservation)
>>>>>>
>>>>>> > for the tables only being written into by a Flink CDC pipeline,
>>>>>> this should not happen as position deletes are only created for 
>>>>>> in-progress
>>>>>> (uncommitted) data files, correct ?
>>>>>>
>>>>>> Correct
>>>>>>
>>>>>> > I believe for the CDC use case it is hard to guarantee that  that
>>>>>> partitions will turn cold and can be merged without conflicts, as 
>>>>>> 'hotness'
>>>>>> is a factor of mutation rate in the source DB
>>>>>>
>>>>>> I agree. When I say hot/cold I am mostly referring to those
>>>>>> time-partitioned use cases with clear hot-cold division of data. But in 
>>>>>> the
>>>>>> end the principle is that the compaction strategy should be determined by
>>>>>> the characteristics of the partition. If all the partitions are always 
>>>>>> with
>>>>>> high traffic, then I think merge with preserved sequence number seems 
>>>>>> like
>>>>>> the way to go for the entire table.
>>>>>>
>>>>>> I have recently summarized all the related concepts in
>>>>>> https://github.com/apache/iceberg/pull/3432, it would be great if
>>>>>> you can take a look about anything else I missed, thanks!
>>>>>>
>>>>>> Best,
>>>>>> Jack Ye
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Nov 1, 2021 at 2:44 PM Puneet Zaroo
>>>>>> <pza...@netflix.com.invalid> wrote:
>>>>>>
>>>>>>> Another follow-up regarding this : *"Merge strategy that does not
>>>>>>> do any bin-packing, and only merges the delete files for each data file 
>>>>>>> and
>>>>>>> writes it back. The new data file will have the same sequence number as 
>>>>>>> the
>>>>>>> old file before Merge"* ; shouldn't the sequence number be set to
>>>>>>> the highest sequence number of any applied delete file to the data 
>>>>>>> file. If
>>>>>>> the sequence number of the data file is not changed at-all, wouldn't
>>>>>>> subsequent reads re-apply the delete files which were used in the merge 
>>>>>>> as
>>>>>>> well?
>>>>>>>
>>>>>>> thanks
>>>>>>>
>>>>>>> On Mon, Nov 1, 2021 at 2:41 PM Puneet Zaroo <pza...@netflix.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I had a few follow-up points.
>>>>>>>>
>>>>>>>> 1 *"(1) for hot partitions, users can try to only perform Convert
>>>>>>>> and Rewrite to keep delete file sizes and count manageable, until the
>>>>>>>> partition becomes cold and a Merge can be performed safely.".* : I
>>>>>>>> believe for the CDC use case it is hard to guarantee that  that 
>>>>>>>> partitions
>>>>>>>> will turn cold and can be merged without conflicts, as 'hotness' is a
>>>>>>>> factor of mutation rate in the source DB; and perhaps some partitions 
>>>>>>>> are
>>>>>>>> always "hot"; so in essence the following:  *"Merge strategy that
>>>>>>>> does not do any bin-packing, and only merges the delete files for each 
>>>>>>>> data
>>>>>>>> file and writes it back. The new data file will have the same sequence
>>>>>>>> number as the old file before Merge"* seems important. Though as a
>>>>>>>> follow-up I am wondering why can't this strategy do bin-packing or 
>>>>>>>> sorting
>>>>>>>> as well; if that is required; as long as the sequence number is not 
>>>>>>>> updated.
>>>>>>>>
>>>>>>>> 2 *"During the commit validation phase of a Merge operation, we
>>>>>>>> need to verify that for each data file that would be removed, there 
>>>>>>>> are no
>>>>>>>> new position deletes with higher sequence number added."* : Just
>>>>>>>> to be clear; for the tables only being written into by a Flink CDC
>>>>>>>> pipeline, this should not happen as position deletes are only created 
>>>>>>>> for
>>>>>>>> in-progress (uncommitted) data files, correct ?
>>>>>>>>
>>>>>>>> Thanks and regards,
>>>>>>>> - Puneet
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 21, 2021 at 10:54 PM Jack Ye <yezhao...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Had some offline discussions on Slack and WeChat.
>>>>>>>>>
>>>>>>>>> For Russell's point, we are reconfirming with related people on
>>>>>>>>> Slack, and will post updates once we have an agreement.
>>>>>>>>>
>>>>>>>>> Regarding point 6, for Flink CDC the data file flushed to disk
>>>>>>>>> might be associated with position deletes, but after the flush all 
>>>>>>>>> deletes
>>>>>>>>> will be equality deletes, so 6-2 still works. After all, as long as 
>>>>>>>>> data
>>>>>>>>> files for position deletes are not removed, the process should be 
>>>>>>>>> able to
>>>>>>>>> succeed with optimistic retry. Currently we are missing the following 
>>>>>>>>> that
>>>>>>>>> needs to be worked on to resolve the CDC performance issue:
>>>>>>>>> 1. We need to support setting the sequence number for individual
>>>>>>>>> content files.
>>>>>>>>> 2. During the commit validation phase of a Merge operation, we
>>>>>>>>> need to verify that for each data file that would be removed, there 
>>>>>>>>> are no
>>>>>>>>> new position deletes with higher sequence number added. If detected, 
>>>>>>>>> merge
>>>>>>>>> of that file has to be completely retried (we can support incremental
>>>>>>>>> progress for this).
>>>>>>>>>
>>>>>>>>> -Jack
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 21, 2021 at 7:58 PM Russell Spitzer <
>>>>>>>>> russell.spit...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I think I understood the Rewrite strategy discussion a little
>>>>>>>>>> differently
>>>>>>>>>>
>>>>>>>>>> Binpack Strategy and SortStrategy each get a new flag which lets
>>>>>>>>>> you pick files based on their number of delete files. So basically 
>>>>>>>>>> you can
>>>>>>>>>> set a variety of parameters, small files, large files, files with 
>>>>>>>>>> deletes
>>>>>>>>>> etc ...
>>>>>>>>>>
>>>>>>>>>> A new strategy is added which determines which file to rewrite by
>>>>>>>>>> looking for all files currently touched by delete files. Instead of 
>>>>>>>>>> looking
>>>>>>>>>> through files with X deletes, we look up all files affected by 
>>>>>>>>>> deletes and
>>>>>>>>>> rewrite them. Although now as I write this it's basically running 
>>>>>>>>>> the above
>>>>>>>>>> strategies with number of delete files >= 1 and files per group at 
>>>>>>>>>> 1. So
>>>>>>>>>> maybe it doesn't need another strategy?
>>>>>>>>>>
>>>>>>>>>> But maybe I got that wrong ...
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 21, 2021 at 8:39 PM Jack Ye <yezhao...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks to everyone who came to the meeting.
>>>>>>>>>>>
>>>>>>>>>>> Here is the full meeting recording I made:
>>>>>>>>>>> https://drive.google.com/file/d/1yuBFlNn9nkMlH9TIut2H8CXmJGLd18Sa/view?usp=sharing
>>>>>>>>>>>
>>>>>>>>>>> Here are some key takeaways:
>>>>>>>>>>>
>>>>>>>>>>> 1. we generally agreed upon the division of compactions into
>>>>>>>>>>> Rewrite, Convert and Merge.
>>>>>>>>>>>
>>>>>>>>>>> 2. Merge will be implemented through RewriteDataFiles as
>>>>>>>>>>> proposed in https://github.com/apache/iceberg/pull/3207, but
>>>>>>>>>>> instead as a new strategy by extending the existing 
>>>>>>>>>>> BinPackStrategy. For
>>>>>>>>>>> users who would also like to run sort during Merge, we will have 
>>>>>>>>>>> another
>>>>>>>>>>> delete strategy that extends the SortStrategy.
>>>>>>>>>>>
>>>>>>>>>>> 3. Merge can have an option that allows users to set the minimum
>>>>>>>>>>> numbers of delete files to trigger a compaction. However, that 
>>>>>>>>>>> would result
>>>>>>>>>>> in very frequent compaction of full partition if people add many 
>>>>>>>>>>> global
>>>>>>>>>>> delete files. A Convert of global equality deletes to partition
>>>>>>>>>>> position deletes while maintaining the same sequence number can be 
>>>>>>>>>>> used to
>>>>>>>>>>> solve the issue. Currently there is no way to write files with a 
>>>>>>>>>>> custom
>>>>>>>>>>> sequence number. This functionality needs to be added.
>>>>>>>>>>>
>>>>>>>>>>> 4. we generally agreed upon the APIs for Rewrite and Convert at
>>>>>>>>>>> https://github.com/apache/iceberg/pull/2841.
>>>>>>>>>>>
>>>>>>>>>>> 5. we had some discussion around the separation of row and
>>>>>>>>>>> partition level filters. The general direction in the meeting is to 
>>>>>>>>>>> just
>>>>>>>>>>> have a single filter method. We will sync offline to reach an 
>>>>>>>>>>> agreement.
>>>>>>>>>>>
>>>>>>>>>>> 6. people raised the issue that if new delete files are added to
>>>>>>>>>>> a data file while a Merge is going on, then the Merge would fail. 
>>>>>>>>>>> That
>>>>>>>>>>> causes huge performance issues for CDC streaming use cases and 
>>>>>>>>>>> Merge is
>>>>>>>>>>> very hard to succeed. There are 2 proposed solutions:
>>>>>>>>>>>   (1) for hot partitions, users can try to only perform Convert
>>>>>>>>>>> and Rewrite to keep delete file sizes and count manageable, until 
>>>>>>>>>>> the
>>>>>>>>>>> partition becomes cold and a Merge can be performed safely.
>>>>>>>>>>>   (2) it looks like we need a Merge strategy that does not do
>>>>>>>>>>> any bin-packing, and only merges the delete files for each data 
>>>>>>>>>>> file and
>>>>>>>>>>> writes it back. The new data file will have the same sequence 
>>>>>>>>>>> number as the
>>>>>>>>>>> old file before Merge. By doing so, new delete files can still be 
>>>>>>>>>>> applied
>>>>>>>>>>> safely and the compaction can succeed without concerns around 
>>>>>>>>>>> conflict. The
>>>>>>>>>>> caveat is that this does not work for position deletes because the 
>>>>>>>>>>> row
>>>>>>>>>>> position changes for each file after Merge. But for the CDC 
>>>>>>>>>>> streaming use
>>>>>>>>>>> case it is acceptable to only write equality deletes, so this looks 
>>>>>>>>>>> like a
>>>>>>>>>>> feasible approach.
>>>>>>>>>>>
>>>>>>>>>>> 7. people raised the concern about the memory consumption issue
>>>>>>>>>>> for the is_deleted metadata column. We ran out of time and will 
>>>>>>>>>>> continue
>>>>>>>>>>> the discussion offline on Slack.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jack Ye
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Oct 18, 2021 at 7:50 PM Jack Ye <yezhao...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>
>>>>>>>>>>>> We are planning to have a meeting to discuss the design of
>>>>>>>>>>>> Iceberg delete compaction on Thursday 5-6pm PDT. The meeting link 
>>>>>>>>>>>> is
>>>>>>>>>>>> https://meet.google.com/nxx-nnvj-omx.
>>>>>>>>>>>>
>>>>>>>>>>>> We have also created the channel #compaction on Slack, please
>>>>>>>>>>>> join the channel for daily discussions if you are interested in the
>>>>>>>>>>>> progress.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jack Ye
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Sep 28, 2021 at 10:23 PM Jack Ye <yezhao...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>
>>>>>>>>>>>>> As there are more and more people adopting the v2 spec, we are
>>>>>>>>>>>>> seeing an increasing number of requests for delete compaction 
>>>>>>>>>>>>> support.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Here is a document discussing the use cases and basic
>>>>>>>>>>>>> interface design for it to get the community aligned around what
>>>>>>>>>>>>> compactions we would offer and how the interfaces would be 
>>>>>>>>>>>>> divided:
>>>>>>>>>>>>> https://docs.google.com/document/d/1-EyKSfwd_W9iI5jrzAvomVw3w1mb_kayVNT7f2I-SUg
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any feedback would be appreciated!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Jack Ye
>>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to