Yeah, I totally forgot to record our discussion. Will do so next time, sorry. -- Jacques Nadeau CTO and Co-Founder, Dremio
On Wed, May 29, 2019 at 4:24 PM Ryan Blue <rb...@netflix.com> wrote: > It wasn't recorded, but I can summarize what we talked about. Sorry I > haven't sent this out earlier. > > We talked about the options and some of the background in Iceberg -- > basically that it isn't possible to determine the order of commits before > you commit so you can't rely on some monotonically increasing value from a > snapshot to know which deltas to apply to a file. The result is that we > can't apply diffs to data files using a rule like "files older than X" > because we can't identify those files without the snapshot history. > > That gives us basically 2 options for scoping delete diffs: either > identify the files to apply a diff to when writing the diff, or log changes > applied to a snapshot and keep the snapshot history around (which is how we > know the order of snapshots). The first option is not good if you want to > write without reading data to determine where the deleted records are. The > second prevents cleaning up snapshot history. > > We also talked about whether we should encode IDs in data files. Jacques > pointed out that retrying a commit is easier if you don't need to re-read > the original data to reconcile changes. For example, if a data file was > compacted in a concurrent write, how do we reconcile a delete for it? We > discussed other options, like rolling back the compaction for delete > events. I think that's a promising option. > > For action items, Jacques was going to think about whether we need to > encode IDs in data files or if we could use positions to identify rows and > write up a summary/proposal. Erik was going to take on planning how > identifying rows without reading data would work and similarly write up a > summary/proposal. > > That's from memory, so if I've missed anything, I hope that other > attendees will fill in the details! > > rb > > On Wed, May 29, 2019 at 3:34 PM Venkatakrishnan Sowrirajan < > vsowr...@asu.edu> wrote: > >> Hi Ryan, >> >> I couldn't attend the meeting. Just curious, if this is recorded by any >> chance. >> >> Regards >> Venkata krishnan >> >> >> On Fri, May 24, 2019 at 8:49 AM Ryan Blue <rb...@netflix.com.invalid> >> wrote: >> >>> Yes, I agree. I'll talk a little about a couple of the constraints of >>> this as well. >>> >>> On Fri, May 24, 2019 at 5:52 AM Anton Okolnychyi <aokolnyc...@apple.com> >>> wrote: >>> >>>> The agenda looks good to me. I think it would also make sense to >>>> clarify the responsibilities of query engines and Iceberg. Not only in >>>> terms of uniqueness, but also in terms of applying diffs on read, for >>>> example. >>>> >>>> On 23 May 2019, at 01:59, Ryan Blue <rb...@netflix.com.INVALID> wrote: >>>> >>>> Here’s a rough agenda: >>>> >>>> - Use cases: everyone come with a use case that you’d like to have >>>> supported. We’ll go around and introduce ourselves and our use cases. >>>> - Main topic: How should Iceberg identify rows that are deleted? >>>> - Side topics from my initial email, if we have time: should we use >>>> insert diffs, should we support dense and sparse formats, etc. >>>> >>>> The main topic I think we should discuss is: *How should Iceberg >>>> identify rows that are deleted?* >>>> >>>> I’m phrasing it this way to avoid where I think we’re talking past one >>>> another because we are making assumptions. The important thing is that >>>> there are two main options: >>>> >>>> - Filename and position, vs >>>> - Specific values of (few) columns in the data >>>> >>>> This phrasing also avoids discussing uniqueness constraints. Once we >>>> get down to behavior, I think we agree. For example, I think we all agree >>>> that uniqueness cannot be enforced in Iceberg. >>>> >>>> If uniqueness can’t be enforced in Iceberg, the main choice comes down >>>> to how we identify rows that are deleted. If we use (filename, position) >>>> then we know that there is only one row. On the other hand, if we use data >>>> values to identify rows then a delete may identify more than one row >>>> because there are no uniqueness guarantees. I think we also agree that if >>>> there is more than one row identified, all of them should be deleted. >>>> >>>> At that point, there are trade-offs between the approaches: >>>> >>>> - When identifying deleted rows by data values, situations like the >>>> one that Anton pointed out are possible. >>>> - Jacques also had a good point about concurrency. If at all >>>> possible, we want to be able to reconcile changes between concurrent >>>> commits without re-running an operation. >>>> >>>> Sound like a reasonable amount to talk through? >>>> >>>> rb >>>> >>>> On Wed, May 22, 2019 at 1:17 PM Erik Wright <erik.wri...@shopify.com> >>>> wrote: >>>> >>>>> >>>>> >>>>> On Wed, May 22, 2019 at 4:04 PM Cristian Opris < >>>>> cop...@apple.com.invalid> wrote: >>>>> >>>>>> Agreed with Erik here, we're certainly not looking to build the >>>>>> equivalent of a relational database, and for that matter not even that >>>>>> of a >>>>>> local disk storage analytics database (like Vertica). Those are very >>>>>> different designs with very different trade-offs and optimizations. >>>>>> >>>>>> We're looking to automate and optimize specific types of file >>>>>> manipulation for large files on remote storage, while presenting that to >>>>>> the user under the common SQL API for *bulk* data manipulation >>>>>> (MERGE INTO) >>>>>> >>>>> >>>>> What I would encourage is to decouple the storage model from the >>>>> implementation of that API. If Iceberg has support for merge-on-read of >>>>> upserts and deletes, in addition to its powerful support for partitioning, >>>>> it will be easy for a higher-level application to implement those APIs >>>>> given certain other constraints (that might not be appropriate to all >>>>> applications). >>>>> >>>>> Myself and Miguel are out on Friday, but Anton should be able to >>>>>> handle the discussion on our side. >>>>>> >>>>>> >>>>>> Thanks, >>>>>> Cristian >>>>>> >>>>>> >>>>>> On 22 May 2019, at 17:51, Erik Wright < >>>>>> erik.wri...@shopify.com.INVALID> wrote: >>>>>> >>>>>> We have two rows with the same natural key and we use that natural >>>>>>> key in diff files: >>>>>>> nk | col1 | col2 >>>>>>> 1 | 1 | 1 >>>>>>> 1 | 2 | 2 >>>>>>> Then we have a delete statement: >>>>>>> DELETE FROM t WHERE col1 = 1 >>>>>> >>>>>> >>>>>> I think this example cuts to the point of the differences of >>>>>> understanding. Does Iceberg want to be approaching the utility of a >>>>>> relational database, against which I can execute complex update queries? >>>>>> This is not what I would have imagined. >>>>>> >>>>>> I would have, instead, imagined that it was up to the client to >>>>>> identify, through whatever means, that they want to update or delete a >>>>>> row >>>>>> with a given ID. If there are multiple (distinct) rows with the same ID, >>>>>> _too bad_. Any user should _expect_ that they could potentially see any >>>>>> one >>>>>> or more of those rows at read time. And that an upsert/delete would >>>>>> affect >>>>>> any/all of them (I would argue for all). >>>>>> >>>>>> *In summary:* Instead of trying to come up with a consistent, >>>>>> logical handling for complex queries that are best suited for a >>>>>> relational >>>>>> database, leave such handling up to the client and concentrate on >>>>>> problems >>>>>> that can be solved simply and more generally. >>>>>> >>>>>> On Wed, May 22, 2019 at 12:11 PM Ryan Blue <rb...@netflix.com.invalid> >>>>>> wrote: >>>>>> >>>>>>> Yes, I think we should. I was going to propose one after catching up >>>>>>> on the rest of this thread today. >>>>>>> >>>>>>> On Wed, May 22, 2019 at 9:08 AM Anton Okolnychyi < >>>>>>> aokolnyc...@apple.com> wrote: >>>>>>> >>>>>>>> Thanks! Would it make sense to discuss the agenda in advance? >>>>>>>> >>>>>>>> On 22 May 2019, at 17:04, Ryan Blue <rb...@netflix.com.INVALID> >>>>>>>> wrote: >>>>>>>> >>>>>>>> I sent out an invite and included everyone on this thread. If >>>>>>>> anyone else would like to join, please join the Zoom meeting. If you'd >>>>>>>> like >>>>>>>> to be added to the calendar invite, just let me know and I'll add you. >>>>>>>> >>>>>>>> On Wed, May 22, 2019 at 8:57 AM Jacques Nadeau <jacq...@dremio.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> works for me. >>>>>>>>> >>>>>>>>> To make things easier, we can use my zoom meeting if people like: >>>>>>>>> >>>>>>>>> Join Zoom Meeting >>>>>>>>> https://zoom.us/j/4157302092 >>>>>>>>> >>>>>>>>> One tap mobile >>>>>>>>> +16465588656,,4157302092# US (New York) >>>>>>>>> +16699006833,,4157302092# US (San Jose) >>>>>>>>> >>>>>>>>> Dial by your location >>>>>>>>> +1 646 558 8656 US (New York) >>>>>>>>> +1 669 900 6833 US (San Jose) >>>>>>>>> 877 853 5257 US Toll-free >>>>>>>>> 888 475 4499 US Toll-free >>>>>>>>> Meeting ID: 415 730 2092 >>>>>>>>> Find your local number: https://zoom.us/u/aH9XYBfm >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Jacques Nadeau >>>>>>>>> CTO and Co-Founder, Dremio >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, May 22, 2019 at 8:54 AM Ryan Blue < >>>>>>>>> rb...@netflix.com.invalid> wrote: >>>>>>>>> >>>>>>>>>> 9AM on Friday works best for me. How about then? >>>>>>>>>> >>>>>>>>>> On Wed, May 22, 2019 at 5:05 AM Anton Okolnychyi < >>>>>>>>>> aokolnyc...@apple.com> wrote: >>>>>>>>>> >>>>>>>>>>> What about this Friday? One hour slot from 9:00 to 10:00 am or >>>>>>>>>>> 10:00 to 11:00 am PST? Some folks are based in London, so meeting >>>>>>>>>>> later >>>>>>>>>>> than this is hard. If Friday doesn’t work, we can consider Tuesday >>>>>>>>>>> or >>>>>>>>>>> Wednesday next week. >>>>>>>>>>> >>>>>>>>>>> On 22 May 2019, at 00:54, Jacques Nadeau <jacq...@dremio.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> I agree with Anton that we should probably spend some time on >>>>>>>>>>> hangouts further discussing things. Definitely differing >>>>>>>>>>> expectations here >>>>>>>>>>> and we seem to be talking a bit past each other. >>>>>>>>>>> -- >>>>>>>>>>> Jacques Nadeau >>>>>>>>>>> CTO and Co-Founder, Dremio >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, May 21, 2019 at 3:44 PM Cristian Opris < >>>>>>>>>>> cop...@apple.com.invalid> wrote: >>>>>>>>>>> >>>>>>>>>>>> I love a good flame war :P >>>>>>>>>>>> >>>>>>>>>>>> On 21 May 2019, at 22:57, Jacques Nadeau <jacq...@dremio.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> That's my point, truly independent writers (two Spark jobs, or >>>>>>>>>>>>> a Spark job and Dremio job) means a distributed transaction. It >>>>>>>>>>>>> would need >>>>>>>>>>>>> yet another external transaction coordinator on top of both Spark >>>>>>>>>>>>> and >>>>>>>>>>>>> Dremio, Iceberg by itself >>>>>>>>>>>>> cannot solve this. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I'm not ready to accept this. Iceberg already supports a set of >>>>>>>>>>>> semantics around multiple writers committing simultaneously and how >>>>>>>>>>>> conflict resolution is done. The same can be done here. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> MVCC (which is what Iceberg tries to implement) requires a >>>>>>>>>>>> total ordering of snapshots. Also the snapshots need to be >>>>>>>>>>>> non-conflicting. >>>>>>>>>>>> I really don't see how any metadata data structures can solve this >>>>>>>>>>>> without >>>>>>>>>>>> an outside coordinator. >>>>>>>>>>>> >>>>>>>>>>>> Consider this: >>>>>>>>>>>> >>>>>>>>>>>> Snapshot 0: (K,A) = 1 >>>>>>>>>>>> Job X: UPDATE K SET A=A+1 >>>>>>>>>>>> Job Y: UPDATE K SET A=10 >>>>>>>>>>>> >>>>>>>>>>>> What should the final value of A be and who decides ? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> By single writer, I don't mean single process, I mean multiple >>>>>>>>>>>>> coordinated processes like Spark executors coordinated by Spark >>>>>>>>>>>>> driver. The >>>>>>>>>>>>> coordinator ensures that the data is pre-partitioned on >>>>>>>>>>>>> each executor, and the coordinator commits the snapshot. >>>>>>>>>>>>> >>>>>>>>>>>>> Note however that single writer job/multiple concurrent reader >>>>>>>>>>>>> jobs is perfectly feasible, i.e. it shouldn't be a problem to >>>>>>>>>>>>> write from a >>>>>>>>>>>>> Spark job and read from multiple Dremio queries concurrently (for >>>>>>>>>>>>> example) >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> :D This is still "single process" from my perspective. That >>>>>>>>>>>> process may be coordinating other processes to do distributed work >>>>>>>>>>>> but >>>>>>>>>>>> ultimately it is a single process. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Fair enough >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> I'm not sure what you mean exactly. If we can't enforce >>>>>>>>>>>>> uniqueness we shouldn't assume it. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I disagree. We can specify that as a requirement and state that >>>>>>>>>>>> you'll get unintended consequences if you provide your own keys >>>>>>>>>>>> and don't >>>>>>>>>>>> maintain this. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> There's no need for unintended consequences, we can specify >>>>>>>>>>>> consistent behaviour (and I believe the document says what that is) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> We do expect that most of the time the natural key is unique, >>>>>>>>>>>>> but the eager and lazy with natural key designs can handle >>>>>>>>>>>>> duplicates >>>>>>>>>>>>> consistently. Basically it's not a problem to have duplicate >>>>>>>>>>>>> natural keys, everything works fine. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> That heavily depends on how things are implemented. For >>>>>>>>>>>> example, we may write a bunch of code that generates internal data >>>>>>>>>>>> structures based on this expectation. If we have to support >>>>>>>>>>>> duplicate >>>>>>>>>>>> matches, all of sudden we can no longer size various data >>>>>>>>>>>> structures to >>>>>>>>>>>> improve performance and may be unable to preallocate memory >>>>>>>>>>>> associated with >>>>>>>>>>>> a guaranteed completion. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Again we need to operate on the assumption that this is a large >>>>>>>>>>>> scale distributed compute/remote storage scenario. Key matching is >>>>>>>>>>>> done >>>>>>>>>>>> with shuffles with data movement across the network, such >>>>>>>>>>>> optimizations >>>>>>>>>>>> would really have little impact on overall performance. Not to >>>>>>>>>>>> mention that >>>>>>>>>>>> most query engines would already optimize the shuffle already as >>>>>>>>>>>> much as it >>>>>>>>>>>> can be optimized. >>>>>>>>>>>> >>>>>>>>>>>> It is true that if actual duplicate keys would make the key >>>>>>>>>>>> matching join (anti-join) somewhat more expensive, however it can >>>>>>>>>>>> be done >>>>>>>>>>>> in such a way that if the keys are in practice unique the join is >>>>>>>>>>>> as >>>>>>>>>>>> efficient as it can be. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Let me try and clarify each point: >>>>>>>>>>>>> >>>>>>>>>>>>> - lookup for query or update on a non-(partition/bucket/sort) >>>>>>>>>>>>> key predicate implies scanning large amounts of data - because >>>>>>>>>>>>> these are >>>>>>>>>>>>> the only data structures that can narrow down the lookup, right ? >>>>>>>>>>>>> One could >>>>>>>>>>>>> argue that the min/max index (file skipping) can be applied to >>>>>>>>>>>>> any column, >>>>>>>>>>>>> but in reality if that column is not sorted the min/max intervals >>>>>>>>>>>>> can have >>>>>>>>>>>>> huge overlaps so it may be next to useless. >>>>>>>>>>>>> - remote storage - this is a critical architecture decision - >>>>>>>>>>>>> implementations on local storage imply a vastly different design >>>>>>>>>>>>> for the >>>>>>>>>>>>> entire system, storage and compute. >>>>>>>>>>>>> - deleting single records per snapshot is unfeasible in eager >>>>>>>>>>>>> but also particularly in the lazy design: each deletion creates a >>>>>>>>>>>>> very >>>>>>>>>>>>> small snapshot. Deleting 1 million records one at a time would >>>>>>>>>>>>> create 1 >>>>>>>>>>>>> million small files, and 1 million RPC calls. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Why is this unfeasible? If I have a dataset of 100mm files >>>>>>>>>>>> including 1mm small files, is that a major problem? It seems like >>>>>>>>>>>> your >>>>>>>>>>>> usecase isn't one where you want to support single record deletes >>>>>>>>>>>> but it is >>>>>>>>>>>> definitely something important to many people. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 100 mm total files or 1 mm files per dataset is definitely a >>>>>>>>>>>> problem on HDFS, and I believe on S3 too. Single key delete would >>>>>>>>>>>> work just >>>>>>>>>>>> fine, but it's simply not optimal to do that on remote storage. >>>>>>>>>>>> This is a >>>>>>>>>>>> very well known problem with HDFS, and one of the very reasons to >>>>>>>>>>>> have >>>>>>>>>>>> something like Iceberg in the first place. >>>>>>>>>>>> >>>>>>>>>>>> Basically the users would be able to do single key mutation, >>>>>>>>>>>> but it's not the use case we should be optimizing for, but it's >>>>>>>>>>>> really not >>>>>>>>>>>> advisable. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> Eager is conceptually just lazy + compaction done, well, >>>>>>>>>>>>> eagerly. The logic for both is exactly the same, the trade-off is >>>>>>>>>>>>> just that >>>>>>>>>>>>> with eager you implicitly compact every time so that you don't do >>>>>>>>>>>>> any work >>>>>>>>>>>>> on read, while with lazy >>>>>>>>>>>>> you want to amortize the cost of compaction over multiple >>>>>>>>>>>>> snapshots. >>>>>>>>>>>>> >>>>>>>>>>>>> Basically there should be no difference between the two >>>>>>>>>>>>> conceptually, or with regard to keys, etc. The only difference is >>>>>>>>>>>>> some >>>>>>>>>>>>> mechanics in implementation. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I think you have deconstruct the problem too much to say these >>>>>>>>>>>> are the same (or at least that is what I'm starting to think given >>>>>>>>>>>> this >>>>>>>>>>>> thread). It seems like real world implementation decisions (per our >>>>>>>>>>>> discussion here) are in conflict. For example, you just argued >>>>>>>>>>>> against >>>>>>>>>>>> having a 1mm arbitrary mutations but I think that is because you >>>>>>>>>>>> aren't >>>>>>>>>>>> thinking about things over time with a delta implementation. >>>>>>>>>>>> Having 10,000 >>>>>>>>>>>> mutations a day where we do delta compaction once a week >>>>>>>>>>>> >>>>>>>>>>>> and local file mappings (key to offset sparse bitmaps) seems >>>>>>>>>>>> like it could result in very good performance in a case where we're >>>>>>>>>>>> mutating small amounts of data. In this scenario, you may not do >>>>>>>>>>>> major >>>>>>>>>>>> compaction ever unless you get to a high enough percentage of >>>>>>>>>>>> records that >>>>>>>>>>>> have been deleted in the original dataset. That drives a very >>>>>>>>>>>> different set >>>>>>>>>>>> of implementation decisions from a situation where you're trying >>>>>>>>>>>> to restate >>>>>>>>>>>> an entire partition at once. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> We operate on 1 billion mutations per day at least. This is the >>>>>>>>>>>> problem Iceberg wants to solve, I believe it's stated upfront. >>>>>>>>>>>> 10000/day is >>>>>>>>>>>> not a big data problem. It can be done fairly trivially and it >>>>>>>>>>>> would be >>>>>>>>>>>> supported, but there's not much point in extra optimizing for this >>>>>>>>>>>> use case >>>>>>>>>>>> I believe. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Ryan Blue >>>>>>>>>> Software Engineer >>>>>>>>>> Netflix >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Ryan Blue >>>>>>>> Software Engineer >>>>>>>> Netflix >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Ryan Blue >>>>>>> Software Engineer >>>>>>> Netflix >>>>>>> >>>>>> >>>>>> >>>> >>>> -- >>>> Ryan Blue >>>> Software Engineer >>>> Netflix >>>> >>>> >>>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >> > > -- > Ryan Blue > Software Engineer > Netflix >