> I think even with the custom sequence file numbers on output data files;
the position delete files have to be deleted; *since position deletes also
apply on data files with the same sequence number*. Also, unless I am
missing something, I think the equality delete files cannot be deleted at
the end of this compaction, as it is really hard to figure out if all the
impacted data files have been rewritten:

The plan is to always remove both position and equality deletes. Given a
predicate (e.g. COMPACT table WHERE region='US'), the initial
implementation will always compact full partitions by (1) look for all
delete files based on the predicate, (2) get all impacted partitions, (3)
rewrite all data files in those partitions that has deletes, (4) remove
those delete files. The algorithm can be improved to a smaller subset of
files, but currently we mostly rely on Iceberg's scan planning and as you
said it's hard to figure out if a delete file (especially equality delete)
covers any additional data files. But we know each delete file only belongs
to a single partition, which guarantees the removal is safe. (For
partitioned tables, global deletes will be handled separately and not
removed unless specifically requested by the user because it requires a
full table compact, but CDC does not write global deletes anyway)

-Jack



On Tue, Nov 2, 2021 at 4:10 PM Puneet Zaroo <pza...@netflix.com.invalid>
wrote:

> Thanks for the clarifications; and thanks for pulling together the
> documentation for the row-level delete functionality separately; as that
> will be very helpful.
> I think we are in agreement on most points. I just want to reiterate my
> understanding of the merge compaction behavior to make sure we are on the
> same page.
>
> The output table of a Flink CDC pipeline will in a lot of cases have small
> files with unsorted data; so doing the bin-pack and sorting compactions is
> also important for those tables (and obviously being able to do so without
> conflicts with incoming data is also important). If the existing bin-pack
> and sort compaction actions are enhanced with
>
> 1) writing the output data files with the sequence number of the snapshot
> with which the compaction was started with *AND *
> 2) deleting all position delete files whose data files have been
> rewritten;
>
> then I believe we can run those (bin-pack and sort) compactions as well
> without fear of conflicting with newer CDC updates. I think even with the
> custom sequence file numbers on output data files; the position delete
> files have to be deleted; *since position deletes also apply on data
> files with the same sequence number*. Also, unless I am missing
> something, I think the equality delete files cannot be deleted at the end
> of this compaction, as it is really hard to figure out if all the impacted
> data files have been rewritten.
>
> If the bin-pack and sort compactions are enhanced in this manner; then I
> foresee just running those so that the same compaction can take care of all
> relevant optimizations for a table (including delete file removal). At the
> end, potentially, only some unnecessary equality delete files will be
> remaining which will have to be deleted by some other action.
>
> Thanks,
> - Puneet
>
>
>
>
> On Tue, Nov 2, 2021 at 1:17 PM Jack Ye <yezhao...@gmail.com> wrote:
>
>> > why can't this strategy do bin-packing or sorting as well; if that is
>> required; as long as the sequence number is not updated.
>> > wouldn't subsequent reads re-apply the delete files which were used in
>> the merge as well?
>>
>> I think you are right, we can use the sequence number of the snapshot we
>> start compaction as the new sequence number of all rewritten files because
>> all the deletes of older sequence numbers related to the file have been
>> applied. (correct me if I missed anything here)
>> But from the correctness perspective it's the same because we remove
>> those delete files as a part of the compaction so we will not really
>> reapply those deletes.
>> With this improvement, we can now also do bin-packing easily. (If we
>> preserve all sequence numbers, we can still do bin-packing for files
>> sharing the same sequence number, what I described was just the easiest way
>> to ensure sequence number preservation)
>>
>> > for the tables only being written into by a Flink CDC pipeline, this
>> should not happen as position deletes are only created for in-progress
>> (uncommitted) data files, correct ?
>>
>> Correct
>>
>> > I believe for the CDC use case it is hard to guarantee that  that
>> partitions will turn cold and can be merged without conflicts, as 'hotness'
>> is a factor of mutation rate in the source DB
>>
>> I agree. When I say hot/cold I am mostly referring to those
>> time-partitioned use cases with clear hot-cold division of data. But in the
>> end the principle is that the compaction strategy should be determined by
>> the characteristics of the partition. If all the partitions are always with
>> high traffic, then I think merge with preserved sequence number seems like
>> the way to go for the entire table.
>>
>> I have recently summarized all the related concepts in
>> https://github.com/apache/iceberg/pull/3432, it would be great if you
>> can take a look about anything else I missed, thanks!
>>
>> Best,
>> Jack Ye
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Nov 1, 2021 at 2:44 PM Puneet Zaroo <pza...@netflix.com.invalid>
>> wrote:
>>
>>> Another follow-up regarding this : *"Merge strategy that does not do
>>> any bin-packing, and only merges the delete files for each data file and
>>> writes it back. The new data file will have the same sequence number as the
>>> old file before Merge"* ; shouldn't the sequence number be set to the
>>> highest sequence number of any applied delete file to the data file. If the
>>> sequence number of the data file is not changed at-all, wouldn't subsequent
>>> reads re-apply the delete files which were used in the merge as well?
>>>
>>> thanks
>>>
>>> On Mon, Nov 1, 2021 at 2:41 PM Puneet Zaroo <pza...@netflix.com> wrote:
>>>
>>>> I had a few follow-up points.
>>>>
>>>> 1 *"(1) for hot partitions, users can try to only perform Convert and
>>>> Rewrite to keep delete file sizes and count manageable, until the partition
>>>> becomes cold and a Merge can be performed safely.".* : I believe for
>>>> the CDC use case it is hard to guarantee that  that partitions will turn
>>>> cold and can be merged without conflicts, as 'hotness' is a factor of
>>>> mutation rate in the source DB; and perhaps some partitions are always
>>>> "hot"; so in essence the following:  *"Merge strategy that does not do
>>>> any bin-packing, and only merges the delete files for each data file and
>>>> writes it back. The new data file will have the same sequence number as the
>>>> old file before Merge"* seems important. Though as a follow-up I am
>>>> wondering why can't this strategy do bin-packing or sorting as well; if
>>>> that is required; as long as the sequence number is not updated.
>>>>
>>>> 2 *"During the commit validation phase of a Merge operation, we need
>>>> to verify that for each data file that would be removed, there are no new
>>>> position deletes with higher sequence number added."* : Just to be
>>>> clear; for the tables only being written into by a Flink CDC pipeline, this
>>>> should not happen as position deletes are only created for in-progress
>>>> (uncommitted) data files, correct ?
>>>>
>>>> Thanks and regards,
>>>> - Puneet
>>>>
>>>>
>>>>
>>>> On Thu, Oct 21, 2021 at 10:54 PM Jack Ye <yezhao...@gmail.com> wrote:
>>>>
>>>>> Had some offline discussions on Slack and WeChat.
>>>>>
>>>>> For Russell's point, we are reconfirming with related people on Slack,
>>>>> and will post updates once we have an agreement.
>>>>>
>>>>> Regarding point 6, for Flink CDC the data file flushed to disk might
>>>>> be associated with position deletes, but after the flush all deletes will
>>>>> be equality deletes, so 6-2 still works. After all, as long as data files
>>>>> for position deletes are not removed, the process should be able to 
>>>>> succeed
>>>>> with optimistic retry. Currently we are missing the following that needs 
>>>>> to
>>>>> be worked on to resolve the CDC performance issue:
>>>>> 1. We need to support setting the sequence number for individual
>>>>> content files.
>>>>> 2. During the commit validation phase of a Merge operation, we need to
>>>>> verify that for each data file that would be removed, there are no new
>>>>> position deletes with higher sequence number added. If detected, merge of
>>>>> that file has to be completely retried (we can support incremental 
>>>>> progress
>>>>> for this).
>>>>>
>>>>> -Jack
>>>>>
>>>>>
>>>>> On Thu, Oct 21, 2021 at 7:58 PM Russell Spitzer <
>>>>> russell.spit...@gmail.com> wrote:
>>>>>
>>>>>> I think I understood the Rewrite strategy discussion a little
>>>>>> differently
>>>>>>
>>>>>> Binpack Strategy and SortStrategy each get a new flag which lets you
>>>>>> pick files based on their number of delete files. So basically you can 
>>>>>> set
>>>>>> a variety of parameters, small files, large files, files with deletes etc
>>>>>> ...
>>>>>>
>>>>>> A new strategy is added which determines which file to rewrite by
>>>>>> looking for all files currently touched by delete files. Instead of 
>>>>>> looking
>>>>>> through files with X deletes, we look up all files affected by deletes 
>>>>>> and
>>>>>> rewrite them. Although now as I write this it's basically running the 
>>>>>> above
>>>>>> strategies with number of delete files >= 1 and files per group at 1. So
>>>>>> maybe it doesn't need another strategy?
>>>>>>
>>>>>> But maybe I got that wrong ...
>>>>>>
>>>>>> On Thu, Oct 21, 2021 at 8:39 PM Jack Ye <yezhao...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks to everyone who came to the meeting.
>>>>>>>
>>>>>>> Here is the full meeting recording I made:
>>>>>>> https://drive.google.com/file/d/1yuBFlNn9nkMlH9TIut2H8CXmJGLd18Sa/view?usp=sharing
>>>>>>>
>>>>>>> Here are some key takeaways:
>>>>>>>
>>>>>>> 1. we generally agreed upon the division of compactions into
>>>>>>> Rewrite, Convert and Merge.
>>>>>>>
>>>>>>> 2. Merge will be implemented through RewriteDataFiles as proposed in
>>>>>>> https://github.com/apache/iceberg/pull/3207, but instead as a new
>>>>>>> strategy by extending the existing BinPackStrategy. For users who would
>>>>>>> also like to run sort during Merge, we will have another delete strategy
>>>>>>> that extends the SortStrategy.
>>>>>>>
>>>>>>> 3. Merge can have an option that allows users to set the minimum
>>>>>>> numbers of delete files to trigger a compaction. However, that would 
>>>>>>> result
>>>>>>> in very frequent compaction of full partition if people add many global
>>>>>>> delete files. A Convert of global equality deletes to partition
>>>>>>> position deletes while maintaining the same sequence number can be used 
>>>>>>> to
>>>>>>> solve the issue. Currently there is no way to write files with a custom
>>>>>>> sequence number. This functionality needs to be added.
>>>>>>>
>>>>>>> 4. we generally agreed upon the APIs for Rewrite and Convert at
>>>>>>> https://github.com/apache/iceberg/pull/2841.
>>>>>>>
>>>>>>> 5. we had some discussion around the separation of row and partition
>>>>>>> level filters. The general direction in the meeting is to just have a
>>>>>>> single filter method. We will sync offline to reach an agreement.
>>>>>>>
>>>>>>> 6. people raised the issue that if new delete files are added to a
>>>>>>> data file while a Merge is going on, then the Merge would fail. That 
>>>>>>> causes
>>>>>>> huge performance issues for CDC streaming use cases and Merge is very 
>>>>>>> hard
>>>>>>> to succeed. There are 2 proposed solutions:
>>>>>>>   (1) for hot partitions, users can try to only perform Convert and
>>>>>>> Rewrite to keep delete file sizes and count manageable, until the 
>>>>>>> partition
>>>>>>> becomes cold and a Merge can be performed safely.
>>>>>>>   (2) it looks like we need a Merge strategy that does not do any
>>>>>>> bin-packing, and only merges the delete files for each data file and 
>>>>>>> writes
>>>>>>> it back. The new data file will have the same sequence number as the old
>>>>>>> file before Merge. By doing so, new delete files can still be applied
>>>>>>> safely and the compaction can succeed without concerns around conflict. 
>>>>>>> The
>>>>>>> caveat is that this does not work for position deletes because the row
>>>>>>> position changes for each file after Merge. But for the CDC streaming 
>>>>>>> use
>>>>>>> case it is acceptable to only write equality deletes, so this looks 
>>>>>>> like a
>>>>>>> feasible approach.
>>>>>>>
>>>>>>> 7. people raised the concern about the memory consumption issue for
>>>>>>> the is_deleted metadata column. We ran out of time and will continue the
>>>>>>> discussion offline on Slack.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jack Ye
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Oct 18, 2021 at 7:50 PM Jack Ye <yezhao...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi everyone,
>>>>>>>>
>>>>>>>> We are planning to have a meeting to discuss the design of Iceberg
>>>>>>>> delete compaction on Thursday 5-6pm PDT. The meeting link is
>>>>>>>> https://meet.google.com/nxx-nnvj-omx.
>>>>>>>>
>>>>>>>> We have also created the channel #compaction on Slack, please join
>>>>>>>> the channel for daily discussions if you are interested in the 
>>>>>>>> progress.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jack Ye
>>>>>>>>
>>>>>>>> On Tue, Sep 28, 2021 at 10:23 PM Jack Ye <yezhao...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi everyone,
>>>>>>>>>
>>>>>>>>> As there are more and more people adopting the v2 spec, we are
>>>>>>>>> seeing an increasing number of requests for delete compaction support.
>>>>>>>>>
>>>>>>>>> Here is a document discussing the use cases and basic interface
>>>>>>>>> design for it to get the community aligned around what compactions we 
>>>>>>>>> would
>>>>>>>>> offer and how the interfaces would be divided:
>>>>>>>>> https://docs.google.com/document/d/1-EyKSfwd_W9iI5jrzAvomVw3w1mb_kayVNT7f2I-SUg
>>>>>>>>>
>>>>>>>>> Any feedback would be appreciated!
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jack Ye
>>>>>>>>>
>>>>>>>>

Reply via email to