Peter,

I also thought about that. Didn't go with `StructTransformation.schema()`,
because I was hoping to stick with the `StructLike` interface which doesn't
expose `schema()`. Trying to mimic the behavior of `StructProjection`,
which doesn't expose  `schema()`. Projected schema can be extracted
via `TypeUtil.project(Schema
schema, Set<Integer> fieldIds)`.

Thanks,
Steven

On Wed, May 31, 2023 at 1:18 AM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> > 4. To represent the transformed struct, we need a transformed schema. I
> am thinking about adding a transform method to TypeUtil. It will return a
> transformed schema with field types updated to the result types of the
> transforms. This can look a bit weird with field types changed.
> >
> > public static Schema transform(Schema schema, Map<Integer, Transform<?,
> ?>> idToTransforms)
>
> Wouldn't it make sense to get the Schema for the `StructTransformation` object
> instead, like `StructTransformation.schema()`?
>
> Steven Wu <stevenz...@gmail.com> ezt írta (időpont: 2023. máj. 31., Sze,
> 7:19):
>
>> We are implementing a range partitioner for Flink sink shuffling [1]. One
>> key piece is RowDataComparator for Flink RowData. Would love to get some
>> feedback on a few decisions.
>>
>> 1. Comparators for Flink `RowData` type. Flink already has the
>> `RowDataWrapper` class that can wrap a `RowData` as a `StructLike`. With
>> `StructLike`, Iceberg `Comparators` can be used to compare two structs.
>> Then we don't need to implement `RowDataComparators` that look very similar
>> to struct `Comparators`. This is also related to the transformation
>> decision below. We don't need to re-implement all the transform functions
>> with Flink data types.
>>
>> 2. Use SortOrder or just natural orders (with null first). SortOrder
>> supports transform functions (like bucket, hours, truncate). The
>> implementation will be a lot simpler if we only need to implement natural
>> order without transformations from SortOrder. But I do think the
>> transformations (like days, bucket) in SortOrder are quite useful.
>>
>> In addition to the current transforms, we plan to add a `relative_hour`
>> transform for event time partitioned tables. Flink range shuffle calculates
>> traffic statistics across keys (like number of observed rows per event
>> hour). Ideally the traffic distributions should be relatively stable. Hence
>> relative hour (hour 0 meaning current hour) can result in the stable
>> statistics for traffic weight across the relative event hours.
>>
>> 3. I am thinking about adding a `StructTransformation` class in the
>> iceberg-api module. It can be implemented similar to `StructProjection`
>> where transform functions are applied lazily during get.
>>
>> public static StructTransformation create(Schema schema, Map<Integer,
>> Transform<?, ?>> idToTransforms)
>>
>> 4. To represent the transformed struct, we need a transformed schema. I
>> am thinking about adding a transform method to TypeUtil. It will return a
>> transformed schema with field types updated to the result types of the
>> transforms. This can look a bit weird with field types changed.
>>
>> public static Schema transform(Schema schema, Map<Integer, Transform<?,
>> ?>> idToTransforms)
>>
>> =========================
>> This is how everything is put together for RowDataComparator.
>>
>> Schema projected = TypeUtil.select(schema, sortFieldIds); // sortFieldIds
>> set is calculated from SortOrder
>> Map<Integer, Transform<?, ?>> idToTransforms) idToTransforms = //
>> calculated from SortOrder
>> Schema sortSchema = TypeUtil.transform(projected, idToTransforms);
>>
>> StructLike leftSortKey =
>> structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData)))
>> StructLike rightSortKey =
>> structTransformation.wrap(structProjection.wrap(rowDataWrapper.wrap(leftRowData)))
>>
>> Comparators.forType(sortSchema).compare(leftSortKey, rightSortKey)
>>
>> Thanks,
>> Steven
>>
>> [1]
>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/
>>
>

Reply via email to