I think the community would welcome these contributions. I've talked with a couple of companies about this sort of thing already, so there are other people that could collaborate on it.
On Fri, Nov 30, 2018 at 6:23 PM Erik Wright <erik.wri...@shopify.com.invalid> wrote: > Hi Ryan, Owen, > > Just following up on this question. Implemented properly, do you see any > reason that a series of PRs to implement merge-on-read support wouldn't be > welcomed? > > Thanks, > > Erik > > On Wed., Nov. 28, 2018, 5:25 p.m. Erik Wright <erik.wri...@shopify.com > wrote: > > > > > > > On Wed, Nov 28, 2018 at 4:32 PM Owen O'Malley <owen.omal...@gmail.com> > > wrote: > > > >> For Hive's ACID, we started with deltas that had three options per a > row: > >> insert, delete, edit. Since that didn't enable predicate push down in > the > >> common case where there are large number of inserts, we went to the > model > >> of just using inserts and deletes in separate files. Queries that > >> modifying > >> tables delete the old row and insert a new one. That allowed us to get > >> good > >> performance for read, where it is most critical. There are some > important > >> optimizations like for a small number of deletes, you can read all of > the > >> deletes into memory and close that file. > >> > > > > Presumably predicate pushdown can still be supported if the deltas are > > partitioned similarly to the base dataset? Or is the issue about > predicates > > on fields that might change between two versions of a row? > > > > If I understand correctly, we already do what you ended up with: when a > > row is updated in a way that moves it between partitions we record a > delete > > for the partition that it was removed from and an insertion in the > > partition it was inserted into. > > > > I personally favour inserts/deletes in separate files because it allows > > the schema of your insert files to be consistent with the dataset schema > > (with respect to nullability). > > > > The delete optimization sounds clever. > > > > .. Owen > >> > >> On Wed, Nov 28, 2018 at 1:14 PM Erik Wright <erik.wri...@shopify.com > >> .invalid> > >> wrote: > >> > >> > Those are both really neat use cases, but the one I had in mind was > what > >> > Ryan mentioned. It's something that Hoodie apparently supports or is > >> > building support for, and it's an important use case for the systems > >> that > >> > my colleagues and I are building. > >> > > >> > There are three scenarios: > >> > > >> > - An Extract system that is receiving updates/deletes from a source > >> > system. We wish to capture them as quickly as possible and make > them > >> > available to users without having to restate the affected data > files. > >> > The > >> > update patterns are not anything that can be addressed with > >> > partitioning. > >> > - A Transform platform that is running a graph of jobs. For some > jobs > >> > that are rebuilt from scratch, we would like to compress the output > >> > without > >> > losing the history. > >> > - A Transform / Load system that is building tables on GCS and > >> > registering them in Hive for querying by Presto. This system is > >> > incrementally updating views, and while some of those views are > >> > event-oriented (with most updates clustered in recent history) some > >> of > >> > them > >> > are not and in those cases there is not partitioning algorithm that > >> will > >> > prevent us from updating virtually all partitions in every update. > >> > > >> > We have one example of an internal solution but would prefer something > >> less > >> > bespoke. That system works as follows: > >> > > >> > 1. For each dataset, unique key columns are defined. > >> > 2. Datasets are partitioned (not necessarily by anything in the > key). > >> > 3. Upserts/deletes are captured in a mutation set. > >> > 4. The mutation set is used to update affected partitions: > >> > 1. Identify the previous/new partition for each upserted/deleted > >> row. > >> > 2. Open the affected partitions, drop all rows matching an > >> > upserted/deleted key. > >> > 3. Append all upserts. > >> > 4. Write out the result. > >> > 5. We maintain an index (effectively an Iceberg snapshot) that says > >> > which partitions come from where (we keep the ones that are > >> unaffected > >> > from > >> > the previous dataset version and add in the updated ones). > >> > > >> > This data is loaded into Presto and our current plan is to update it > by > >> > registering a view in Presto that applies recent mutation sets to the > >> > latest merged version on the fly. > >> > > >> > So to build this in Iceberg we would likely need to extend the Table > >> spec > >> > with: > >> > > >> > - An optional unique key specification, possibly composite, naming > >> one > >> > or more columns for which there is expected to be at most one row > per > >> > unique value. > >> > - The ability to indicate in the snapshot that a certain set of > >> > manifests are "base" data while other manifests are "diffs". > >> > - The ability in a "diff" manifest to indicate files that contain > >> > "deleted" keys (or else the ability in a given row to have a > special > >> > column > >> > that indicates that the row is a "delete" and not an "upsert") > >> > - "diff" manifests would need to be ordered in the snapshot (as > >> multiple > >> > "diff" manifests could affect a single row and only the latest of > >> those > >> > takes effect). > >> > > >> > Obviously readers would need to be updated to correctly interpret this > >> > data. And there is all kinds of supporting work that would be required > >> in > >> > order to maintain these (periodically collapsing diffs into the base, > >> > etc.). > >> > > >> > Is this something for which PRs would be accepted, assuming all of the > >> > necessary steps to make sure the direction is compatible with > Iceberg's > >> > other use-cases? > >> > > >> > On Wed, Nov 28, 2018 at 1:14 PM Owen O'Malley <owen.omal...@gmail.com > > > >> > wrote: > >> > > >> > > I’m not sure what use case Erik is looking for, but I’ve had users > >> that > >> > > want to do the equivalent of HBase’s column families. They want some > >> of > >> > the > >> > > columns to be stored separately and the merged together on read. The > >> > > requirements would be that there is a 1:1 mapping between rows in > the > >> > > matching files and stripes. > >> > > > >> > > It would look like: > >> > > > >> > > file1.orc: struct<name:string,email:string> file2.orc: > >> > > struct<lastAccess:timestamp> > >> > > > >> > > It would let them leave the stable information and only re-write the > >> > > second column family when the information in the mutable column > family > >> > > changes. It would also support use cases where you add data > enrichment > >> > > columns after the data has been ingested. > >> > > > >> > > From there it is easy to imagine having a replace operator where > >> file2’s > >> > > version of a column replaces file1’s version. > >> > > > >> > > .. Owen > >> > > > >> > > > On Nov 28, 2018, at 9:44 AM, Ryan Blue <rb...@netflix.com.INVALID > > > >> > > wrote: > >> > > > > >> > > > What do you mean by merge on read? > >> > > > > >> > > > A few people I've talked to are interested in building delete and > >> > upsert > >> > > > features. Those would create files that track the changes, which > >> would > >> > be > >> > > > merged at read time to apply them. Is that what you mean? > >> > > > > >> > > > rb > >> > > > > >> > > > On Tue, Nov 27, 2018 at 12:26 PM Erik Wright > >> > > > <erik.wri...@shopify.com.invalid> wrote: > >> > > > > >> > > >> Has any consideration been given to the possibility of eventual > >> > > >> merge-on-read support in the Iceberg table spec? > >> > > >> > >> > > > > >> > > > > >> > > > -- > >> > > > Ryan Blue > >> > > > Software Engineer > >> > > > Netflix > >> > > > >> > > > >> > > >> > > > On Nov. 28, 2018 5:25 p.m., "Erik Wright" <erik.wri...@shopify.com> wrote: > > > > On Wed, Nov 28, 2018 at 4:32 PM Owen O'Malley <owen.omal...@gmail.com> > wrote: > > > For Hive's ACID, we started with deltas that had three options per a row: > > insert, delete, edit. Since that didn't enable predicate push down in the > > common case where there are large number of inserts, we went to the model > > of just using inserts and deletes in separate files. Queries that > modifying > > tables delete the old row and insert a new one. That allowed us to get > good > > performance for read, where it is most critical. There are some important > > optimizations like for a small number of deletes, you can read all of the > > deletes into memory and close that file. > > > > Presumably predicate pushdown can still be supported if the deltas are > partitioned similarly to the base dataset? Or is the issue about predicates > on fields that might change between two versions of a row? > > If I understand correctly, we already do what you ended up with: when a row > is updated in a way that moves it between partitions we record a delete for > the partition that it was removed from and an insertion in the partition it > was inserted into. > > I personally favour inserts/deletes in separate files because it allows the > schema of your insert files to be consistent with the dataset schema (with > respect to nullability). > > The delete optimization sounds clever. > > .. Owen > > > > On Wed, Nov 28, 2018 at 1:14 PM Erik Wright <erik.wri...@shopify.com > > .invalid> > > wrote: > > > > > Those are both really neat use cases, but the one I had in mind was > what > > > Ryan mentioned. It's something that Hoodie apparently supports or is > > > building support for, and it's an important use case for the systems > that > > > my colleagues and I are building. > > > > > > There are three scenarios: > > > > > > - An Extract system that is receiving updates/deletes from a source > > > system. We wish to capture them as quickly as possible and make them > > > available to users without having to restate the affected data > files. > > > The > > > update patterns are not anything that can be addressed with > > > partitioning. > > > - A Transform platform that is running a graph of jobs. For some > jobs > > > that are rebuilt from scratch, we would like to compress the output > > > without > > > losing the history. > > > - A Transform / Load system that is building tables on GCS and > > > registering them in Hive for querying by Presto. This system is > > > incrementally updating views, and while some of those views are > > > event-oriented (with most updates clustered in recent history) some > of > > > them > > > are not and in those cases there is not partitioning algorithm that > > will > > > prevent us from updating virtually all partitions in every update. > > > > > > We have one example of an internal solution but would prefer something > > less > > > bespoke. That system works as follows: > > > > > > 1. For each dataset, unique key columns are defined. > > > 2. Datasets are partitioned (not necessarily by anything in the > key). > > > 3. Upserts/deletes are captured in a mutation set. > > > 4. The mutation set is used to update affected partitions: > > > 1. Identify the previous/new partition for each upserted/deleted > > row. > > > 2. Open the affected partitions, drop all rows matching an > > > upserted/deleted key. > > > 3. Append all upserts. > > > 4. Write out the result. > > > 5. We maintain an index (effectively an Iceberg snapshot) that says > > > which partitions come from where (we keep the ones that are > unaffected > > > from > > > the previous dataset version and add in the updated ones). > > > > > > This data is loaded into Presto and our current plan is to update it by > > > registering a view in Presto that applies recent mutation sets to the > > > latest merged version on the fly. > > > > > > So to build this in Iceberg we would likely need to extend the Table > spec > > > with: > > > > > > - An optional unique key specification, possibly composite, naming > one > > > or more columns for which there is expected to be at most one row > per > > > unique value. > > > - The ability to indicate in the snapshot that a certain set of > > > manifests are "base" data while other manifests are "diffs". > > > - The ability in a "diff" manifest to indicate files that contain > > > "deleted" keys (or else the ability in a given row to have a special > > > column > > > that indicates that the row is a "delete" and not an "upsert") > > > - "diff" manifests would need to be ordered in the snapshot (as > > multiple > > > "diff" manifests could affect a single row and only the latest of > > those > > > takes effect). > > > > > > Obviously readers would need to be updated to correctly interpret this > > > data. And there is all kinds of supporting work that would be required > in > > > order to maintain these (periodically collapsing diffs into the base, > > > etc.). > > > > > > Is this something for which PRs would be accepted, assuming all of the > > > necessary steps to make sure the direction is compatible with Iceberg's > > > other use-cases? > > > > > > On Wed, Nov 28, 2018 at 1:14 PM Owen O'Malley <owen.omal...@gmail.com> > > > wrote: > > > > > > > I’m not sure what use case Erik is looking for, but I’ve had users > that > > > > want to do the equivalent of HBase’s column families. They want some > of > > > the > > > > columns to be stored separately and the merged together on read. The > > > > requirements would be that there is a 1:1 mapping between rows in the > > > > matching files and stripes. > > > > > > > > It would look like: > > > > > > > > file1.orc: struct<name:string,email:string> file2.orc: > > > > struct<lastAccess:timestamp> > > > > > > > > It would let them leave the stable information and only re-write the > > > > second column family when the information in the mutable column > family > > > > changes. It would also support use cases where you add data > enrichment > > > > columns after the data has been ingested. > > > > > > > > From there it is easy to imagine having a replace operator where > > file2’s > > > > version of a column replaces file1’s version. > > > > > > > > .. Owen > > > > > > > > > On Nov 28, 2018, at 9:44 AM, Ryan Blue <rb...@netflix.com.INVALID> > > > > wrote: > > > > > > > > > > What do you mean by merge on read? > > > > > > > > > > A few people I've talked to are interested in building delete and > > > upsert > > > > > features. Those would create files that track the changes, which > > would > > > be > > > > > merged at read time to apply them. Is that what you mean? > > > > > > > > > > rb > > > > > > > > > > On Tue, Nov 27, 2018 at 12:26 PM Erik Wright > > > > > <erik.wri...@shopify.com.invalid> wrote: > > > > > > > > > >> Has any consideration been given to the possibility of eventual > > > > >> merge-on-read support in the Iceberg table spec? > > > > >> > > > > > > > > > > > > > > > -- > > > > > Ryan Blue > > > > > Software Engineer > > > > > Netflix > > > > > > > > > > > > > > -- Ryan Blue Software Engineer Netflix