Peter, I also thought about that. Didn't go with `StructTransformation.schema()`, because I was hoping to stick with the `StructLike` interface which doesn't expose `schema()`. Trying to mimic the behavior of `StructProjection`, which doesn't expose `schema()`. Projected schema can be extracted via `TypeUtil.project(Schema schema, Set<Integer> fieldIds)`.
Thanks, Steven On Wed, May 31, 2023 at 1:18 AM Péter Váry <peter.vary.apa...@gmail.com> wrote: > > 4. To represent the transformed struct, we need a transformed schema. I > am thinking about adding a transform method to TypeUtil. It will return a > transformed schema with field types updated to the result types of the > transforms. This can look a bit weird with field types changed. > > > > public static Schema transform(Schema schema, Map<Integer, Transform<?, > ?>> idToTransforms) > > Wouldn't it make sense to get the Schema for the `StructTransformation` object > instead, like `StructTransformation.schema()`? > > Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 31., Sze, > 7:19): > >> We are implementing a range partitioner for Flink sink shuffling [1]. One >> key piece is RowDataComparator for Flink RowData. Would love to get some >> feedback on a few decisions. >> >> 1. Comparators for Flink `RowData` type. Flink already has the >> `RowDataWrapper` class that can wrap a `RowData` as a `StructLike`. With >> `StructLike`, Iceberg `Comparators` can be used to compare two structs. >> Then we don't need to implement `RowDataComparators` that look very similar >> to struct `Comparators`. This is also related to the transformation >> decision below. We don't need to re-implement all the transform functions >> with Flink data types. >> >> 2. Use SortOrder or just natural orders (with null first). SortOrder >> supports transform functions (like bucket, hours, truncate). The >> implementation will be a lot simpler if we only need to implement natural >> order without transformations from SortOrder. But I do think the >> transformations (like days, bucket) in SortOrder are quite useful. >> >> In addition to the current transforms, we plan to add a `relative_hour` >> transform for event time partitioned tables. Flink range shuffle calculates >> traffic statistics across keys (like number of observed rows per event >> hour). Ideally the traffic distributions should be relatively stable. Hence >> relative hour (hour 0 meaning current hour) can result in the stable >> statistics for traffic weight across the relative event hours. >> >> 3. I am thinking about adding a `StructTransformation` class in the >> iceberg-api module. It can be implemented similar to `StructProjection` >> where transform functions are applied lazily during get. >> >> public static StructTransformation create(Schema schema, Map<Integer, >> Transform<?, ?>> idToTransforms) >> >> 4. To represent the transformed struct, we need a transformed schema. I >> am thinking about adding a transform method to TypeUtil. It will return a >> transformed schema with field types updated to the result types of the >> transforms. This can look a bit weird with field types changed. >> >> public static Schema transform(Schema schema, Map<Integer, Transform<?, >> ?>> idToTransforms) >> >> ========================= >> This is how everything is put together for RowDataComparator. >> >> Schema projected = TypeUtil.select(schema, sortFieldIds); // sortFieldIds >> set is calculated from SortOrder >> Map<Integer, Transform<?, ?>> idToTransforms) idToTransforms = // >> calculated from SortOrder >> Schema sortSchema = TypeUtil.transform(projected, idToTransforms); >> >> StructLike leftSortKey = >> structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData))) >> StructLike rightSortKey = >> structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData))) >> >> Comparators.forType(sortSchema).compare(leftSortKey, rightSortKey) >> >> Thanks, >> Steven >> >> [1] >> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/ >> >