I am really glad to see progress on this! I was off for a couple of days, so 
could take a detailed look at Erik’s proposal just recently.

I would highlight the following features of Erik’s proposal:
- Performing certain natural key operations without reading existing data.
- Cheap retries if an update/delete modifies rows in files that are being 
compacted (it is about compaction only as two concurrent 
updates/deletes/inserts will require a row-based conflict resolution mechanism, 
so it is the same as in our proposal).

These two features could not be achieved in our approaches since we relied on 
‘fileId’ to scope deletes. The cost on read in Erik’s approach seems to be 
higher as we basically sort the dataset (or at least a part we query) not only 
by the natural key but also by version + delete flag and then iterate through 
those records. It is a bit hard to predict how that will behave from the 
performance perspective. Maybe, having more details will clarify this for me 
(e.g. whether we can avoid shuffles, what is done by query engines/Iceberg, 
etc). In our approaches, we were reading datasets during updates/deletes and 
then generating diffs that would point to specific rows in specific files, so 
the read path was cheap and straightforward but we had to read the data while 
generating diffs and we could face unresolvable conflicts if a compaction was 
performed concurrently to updates/deletes. Potentially, the latter can be 
circumvented. Also, the cost to read data on write might not be that high 
because the data is sorted by natural keys and we can leverage file stats. If 
we are to update 1 natural key, we will need to read 1 data file or even a part 
of it if we are dealing with columnar data.

Overall, Erik seems to focus on use cases when we already know a set of natural 
keys to update/delete instead of supporting UPDATE/DELETE/MERGE INTO statements 
in a broader sense, where reading data before generating diffs is inevitable in 
certain scenarios. Both use cases are perfectly valid but they are 
substantially different. I think we can consider changes to the proposed 
merge-on-read logic to cover more use cases with natural keys. For example, it 
seems impossible to perform an upsert by natural key without reading data in 
the currently proposed logic. One simple way to support this is to always issue 
a delete before an insert but this will result in an unresolved delete error 
right now. Alternatively, we can always pick the last version of each row so 
that every insert is basically an upsert. In theory, we can even have 
general-purpose updates/deletes using that design. However, UPDATE/DELETE/MERGE 
INTO statements might require reading data to generate diffs, killing the main 
benefit and potentially making the overall solution inefficient as we would 
need to do a lot during write and read. As it stands today, this approach seems 
relevant only if we can perform all operations without reading data.

Also, Ryan’s suggestion with sequence numbers looks promising to me as it 
allows us to achieve the same features with minimum changes to the existing 
metadata.

To sum up, it would be great to support both use cases but I am not sure it can 
be done with one approach. So, supporting multiple ways and letting users 
decide which one they need can be a good way out.

Thanks,
Anton

