Yeah, I totally forgot to record our discussion. Will do so next time,
sorry.
--
Jacques Nadeau
CTO and Co-Founder, Dremio


On Wed, May 29, 2019 at 4:24 PM Ryan Blue <rb...@netflix.com> wrote:

> It wasn't recorded, but I can summarize what we talked about. Sorry I
> haven't sent this out earlier.
>
> We talked about the options and some of the background in Iceberg --
> basically that it isn't possible to determine the order of commits before
> you commit so you can't rely on some monotonically increasing value from a
> snapshot to know which deltas to apply to a file. The result is that we
> can't apply diffs to data files using a rule like "files older than X"
> because we can't identify those files without the snapshot history.
>
> That gives us basically 2 options for scoping delete diffs: either
> identify the files to apply a diff to when writing the diff, or log changes
> applied to a snapshot and keep the snapshot history around (which is how we
> know the order of snapshots). The first option is not good if you want to
> write without reading data to determine where the deleted records are. The
> second prevents cleaning up snapshot history.
>
> We also talked about whether we should encode IDs in data files. Jacques
> pointed out that retrying a commit is easier if you don't need to re-read
> the original data to reconcile changes. For example, if a data file was
> compacted in a concurrent write, how do we reconcile a delete for it? We
> discussed other options, like rolling back the compaction for delete
> events. I think that's a promising option.
>
> For action items, Jacques was going to think about whether we need to
> encode IDs in data files or if we could use positions to identify rows and
> write up a summary/proposal. Erik was going to take on planning how
> identifying rows without reading data would work and similarly write up a
> summary/proposal.
>
> That's from memory, so if I've missed anything, I hope that other
> attendees will fill in the details!
>
> rb
>
> On Wed, May 29, 2019 at 3:34 PM Venkatakrishnan Sowrirajan <
> vsowr...@asu.edu> wrote:
>
>> Hi Ryan,
>>
>> I couldn't attend the meeting. Just curious, if this is recorded by any
>> chance.
>>
>> Regards
>> Venkata krishnan
>>
>>
>> On Fri, May 24, 2019 at 8:49 AM Ryan Blue <rb...@netflix.com.invalid>
>> wrote:
>>
>>> Yes, I agree. I'll talk a little about a couple of the constraints of
>>> this as well.
>>>
>>> On Fri, May 24, 2019 at 5:52 AM Anton Okolnychyi <aokolnyc...@apple.com>
>>> wrote:
>>>
>>>> The agenda looks good to me. I think it would also make sense to
>>>> clarify the responsibilities of query engines and Iceberg. Not only in
>>>> terms of uniqueness, but also in terms of applying diffs on read, for
>>>> example.
>>>>
>>>> On 23 May 2019, at 01:59, Ryan Blue <rb...@netflix.com.INVALID> wrote:
>>>>
>>>> Here’s a rough agenda:
>>>>
>>>>    - Use cases: everyone come with a use case that you’d like to have
>>>>    supported. We’ll go around and introduce ourselves and our use cases.
>>>>    - Main topic: How should Iceberg identify rows that are deleted?
>>>>    - Side topics from my initial email, if we have time: should we use
>>>>    insert diffs, should we support dense and sparse formats, etc.
>>>>
>>>> The main topic I think we should discuss is: *How should Iceberg
>>>> identify rows that are deleted?*
>>>>
>>>> I’m phrasing it this way to avoid where I think we’re talking past one
>>>> another because we are making assumptions. The important thing is that
>>>> there are two main options:
>>>>
>>>>    - Filename and position, vs
>>>>    - Specific values of (few) columns in the data
>>>>
>>>> This phrasing also avoids discussing uniqueness constraints. Once we
>>>> get down to behavior, I think we agree. For example, I think we all agree
>>>> that uniqueness cannot be enforced in Iceberg.
>>>>
>>>> If uniqueness can’t be enforced in Iceberg, the main choice comes down
>>>> to how we identify rows that are deleted. If we use (filename, position)
>>>> then we know that there is only one row. On the other hand, if we use data
>>>> values to identify rows then a delete may identify more than one row
>>>> because there are no uniqueness guarantees. I think we also agree that if
>>>> there is more than one row identified, all of them should be deleted.
>>>>
>>>> At that point, there are trade-offs between the approaches:
>>>>
>>>>    - When identifying deleted rows by data values, situations like the
>>>>    one that Anton pointed out are possible.
>>>>    - Jacques also had a good point about concurrency. If at all
>>>>    possible, we want to be able to reconcile changes between concurrent
>>>>    commits without re-running an operation.
>>>>
>>>> Sound like a reasonable amount to talk through?
>>>>
>>>> rb
>>>>
>>>> On Wed, May 22, 2019 at 1:17 PM Erik Wright <erik.wri...@shopify.com>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Wed, May 22, 2019 at 4:04 PM Cristian Opris <
>>>>> cop...@apple.com.invalid> wrote:
>>>>>
>>>>>> Agreed with Erik here, we're certainly not looking to build the
>>>>>> equivalent of a relational database, and for that matter not even that 
>>>>>> of a
>>>>>> local disk storage analytics database (like Vertica). Those are very
>>>>>> different designs with very different trade-offs and optimizations.
>>>>>>
>>>>>> We're looking to automate and optimize specific types of file
>>>>>> manipulation for large files on remote storage, while presenting that to
>>>>>> the user under the common SQL API for *bulk* data manipulation
>>>>>> (MERGE INTO)
>>>>>>
>>>>>
>>>>> What I would encourage is to decouple the storage model from the
>>>>> implementation of that API. If Iceberg has support for merge-on-read of
>>>>> upserts and deletes, in addition to its powerful support for partitioning,
>>>>> it will be easy for a higher-level application to implement those APIs
>>>>> given certain other constraints (that might not be appropriate to all
>>>>> applications).
>>>>>
>>>>> Myself and Miguel are out on Friday, but Anton should be able to
>>>>>> handle the discussion on our side.
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Cristian
>>>>>>
>>>>>>
>>>>>> On 22 May 2019, at 17:51, Erik Wright <
>>>>>> erik.wri...@shopify.com.INVALID> wrote:
>>>>>>
>>>>>> We have two rows with the same natural key and we use that natural
>>>>>>> key in diff files:
>>>>>>> nk | col1 | col2
>>>>>>> 1 | 1 | 1
>>>>>>> 1 | 2 | 2
>>>>>>> Then we have a delete statement:
>>>>>>> DELETE FROM t WHERE col1 = 1
>>>>>>
>>>>>>
>>>>>> I think this example cuts to the point of the differences of
>>>>>> understanding. Does Iceberg want to be approaching the utility of a
>>>>>> relational database, against which I can execute complex update queries?
>>>>>> This is not what I would have imagined.
>>>>>>
>>>>>> I would have, instead, imagined that it was up to the client to
>>>>>> identify, through whatever means, that they want to update or delete a 
>>>>>> row
>>>>>> with a given ID. If there are multiple (distinct) rows with the same ID,
>>>>>> _too bad_. Any user should _expect_ that they could potentially see any 
>>>>>> one
>>>>>> or more of those rows at read time. And that an upsert/delete would 
>>>>>> affect
>>>>>> any/all of them (I would argue for all).
>>>>>>
>>>>>> *In summary:* Instead of trying to come up with a consistent,
>>>>>> logical handling for complex queries that are best suited for a 
>>>>>> relational
>>>>>> database, leave such handling up to the client and concentrate on 
>>>>>> problems
>>>>>> that can be solved simply and more generally.
>>>>>>
>>>>>> On Wed, May 22, 2019 at 12:11 PM Ryan Blue <rb...@netflix.com.invalid>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes, I think we should. I was going to propose one after catching up
>>>>>>> on the rest of this thread today.
>>>>>>>
>>>>>>> On Wed, May 22, 2019 at 9:08 AM Anton Okolnychyi <
>>>>>>> aokolnyc...@apple.com> wrote:
>>>>>>>
>>>>>>>> Thanks! Would it make sense to discuss the agenda in advance?
>>>>>>>>
>>>>>>>> On 22 May 2019, at 17:04, Ryan Blue <rb...@netflix.com.INVALID>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> I sent out an invite and included everyone on this thread. If
>>>>>>>> anyone else would like to join, please join the Zoom meeting. If you'd 
>>>>>>>> like
>>>>>>>> to be added to the calendar invite, just let me know and I'll add you.
>>>>>>>>
>>>>>>>> On Wed, May 22, 2019 at 8:57 AM Jacques Nadeau <jacq...@dremio.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> works for me.
>>>>>>>>>
>>>>>>>>> To make things easier, we can use my zoom meeting if people like:
>>>>>>>>>
>>>>>>>>> Join Zoom Meeting
>>>>>>>>> https://zoom.us/j/4157302092
>>>>>>>>>
>>>>>>>>> One tap mobile
>>>>>>>>> +16465588656,,4157302092# US (New York)
>>>>>>>>> +16699006833,,4157302092# US (San Jose)
>>>>>>>>>
>>>>>>>>> Dial by your location
>>>>>>>>>         +1 646 558 8656 US (New York)
>>>>>>>>>         +1 669 900 6833 US (San Jose)
>>>>>>>>>         877 853 5257 US Toll-free
>>>>>>>>>         888 475 4499 US Toll-free
>>>>>>>>> Meeting ID: 415 730 2092
>>>>>>>>> Find your local number: https://zoom.us/u/aH9XYBfm
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Jacques Nadeau
>>>>>>>>> CTO and Co-Founder, Dremio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, May 22, 2019 at 8:54 AM Ryan Blue <
>>>>>>>>> rb...@netflix.com.invalid> wrote:
>>>>>>>>>
>>>>>>>>>> 9AM on Friday works best for me. How about then?
>>>>>>>>>>
>>>>>>>>>> On Wed, May 22, 2019 at 5:05 AM Anton Okolnychyi <
>>>>>>>>>> aokolnyc...@apple.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> What about this Friday? One hour slot from 9:00 to 10:00 am or
>>>>>>>>>>> 10:00 to 11:00 am PST? Some folks are based in London, so meeting 
>>>>>>>>>>> later
>>>>>>>>>>> than this is hard. If Friday doesn’t work, we can consider Tuesday 
>>>>>>>>>>> or
>>>>>>>>>>> Wednesday next week.
>>>>>>>>>>>
>>>>>>>>>>> On 22 May 2019, at 00:54, Jacques Nadeau <jacq...@dremio.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> I agree with Anton that we should probably spend some time on
>>>>>>>>>>> hangouts further discussing things. Definitely differing 
>>>>>>>>>>> expectations here
>>>>>>>>>>> and we seem to be talking a bit past each other.
>>>>>>>>>>> --
>>>>>>>>>>> Jacques Nadeau
>>>>>>>>>>> CTO and Co-Founder, Dremio
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, May 21, 2019 at 3:44 PM Cristian Opris <
>>>>>>>>>>> cop...@apple.com.invalid> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I love a good flame war :P
>>>>>>>>>>>>
>>>>>>>>>>>> On 21 May 2019, at 22:57, Jacques Nadeau <jacq...@dremio.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> That's my point, truly independent writers (two Spark jobs, or
>>>>>>>>>>>>> a Spark job and Dremio job) means a distributed transaction. It 
>>>>>>>>>>>>> would need
>>>>>>>>>>>>> yet another external transaction coordinator on top of both Spark 
>>>>>>>>>>>>> and
>>>>>>>>>>>>> Dremio, Iceberg by itself
>>>>>>>>>>>>> cannot solve this.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I'm not ready to accept this. Iceberg already supports a set of
>>>>>>>>>>>> semantics around multiple writers committing simultaneously and how
>>>>>>>>>>>> conflict resolution is done. The same can be done here.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> MVCC (which is what Iceberg tries to implement) requires a
>>>>>>>>>>>> total ordering of snapshots. Also the snapshots need to be 
>>>>>>>>>>>> non-conflicting.
>>>>>>>>>>>> I really don't see how any metadata data structures can solve this 
>>>>>>>>>>>> without
>>>>>>>>>>>> an outside coordinator.
>>>>>>>>>>>>
>>>>>>>>>>>> Consider this:
>>>>>>>>>>>>
>>>>>>>>>>>> Snapshot 0: (K,A) = 1
>>>>>>>>>>>> Job X: UPDATE K SET A=A+1
>>>>>>>>>>>> Job Y: UPDATE K SET A=10
>>>>>>>>>>>>
>>>>>>>>>>>> What should the final value of A be and who decides ?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> By single writer, I don't mean single process, I mean multiple
>>>>>>>>>>>>> coordinated processes like Spark executors coordinated by Spark 
>>>>>>>>>>>>> driver. The
>>>>>>>>>>>>> coordinator ensures that the data is pre-partitioned on
>>>>>>>>>>>>> each executor, and the coordinator commits the snapshot.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Note however that single writer job/multiple concurrent reader
>>>>>>>>>>>>> jobs is perfectly feasible, i.e. it shouldn't be a problem to 
>>>>>>>>>>>>> write from a
>>>>>>>>>>>>> Spark job and read from multiple Dremio queries concurrently (for 
>>>>>>>>>>>>> example)
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> :D This is still "single process" from my perspective. That
>>>>>>>>>>>> process may be coordinating other processes to do distributed work 
>>>>>>>>>>>> but
>>>>>>>>>>>> ultimately it is a single process.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Fair enough
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> I'm not sure what you mean exactly. If we can't enforce
>>>>>>>>>>>>> uniqueness we shouldn't assume it.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I disagree. We can specify that as a requirement and state that
>>>>>>>>>>>> you'll get unintended consequences if you provide your own keys 
>>>>>>>>>>>> and don't
>>>>>>>>>>>> maintain this.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> There's no need for unintended consequences, we can specify
>>>>>>>>>>>> consistent behaviour (and I believe the document says what that is)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> We do expect that most of the time the natural key is unique,
>>>>>>>>>>>>> but the eager and lazy with natural key designs can handle 
>>>>>>>>>>>>> duplicates
>>>>>>>>>>>>> consistently. Basically it's not a problem to have duplicate
>>>>>>>>>>>>> natural keys, everything works fine.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> That heavily depends on how things are implemented. For
>>>>>>>>>>>> example, we may write a bunch of code that generates internal data
>>>>>>>>>>>> structures based on this expectation. If we have to support 
>>>>>>>>>>>> duplicate
>>>>>>>>>>>> matches, all of sudden we can no longer size various data 
>>>>>>>>>>>> structures to
>>>>>>>>>>>> improve performance and may be unable to preallocate memory 
>>>>>>>>>>>> associated with
>>>>>>>>>>>> a guaranteed completion.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Again we need to operate on the assumption that this is a large
>>>>>>>>>>>> scale distributed compute/remote storage scenario. Key matching is 
>>>>>>>>>>>> done
>>>>>>>>>>>> with shuffles with data movement across the network, such 
>>>>>>>>>>>> optimizations
>>>>>>>>>>>> would really have little impact on overall performance. Not to 
>>>>>>>>>>>> mention that
>>>>>>>>>>>> most query engines would already optimize the shuffle already as 
>>>>>>>>>>>> much as it
>>>>>>>>>>>> can be optimized.
>>>>>>>>>>>>
>>>>>>>>>>>> It is true that if actual duplicate keys would make the key
>>>>>>>>>>>> matching join (anti-join) somewhat more expensive, however it can 
>>>>>>>>>>>> be done
>>>>>>>>>>>> in such a way that if the keys are in practice unique the join is 
>>>>>>>>>>>> as
>>>>>>>>>>>> efficient as it can be.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Let me try and clarify each point:
>>>>>>>>>>>>>
>>>>>>>>>>>>> - lookup for query or update on a non-(partition/bucket/sort)
>>>>>>>>>>>>> key predicate implies scanning large amounts of data - because 
>>>>>>>>>>>>> these are
>>>>>>>>>>>>> the only data structures that can narrow down the lookup, right ? 
>>>>>>>>>>>>> One could
>>>>>>>>>>>>> argue that the min/max index (file skipping) can be applied to 
>>>>>>>>>>>>> any column,
>>>>>>>>>>>>> but in reality if that column is not sorted the min/max intervals 
>>>>>>>>>>>>> can have
>>>>>>>>>>>>> huge overlaps so it may be next to useless.
>>>>>>>>>>>>> - remote storage - this is a critical architecture decision -
>>>>>>>>>>>>> implementations on local storage imply a vastly different design 
>>>>>>>>>>>>> for the
>>>>>>>>>>>>> entire system, storage and compute.
>>>>>>>>>>>>> - deleting single records per snapshot is unfeasible in eager
>>>>>>>>>>>>> but also particularly in the lazy design: each deletion creates a 
>>>>>>>>>>>>> very
>>>>>>>>>>>>> small snapshot. Deleting 1 million records one at a time would 
>>>>>>>>>>>>> create 1
>>>>>>>>>>>>> million small files, and 1 million RPC calls.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Why is this unfeasible? If I have a dataset of 100mm files
>>>>>>>>>>>> including 1mm small files, is that a major problem? It seems like 
>>>>>>>>>>>> your
>>>>>>>>>>>> usecase isn't one where you want to support single record deletes 
>>>>>>>>>>>> but it is
>>>>>>>>>>>> definitely something important to many people.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 100 mm total files or 1 mm files per dataset is definitely a
>>>>>>>>>>>> problem on HDFS, and I believe on S3 too. Single key delete would 
>>>>>>>>>>>> work just
>>>>>>>>>>>> fine, but it's simply not optimal to do that on remote storage. 
>>>>>>>>>>>> This is a
>>>>>>>>>>>> very well known problem with HDFS, and one of the very reasons to 
>>>>>>>>>>>> have
>>>>>>>>>>>> something like Iceberg in the first place.
>>>>>>>>>>>>
>>>>>>>>>>>> Basically the users would be able to do single key mutation,
>>>>>>>>>>>> but it's not the use case we should be optimizing for, but it's 
>>>>>>>>>>>> really not
>>>>>>>>>>>> advisable.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> Eager is conceptually just lazy + compaction done, well,
>>>>>>>>>>>>> eagerly. The logic for both is exactly the same, the trade-off is 
>>>>>>>>>>>>> just that
>>>>>>>>>>>>> with eager you implicitly compact every time so that you don't do 
>>>>>>>>>>>>> any work
>>>>>>>>>>>>> on read, while with lazy
>>>>>>>>>>>>> you want to amortize the cost of compaction over multiple
>>>>>>>>>>>>> snapshots.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Basically there should be no difference between the two
>>>>>>>>>>>>> conceptually, or with regard to keys, etc. The only difference is 
>>>>>>>>>>>>> some
>>>>>>>>>>>>> mechanics in implementation.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I think you have deconstruct the problem too much to say these
>>>>>>>>>>>> are the same (or at least that is what I'm starting to think given 
>>>>>>>>>>>> this
>>>>>>>>>>>> thread). It seems like real world implementation decisions (per our
>>>>>>>>>>>> discussion here) are in conflict. For example, you just argued 
>>>>>>>>>>>> against
>>>>>>>>>>>> having a 1mm arbitrary mutations but I think that is because you 
>>>>>>>>>>>> aren't
>>>>>>>>>>>> thinking about things over time with a delta implementation. 
>>>>>>>>>>>> Having 10,000
>>>>>>>>>>>> mutations a day where we do delta compaction once a week
>>>>>>>>>>>>
>>>>>>>>>>>> and local file mappings (key to offset sparse bitmaps) seems
>>>>>>>>>>>> like it could result in very good performance in a case where we're
>>>>>>>>>>>> mutating small amounts of data. In this scenario, you may not do 
>>>>>>>>>>>> major
>>>>>>>>>>>> compaction ever unless you get to a high enough percentage of 
>>>>>>>>>>>> records that
>>>>>>>>>>>> have been deleted in the original dataset. That drives a very 
>>>>>>>>>>>> different set
>>>>>>>>>>>> of implementation decisions from a situation where you're trying 
>>>>>>>>>>>> to restate
>>>>>>>>>>>> an entire partition at once.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> We operate on 1 billion mutations per day at least. This is the
>>>>>>>>>>>> problem Iceberg wants to solve, I believe it's stated upfront. 
>>>>>>>>>>>> 10000/day is
>>>>>>>>>>>> not a big data problem. It can be done fairly trivially and it 
>>>>>>>>>>>> would be
>>>>>>>>>>>> supported, but there's not much point in extra optimizing for this 
>>>>>>>>>>>> use case
>>>>>>>>>>>> I believe.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Ryan Blue
>>>>>>>>>> Software Engineer
>>>>>>>>>> Netflix
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Ryan Blue
>>>>>>>> Software Engineer
>>>>>>>> Netflix
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>>
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Reply via email to