> 4. To represent the transformed struct, we need a transformed schema. I am thinking about adding a transform method to TypeUtil. It will return a transformed schema with field types updated to the result types of the transforms. This can look a bit weird with field types changed. > > public static Schema transform(Schema schema, Map<Integer, Transform<?, ?>> idToTransforms)
Wouldn't it make sense to get the Schema for the `StructTransformation` object instead, like `StructTransformation.schema()`? Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 31., Sze, 7:19): > We are implementing a range partitioner for Flink sink shuffling [1]. One > key piece is RowDataComparator for Flink RowData. Would love to get some > feedback on a few decisions. > > 1. Comparators for Flink `RowData` type. Flink already has the > `RowDataWrapper` class that can wrap a `RowData` as a `StructLike`. With > `StructLike`, Iceberg `Comparators` can be used to compare two structs. > Then we don't need to implement `RowDataComparators` that look very similar > to struct `Comparators`. This is also related to the transformation > decision below. We don't need to re-implement all the transform functions > with Flink data types. > > 2. Use SortOrder or just natural orders (with null first). SortOrder > supports transform functions (like bucket, hours, truncate). The > implementation will be a lot simpler if we only need to implement natural > order without transformations from SortOrder. But I do think the > transformations (like days, bucket) in SortOrder are quite useful. > > In addition to the current transforms, we plan to add a `relative_hour` > transform for event time partitioned tables. Flink range shuffle calculates > traffic statistics across keys (like number of observed rows per event > hour). Ideally the traffic distributions should be relatively stable. Hence > relative hour (hour 0 meaning current hour) can result in the stable > statistics for traffic weight across the relative event hours. > > 3. I am thinking about adding a `StructTransformation` class in the > iceberg-api module. It can be implemented similar to `StructProjection` > where transform functions are applied lazily during get. > > public static StructTransformation create(Schema schema, Map<Integer, > Transform<?, ?>> idToTransforms) > > 4. To represent the transformed struct, we need a transformed schema. I am > thinking about adding a transform method to TypeUtil. It will return a > transformed schema with field types updated to the result types of the > transforms. This can look a bit weird with field types changed. > > public static Schema transform(Schema schema, Map<Integer, Transform<?, > ?>> idToTransforms) > > ========================= > This is how everything is put together for RowDataComparator. > > Schema projected = TypeUtil.select(schema, sortFieldIds); // sortFieldIds > set is calculated from SortOrder > Map<Integer, Transform<?, ?>> idToTransforms) idToTransforms = // > calculated from SortOrder > Schema sortSchema = TypeUtil.transform(projected, idToTransforms); > > StructLike leftSortKey = > structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData))) > StructLike rightSortKey = > structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData))) > > Comparators.forType(sortSchema).compare(leftSortKey, rightSortKey) > > Thanks, > Steven > > [1] > https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/ >