> 4. To represent the transformed struct, we need a transformed schema. I
am thinking about adding a transform method to TypeUtil. It will return a
transformed schema with field types updated to the result types of the
transforms. This can look a bit weird with field types changed.
>
> public static Schema transform(Schema schema, Map<Integer, Transform<?,
?>> idToTransforms)

Wouldn't it make sense to get the Schema for the `StructTransformation` object
instead, like `StructTransformation.schema()`?

Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 31., Sze,
7:19):

> We are implementing a range partitioner for Flink sink shuffling [1]. One
> key piece is RowDataComparator for Flink RowData. Would love to get some
> feedback on a few decisions.
>
> 1. Comparators for Flink `RowData` type. Flink already has the
> `RowDataWrapper` class that can wrap a `RowData` as a `StructLike`. With
> `StructLike`, Iceberg `Comparators` can be used to compare two structs.
> Then we don't need to implement `RowDataComparators` that look very similar
> to struct `Comparators`. This is also related to the transformation
> decision below. We don't need to re-implement all the transform functions
> with Flink data types.
>
> 2. Use SortOrder or just natural orders (with null first). SortOrder
> supports transform functions (like bucket, hours, truncate). The
> implementation will be a lot simpler if we only need to implement natural
> order without transformations from SortOrder. But I do think the
> transformations (like days, bucket) in SortOrder are quite useful.
>
> In addition to the current transforms, we plan to add a `relative_hour`
> transform for event time partitioned tables. Flink range shuffle calculates
> traffic statistics across keys (like number of observed rows per event
> hour). Ideally the traffic distributions should be relatively stable. Hence
> relative hour (hour 0 meaning current hour) can result in the stable
> statistics for traffic weight across the relative event hours.
>
> 3. I am thinking about adding a `StructTransformation` class in the
> iceberg-api module. It can be implemented similar to `StructProjection`
> where transform functions are applied lazily during get.
>
> public static StructTransformation create(Schema schema, Map<Integer,
> Transform<?, ?>> idToTransforms)
>
> 4. To represent the transformed struct, we need a transformed schema. I am
> thinking about adding a transform method to TypeUtil. It will return a
> transformed schema with field types updated to the result types of the
> transforms. This can look a bit weird with field types changed.
>
> public static Schema transform(Schema schema, Map<Integer, Transform<?,
> ?>> idToTransforms)
>
> =========================
> This is how everything is put together for RowDataComparator.
>
> Schema projected = TypeUtil.select(schema, sortFieldIds); // sortFieldIds
> set is calculated from SortOrder
> Map<Integer, Transform<?, ?>> idToTransforms) idToTransforms = //
> calculated from SortOrder
> Schema sortSchema = TypeUtil.transform(projected, idToTransforms);
>
> StructLike leftSortKey =
> structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData)))
> StructLike rightSortKey =
> structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData)))
>
> Comparators.forType(sortSchema).compare(leftSortKey, rightSortKey)
>
> Thanks,
> Steven
>
> [1]
> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/
>

Reply via email to