I love this idea. Instead of (or in addition to) inferring the desired sort 
order, I would propose that the ability for the user to define their own 
sorting/partitioning be exposed. That way the user could balance the metadata 
tree more specifically to their use case.
Rough thinking - 
https://github.com/apache/iceberg/blob/26d62c06bce6f50d46f51860f0014bba2f9538d2/sp[…]g/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 - so this is the action that basically determines the structure of the 
metadata tree - I wonder this could be optionally supplied as a generic 
Transform? Then I as the user could specify exactly how I want my MD tree to be 
balanced.
Something along the lines of

```
  private List<ManifestFile> writePartitionedManifests(
      ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests, 
manifestTransformFunction = repartitionAndSort) {

    return withReusableDS(
        manifestEntryDF,
        df -> {
          WriteManifests<?> writeFunc = newWriteManifestsFunc(content, 
df.schema());
          Column partitionColumn = df.col("data_file.partition");
          Dataset<Row> transformedDF = manifestTransformFunction(df, 
partitionColumn, numManifests);
          return writeFunc.apply(transformedDF).collectAsList();
        });
  }
```

What do you think?
Zach
On 2024/01/30 21:50:10 Jack Ye wrote:
> Hi everyone,
> 
> Today, the rewrite manifest procedure always orders the data files based on
> their *data_file.partition* value. Specifically, it sorts data files that
> have the same partition value, and then does a repartition by range based
> on the target number of manifest files (ref
> <https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java#L257-L258>
> ),
> 
> I notice that this approach does not always yield the best performance for
> scan planning because the resulting manifest entries order is basically
> based on the default order of the partition columns.
> 
> For example, consider a table partitioned by columns a and b. By default
> the rewrite procedure will organize manifest entries based on column a and
> then b. If most of my queries are using b as the predicate, rewriting
> manifests by sorting first against column b and then a will yield a much
> shorter scan planning time, because all manifest entries with similar b
> values are close together, and manifest list can be used to prune many
> files already without opening the manifest files.
> 
> This happens a lot for cases like b is an event time timestamp column,
> which is not the first partition column, but actually the column that is
> read most frequently in every query.
> 
> Translated to code, this means we can benefit from something like:
> 
> SparkActions.rewriteManifests(table)
>   .sort("b", "a")
>   .commit()
> 
> Any thoughts?
> 
> Best,
> Jack Ye
> 

Reply via email to