I think the question is what we mean by doing this synchronously. For instance, I have doubts it would be a good idea to do this in each commit attempt unless we can prove the overhead is negligible with a benchmark. We risk failing a job for the sake of updating partition stats. I can see the need to update the stats immediately after an operation. However, why not commit first and then do a post-commit action to update the stats? If succeeds, good. If not, we will try again next time.
On 2023/10/11 17:14:12 Ajantha Bhat wrote: > Hi All, > > As per the above proposal, I have worked on a POC ( > https://github.com/apache/iceberg/pull/8488). > > *But to move things forward, first we need to merge the spec PR > (https://github.com/apache/iceberg/pull/7105 > <https://github.com/apache/iceberg/pull/7105>). *I don't see any blocker > for the spec. Please review and approve if it is ok. > > One topic that came up during the review is whether to write stats > synchronously or asynchronously. > My suggestion is that we need to support both. I think we can first have an > async writing implementation. > But we also need to support sync stats writing with writes (controlled by a > table property). > > Some engines like Trino, Dremio can make use of the sync writing of the > stats. > Currently Puffin stats also supports sync writing from Trino. > > Thanks, > Ajantha > > > On Mon, May 22, 2023 at 10:15 PM Ryan Blue <b...@tabular.io> wrote: > > > Thanks, Ajantha. I think it's safe to say that we should continue assuming > > that we will have one partition stats file. I agree that it should be small > > and we don't want to block the progress here. > > > > On Mon, May 22, 2023 at 5:07 AM Ajantha Bhat <ajanthab...@gmail.com> > > wrote: > > > >> Hi Anton and Ryan, > >> > >> The Partition stats spec PR <https://github.com/apache/iceberg/pull/7105> > >> didn't > >> move forward as Anton wanted to conduct some experiments to conclude > >> whether single-file writing or multiple files is better. > >> I conducted the experiments myself and attached some numbers in the PR. > >> > >> I would like to take this forward. > >> Please let me know what you think (can comment on the PR). > >> > >> As the output file is very small and initially the stats are computed > >> asynchronously, > >> I think writing them as a single file should be good enough. > >> In future, If we need faster stats writing (along with each write > >> operation) we can also implement multiple stats files. > >> > >> Just like how copy-on-write and merge-on-read are serving their use cases > >> in Iceberg, > >> we might have to support both single-file writing and multiple-file > >> writing in the long run. > >> > >> Thanks, > >> Ajantha > >> > >> On Wed, May 17, 2023 at 1:38 AM Mayur Srivastava < > >> mayur.srivast...@twosigma.com> wrote: > >> > >>> I agree, it totally depends on the way “last modified time” per > >>> partition is implemented. > >>> > >>> I’m concerned about performance of computing partition stats (and > >>> storage + the size of table metadata files) if the implementation requires > >>> users to keep around all snapshots. (I described one of my use case in > >>> this > >>> thread earlier.) > >>> > >>> > >>> > >>> *From:* Pucheng Yang <py...@pinterest.com.INVALID> > >>> *Sent:* Monday, May 15, 2023 11:46 AM > >>> *To:* dev@iceberg.apache.org > >>> *Subject:* Re: [Proposal] Partition stats in Iceberg > >>> > >>> > >>> > >>> Hi Mayur, can you elaborate your concern? I don't know how this is going > >>> to be implemented so not sure where the performance issue is. > >>> > >>> > >>> > >>> On Mon, May 15, 2023 at 7:55 AM Mayur Srivastava < > >>> mayur.srivast...@twosigma.com> wrote: > >>> > >>> Thanks Ryan. > >>> > >>> For most partition stats, I’m ok with compaction and keeping fewer > >>> snapshots. My concern was for supporting last modified time. I guess, if > >>> we > >>> need to keep all snapshots to support last modified time, it could have > >>> impact on metadata access performance. > >>> > >>> > >>> > >>> Thanks, > >>> > >>> Mayur > >>> > >>> > >>> > >>> *From:* Ryan Blue <b...@tabular.io> > >>> *Sent:* Wednesday, May 3, 2023 2:00 PM > >>> *To:* dev@iceberg.apache.org > >>> *Subject:* Re: [Proposal] Partition stats in Iceberg > >>> > >>> > >>> > >>> Mayur, your use case may require a lot of snapshots, but we generally > >>> recommend expiring them after a few days. You can tag snapshots to keep > >>> them around longer than that. > >>> > >>> > >>> > >>> On Tue, May 2, 2023 at 4:52 PM Mayur Srivastava < > >>> mayur.p.srivast...@gmail.com> wrote: > >>> > >>> Thanks for the response. > >>> > >>> One of the use cases that we have is where one business day of data is > >>> added at a time to a DAY partitioned table. With 25 years of this data, > >>> there will be ~6250 partitions and as many snapshots. Are these many > >>> snapshots recommended to be kept around? > >>> > >>> > >>> > >>> On Tue, May 2, 2023 at 7:45 PM Szehon Ho <szehon.apa...@gmail.com> > >>> wrote: > >>> > >>> > >>> > >>> Does snapshot expiration needs to be disabled for this to work? Thanks, > >>> Mayur > >>> > >>> > >>> Yes, the snapshot that last updated the partition needs to be around for > >>> this to work. > >>> > >>> > >>> > >>> Szehon, the query you shared requires a SparkSQL job to be run which > >>> means latency will be high. However, I am glad you are also thinking of > >>> adding these directly to the partition table and it seems we share the > >>> same > >>> interests. > >>> > >>> > >>> Yea the partitions table currently still goes through SparkSQL, so it > >>> will be the same. Maybe you mean add this to partition stats? We do need > >>> to reconcile partition table and partition stats at some point though. > >>> Not > >>> sure if it was designed/discussed yet, I think there was some thoughts on > >>> short-circuiting Partitions table to read from Partition stats, if stats > >>> exist for the current snapshot. > >>> > >>> > >>> > >>> Thanks > >>> > >>> Szehon > >>> > >>> > >>> > >>> On Tue, May 2, 2023 at 4:34 PM Pucheng Yang <py...@pinterest.com.invalid> > >>> wrote: > >>> > >>> Thanks Ryan and Szehon! > >>> > >>> > >>> > >>> Szehon, the query you shared requires a SparkSQL job to be run which > >>> means latency will be high. However, I am glad you are also thinking of > >>> adding these directly to the partition table and it seems we share the > >>> same > >>> interests. I am looking forward to the work in the phase 2 implementation. > >>> Let me know if I can help, thanks. > >>> > >>> > >>> > >>> On Tue, May 2, 2023 at 4:28 PM Szehon Ho <szehon.apa...@gmail.com> > >>> wrote: > >>> > >>> Yea I agree, I had a handy query for the last update time of partition. > >>> > >>> > >>> > >>> SELECT > >>> > >>> e.data_file.partition, > >>> > >>> MAX(s.committed_at) AS last_modified_time > >>> > >>> FROM db.table.snapshots s > >>> > >>> JOIN db.table.entries e > >>> > >>> WHERE s.snapshot_id = e.snapshot_id > >>> > >>> GROUP BY by e.data_file.partition > >>> > >>> > >>> > >>> It's a bit lengthy currently. > >>> > >>> > >>> > >>> I have been indeed thinking to look at adding these fields to the > >>> Partitions table directly, after Ajantha's pending changes to add delete > >>> files to this table. > >>> > >>> > >>> > >>> Thanks > >>> > >>> Szehon > >>> > >>> > >>> > >>> On Tue, May 2, 2023 at 4:08 PM Ryan Blue <b...@tabular.io> wrote: > >>> > >>> Pucheng, > >>> > >>> > >>> > >>> Rather than using the changelog, I'd just look at the metadata tables. > >>> You should be able to query the all_entries metadata table to see file > >>> additions or deletions for a given snapshot. Then from there you can join > >>> to the snapshots table for timestamps and aggregate to the partition > >>> level. > >>> > >>> > >>> > >>> Ryan > >>> > >>> > >>> > >>> On Fri, Apr 28, 2023 at 12:49 PM Pucheng Yang < > >>> py...@pinterest.com.invalid> wrote: > >>> > >>> Hi Ajantha and the community, > >>> > >>> > >>> > >>> I am interested and I am wondering where we can see the latest progress > >>> of this feature? > >>> > >>> > >>> > >>> Regarding the partition stats in Iceberg, I am specifically curious if > >>> we can consider a new field called "last modified time" to be included for > >>> the partitions stats (or have a plugable way to allow users to > >>> configure partition stats they need). My use case is to find out if a > >>> partition is changed or not given two snapshots (old and new) with a > >>> quick and light way process. I previously was suggested by the community > >>> to > >>> use the change log (CDC) but I think that is too heavy (I guess, since it > >>> requires to run SparkSQL procedure) and it is over do the work (I don't > >>> need what rows are changed, I just need true or false for whether a > >>> partition is changed). > >>> > >>> > >>> > >>> Thanks > >>> > >>> > >>> > >>> On Tue, Feb 7, 2023 at 11:36 AM Mayur Srivastava < > >>> mayur.srivast...@twosigma.com> wrote: > >>> > >>> Thanks Ajantha. > >>> > >>> > >>> > >>> > It should be very easy to add a few more fields to it like the latest > >>> sequence number or last modified time per partition. > >>> > >>> > >>> > >>> Among sequence number and modified time, which one do you think is more > >>> likely to be available in Iceberg partition stats? Note that we would like > >>> to avoid compaction change the sequence number or modified time stats. > >>> > >>> > >>> > >>> Thanks, > >>> > >>> Mayur > >>> > >>> > >>> > >>> *From:* Ajantha Bhat <ajanthab...@gmail.com> > >>> *Sent:* Tuesday, February 7, 2023 10:02 AM > >>> *To:* dev@iceberg.apache.org > >>> *Subject:* Re: [Proposal] Partition stats in Iceberg > >>> > >>> > >>> > >>> Hi Hrishi and Mayur, thanks for the inputs. > >>> > >>> To get things moving I have frozen the scope of phase 1 implementation. > >>> (Recently added the delete file stats to phase 1 too). You can find the > >>> scope in the "Design for approval" section of the design doc. > >>> > >>> That said, once we have phase 1 implemented, It should be very easy to > >>> add a few more fields to it like the latest sequence number or last > >>> modified time per partition. > >>> I will be opening up the discussion about phase 2 schema again once > >>> phase 1 implementation is done. > >>> > >>> Thanks, > >>> Ajantha > >>> > >>> > >>> > >>> On Tue, Feb 7, 2023 at 8:15 PM Mayur Srivastava < > >>> mayur.srivast...@twosigma.com> wrote: > >>> > >>> +1 for the initiative. > >>> > >>> > >>> > >>> We’ve been exploring options for storing last-modified-time per > >>> partition. It an important building block for data pipelines – especially > >>> if there is a dependency between jobs with strong consistency > >>> requirements. > >>> > >>> > >>> > >>> Is partition stats a good place for storing last-modified-time per > >>> partition? > >>> > >>> > >>> > >>> Thanks, > >>> > >>> Mayur > >>> > >>> > >>> > >>> *From:* Ajantha Bhat <ajanthab...@gmail.com> > >>> *Sent:* Monday, January 23, 2023 11:56 AM > >>> *To:* dev@iceberg.apache.org > >>> *Subject:* Re: [Proposal] Partition stats in Iceberg > >>> > >>> > >>> > >>> Hi All, > >>> > >>> In the same design document ( > >>> https://docs.google.com/document/d/1vaufuD47kMijz97LxM67X8OX-W2Wq7nmlz3jRo8J5Qk/edit?usp=sharing > >>> ), > >>> I have added a section called > >>> *"Design for approval". *It also contains a potential PR breakdown for > >>> the phase 1 implementation and future development scope. > >>> Please take a look and please vote if you think the design is ok. > >>> > >>> Thanks, > >>> Ajantha > >>> > >>> > >>> > >>> On Mon, Dec 5, 2022 at 8:37 PM Ajantha Bhat <ajanthab...@gmail.com> > >>> wrote: > >>> > >>> A big thanks to everyone who was involved in the review and the > >>> discussions so far. > >>> > >>> Please find the meeting minutes from the last iceberg sync about the > >>> partition stats. > >>> a. Writers should not write the partition stats or any stats as of > >>> now. > >>> Because it requires bumping the spec to V3. (We can have it as > >>> part of the v3 spec later on. But not anytime soon). > >>> b. So, there can be an async way of generating the stats like > >>> ANALYZE table or call procedure. > >>> Which will compute the stats till the current snapshot and store > >>> it as a partition stats file. > >>> c. In phase 1, partition stats will just store the row_count and > >>> file_count per partition value as mentioned in the design document. > >>> Later it can be enhanced to store puffin file location and other > >>> metrics per partition value. > >>> d. These tuples are stored in a single sorted Avro/parquet file (we > >>> need to finalize this). > >>> e. Each time "analyze table" will rewrite the whole stats file as > >>> keeping multiple delta files will just make the read path messy. > >>> Also, even with million rows, it can be of a few MB size. > >>> Once the writers start writing the stats (V3 spec), we can > >>> revisit storing as the delta files if there are any performance issues. > >>> > >>> The next immediate plan is to > >>> a. Get these PRs merged (open points in existing StatictisFile > >>> interface added during Puffin) > >>> #6267 <https://github.com/apache/iceberg/pull/6267>, #6090 > >>> <https://github.com/apache/iceberg/pull/6090>, #6091 > >>> <https://github.com/apache/iceberg/pull/6091> > >>> b. Figure out how to give accurate stats with row-level deletes and > >>> how to mask dropped partition values from stats. > >>> https://github.com/apache/iceberg/issues/6042 > >>> c. Standardize the `StatictisFile` interface to hold the > >>> parquet/Avro stats file (instead of always assuming it as a Puffin file) > >>> and introduce a `StatisticsType` enum. > >>> d. Conclude the storage format and get approval for the design. > >>> > >>> I will wait another week or two for some more people to take a look at > >>> the document > >>> > >>> before jumping into the implementation. > >>> > >>> Thanks, > >>> Ajantha. > >>> > >>> > >>> > >>> On Sat, Nov 26, 2022 at 8:25 AM Ajantha Bhat <ajanthab...@gmail.com> > >>> wrote: > >>> > >>> Hi Ryan, > >>> > >>> are you saying that you think the partition-level stats should not be > >>> required? I think that would be best. > >>> > >>> I think there is some confusion here. Partition-level stats are > >>> required (hence the proposal). > >>> But does the writer always write it? (with the append/delete/replace > >>> operation) > >>> or writer skips writing it and then the user generates it using DML like > >>> "Analyze table" was the point of discussion. > >>> I think we can have both options with the writer stats writing > >>> controlled by a table property "write.stats.enabled" > >>> > >>> > >>> > >>> I’m all for improving the interface for retrieving stats. It’s a > >>> separate issue > >>> > >>> Agree. Let us discuss it in a separate thread. > >>> > >>> Thanks, > >>> Ajantha > >>> > >>> > >>> > >>> On Sat, Nov 26, 2022 at 12:12 AM Ryan Blue <b...@tabular.io> wrote: > >>> > >>> Ajantha, are you saying that you think the partition-level stats should > >>> not be required? I think that would be best. > >>> > >>> I’m all for improving the interface for retrieving stats. It’s a > >>> separate issue, but I think that Iceberg should provide both access to the > >>> Puffin files and metadata as well as a higher-level interface for > >>> retrieving information like a column’s NDV. Something like this: > >>> > >>> int ndv = > >>> table.findStat(Statistics.NDV).limitSnapshotDistance(3).forColumn("x"); > >>> > >>> > >>> > >>> On Thu, Nov 24, 2022 at 2:31 AM Ajantha Bhat <ajanthab...@gmail.com> > >>> wrote: > >>> > >>> Hi Ryan, > >>> Thanks a lot for the review and suggestions. > >>> > >>> but I think there is also a decision that we need to make before that: > >>> Should Iceberg require writers to maintain the partition stats? > >>> > >>> I think I would prefer to take a lazy approach and not assume that > >>> writers will keep the partition stats up to date, > >>> > >>> in which case we need a way to know which parts of a table are newer > >>> than the most recent stats. > >>> > >>> > >>> > >>> This is a common problem for existing table-level puffin stats too. Not > >>> just for partition stats. > >>> As mentioned in the "integration with the current code" section point > >>> 8), > >>> I was planning to introduce a table property "write.stats.enabled" with > >>> a default value set to false. > >>> > >>> And as per point 7), I was planning to introduce an "ANALYZE table" or > >>> "CALL procedure" SQL (maybe table-level API too) to asynchronously > >>> compute the stats on demand from the previous checkpoints. > >>> > >>> But currently, `TableMetadata` doesn't have a clean Interface to provide > >>> the statistics file for the current snapshot. > >>> If stats are not present, we need another interface to provide a last > >>> successful snapshot id for which stats was computed. > >>> Also, there is some confusion around reusing the statistics file > >>> (because the spec only has a computed snapshot id, not the referenced > >>> snapshot id). > >>> I am planning to open up a PR to handle these interface updates > >>> this week. (same things as you suggested in the last Iceberg sync). > >>> This should serve as a good foundation to get insights for lazy & > >>> incremental stats computing. > >>> > >>> > >>> Thanks, > >>> Ajantha > >>> > >>> > >>> > >>> On Thu, Nov 24, 2022 at 12:50 AM Ryan Blue <b...@tabular.io> wrote: > >>> > >>> Thanks for writing this up, Ajantha! I think that we have all the > >>> upstream pieces in place to work on this so it's great to have a proposal. > >>> > >>> > >>> > >>> The proposal does a good job of summarizing the choices for how to store > >>> the data, but I think there is also a decision that we need to make before > >>> that: Should Iceberg require writers to maintain the partition stats? > >>> > >>> > >>> > >>> If we do want writers to participate, then we may want to make choices > >>> that are easier for writers. But I think that is going to be a challenge. > >>> Adding requirements for writers would mean that we need to bump the spec > >>> version. Otherwise, we aren't guaranteed that writers will update the > >>> files > >>> correctly. I think I would prefer to take a lazy approach and not assume > >>> that writers will keep the partition stats up to date, in which case we > >>> need a way to know which parts of a table are newer than the most recent > >>> stats. > >>> > >>> > >>> > >>> Ryan > >>> > >>> > >>> > >>> On Wed, Nov 23, 2022 at 4:36 AM Ajantha Bhat <ajanthab...@gmail.com> > >>> wrote: > >>> > >>> Thanks Piotr for taking a look at it. > >>> I have replied to all the comments in the document. > >>> I might need your support in standardising the existing `StatisticsFile` > >>> interface to adopt partition stats as mentioned in the design. > >>> > >>> > >>> > >>> *We do need more eyes on the design. Once I get approval for the design, > >>> I can start the implementation. * > >>> Thanks, > >>> Ajantha > >>> > >>> > >>> > >>> On Wed, Nov 23, 2022 at 3:28 PM Piotr Findeisen <pi...@starburstdata.com> > >>> wrote: > >>> > >>> Hi Ajantha, > >>> > >>> > >>> > >>> this is very interesting document, thank you for your work on this! > >>> > >>> I've added a few comments there. > >>> > >>> > >>> > >>> I have one high-level design comment so I thought it would be nicer to > >>> everyone if I re-post it here > >>> > >>> > >>> > >>> is "partition" the right level of keeping the stats? > >>> We do this in Hive, but was it an accidental choice? or just the only > >>> thing that was possible to be implemented many years ago? > >>> > >>> > >>> > >>> Iceberg allows to have higher number of partitions compared to Hive, > >>> because it scales better. But that means partition-level may or may not be > >>> the right granularity. > >>> > >>> > >>> A self-optimizing system would gather stats on "per query unit" basis -- > >>> for example if i partition by [ day x country ], but usually query by day, > >>> the days are the "query unit" and from stats perspective country can be > >>> ignored. > >>> Having more fine-grained partitions may lead to expensive planning time, > >>> so it's not theoretical problem. > >>> > >>> > >>> I am not saying we should implement all this logic right now, but I > >>> think we should decouple partitioning scheme from stats partitions, to > >>> allow query engine to become smarter. > >>> > >>> > >>> > >>> > >>> > >>> cc @Alexander Jo <alex...@starburstdata.com> > >>> > >>> > >>> > >>> Best > >>> > >>> PF > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> On Mon, Nov 14, 2022 at 12:47 PM Ajantha Bhat <ajanthab...@gmail.com> > >>> wrote: > >>> > >>> Hi Community, > >>> I did a proposal write-up for the partition stats in Iceberg. > >>> Please have a look and let me know what you think. I would like to work > >>> on it. > >>> > >>> > >>> https://docs.google.com/document/d/1vaufuD47kMijz97LxM67X8OX-W2Wq7nmlz3jRo8J5Qk/edit?usp=sharing > >>> > >>> Requirement background snippet from the above document. > >>> > >>> For some query engines that use cost-based-optimizer instead or along > >>> with rule-based-optimizer (like Dremio, Trino, etc), at the planning time, > >>> it is good to know the partition level stats like total rows per > >>> partition and total files per partition to take decisions for CBO ( > >>> like deciding on the join reordering and join type, identifying the > >>> parallelism). > >>> Currently, the only way to do this is to read the partition info from > >>> data_file > >>> in manifest_entry of the manifest file and compute partition-level > >>> statistics (the same thing that ‘partitions’ metadata table is doing *[see > >>> **Appendix A* > >>> <https://docs.google.com/document/d/1vaufuD47kMijz97LxM67X8OX-W2Wq7nmlz3jRo8J5Qk/edit#heading=h.s8iywtu7x8m6> > >>> *]*). > >>> Doing this on each query is expensive. *Hence, this is a proposal for > >>> computing and storing partition-level stats for Iceberg tables and using > >>> them during queries.* > >>> > >>> > >>> > >>> Thanks, > >>> Ajantha > >>> > >>> > >>> > >>> > >>> -- > >>> > >>> Ryan Blue > >>> > >>> Tabular > >>> > >>> > >>> > >>> > >>> -- > >>> > >>> Ryan Blue > >>> > >>> Tabular > >>> > >>> > >>> > >>> > >>> -- > >>> > >>> Ryan Blue > >>> > >>> Tabular > >>> > >>> > >>> > >>> > >>> -- > >>> > >>> Ryan Blue > >>> > >>> Tabular > >>> > >>> > > > > -- > > Ryan Blue > > Tabular > > >