Thanks for the feedback Ryan. We're sorting through a few ideas for how to
move forward and this helps in our thinking.

On Fri, Nov 30, 2018 at 10:10 PM Ryan Blue <rb...@netflix.com.invalid>
wrote:

> I think the community would welcome these contributions. I've talked with a
> couple of companies about this sort of thing already, so there are other
> people that could collaborate on it.
>
> On Fri, Nov 30, 2018 at 6:23 PM Erik Wright <erik.wri...@shopify.com
> .invalid>
> wrote:
>
> > Hi Ryan, Owen,
> >
> > Just following up on this question. Implemented properly, do you see any
> > reason that a series of PRs to implement merge-on-read support wouldn't
> be
> > welcomed?
> >
> > Thanks,
> >
> > Erik
> >
> > On Wed., Nov. 28, 2018, 5:25 p.m. Erik Wright <erik.wri...@shopify.com
> > wrote:
> >
> > >
> > >
> > > On Wed, Nov 28, 2018 at 4:32 PM Owen O'Malley <owen.omal...@gmail.com>
> > > wrote:
> > >
> > >> For Hive's ACID, we started with deltas that had three options per a
> > row:
> > >> insert, delete, edit. Since that didn't enable predicate push down in
> > the
> > >> common case where there are large number of inserts, we went to the
> > model
> > >> of just using inserts and deletes in separate files. Queries that
> > >> modifying
> > >> tables delete the old row and insert a new one. That allowed us to get
> > >> good
> > >> performance for read, where it is most critical. There are some
> > important
> > >> optimizations like for a small number of deletes, you can read all of
> > the
> > >> deletes into memory and close that file.
> > >>
> > >
> > > Presumably predicate pushdown can still be supported if the deltas are
> > > partitioned similarly to the base dataset? Or is the issue about
> > predicates
> > > on fields that might change between two versions of a row?
> > >
> > > If I understand correctly, we already do what you ended up with: when a
> > > row is updated in a way that moves it between partitions we record a
> > delete
> > > for the partition that it was removed from and an insertion in the
> > > partition it was inserted into.
> > >
> > > I personally favour inserts/deletes in separate files because it allows
> > > the schema of your insert files to be consistent with the dataset
> schema
> > > (with respect to nullability).
> > >
> > > The delete optimization sounds clever.
> > >
> > > .. Owen
> > >>
> > >> On Wed, Nov 28, 2018 at 1:14 PM Erik Wright <erik.wri...@shopify.com
> > >> .invalid>
> > >> wrote:
> > >>
> > >> > Those are both really neat use cases, but the one I had in mind was
> > what
> > >> > Ryan mentioned. It's something that Hoodie apparently supports or is
> > >> > building support for, and it's an important use case for the systems
> > >> that
> > >> > my colleagues and I are building.
> > >> >
> > >> > There are three scenarios:
> > >> >
> > >> >    - An Extract system that is receiving updates/deletes from a
> source
> > >> >    system. We wish to capture them as quickly as possible and make
> > them
> > >> >    available to users without having to restate the affected data
> > files.
> > >> > The
> > >> >    update patterns are not anything that can be addressed with
> > >> > partitioning.
> > >> >    - A Transform platform that is running a graph of jobs. For some
> > jobs
> > >> >    that are rebuilt from scratch, we would like to compress the
> output
> > >> > without
> > >> >    losing the history.
> > >> >    - A Transform / Load system that is building tables on GCS and
> > >> >    registering them in Hive for querying by Presto. This system is
> > >> >    incrementally updating views, and while some of those views are
> > >> >    event-oriented (with most updates clustered in recent history)
> some
> > >> of
> > >> > them
> > >> >    are not and in those cases there is not partitioning algorithm
> that
> > >> will
> > >> >    prevent us from updating virtually all partitions in every
> update.
> > >> >
> > >> > We have one example of an internal solution but would prefer
> something
> > >> less
> > >> > bespoke. That system works as follows:
> > >> >
> > >> >    1. For each dataset, unique key columns are defined.
> > >> >    2. Datasets are partitioned (not necessarily by anything in the
> > key).
> > >> >    3. Upserts/deletes are captured in a mutation set.
> > >> >    4. The mutation set is used to update affected partitions:
> > >> >       1. Identify the previous/new partition for each
> upserted/deleted
> > >> row.
> > >> >       2. Open the affected partitions, drop all rows matching an
> > >> >       upserted/deleted key.
> > >> >       3. Append all upserts.
> > >> >       4. Write out the result.
> > >> >    5. We maintain an index (effectively an Iceberg snapshot) that
> says
> > >> >    which partitions come from where (we keep the ones that are
> > >> unaffected
> > >> > from
> > >> >    the previous dataset version and add in the updated ones).
> > >> >
> > >> > This data is loaded into Presto and our current plan is to update it
> > by
> > >> > registering a view in Presto that applies recent mutation sets to
> the
> > >> > latest merged version on the fly.
> > >> >
> > >> > So to build this in Iceberg we would likely need to extend the Table
> > >> spec
> > >> > with:
> > >> >
> > >> >    - An optional unique key specification, possibly composite,
> naming
> > >> one
> > >> >    or more columns for which there is expected to be at most one row
> > per
> > >> >    unique value.
> > >> >    - The ability to indicate in the snapshot that a certain set of
> > >> >    manifests are "base" data while other manifests are "diffs".
> > >> >    - The ability in a "diff" manifest to indicate files that contain
> > >> >    "deleted" keys (or else the ability in a given row to have a
> > special
> > >> > column
> > >> >    that indicates that the row is a "delete" and not an "upsert")
> > >> >    - "diff" manifests would need to be ordered in the snapshot (as
> > >> multiple
> > >> >    "diff" manifests could affect a single row and only the latest of
> > >> those
> > >> >    takes effect).
> > >> >
> > >> > Obviously readers would need to be updated to correctly interpret
> this
> > >> > data. And there is all kinds of supporting work that would be
> required
> > >> in
> > >> > order to maintain these (periodically collapsing diffs into the
> base,
> > >> > etc.).
> > >> >
> > >> > Is this something for which PRs would be accepted, assuming all of
> the
> > >> > necessary steps to make sure the direction is compatible with
> > Iceberg's
> > >> > other use-cases?
> > >> >
> > >> > On Wed, Nov 28, 2018 at 1:14 PM Owen O'Malley <
> owen.omal...@gmail.com
> > >
> > >> > wrote:
> > >> >
> > >> > > I’m not sure what use case Erik is looking for, but I’ve had users
> > >> that
> > >> > > want to do the equivalent of HBase’s column families. They want
> some
> > >> of
> > >> > the
> > >> > > columns to be stored separately and the merged together on read.
> The
> > >> > > requirements would be that there is a 1:1 mapping between rows in
> > the
> > >> > > matching files and stripes.
> > >> > >
> > >> > > It would look like:
> > >> > >
> > >> > > file1.orc: struct<name:string,email:string> file2.orc:
> > >> > > struct<lastAccess:timestamp>
> > >> > >
> > >> > > It would let them leave the stable information and only re-write
> the
> > >> > > second column family when the information in the mutable column
> > family
> > >> > > changes. It would also support use cases where you add data
> > enrichment
> > >> > > columns after the data has been ingested.
> > >> > >
> > >> > > From there it is easy to imagine having a replace operator where
> > >> file2’s
> > >> > > version of a column replaces file1’s version.
> > >> > >
> > >> > > .. Owen
> > >> > >
> > >> > > > On Nov 28, 2018, at 9:44 AM, Ryan Blue
> <rb...@netflix.com.INVALID
> > >
> > >> > > wrote:
> > >> > > >
> > >> > > > What do you mean by merge on read?
> > >> > > >
> > >> > > > A few people I've talked to are interested in building delete
> and
> > >> > upsert
> > >> > > > features. Those would create files that track the changes, which
> > >> would
> > >> > be
> > >> > > > merged at read time to apply them. Is that what you mean?
> > >> > > >
> > >> > > > rb
> > >> > > >
> > >> > > > On Tue, Nov 27, 2018 at 12:26 PM Erik Wright
> > >> > > > <erik.wri...@shopify.com.invalid> wrote:
> > >> > > >
> > >> > > >> Has any consideration been given to the possibility of eventual
> > >> > > >> merge-on-read support in the Iceberg table spec?
> > >> > > >>
> > >> > > >
> > >> > > >
> > >> > > > --
> > >> > > > Ryan Blue
> > >> > > > Software Engineer
> > >> > > > Netflix
> > >> > >
> > >> > >
> > >> >
> > >>
> > >
> > On Nov. 28, 2018 5:25 p.m., "Erik Wright" <erik.wri...@shopify.com>
> wrote:
> >
> >
> >
> > On Wed, Nov 28, 2018 at 4:32 PM Owen O'Malley <owen.omal...@gmail.com>
> > wrote:
> >
> > > For Hive's ACID, we started with deltas that had three options per a
> row:
> > > insert, delete, edit. Since that didn't enable predicate push down in
> the
> > > common case where there are large number of inserts, we went to the
> model
> > > of just using inserts and deletes in separate files. Queries that
> > modifying
> > > tables delete the old row and insert a new one. That allowed us to get
> > good
> > > performance for read, where it is most critical. There are some
> important
> > > optimizations like for a small number of deletes, you can read all of
> the
> > > deletes into memory and close that file.
> > >
> >
> > Presumably predicate pushdown can still be supported if the deltas are
> > partitioned similarly to the base dataset? Or is the issue about
> predicates
> > on fields that might change between two versions of a row?
> >
> > If I understand correctly, we already do what you ended up with: when a
> row
> > is updated in a way that moves it between partitions we record a delete
> for
> > the partition that it was removed from and an insertion in the partition
> it
> > was inserted into.
> >
> > I personally favour inserts/deletes in separate files because it allows
> the
> > schema of your insert files to be consistent with the dataset schema
> (with
> > respect to nullability).
> >
> > The delete optimization sounds clever.
> >
> > .. Owen
> > >
> > > On Wed, Nov 28, 2018 at 1:14 PM Erik Wright <erik.wri...@shopify.com
> > > .invalid>
> > > wrote:
> > >
> > > > Those are both really neat use cases, but the one I had in mind was
> > what
> > > > Ryan mentioned. It's something that Hoodie apparently supports or is
> > > > building support for, and it's an important use case for the systems
> > that
> > > > my colleagues and I are building.
> > > >
> > > > There are three scenarios:
> > > >
> > > >    - An Extract system that is receiving updates/deletes from a
> source
> > > >    system. We wish to capture them as quickly as possible and make
> them
> > > >    available to users without having to restate the affected data
> > files.
> > > > The
> > > >    update patterns are not anything that can be addressed with
> > > > partitioning.
> > > >    - A Transform platform that is running a graph of jobs. For some
> > jobs
> > > >    that are rebuilt from scratch, we would like to compress the
> output
> > > > without
> > > >    losing the history.
> > > >    - A Transform / Load system that is building tables on GCS and
> > > >    registering them in Hive for querying by Presto. This system is
> > > >    incrementally updating views, and while some of those views are
> > > >    event-oriented (with most updates clustered in recent history)
> some
> > of
> > > > them
> > > >    are not and in those cases there is not partitioning algorithm
> that
> > > will
> > > >    prevent us from updating virtually all partitions in every update.
> > > >
> > > > We have one example of an internal solution but would prefer
> something
> > > less
> > > > bespoke. That system works as follows:
> > > >
> > > >    1. For each dataset, unique key columns are defined.
> > > >    2. Datasets are partitioned (not necessarily by anything in the
> > key).
> > > >    3. Upserts/deletes are captured in a mutation set.
> > > >    4. The mutation set is used to update affected partitions:
> > > >       1. Identify the previous/new partition for each
> upserted/deleted
> > > row.
> > > >       2. Open the affected partitions, drop all rows matching an
> > > >       upserted/deleted key.
> > > >       3. Append all upserts.
> > > >       4. Write out the result.
> > > >    5. We maintain an index (effectively an Iceberg snapshot) that
> says
> > > >    which partitions come from where (we keep the ones that are
> > unaffected
> > > > from
> > > >    the previous dataset version and add in the updated ones).
> > > >
> > > > This data is loaded into Presto and our current plan is to update it
> by
> > > > registering a view in Presto that applies recent mutation sets to the
> > > > latest merged version on the fly.
> > > >
> > > > So to build this in Iceberg we would likely need to extend the Table
> > spec
> > > > with:
> > > >
> > > >    - An optional unique key specification, possibly composite, naming
> > one
> > > >    or more columns for which there is expected to be at most one row
> > per
> > > >    unique value.
> > > >    - The ability to indicate in the snapshot that a certain set of
> > > >    manifests are "base" data while other manifests are "diffs".
> > > >    - The ability in a "diff" manifest to indicate files that contain
> > > >    "deleted" keys (or else the ability in a given row to have a
> special
> > > > column
> > > >    that indicates that the row is a "delete" and not an "upsert")
> > > >    - "diff" manifests would need to be ordered in the snapshot (as
> > > multiple
> > > >    "diff" manifests could affect a single row and only the latest of
> > > those
> > > >    takes effect).
> > > >
> > > > Obviously readers would need to be updated to correctly interpret
> this
> > > > data. And there is all kinds of supporting work that would be
> required
> > in
> > > > order to maintain these (periodically collapsing diffs into the base,
> > > > etc.).
> > > >
> > > > Is this something for which PRs would be accepted, assuming all of
> the
> > > > necessary steps to make sure the direction is compatible with
> Iceberg's
> > > > other use-cases?
> > > >
> > > > On Wed, Nov 28, 2018 at 1:14 PM Owen O'Malley <
> owen.omal...@gmail.com>
> > > > wrote:
> > > >
> > > > > I’m not sure what use case Erik is looking for, but I’ve had users
> > that
> > > > > want to do the equivalent of HBase’s column families. They want
> some
> > of
> > > > the
> > > > > columns to be stored separately and the merged together on read.
> The
> > > > > requirements would be that there is a 1:1 mapping between rows in
> the
> > > > > matching files and stripes.
> > > > >
> > > > > It would look like:
> > > > >
> > > > > file1.orc: struct<name:string,email:string> file2.orc:
> > > > > struct<lastAccess:timestamp>
> > > > >
> > > > > It would let them leave the stable information and only re-write
> the
> > > > > second column family when the information in the mutable column
> > family
> > > > > changes. It would also support use cases where you add data
> > enrichment
> > > > > columns after the data has been ingested.
> > > > >
> > > > > From there it is easy to imagine having a replace operator where
> > > file2’s
> > > > > version of a column replaces file1’s version.
> > > > >
> > > > > .. Owen
> > > > >
> > > > > > On Nov 28, 2018, at 9:44 AM, Ryan Blue <rb...@netflix.com.INVALID
> >
> > > > > wrote:
> > > > > >
> > > > > > What do you mean by merge on read?
> > > > > >
> > > > > > A few people I've talked to are interested in building delete and
> > > > upsert
> > > > > > features. Those would create files that track the changes, which
> > > would
> > > > be
> > > > > > merged at read time to apply them. Is that what you mean?
> > > > > >
> > > > > > rb
> > > > > >
> > > > > > On Tue, Nov 27, 2018 at 12:26 PM Erik Wright
> > > > > > <erik.wri...@shopify.com.invalid> wrote:
> > > > > >
> > > > > >> Has any consideration been given to the possibility of eventual
> > > > > >> merge-on-read support in the Iceberg table spec?
> > > > > >>
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Ryan Blue
> > > > > > Software Engineer
> > > > > > Netflix
> > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Reply via email to