Yeah I was also thinking about potentially exposing something more flexible.

However, I don't think we can directly expose the logic for users to
manipulate the data frame directly, because we want the RewriteManifests
core API to be not engine-specific. In addition, I think we still need to
expose some kind of API with a simpler input type so that we can map that
to SQL procedure for users that would like a SQL-only experience.

Maybe one way to do it is to expose API methods that:
1. takes a partition data struct and transforms it into a string that can
be more flexibly sorted and range-partitioned
2. more precisely controls the number of manifests to produce. This value
is ultimately used to control the number of partitions in the range
partitioning call of manifest entries.

SparkActions.rewriteManifests(table)
  .sort(partitionData -> string)
  .minManifests(20)
  .maxManifests(40)
  .targetManifestSize(8MB)
  .commit()

And sort("pk1", "pk2", "pk3", ...) can be a convenient method on top of
.sort(partitionData -> string) for rewriting manifests based on some
partition field orders, and that can be mapped to the end Spark procedure.

Thoughts?

-Jack



On Wed, Jan 31, 2024 at 10:21 AM Zach Dischner
<zach.disch...@icloud.com.invalid> wrote:

> I love this idea. Instead of (or in addition to) inferring the desired
> sort order, I would propose that the ability for the user to define their
> own sorting/partitioning be exposed. That way the user could balance the
> metadata tree more specifically to their use case.
> Rough thinking -
> https://github.com/apache/iceberg/blob/26d62c06bce6f50d46f51860f0014bba2f9538d2/sp[…]g/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
> - so this is the action that basically determines the structure of the
> metadata tree - I wonder this could be optionally supplied as a generic
> Transform? Then I as the user could specify exactly how I want my MD tree
> to be balanced.
> Something along the lines of
>
> ```
>   private List<ManifestFile> writePartitionedManifests(
>       ManifestContent content, Dataset<Row> manifestEntryDF, int
> numManifests, manifestTransformFunction = repartitionAndSort) {
>
>     return withReusableDS(
>         manifestEntryDF,
>         df -> {
>           WriteManifests<?> writeFunc = newWriteManifestsFunc(content,
> df.schema());
>           Column partitionColumn = df.col("data_file.partition");
>           Dataset<Row> transformedDF = manifestTransformFunction(df,
> partitionColumn, numManifests);
>           return writeFunc.apply(transformedDF).collectAsList();
>         });
>   }
> ```
>
> What do you think?
> Zach
> On 2024/01/30 21:50:10 Jack Ye wrote:
> > Hi everyone,
> >
> > Today, the rewrite manifest procedure always orders the data files based
> on
> > their *data_file.partition* value. Specifically, it sorts data files that
> > have the same partition value, and then does a repartition by range based
> > on the target number of manifest files (ref
> > <
> https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java#L257-L258
> >
> > ),
> >
> > I notice that this approach does not always yield the best performance
> for
> > scan planning because the resulting manifest entries order is basically
> > based on the default order of the partition columns.
> >
> > For example, consider a table partitioned by columns a and b. By default
> > the rewrite procedure will organize manifest entries based on column a
> and
> > then b. If most of my queries are using b as the predicate, rewriting
> > manifests by sorting first against column b and then a will yield a much
> > shorter scan planning time, because all manifest entries with similar b
> > values are close together, and manifest list can be used to prune many
> > files already without opening the manifest files.
> >
> > This happens a lot for cases like b is an event time timestamp column,
> > which is not the first partition column, but actually the column that is
> > read most frequently in every query.
> >
> > Translated to code, this means we can benefit from something like:
> >
> > SparkActions.rewriteManifests(table)
> >   .sort("b", "a")
> >   .commit()
> >
> > Any thoughts?
> >
> > Best,
> > Jack Ye
> >
>

Reply via email to