I think the community would welcome these contributions. I've talked with a
couple of companies about this sort of thing already, so there are other
people that could collaborate on it.

On Fri, Nov 30, 2018 at 6:23 PM Erik Wright <erik.wri...@shopify.com.invalid>
wrote:

> Hi Ryan, Owen,
>
> Just following up on this question. Implemented properly, do you see any
> reason that a series of PRs to implement merge-on-read support wouldn't be
> welcomed?
>
> Thanks,
>
> Erik
>
> On Wed., Nov. 28, 2018, 5:25 p.m. Erik Wright <erik.wri...@shopify.com
> wrote:
>
> >
> >
> > On Wed, Nov 28, 2018 at 4:32 PM Owen O'Malley <owen.omal...@gmail.com>
> > wrote:
> >
> >> For Hive's ACID, we started with deltas that had three options per a
> row:
> >> insert, delete, edit. Since that didn't enable predicate push down in
> the
> >> common case where there are large number of inserts, we went to the
> model
> >> of just using inserts and deletes in separate files. Queries that
> >> modifying
> >> tables delete the old row and insert a new one. That allowed us to get
> >> good
> >> performance for read, where it is most critical. There are some
> important
> >> optimizations like for a small number of deletes, you can read all of
> the
> >> deletes into memory and close that file.
> >>
> >
> > Presumably predicate pushdown can still be supported if the deltas are
> > partitioned similarly to the base dataset? Or is the issue about
> predicates
> > on fields that might change between two versions of a row?
> >
> > If I understand correctly, we already do what you ended up with: when a
> > row is updated in a way that moves it between partitions we record a
> delete
> > for the partition that it was removed from and an insertion in the
> > partition it was inserted into.
> >
> > I personally favour inserts/deletes in separate files because it allows
> > the schema of your insert files to be consistent with the dataset schema
> > (with respect to nullability).
> >
> > The delete optimization sounds clever.
> >
> > .. Owen
> >>
> >> On Wed, Nov 28, 2018 at 1:14 PM Erik Wright <erik.wri...@shopify.com
> >> .invalid>
> >> wrote:
> >>
> >> > Those are both really neat use cases, but the one I had in mind was
> what
> >> > Ryan mentioned. It's something that Hoodie apparently supports or is
> >> > building support for, and it's an important use case for the systems
> >> that
> >> > my colleagues and I are building.
> >> >
> >> > There are three scenarios:
> >> >
> >> >    - An Extract system that is receiving updates/deletes from a source
> >> >    system. We wish to capture them as quickly as possible and make
> them
> >> >    available to users without having to restate the affected data
> files.
> >> > The
> >> >    update patterns are not anything that can be addressed with
> >> > partitioning.
> >> >    - A Transform platform that is running a graph of jobs. For some
> jobs
> >> >    that are rebuilt from scratch, we would like to compress the output
> >> > without
> >> >    losing the history.
> >> >    - A Transform / Load system that is building tables on GCS and
> >> >    registering them in Hive for querying by Presto. This system is
> >> >    incrementally updating views, and while some of those views are
> >> >    event-oriented (with most updates clustered in recent history) some
> >> of
> >> > them
> >> >    are not and in those cases there is not partitioning algorithm that
> >> will
> >> >    prevent us from updating virtually all partitions in every update.
> >> >
> >> > We have one example of an internal solution but would prefer something
> >> less
> >> > bespoke. That system works as follows:
> >> >
> >> >    1. For each dataset, unique key columns are defined.
> >> >    2. Datasets are partitioned (not necessarily by anything in the
> key).
> >> >    3. Upserts/deletes are captured in a mutation set.
> >> >    4. The mutation set is used to update affected partitions:
> >> >       1. Identify the previous/new partition for each upserted/deleted
> >> row.
> >> >       2. Open the affected partitions, drop all rows matching an
> >> >       upserted/deleted key.
> >> >       3. Append all upserts.
> >> >       4. Write out the result.
> >> >    5. We maintain an index (effectively an Iceberg snapshot) that says
> >> >    which partitions come from where (we keep the ones that are
> >> unaffected
> >> > from
> >> >    the previous dataset version and add in the updated ones).
> >> >
> >> > This data is loaded into Presto and our current plan is to update it
> by
> >> > registering a view in Presto that applies recent mutation sets to the
> >> > latest merged version on the fly.
> >> >
> >> > So to build this in Iceberg we would likely need to extend the Table
> >> spec
> >> > with:
> >> >
> >> >    - An optional unique key specification, possibly composite, naming
> >> one
> >> >    or more columns for which there is expected to be at most one row
> per
> >> >    unique value.
> >> >    - The ability to indicate in the snapshot that a certain set of
> >> >    manifests are "base" data while other manifests are "diffs".
> >> >    - The ability in a "diff" manifest to indicate files that contain
> >> >    "deleted" keys (or else the ability in a given row to have a
> special
> >> > column
> >> >    that indicates that the row is a "delete" and not an "upsert")
> >> >    - "diff" manifests would need to be ordered in the snapshot (as
> >> multiple
> >> >    "diff" manifests could affect a single row and only the latest of
> >> those
> >> >    takes effect).
> >> >
> >> > Obviously readers would need to be updated to correctly interpret this
> >> > data. And there is all kinds of supporting work that would be required
> >> in
> >> > order to maintain these (periodically collapsing diffs into the base,
> >> > etc.).
> >> >
> >> > Is this something for which PRs would be accepted, assuming all of the
> >> > necessary steps to make sure the direction is compatible with
> Iceberg's
> >> > other use-cases?
> >> >
> >> > On Wed, Nov 28, 2018 at 1:14 PM Owen O'Malley <owen.omal...@gmail.com
> >
> >> > wrote:
> >> >
> >> > > I’m not sure what use case Erik is looking for, but I’ve had users
> >> that
> >> > > want to do the equivalent of HBase’s column families. They want some
> >> of
> >> > the
> >> > > columns to be stored separately and the merged together on read. The
> >> > > requirements would be that there is a 1:1 mapping between rows in
> the
> >> > > matching files and stripes.
> >> > >
> >> > > It would look like:
> >> > >
> >> > > file1.orc: struct<name:string,email:string> file2.orc:
> >> > > struct<lastAccess:timestamp>
> >> > >
> >> > > It would let them leave the stable information and only re-write the
> >> > > second column family when the information in the mutable column
> family
> >> > > changes. It would also support use cases where you add data
> enrichment
> >> > > columns after the data has been ingested.
> >> > >
> >> > > From there it is easy to imagine having a replace operator where
> >> file2’s
> >> > > version of a column replaces file1’s version.
> >> > >
> >> > > .. Owen
> >> > >
> >> > > > On Nov 28, 2018, at 9:44 AM, Ryan Blue <rb...@netflix.com.INVALID
> >
> >> > > wrote:
> >> > > >
> >> > > > What do you mean by merge on read?
> >> > > >
> >> > > > A few people I've talked to are interested in building delete and
> >> > upsert
> >> > > > features. Those would create files that track the changes, which
> >> would
> >> > be
> >> > > > merged at read time to apply them. Is that what you mean?
> >> > > >
> >> > > > rb
> >> > > >
> >> > > > On Tue, Nov 27, 2018 at 12:26 PM Erik Wright
> >> > > > <erik.wri...@shopify.com.invalid> wrote:
> >> > > >
> >> > > >> Has any consideration been given to the possibility of eventual
> >> > > >> merge-on-read support in the Iceberg table spec?
> >> > > >>
> >> > > >
> >> > > >
> >> > > > --
> >> > > > Ryan Blue
> >> > > > Software Engineer
> >> > > > Netflix
> >> > >
> >> > >
> >> >
> >>
> >
> On Nov. 28, 2018 5:25 p.m., "Erik Wright" <erik.wri...@shopify.com> wrote:
>
>
>
> On Wed, Nov 28, 2018 at 4:32 PM Owen O'Malley <owen.omal...@gmail.com>
> wrote:
>
> > For Hive's ACID, we started with deltas that had three options per a row:
> > insert, delete, edit. Since that didn't enable predicate push down in the
> > common case where there are large number of inserts, we went to the model
> > of just using inserts and deletes in separate files. Queries that
> modifying
> > tables delete the old row and insert a new one. That allowed us to get
> good
> > performance for read, where it is most critical. There are some important
> > optimizations like for a small number of deletes, you can read all of the
> > deletes into memory and close that file.
> >
>
> Presumably predicate pushdown can still be supported if the deltas are
> partitioned similarly to the base dataset? Or is the issue about predicates
> on fields that might change between two versions of a row?
>
> If I understand correctly, we already do what you ended up with: when a row
> is updated in a way that moves it between partitions we record a delete for
> the partition that it was removed from and an insertion in the partition it
> was inserted into.
>
> I personally favour inserts/deletes in separate files because it allows the
> schema of your insert files to be consistent with the dataset schema (with
> respect to nullability).
>
> The delete optimization sounds clever.
>
> .. Owen
> >
> > On Wed, Nov 28, 2018 at 1:14 PM Erik Wright <erik.wri...@shopify.com
> > .invalid>
> > wrote:
> >
> > > Those are both really neat use cases, but the one I had in mind was
> what
> > > Ryan mentioned. It's something that Hoodie apparently supports or is
> > > building support for, and it's an important use case for the systems
> that
> > > my colleagues and I are building.
> > >
> > > There are three scenarios:
> > >
> > >    - An Extract system that is receiving updates/deletes from a source
> > >    system. We wish to capture them as quickly as possible and make them
> > >    available to users without having to restate the affected data
> files.
> > > The
> > >    update patterns are not anything that can be addressed with
> > > partitioning.
> > >    - A Transform platform that is running a graph of jobs. For some
> jobs
> > >    that are rebuilt from scratch, we would like to compress the output
> > > without
> > >    losing the history.
> > >    - A Transform / Load system that is building tables on GCS and
> > >    registering them in Hive for querying by Presto. This system is
> > >    incrementally updating views, and while some of those views are
> > >    event-oriented (with most updates clustered in recent history) some
> of
> > > them
> > >    are not and in those cases there is not partitioning algorithm that
> > will
> > >    prevent us from updating virtually all partitions in every update.
> > >
> > > We have one example of an internal solution but would prefer something
> > less
> > > bespoke. That system works as follows:
> > >
> > >    1. For each dataset, unique key columns are defined.
> > >    2. Datasets are partitioned (not necessarily by anything in the
> key).
> > >    3. Upserts/deletes are captured in a mutation set.
> > >    4. The mutation set is used to update affected partitions:
> > >       1. Identify the previous/new partition for each upserted/deleted
> > row.
> > >       2. Open the affected partitions, drop all rows matching an
> > >       upserted/deleted key.
> > >       3. Append all upserts.
> > >       4. Write out the result.
> > >    5. We maintain an index (effectively an Iceberg snapshot) that says
> > >    which partitions come from where (we keep the ones that are
> unaffected
> > > from
> > >    the previous dataset version and add in the updated ones).
> > >
> > > This data is loaded into Presto and our current plan is to update it by
> > > registering a view in Presto that applies recent mutation sets to the
> > > latest merged version on the fly.
> > >
> > > So to build this in Iceberg we would likely need to extend the Table
> spec
> > > with:
> > >
> > >    - An optional unique key specification, possibly composite, naming
> one
> > >    or more columns for which there is expected to be at most one row
> per
> > >    unique value.
> > >    - The ability to indicate in the snapshot that a certain set of
> > >    manifests are "base" data while other manifests are "diffs".
> > >    - The ability in a "diff" manifest to indicate files that contain
> > >    "deleted" keys (or else the ability in a given row to have a special
> > > column
> > >    that indicates that the row is a "delete" and not an "upsert")
> > >    - "diff" manifests would need to be ordered in the snapshot (as
> > multiple
> > >    "diff" manifests could affect a single row and only the latest of
> > those
> > >    takes effect).
> > >
> > > Obviously readers would need to be updated to correctly interpret this
> > > data. And there is all kinds of supporting work that would be required
> in
> > > order to maintain these (periodically collapsing diffs into the base,
> > > etc.).
> > >
> > > Is this something for which PRs would be accepted, assuming all of the
> > > necessary steps to make sure the direction is compatible with Iceberg's
> > > other use-cases?
> > >
> > > On Wed, Nov 28, 2018 at 1:14 PM Owen O'Malley <owen.omal...@gmail.com>
> > > wrote:
> > >
> > > > I’m not sure what use case Erik is looking for, but I’ve had users
> that
> > > > want to do the equivalent of HBase’s column families. They want some
> of
> > > the
> > > > columns to be stored separately and the merged together on read. The
> > > > requirements would be that there is a 1:1 mapping between rows in the
> > > > matching files and stripes.
> > > >
> > > > It would look like:
> > > >
> > > > file1.orc: struct<name:string,email:string> file2.orc:
> > > > struct<lastAccess:timestamp>
> > > >
> > > > It would let them leave the stable information and only re-write the
> > > > second column family when the information in the mutable column
> family
> > > > changes. It would also support use cases where you add data
> enrichment
> > > > columns after the data has been ingested.
> > > >
> > > > From there it is easy to imagine having a replace operator where
> > file2’s
> > > > version of a column replaces file1’s version.
> > > >
> > > > .. Owen
> > > >
> > > > > On Nov 28, 2018, at 9:44 AM, Ryan Blue <rb...@netflix.com.INVALID>
> > > > wrote:
> > > > >
> > > > > What do you mean by merge on read?
> > > > >
> > > > > A few people I've talked to are interested in building delete and
> > > upsert
> > > > > features. Those would create files that track the changes, which
> > would
> > > be
> > > > > merged at read time to apply them. Is that what you mean?
> > > > >
> > > > > rb
> > > > >
> > > > > On Tue, Nov 27, 2018 at 12:26 PM Erik Wright
> > > > > <erik.wri...@shopify.com.invalid> wrote:
> > > > >
> > > > >> Has any consideration been given to the possibility of eventual
> > > > >> merge-on-read support in the Iceberg table spec?
> > > > >>
> > > > >
> > > > >
> > > > > --
> > > > > Ryan Blue
> > > > > Software Engineer
> > > > > Netflix
> > > >
> > > >
> > >
> >
>


-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to