Hey Erik,

I was able to parse your proposal quite quickly since it seems to be pretty 
much exactly the "lazy with natural key" in our proposed approach.


I must say it is however much concretely expressed.

This natural key approach is in fact our preferred option too, even though it 
wasn't explicitly said in the document. I guess we wanted to get more feedback 
just in case we're missing something. Turns out we weren't :)

In particular, I fully agree with this:

"The effective dataset of any other version is the equivalent of the previous 
version, anti-join the deletes in the current version (on the unique row 
identifier), union the insertions in the current version. Optimal performance 
will be achieved when consistent partition specifications and sorting are used 
from one version to the next (minimizing the number of deletes that need to be 
compared to each insertion)."

Yes, nothing more than sorted natural keys is needed to achieve a good 
implementation that covers all use cases.

One thing though: guaranteeing uniques doesn't seem possible with large 
datasets. In fact, no real big data system does that (We posted some examples 
in the comment thread on our proposal doc).

It's not clear from your doc how enforcing uniqueness would work ?

The good news however, is we don't need uniqueness for all of this to work. It 
works just the same even if there are duplicates, however with a bit more care 
in implementation to handle 
some corner cases consistently.

 Let me know if I got the gist of your proposal right ?

Thanks,
Cristian


> On 7 Jun 2019, at 19:47, Ryan Blue <rb...@netflix.com.INVALID> wrote:
> 
> Thanks, Erik! Great to see progress here. I'll set aside some time to look 
> this over in detail.
> 
> On Fri, Jun 7, 2019 at 11:46 AM Erik Wright <erik.wri...@shopify.com 
> <mailto:erik.wri...@shopify.com>> wrote:
> I apologize for the delay, but I have finally put together a document 
> describing an alternative approach to supporting updates in Iceberg while 
> minimizing write amplification.
> 
> Proposal: Iceberg Merge-on-Read 
> <https://docs.google.com/document/d/1KuOMeS8Hw_yuE5IXtII8EClJlEMtqFECfWbg-gN5lsQ/edit?usp=sharing>
> 
> Thank you, Anton and Miguel, for starting this conversation, and everyone 
> else as well for the ongoing dialogue. I'm looking forward to continuing to 
> discuss this and hopefully finding an approach that can meet our different 
> needs and that we can work together on.
> 
> Cheers,
> 
> Erik
> 
> On Wed, May 29, 2019 at 7:26 PM Jacques Nadeau <jacq...@dremio.com 
> <mailto:jacq...@dremio.com>> wrote:
> Yeah, I totally forgot to record our discussion. Will do so next time, sorry.
> --
> Jacques Nadeau
> CTO and Co-Founder, Dremio
> 
> 
> On Wed, May 29, 2019 at 4:24 PM Ryan Blue <rb...@netflix.com 
> <mailto:rb...@netflix.com>> wrote:
> It wasn't recorded, but I can summarize what we talked about. Sorry I haven't 
> sent this out earlier.
> 
> We talked about the options and some of the background in Iceberg -- 
> basically that it isn't possible to determine the order of commits before you 
> commit so you can't rely on some monotonically increasing value from a 
> snapshot to know which deltas to apply to a file. The result is that we can't 
> apply diffs to data files using a rule like "files older than X" because we 
> can't identify those files without the snapshot history.
> 
> That gives us basically 2 options for scoping delete diffs: either identify 
> the files to apply a diff to when writing the diff, or log changes applied to 
> a snapshot and keep the snapshot history around (which is how we know the 
> order of snapshots). The first option is not good if you want to write 
> without reading data to determine where the deleted records are. The second 
> prevents cleaning up snapshot history.
> 
> We also talked about whether we should encode IDs in data files. Jacques 
> pointed out that retrying a commit is easier if you don't need to re-read the 
> original data to reconcile changes. For example, if a data file was compacted 
> in a concurrent write, how do we reconcile a delete for it? We discussed 
> other options, like rolling back the compaction for delete events. I think 
> that's a promising option.
> 
> For action items, Jacques was going to think about whether we need to encode 
> IDs in data files or if we could use positions to identify rows and write up 
> a summary/proposal. Erik was going to take on planning how identifying rows 
> without reading data would work and similarly write up a summary/proposal.
> 
> That's from memory, so if I've missed anything, I hope that other attendees 
> will fill in the details!
> 
> rb
> 
> On Wed, May 29, 2019 at 3:34 PM Venkatakrishnan Sowrirajan <vsowr...@asu.edu 
> <mailto:vsowr...@asu.edu>> wrote:
> Hi Ryan,
> 
> I couldn't attend the meeting. Just curious, if this is recorded by any 
> chance.
> 
> Regards
> Venkata krishnan
> 
> 
> On Fri, May 24, 2019 at 8:49 AM Ryan Blue <rb...@netflix.com.invalid> wrote:
> Yes, I agree. I'll talk a little about a couple of the constraints of this as 
> well.
> 
> On Fri, May 24, 2019 at 5:52 AM Anton Okolnychyi <aokolnyc...@apple.com 
> <mailto:aokolnyc...@apple.com>> wrote:
> The agenda looks good to me. I think it would also make sense to clarify the 
> responsibilities of query engines and Iceberg. Not only in terms of 
> uniqueness, but also in terms of applying diffs on read, for example. 
> 
>> On 23 May 2019, at 01:59, Ryan Blue <rb...@netflix.com.INVALID 
>> <mailto:rb...@netflix.com.INVALID>> wrote:
>> 
>> Here’s a rough agenda:
>> 
>> Use cases: everyone come with a use case that you’d like to have supported. 
>> We’ll go around and introduce ourselves and our use cases.
>> Main topic: How should Iceberg identify rows that are deleted?
>> Side topics from my initial email, if we have time: should we use insert 
>> diffs, should we support dense and sparse formats, etc.
>> The main topic I think we should discuss is: How should Iceberg identify 
>> rows that are deleted?
>> 
>> I’m phrasing it this way to avoid where I think we’re talking past one 
>> another because we are making assumptions. The important thing is that there 
>> are two main options:
>> 
>> Filename and position, vs
>> Specific values of (few) columns in the data
>> This phrasing also avoids discussing uniqueness constraints. Once we get 
>> down to behavior, I think we agree. For example, I think we all agree that 
>> uniqueness cannot be enforced in Iceberg.
>> 
>> If uniqueness can’t be enforced in Iceberg, the main choice comes down to 
>> how we identify rows that are deleted. If we use (filename, position) then 
>> we know that there is only one row. On the other hand, if we use data values 
>> to identify rows then a delete may identify more than one row because there 
>> are no uniqueness guarantees. I think we also agree that if there is more 
>> than one row identified, all of them should be deleted.
>> 
>> At that point, there are trade-offs between the approaches:
>> 
>> When identifying deleted rows by data values, situations like the one that 
>> Anton pointed out are possible.
>> Jacques also had a good point about concurrency. If at all possible, we want 
>> to be able to reconcile changes between concurrent commits without 
>> re-running an operation.
>> Sound like a reasonable amount to talk through?
>> 
>> rb
>> 
>> 
>> On Wed, May 22, 2019 at 1:17 PM Erik Wright <erik.wri...@shopify.com 
>> <mailto:erik.wri...@shopify.com>> wrote:
>> 
>> 
>> On Wed, May 22, 2019 at 4:04 PM Cristian Opris <cop...@apple.com.invalid 
>> <mailto:cop...@apple.com.invalid>> wrote:
>> Agreed with Erik here, we're certainly not looking to build the equivalent 
>> of a relational database, and for that matter not even that of a local disk 
>> storage analytics database (like Vertica). Those are very 
>> different designs with very different trade-offs and optimizations.
>> 
>> We're looking to automate and optimize specific types of file manipulation 
>> for large files on remote storage, while presenting that to the user under 
>> the common SQL API for bulk data manipulation (MERGE INTO)
>> 
>> What I would encourage is to decouple the storage model from the 
>> implementation of that API. If Iceberg has support for merge-on-read of 
>> upserts and deletes, in addition to its powerful support for partitioning, 
>> it will be easy for a higher-level application to implement those APIs given 
>> certain other constraints (that might not be appropriate to all 
>> applications).
>> 
>> Myself and Miguel are out on Friday, but Anton should be able to handle the 
>> discussion on our side.
>> 
>> 
>> Thanks,
>> Cristian
>> 
>> 
>>> On 22 May 2019, at 17:51, Erik Wright <erik.wri...@shopify.com.INVALID 
>>> <mailto:erik.wri...@shopify.com.INVALID>> wrote:
>>> 
>>> We have two rows with the same natural key and we use that natural key in 
>>> diff files:
>>> nk | col1 | col2
>>> 1 | 1 | 1
>>> 1 | 2 | 2
>>> Then we have a delete statement:
>>> DELETE FROM t WHERE col1 = 1
>>> 
>>> I think this example cuts to the point of the differences of understanding. 
>>> Does Iceberg want to be approaching the utility of a relational database, 
>>> against which I can execute complex update queries? This is not what I 
>>> would have imagined.
>>> 
>>> I would have, instead, imagined that it was up to the client to identify, 
>>> through whatever means, that they want to update or delete a row with a 
>>> given ID. If there are multiple (distinct) rows with the same ID, _too 
>>> bad_. Any user should _expect_ that they could potentially see any one or 
>>> more of those rows at read time. And that an upsert/delete would affect 
>>> any/all of them (I would argue for all).
>>> 
>>> In summary: Instead of trying to come up with a consistent, logical 
>>> handling for complex queries that are best suited for a relational 
>>> database, leave such handling up to the client and concentrate on problems 
>>> that can be solved simply and more generally.
>>> 
>>> On Wed, May 22, 2019 at 12:11 PM Ryan Blue <rb...@netflix.com.invalid 
>>> <mailto:rb...@netflix.com.invalid>> wrote:
>>> Yes, I think we should. I was going to propose one after catching up on the 
>>> rest of this thread today.
>>> 
>>> On Wed, May 22, 2019 at 9:08 AM Anton Okolnychyi <aokolnyc...@apple.com 
>>> <mailto:aokolnyc...@apple.com>> wrote:
>>> Thanks! Would it make sense to discuss the agenda in advance?
>>> 
>>>> On 22 May 2019, at 17:04, Ryan Blue <rb...@netflix.com.INVALID 
>>>> <mailto:rb...@netflix.com.INVALID>> wrote:
>>>> 
>>>> I sent out an invite and included everyone on this thread. If anyone else 
>>>> would like to join, please join the Zoom meeting. If you'd like to be 
>>>> added to the calendar invite, just let me know and I'll add you.
>>>> 
>>>> On Wed, May 22, 2019 at 8:57 AM Jacques Nadeau <jacq...@dremio.com 
>>>> <mailto:jacq...@dremio.com>> wrote:
>>>> works for me.
>>>> 
>>>> To make things easier, we can use my zoom meeting if people like:
>>>> 
>>>> Join Zoom Meeting
>>>> https://zoom.us/j/4157302092 <https://zoom.us/j/4157302092>
>>>> 
>>>> One tap mobile
>>>> +16465588656,,4157302092# US (New York)
>>>> +16699006833,,4157302092# US (San Jose)
>>>> 
>>>> Dial by your location
>>>>         +1 646 558 8656 US (New York)
>>>>         +1 669 900 6833 US (San Jose)
>>>>         877 853 5257 US Toll-free
>>>>         888 475 4499 US Toll-free
>>>> Meeting ID: 415 730 2092
>>>> Find your local number: https://zoom.us/u/aH9XYBfm 
>>>> <https://zoom.us/u/aH9XYBfm>
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Jacques Nadeau
>>>> CTO and Co-Founder, Dremio
>>>> 
>>>> 
>>>> On Wed, May 22, 2019 at 8:54 AM Ryan Blue <rb...@netflix.com.invalid 
>>>> <mailto:rb...@netflix.com.invalid>> wrote:
>>>> 9AM on Friday works best for me. How about then?
>>>> 
>>>> On Wed, May 22, 2019 at 5:05 AM Anton Okolnychyi <aokolnyc...@apple.com 
>>>> <mailto:aokolnyc...@apple.com>> wrote:
>>>> What about this Friday? One hour slot from 9:00 to 10:00 am or 10:00 to 
>>>> 11:00 am PST? Some folks are based in London, so meeting later than this 
>>>> is hard. If Friday doesn’t work, we can consider Tuesday or Wednesday next 
>>>> week.
>>>> 
>>>>> On 22 May 2019, at 00:54, Jacques Nadeau <jacq...@dremio.com 
>>>>> <mailto:jacq...@dremio.com>> wrote:
>>>>> 
>>>>> I agree with Anton that we should probably spend some time on hangouts 
>>>>> further discussing things. Definitely differing expectations here and we 
>>>>> seem to be talking a bit past each other.
>>>>> --
>>>>> Jacques Nadeau
>>>>> CTO and Co-Founder, Dremio
>>>>> 
>>>>> 
>>>>> On Tue, May 21, 2019 at 3:44 PM Cristian Opris <cop...@apple.com.invalid 
>>>>> <mailto:cop...@apple.com.invalid>> wrote:
>>>>> I love a good flame war :P
>>>>> 
>>>>>> On 21 May 2019, at 22:57, Jacques Nadeau <jacq...@dremio.com 
>>>>>> <mailto:jacq...@dremio.com>> wrote:
>>>>>> 
>>>>>> 
>>>>>> That's my point, truly independent writers (two Spark jobs, or a Spark 
>>>>>> job and Dremio job) means a distributed transaction. It would need yet 
>>>>>> another external transaction coordinator on top of both Spark and 
>>>>>> Dremio, Iceberg by itself 
>>>>>> cannot solve this.
>>>>>>  
>>>>>> I'm not ready to accept this. Iceberg already supports a set of 
>>>>>> semantics around multiple writers committing simultaneously and how 
>>>>>> conflict resolution is done. The same can be done here.
>>>>> 
>>>>> 
>>>>> 
>>>>> MVCC (which is what Iceberg tries to implement) requires a total ordering 
>>>>> of snapshots. Also the snapshots need to be non-conflicting. I really 
>>>>> don't see how any metadata data structures can solve this without an 
>>>>> outside coordinator.
>>>>> 
>>>>> Consider this:
>>>>> 
>>>>> Snapshot 0: (K,A) = 1
>>>>> Job X: UPDATE K SET A=A+1
>>>>> Job Y: UPDATE K SET A=10
>>>>> 
>>>>> What should the final value of A be and who decides ?
>>>>> 
>>>>>>  
>>>>>> By single writer, I don't mean single process, I mean multiple 
>>>>>> coordinated processes like Spark executors coordinated by Spark driver. 
>>>>>> The coordinator ensures that the data is pre-partitioned on 
>>>>>> each executor, and the coordinator commits the snapshot. 
>>>>>> 
>>>>>> Note however that single writer job/multiple concurrent reader jobs is 
>>>>>> perfectly feasible, i.e. it shouldn't be a problem to write from a Spark 
>>>>>> job and read from multiple Dremio queries concurrently (for example)
>>>>>> 
>>>>>> :D This is still "single process" from my perspective. That process may 
>>>>>> be coordinating other processes to do distributed work but ultimately it 
>>>>>> is a single process. 
>>>>> 
>>>>> Fair enough
>>>>> 
>>>>>>  
>>>>>> I'm not sure what you mean exactly. If we can't enforce uniqueness we 
>>>>>> shouldn't assume it.
>>>>>>  
>>>>>> I disagree. We can specify that as a requirement and state that you'll 
>>>>>> get unintended consequences if you provide your own keys and don't 
>>>>>> maintain this.
>>>>> 
>>>>> There's no need for unintended consequences, we can specify consistent 
>>>>> behaviour (and I believe the document says what that is)
>>>>> 
>>>>> 
>>>>>>  
>>>>>> We do expect that most of the time the natural key is unique, but the 
>>>>>> eager and lazy with natural key designs can handle duplicates 
>>>>>> consistently. Basically it's not a problem to have duplicate natural 
>>>>>> keys, everything works fine.
>>>>>> 
>>>>>> That heavily depends on how things are implemented. For example, we may 
>>>>>> write a bunch of code that generates internal data structures based on 
>>>>>> this expectation. If we have to support duplicate matches, all of sudden 
>>>>>> we can no longer size various data structures to improve performance and 
>>>>>> may be unable to preallocate memory associated with a guaranteed 
>>>>>> completion.
>>>>> 
>>>>> Again we need to operate on the assumption that this is a large scale 
>>>>> distributed compute/remote storage scenario. Key matching is done with 
>>>>> shuffles with data movement across the network, such optimizations would 
>>>>> really have little impact on overall performance. Not to mention that 
>>>>> most query engines would already optimize the shuffle already as much as 
>>>>> it can be optimized.
>>>>> 
>>>>> It is true that if actual duplicate keys would make the key matching join 
>>>>> (anti-join) somewhat more expensive, however it can be done in such a way 
>>>>> that if the keys are in practice unique the join is as efficient as it 
>>>>> can be.
>>>>> 
>>>>> 
>>>>>> 
>>>>>> Let me try and clarify each point:
>>>>>> 
>>>>>> - lookup for query or update on a non-(partition/bucket/sort) key 
>>>>>> predicate implies scanning large amounts of data - because these are the 
>>>>>> only data structures that can narrow down the lookup, right ? One could 
>>>>>> argue that the min/max index (file skipping) can be applied to any 
>>>>>> column, but in reality if that column is not sorted the min/max 
>>>>>> intervals can have huge overlaps so it may be next to useless.
>>>>>> - remote storage - this is a critical architecture decision - 
>>>>>> implementations on local storage imply a vastly different design for the 
>>>>>> entire system, storage and compute. 
>>>>>> - deleting single records per snapshot is unfeasible in eager but also 
>>>>>> particularly in the lazy design: each deletion creates a very small 
>>>>>> snapshot. Deleting 1 million records one at a time would create 1 
>>>>>> million small files, and 1 million RPC calls.
>>>>>> 
>>>>>> Why is this unfeasible? If I have a dataset of 100mm files including 1mm 
>>>>>> small files, is that a major problem? It seems like your usecase isn't 
>>>>>> one where you want to support single record deletes but it is definitely 
>>>>>> something important to many people.
>>>>> 
>>>>> 100 mm total files or 1 mm files per dataset is definitely a problem on 
>>>>> HDFS, and I believe on S3 too. Single key delete would work just fine, 
>>>>> but it's simply not optimal to do that on remote storage. This is a very 
>>>>> well known problem with HDFS, and one of the very reasons to have 
>>>>> something like Iceberg in the first place.
>>>>> 
>>>>> Basically the users would be able to do single key mutation, but it's not 
>>>>> the use case we should be optimizing for, but it's really not advisable.
>>>>> 
>>>>> 
>>>>>>  
>>>>>> Eager is conceptually just lazy + compaction done, well, eagerly. The 
>>>>>> logic for both is exactly the same, the trade-off is just that with 
>>>>>> eager you implicitly compact every time so that you don't do any work on 
>>>>>> read, while with lazy 
>>>>>> you want to amortize the cost of compaction over multiple snapshots.
>>>>>> 
>>>>>> Basically there should be no difference between the two conceptually, or 
>>>>>> with regard to keys, etc. The only difference is some mechanics in 
>>>>>> implementation.
>>>>>> 
>>>>>> I think you have deconstruct the problem too much to say these are the 
>>>>>> same (or at least that is what I'm starting to think given this thread). 
>>>>>> It seems like real world implementation decisions (per our discussion 
>>>>>> here) are in conflict. For example, you just argued against having a 1mm 
>>>>>> arbitrary mutations but I think that is because you aren't thinking 
>>>>>> about things over time with a delta implementation. Having 10,000 
>>>>>> mutations a day where we do delta compaction once a week
>>>>>> and local file mappings (key to offset sparse bitmaps) seems like it 
>>>>>> could result in very good performance in a case where we're mutating 
>>>>>> small amounts of data. In this scenario, you may not do major compaction 
>>>>>> ever unless you get to a high enough percentage of records that have 
>>>>>> been deleted in the original dataset. That drives a very different set 
>>>>>> of implementation decisions from a situation where you're trying to 
>>>>>> restate an entire partition at once.
>>>>> 
>>>>> 
>>>>> We operate on 1 billion mutations per day at least. This is the problem 
>>>>> Iceberg wants to solve, I believe it's stated upfront. 10000/day is not a 
>>>>> big data problem. It can be done fairly trivially and it would be 
>>>>> supported, but there's not much point in extra optimizing for this use 
>>>>> case I believe.
>>>>> 
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>> 
>>>> 
>>>> -- 
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>> 
>>> 
>>> 
>>> -- 
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>> 
>> 
>> 
>> -- 
>> Ryan Blue
>> Software Engineer
>> Netflix
> 
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix

Reply via email to