> On 17 Jun 2019, at 18:53, Erik Wright <erik.wri...@shopify.com> wrote:
> 
> Thanks so much for this in-depth response.
> 
> Analytical and incremental reads are the two use cases that I had primarily 
> been considering. I don't think synthetic keys are necessary for those 
> scenarios, although perhaps they offer a level of performance optimization.
> 
> The MERGE INTO scenarios are quite different. I agree with the premise that 
> they already require reading data and, therefore, the resulting output should 
> capture as much value from that effort as possible.
> 
> Because snapshots and versions are basically the same idea, we don’t need 
> both. If we were to add versions, they should replace snapshots.
> 
> I'm a little confused by this. You mention sequence numbers quite a bit, but 
> then say that we should not introduce versions. From my understanding, 
> sequence numbers and versions are essentially identical. What has changed is 
> where they are encoded (in file metadata vs. in snapshot metadata). It would 
> also seem necessary to keep them distinct from snapshots in order to be able 
> to collapse/compact files older than N (oldest valid version) without losing 
> the ability to incrementally apply the changes from N + 1.
> 
> Because I am having a really hard time seeing how things would work without 
> these sequence numbers (versions), and given their prominence in your reply, 
> my remaining comments assume they are present, distinct from snapshots, and 
> monotonically increasing.
> 
> Another way of thinking about it: if versions are stored as data/delete file 
> metadata and not in the table metadata file, then the complete history can be 
> kept.
> 
> Sure. Although version expiry would still be an important process to improve 
> read performance and reduce storage overhead.
> 
> Based on your proposal, it would seem that we can basically collapse any 
> given data file with its corresponding deletions. It's important to consider 
> how this will affect incremental readers. The following make sense to me:
> We should track an oldest-valid-version in the table metadata. No changes 
> newer than this version should be collapsed, as a reader who has consumed up 
> to that version must be able to continue reading.
> Any data file strictly older than the oldest-valid-version is eligible for 
> delete collapsing. During delete collapsing, all deletions up to version N 
> (any version <= the oldest-valid-version) are applied to the file. The newly 
> created file should be assigned a sequence number of N.
> Any delete file that has been completely applied is eligible for expiry. A 
> delete file is completely applied if its sequence number is <= all data files 
> to which it may apply (based on partitioning data).
> Any data file equal to or older than the oldest-valid-version is eligible for 
> compaction. During compaction, some or all data files up to version N (any 
> version <= the oldest-valid version) are selected. There may not be any 
> deletions applicable to any of the selected data files. The selected data 
> files are read, rewritten as desired (partitioning, compaction, sorting, 
> etc.). The new files are included in the new snapshot (with sequence number 
> N) while the old files are dropped.
> Delete collapsing, delete expiry, and compaction may be performed in a single 
> commit. The oldest-valid-version may be advanced during this process as well. 
> The outcome must logically be consistent with applying these steps 
> independently.
> Important to note in the above is that, during collapsing/compaction, new 
> files may be emitted with a version number N that are older than the most 
> recent version. This is important to ensure that all deltas newer than N are 
> appropriately applied, and that incremental readers are able to continue 
> processing the dataset.
> 
> It is also compatible with file-level deletes used to age off old data (e.g. 
> delete where day(ts) < '2019-01-01').
> 
> This particular operation is not compatible with incremental consumption. One 
> must still generate a deletion that is associated with a new sequence number. 
> I could see a reasonable workaround where, in order to delete an entire file 
> (insertion file `x.dat`, sequence number X) one would reference the same file 
> a second time (deletion file `x.dat, sequence number Y > X). Since an 
> insertion file and a deletion file have compatible formats, there is no need 
> to rewrite this file simply to mark each row in it as deleted. A savvy 
> consumer would be able to tell that this is a whole-file deletion while a 
> naïve consumer would be able to apply them using the basic algorithm.
> 
> If it seems like we are roughly on the same page I will take a stab at 
> updating that document to go to the same level of detail that it does now but 
> use the sequence-number approach.
> 
> On Fri, Jun 14, 2019 at 6:30 PM Ryan Blue <rb...@netflix.com 
> <mailto:rb...@netflix.com>> wrote:
> First, I want to clear up some of the confusion with a bit of background on 
> how Iceberg works, but I have inline replies below as well.
> 
> Conceptually, a snapshot is a table version. Each snapshot is the complete 
> table state at some point in time and each snapshot is independent. 
> Independence is a key property of snapshots because it allows us to maintain 
> table data and metadata.
> 
> Snapshots track additions and deletions using per-file status 
> (ADDED/EXISTING/DELETED) and the snapshot ID when each file was added or 
> deleted. This can be efficiently loaded and used to process changes 
> incrementally. Snapshots are also annotated with the operation that created 
> each snapshot. This is used to identify snapshots with no logical changes 
> (replace) and snapshots with no deletions (append) to speed up incremental 
> consumption and operations like snapshot expiration.
> 
> Readers use the current snapshot for isolation from writes. Because readers 
> may be using an old snapshot, Iceberg keeps snapshots around and only 
> physically deletes files when snapshots expire — data files referenced in a 
> valid snapshot cannot be deleted. Independence is important so that snapshots 
> can expire and get cleaned up without any other actions on the table data.
> 
> Consider the alternative: if each snapshot depended on the previous one, then 
> Iceberg cannot expire them and physically delete data. Here’s an example 
> snapshot history:
> 
> S1
> - add file_a.parquet
> - add file_b.parquet
> S2
> - delete file_a.parquet
> S3
> - add file_c.parquet
> To delete file_a.parquet, S1 and S2 need to be rewritten into S2’ that 
> contains only file_b.parquet. This would work, but takes more time than 
> dropping references and deleting files that are no longer used. An expensive 
> metadata rewrite to delete data isn’t ideal, and not taking the time to 
> rewrite makes metadata stack up in the table metadata file, that gets 
> rewritten often.
> 
> Finally, Iceberg relies on regular metadata maintenance — manifest compaction 
> — to reduce both write volume and the number of file reads needed to plan a 
> scan. Snapshots also reuse metadata files from previous snapshots to reduce 
> write volume.
> 
> On Wed, Jun 12, 2019 at 9:12 AM Erik Wright erik.wri...@shopify.com 
> <http://mailto:erik.wri...@shopify.com/> wrote:
> 
> 
> 
> Maintaining separate manifest lists for each version requires keeping 
> versions in separate metadata trees, so we can’t merge manifest files 
> together across versions. We initially had this problem when we used only 
> fast appends for our streaming writes. We ended up with way too many metadata 
> files and job planning took a really long time.
> If I understand correctly, the main concern here is for scenarios where 
> versions are essentially micro-batches. They accumulate rapidly, each one is 
> relatively small, and there may be many valid versions at a time even if 
> expiry happens regularly. The negative impact is many small versions cannot 
> be combined together. Even the manifest list files are many, and all of them 
> need to be read.
> 
> 
> I’m not sure how you define micro-batch, but this happens when commits are on 
> the order of every 10 minutes and consist of large data files. Manifest 
> compaction is required to avoid thousands of small metadata files impacting 
> performance.
> 
> 
> 
> Although snapshots are independent and can be aged off, this requires 
> rewriting the base version to expire versions. That means we can’t delete old 
> data files until versions are rewritten, so the problem affects versions 
> instead of snapshots. We would still have the problem of not being able to 
> delete old data until after a compaction.
> I don't quite understand this point, so forgive me if the following is 
> irrelevant or obvious.
> 
> You are correct that snapshots become less useful as a tool, compared to 
> versions, for expiring data. The process now goes through (1) expire a 
> version and then (2) invalidate the last snapshot containing that version. Of 
> course, fundamentally you need to keep version X around until you are willing 
> to compact delta Y (or more) into it. And this is (I believe) our common 
> purpose in this discussion: permit a trade-off between write amplification 
> and storage/read costs.
> 
> 
> Hopefully the background above is useful. Requiring expensive data-file 
> compaction in order to compact version metadata or delete files is a big 
> problem.
> 
> And you’re right that snapshots are less useful. Because snapshots and 
> versions are basically the same idea, we don’t need both. If we were to add 
> versions, they should replace snapshots. But I don’t think it is a good idea 
> to give up the independence.
> 
> The good news is that we can use snapshots exactly like versions and avoid 
> these problems by adding sequence numbers.
> 
> 
> 
> The value of decoupling versions from snapshots is:
> 
> 1. It makes the process of correctly applying multiple deltas easy to reason 
> about.
> 2. It allows us to issue updates that don't introduce new versions (i.e., 
> those that merely expire old versions, or that perform other optimizations).
> 3. It enables clients to cleanly consume new deltas when those clients are 
> sophisticated enough to incrementally update their own outputs in function of 
> those changes.
> 
> 
> Points 2 and 3 are available already, and 1 can be done with sequence numbers 
> in snapshot metadata.
> 
> 
> 
> Here are some benefits to this approach:
> 
> There is no explicit version that must be expired so compaction is only 
> required to optimize reads — partition or file-level delete can coexist with 
> deltas
> Can you expand on this?
> 
> 
> By adding a sequence number instead of versions, the essential version 
> information is embedded in the existing metadata files. Those files are 
> reused across snapshots so there is no need to compact deltas into data files 
> to reduce the write volume.
> 
> Another way of thinking about it: if versions are stored as data/delete file 
> metadata and not in the table metadata file, then the complete history can be 
> kept.
> 
> It is also compatible with file-level deletes used to age off old data (e.g. 
> delete where day(ts) < '2019-01-01'). When files match a delete, they are 
> removed from metadata and physically deleted when the delete snapshot is 
> expired. This is independent of the delete files that get applied on read. In 
> fact, if a delete file’s sequence number is less than all data files in a 
> partition, the delete itself can be removed and never compacted.
> 
> 
> 
> When a delete’s sequence number is lower than all data files, it can be pruned
> In combination with the previous comment, I have the impression that we might 
> be losing the distinction between snapshots and versions. In particular, what 
> I'm concerned about is that, from a consumer's point of view, it may be 
> difficult to figure out, from one snapshot to the next, what new deltas I 
> need to process to update my view.
> 
> 
> We don’t need both and should go with either snapshots (independent) or 
> versions (dependent). I don’t think it is possible to make dependent versions 
> work.
> 
> But as I noted above, the changes that occurred in a snapshot are available 
> as data file additions and data file deletions. Delete file additions and 
> deletions would be tracked the same way. Consumers would have full access to 
> the changes.
> 
> 
> 
> Compaction can translate a natural key delete to a synthetic key delete by 
> adding a synthetic key delete file and updating the data file’s sequence 
> number
> I don't understand what this means.
> 
> 
> Synthetic keys are better than natural keys in several ways. They are cheaper 
> to apply at read time, they are scoped to a single data file, they compress 
> well, etc. If both are supported, compaction could rewrite a natural key 
> delete as a synthetic key delete to avoid rewriting the entire data file to 
> delete just a few rows.
> 
> In addition, if a natural key delete is applied to an entire table, it would 
> be possible to change sequence numbers to apply this process incrementally. 
> But, this would change history just like rewriting the base version of a 
> table loses the changes in that version. So we would need to be careful about 
> when changing a sequence number is allowed.
> 
> 
> 
> Synthetic and natural key approaches can coexist
> Can you elaborate on what the synthetic key approach would imply, and what 
> use-cases it would serve? 
> 
> 
> Some use cases, like MERGE INTO, already require reading data so we can write 
> synthetic key delete files without additional work. That means Iceberg should 
> support not just natural key deletes, but also synthetic.
> 
> 
> 
> I agree we are getting closer. I am confused by some things, and I suspect 
> that the difference lies in the use-cases I wish to support for incremental 
> consumers of these datasets. I have the impression that most of you folks are 
> primarily considering incremental writes that end up serving analytical use 
> cases (that want to see the full effective dataset as of the most recent 
> snapshot).
> 
> 
> We have both use cases and I want to make sure Iceberg can be used for both.
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix

Reply via email to