Thanks for the feedback Ryan. We're sorting through a few ideas for how to move forward and this helps in our thinking.
On Fri, Nov 30, 2018 at 10:10 PM Ryan Blue <rb...@netflix.com.invalid> wrote: > I think the community would welcome these contributions. I've talked with a > couple of companies about this sort of thing already, so there are other > people that could collaborate on it. > > On Fri, Nov 30, 2018 at 6:23 PM Erik Wright <erik.wri...@shopify.com > .invalid> > wrote: > > > Hi Ryan, Owen, > > > > Just following up on this question. Implemented properly, do you see any > > reason that a series of PRs to implement merge-on-read support wouldn't > be > > welcomed? > > > > Thanks, > > > > Erik > > > > On Wed., Nov. 28, 2018, 5:25 p.m. Erik Wright <erik.wri...@shopify.com > > wrote: > > > > > > > > > > > On Wed, Nov 28, 2018 at 4:32 PM Owen O'Malley <owen.omal...@gmail.com> > > > wrote: > > > > > >> For Hive's ACID, we started with deltas that had three options per a > > row: > > >> insert, delete, edit. Since that didn't enable predicate push down in > > the > > >> common case where there are large number of inserts, we went to the > > model > > >> of just using inserts and deletes in separate files. Queries that > > >> modifying > > >> tables delete the old row and insert a new one. That allowed us to get > > >> good > > >> performance for read, where it is most critical. There are some > > important > > >> optimizations like for a small number of deletes, you can read all of > > the > > >> deletes into memory and close that file. > > >> > > > > > > Presumably predicate pushdown can still be supported if the deltas are > > > partitioned similarly to the base dataset? Or is the issue about > > predicates > > > on fields that might change between two versions of a row? > > > > > > If I understand correctly, we already do what you ended up with: when a > > > row is updated in a way that moves it between partitions we record a > > delete > > > for the partition that it was removed from and an insertion in the > > > partition it was inserted into. > > > > > > I personally favour inserts/deletes in separate files because it allows > > > the schema of your insert files to be consistent with the dataset > schema > > > (with respect to nullability). > > > > > > The delete optimization sounds clever. > > > > > > .. Owen > > >> > > >> On Wed, Nov 28, 2018 at 1:14 PM Erik Wright <erik.wri...@shopify.com > > >> .invalid> > > >> wrote: > > >> > > >> > Those are both really neat use cases, but the one I had in mind was > > what > > >> > Ryan mentioned. It's something that Hoodie apparently supports or is > > >> > building support for, and it's an important use case for the systems > > >> that > > >> > my colleagues and I are building. > > >> > > > >> > There are three scenarios: > > >> > > > >> > - An Extract system that is receiving updates/deletes from a > source > > >> > system. We wish to capture them as quickly as possible and make > > them > > >> > available to users without having to restate the affected data > > files. > > >> > The > > >> > update patterns are not anything that can be addressed with > > >> > partitioning. > > >> > - A Transform platform that is running a graph of jobs. For some > > jobs > > >> > that are rebuilt from scratch, we would like to compress the > output > > >> > without > > >> > losing the history. > > >> > - A Transform / Load system that is building tables on GCS and > > >> > registering them in Hive for querying by Presto. This system is > > >> > incrementally updating views, and while some of those views are > > >> > event-oriented (with most updates clustered in recent history) > some > > >> of > > >> > them > > >> > are not and in those cases there is not partitioning algorithm > that > > >> will > > >> > prevent us from updating virtually all partitions in every > update. > > >> > > > >> > We have one example of an internal solution but would prefer > something > > >> less > > >> > bespoke. That system works as follows: > > >> > > > >> > 1. For each dataset, unique key columns are defined. > > >> > 2. Datasets are partitioned (not necessarily by anything in the > > key). > > >> > 3. Upserts/deletes are captured in a mutation set. > > >> > 4. The mutation set is used to update affected partitions: > > >> > 1. Identify the previous/new partition for each > upserted/deleted > > >> row. > > >> > 2. Open the affected partitions, drop all rows matching an > > >> > upserted/deleted key. > > >> > 3. Append all upserts. > > >> > 4. Write out the result. > > >> > 5. We maintain an index (effectively an Iceberg snapshot) that > says > > >> > which partitions come from where (we keep the ones that are > > >> unaffected > > >> > from > > >> > the previous dataset version and add in the updated ones). > > >> > > > >> > This data is loaded into Presto and our current plan is to update it > > by > > >> > registering a view in Presto that applies recent mutation sets to > the > > >> > latest merged version on the fly. > > >> > > > >> > So to build this in Iceberg we would likely need to extend the Table > > >> spec > > >> > with: > > >> > > > >> > - An optional unique key specification, possibly composite, > naming > > >> one > > >> > or more columns for which there is expected to be at most one row > > per > > >> > unique value. > > >> > - The ability to indicate in the snapshot that a certain set of > > >> > manifests are "base" data while other manifests are "diffs". > > >> > - The ability in a "diff" manifest to indicate files that contain > > >> > "deleted" keys (or else the ability in a given row to have a > > special > > >> > column > > >> > that indicates that the row is a "delete" and not an "upsert") > > >> > - "diff" manifests would need to be ordered in the snapshot (as > > >> multiple > > >> > "diff" manifests could affect a single row and only the latest of > > >> those > > >> > takes effect). > > >> > > > >> > Obviously readers would need to be updated to correctly interpret > this > > >> > data. And there is all kinds of supporting work that would be > required > > >> in > > >> > order to maintain these (periodically collapsing diffs into the > base, > > >> > etc.). > > >> > > > >> > Is this something for which PRs would be accepted, assuming all of > the > > >> > necessary steps to make sure the direction is compatible with > > Iceberg's > > >> > other use-cases? > > >> > > > >> > On Wed, Nov 28, 2018 at 1:14 PM Owen O'Malley < > owen.omal...@gmail.com > > > > > >> > wrote: > > >> > > > >> > > I’m not sure what use case Erik is looking for, but I’ve had users > > >> that > > >> > > want to do the equivalent of HBase’s column families. They want > some > > >> of > > >> > the > > >> > > columns to be stored separately and the merged together on read. > The > > >> > > requirements would be that there is a 1:1 mapping between rows in > > the > > >> > > matching files and stripes. > > >> > > > > >> > > It would look like: > > >> > > > > >> > > file1.orc: struct<name:string,email:string> file2.orc: > > >> > > struct<lastAccess:timestamp> > > >> > > > > >> > > It would let them leave the stable information and only re-write > the > > >> > > second column family when the information in the mutable column > > family > > >> > > changes. It would also support use cases where you add data > > enrichment > > >> > > columns after the data has been ingested. > > >> > > > > >> > > From there it is easy to imagine having a replace operator where > > >> file2’s > > >> > > version of a column replaces file1’s version. > > >> > > > > >> > > .. Owen > > >> > > > > >> > > > On Nov 28, 2018, at 9:44 AM, Ryan Blue > <rb...@netflix.com.INVALID > > > > > >> > > wrote: > > >> > > > > > >> > > > What do you mean by merge on read? > > >> > > > > > >> > > > A few people I've talked to are interested in building delete > and > > >> > upsert > > >> > > > features. Those would create files that track the changes, which > > >> would > > >> > be > > >> > > > merged at read time to apply them. Is that what you mean? > > >> > > > > > >> > > > rb > > >> > > > > > >> > > > On Tue, Nov 27, 2018 at 12:26 PM Erik Wright > > >> > > > <erik.wri...@shopify.com.invalid> wrote: > > >> > > > > > >> > > >> Has any consideration been given to the possibility of eventual > > >> > > >> merge-on-read support in the Iceberg table spec? > > >> > > >> > > >> > > > > > >> > > > > > >> > > > -- > > >> > > > Ryan Blue > > >> > > > Software Engineer > > >> > > > Netflix > > >> > > > > >> > > > > >> > > > >> > > > > > On Nov. 28, 2018 5:25 p.m., "Erik Wright" <erik.wri...@shopify.com> > wrote: > > > > > > > > On Wed, Nov 28, 2018 at 4:32 PM Owen O'Malley <owen.omal...@gmail.com> > > wrote: > > > > > For Hive's ACID, we started with deltas that had three options per a > row: > > > insert, delete, edit. Since that didn't enable predicate push down in > the > > > common case where there are large number of inserts, we went to the > model > > > of just using inserts and deletes in separate files. Queries that > > modifying > > > tables delete the old row and insert a new one. That allowed us to get > > good > > > performance for read, where it is most critical. There are some > important > > > optimizations like for a small number of deletes, you can read all of > the > > > deletes into memory and close that file. > > > > > > > Presumably predicate pushdown can still be supported if the deltas are > > partitioned similarly to the base dataset? Or is the issue about > predicates > > on fields that might change between two versions of a row? > > > > If I understand correctly, we already do what you ended up with: when a > row > > is updated in a way that moves it between partitions we record a delete > for > > the partition that it was removed from and an insertion in the partition > it > > was inserted into. > > > > I personally favour inserts/deletes in separate files because it allows > the > > schema of your insert files to be consistent with the dataset schema > (with > > respect to nullability). > > > > The delete optimization sounds clever. > > > > .. Owen > > > > > > On Wed, Nov 28, 2018 at 1:14 PM Erik Wright <erik.wri...@shopify.com > > > .invalid> > > > wrote: > > > > > > > Those are both really neat use cases, but the one I had in mind was > > what > > > > Ryan mentioned. It's something that Hoodie apparently supports or is > > > > building support for, and it's an important use case for the systems > > that > > > > my colleagues and I are building. > > > > > > > > There are three scenarios: > > > > > > > > - An Extract system that is receiving updates/deletes from a > source > > > > system. We wish to capture them as quickly as possible and make > them > > > > available to users without having to restate the affected data > > files. > > > > The > > > > update patterns are not anything that can be addressed with > > > > partitioning. > > > > - A Transform platform that is running a graph of jobs. For some > > jobs > > > > that are rebuilt from scratch, we would like to compress the > output > > > > without > > > > losing the history. > > > > - A Transform / Load system that is building tables on GCS and > > > > registering them in Hive for querying by Presto. This system is > > > > incrementally updating views, and while some of those views are > > > > event-oriented (with most updates clustered in recent history) > some > > of > > > > them > > > > are not and in those cases there is not partitioning algorithm > that > > > will > > > > prevent us from updating virtually all partitions in every update. > > > > > > > > We have one example of an internal solution but would prefer > something > > > less > > > > bespoke. That system works as follows: > > > > > > > > 1. For each dataset, unique key columns are defined. > > > > 2. Datasets are partitioned (not necessarily by anything in the > > key). > > > > 3. Upserts/deletes are captured in a mutation set. > > > > 4. The mutation set is used to update affected partitions: > > > > 1. Identify the previous/new partition for each > upserted/deleted > > > row. > > > > 2. Open the affected partitions, drop all rows matching an > > > > upserted/deleted key. > > > > 3. Append all upserts. > > > > 4. Write out the result. > > > > 5. We maintain an index (effectively an Iceberg snapshot) that > says > > > > which partitions come from where (we keep the ones that are > > unaffected > > > > from > > > > the previous dataset version and add in the updated ones). > > > > > > > > This data is loaded into Presto and our current plan is to update it > by > > > > registering a view in Presto that applies recent mutation sets to the > > > > latest merged version on the fly. > > > > > > > > So to build this in Iceberg we would likely need to extend the Table > > spec > > > > with: > > > > > > > > - An optional unique key specification, possibly composite, naming > > one > > > > or more columns for which there is expected to be at most one row > > per > > > > unique value. > > > > - The ability to indicate in the snapshot that a certain set of > > > > manifests are "base" data while other manifests are "diffs". > > > > - The ability in a "diff" manifest to indicate files that contain > > > > "deleted" keys (or else the ability in a given row to have a > special > > > > column > > > > that indicates that the row is a "delete" and not an "upsert") > > > > - "diff" manifests would need to be ordered in the snapshot (as > > > multiple > > > > "diff" manifests could affect a single row and only the latest of > > > those > > > > takes effect). > > > > > > > > Obviously readers would need to be updated to correctly interpret > this > > > > data. And there is all kinds of supporting work that would be > required > > in > > > > order to maintain these (periodically collapsing diffs into the base, > > > > etc.). > > > > > > > > Is this something for which PRs would be accepted, assuming all of > the > > > > necessary steps to make sure the direction is compatible with > Iceberg's > > > > other use-cases? > > > > > > > > On Wed, Nov 28, 2018 at 1:14 PM Owen O'Malley < > owen.omal...@gmail.com> > > > > wrote: > > > > > > > > > I’m not sure what use case Erik is looking for, but I’ve had users > > that > > > > > want to do the equivalent of HBase’s column families. They want > some > > of > > > > the > > > > > columns to be stored separately and the merged together on read. > The > > > > > requirements would be that there is a 1:1 mapping between rows in > the > > > > > matching files and stripes. > > > > > > > > > > It would look like: > > > > > > > > > > file1.orc: struct<name:string,email:string> file2.orc: > > > > > struct<lastAccess:timestamp> > > > > > > > > > > It would let them leave the stable information and only re-write > the > > > > > second column family when the information in the mutable column > > family > > > > > changes. It would also support use cases where you add data > > enrichment > > > > > columns after the data has been ingested. > > > > > > > > > > From there it is easy to imagine having a replace operator where > > > file2’s > > > > > version of a column replaces file1’s version. > > > > > > > > > > .. Owen > > > > > > > > > > > On Nov 28, 2018, at 9:44 AM, Ryan Blue <rb...@netflix.com.INVALID > > > > > > > wrote: > > > > > > > > > > > > What do you mean by merge on read? > > > > > > > > > > > > A few people I've talked to are interested in building delete and > > > > upsert > > > > > > features. Those would create files that track the changes, which > > > would > > > > be > > > > > > merged at read time to apply them. Is that what you mean? > > > > > > > > > > > > rb > > > > > > > > > > > > On Tue, Nov 27, 2018 at 12:26 PM Erik Wright > > > > > > <erik.wri...@shopify.com.invalid> wrote: > > > > > > > > > > > >> Has any consideration been given to the possibility of eventual > > > > > >> merge-on-read support in the Iceberg table spec? > > > > > >> > > > > > > > > > > > > > > > > > > -- > > > > > > Ryan Blue > > > > > > Software Engineer > > > > > > Netflix > > > > > > > > > > > > > > > > > > > > > > -- > Ryan Blue > Software Engineer > Netflix >