[
https://issues.apache.org/jira/browse/HUDI-9670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Shuo Cheng updated HUDI-9670:
-----------------------------
Fix Version/s: 1.1.1
> Fix schema.on.write support for flink reader
> --------------------------------------------
>
> Key: HUDI-9670
> URL: https://issues.apache.org/jira/browse/HUDI-9670
> Project: Apache Hudi
> Issue Type: Bug
> Components: flink
> Reporter: Jonathan Vexler
> Priority: Major
> Fix For: 1.1.1
>
>
> org.apache.hudi.table.TestHoodieFileGroupReaderOnFlink#getSchemaEvolutionConfigs
> should have most/all cases enabled.
>
> org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader#ParquetColumnarRowSplitReader
> reads the parquet footer. One solution to adding this support would be to
> pull this up a few layers, use the util method pruneDataSchema from:
> [https://github.com/apache/hudi/pull/13654] and then cast/project from the
> pruned schema to the requested schema.
>
> The test datagen will need to be updated because there is a flink bug
> HUDI-9603
> Here are some fixes I already did:
> {code:java}
> diff --git
> a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
>
> b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
> index a0741ea6705..fb46996317e 100644
> ---
> a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
> +++
> b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
> @@ -69,7 +69,7 @@ public class SchemaEvolvingRowDataProjection implements
> RowProjection {
> case ROW:
> return createRowProjection(fromType, toType, renamedColumns,
> fieldNameStack);
> default:
> - if (fromType.equals(toType)) {
> + if (fromType.equals(toType) ||
> fromType.getTypeRoot().equals(toType.getTypeRoot())) {
> return TypeConverters.NOOP_CONVERTER;
> } else {
> // return TypeConverter directly for non-composite type
> diff --git
> a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
>
> b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
> index cef2df7c037..83b886307ab 100644
> ---
> a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
> +++
> b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
> @@ -53,7 +53,10 @@ import static
> org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT;
> import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
> import static org.apache.flink.table.types.logical.LogicalTypeRoot.MAP;
> import static org.apache.flink.table.types.logical.LogicalTypeRoot.ROW;
> +import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARBINARY;
> import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
> +import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
> +import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
>
> /**
> * Tool class used to perform supported casts from a {@link LogicalType} to
> another
> @@ -81,6 +84,20 @@ public class TypeConverters {
> LogicalTypeRoot to = toType.getTypeRoot();
>
> switch (to) {
> + case VARBINARY: {
> + if (from == VARCHAR) {
> + return new TypeConverter() {
> + private static final long serialVersionUID = 1L;
> +
> + @Override
> + public Object convert(Object val) {
> + return getUTF8Bytes(val.toString());
> + }
> + };
> + }
> + break;
> + }
> +
> case BIGINT: {
> if (from == INTEGER) {
> return new TypeConverter() {
> @@ -202,6 +219,16 @@ public class TypeConverters {
> }
> };
> }
> + if (from == VARBINARY) {
> + return new TypeConverter() {
> + private static final long serialVersionUID = 1L;
> +
> + @Override
> + public Object convert(Object val) {
> + return new BinaryStringData(fromUTF8Bytes((byte[]) val));
> + }
> + };
> + }
> break;
> }
>
> diff --git
> a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
>
> b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
> index bb671a8f5ef..f165161b317 100644
> ---
> a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
> +++
> b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
> @@ -1336,6 +1336,9 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
>
> // Bytes
> public boolean bytesToStringSupport = true;
> +
> + // TODO: [HUDI-9607] Flink VARBINARY in array and map
> + public boolean supportBytesInArrayMap = true;
> }
>
> private enum SchemaEvolutionTypePromotionCase {
> @@ -1430,13 +1433,13 @@ Generate random record using
> TRIP_ENCODED_DECIMAL_SCHEMA
> if (toplevel) {
> if (configs.mapSupport) {
> List<Schema.Field> mapFields = new ArrayList<>(baseFields.size());
> - addFieldsHelper(mapFields, baseFields, fieldPrefix + "Map");
> + addFieldsHelper(mapFields, baseFields, fieldPrefix + "Map",
> !configs.supportBytesInArrayMap);
> finalFields.add(new Schema.Field(fieldPrefix + "Map",
> Schema.createMap(Schema.createRecord("customMapRecord", "", namespace, false,
> mapFields)), "", null));
> }
>
> - if (configs.arraySupport) {
> + if (configs.arraySupport && configs.anyArraySupport) {
> List<Schema.Field> arrayFields = new ArrayList<>(baseFields.size());
> - addFieldsHelper(arrayFields, baseFields, fieldPrefix + "Array");
> + addFieldsHelper(arrayFields, baseFields, fieldPrefix + "Array",
> !configs.supportBytesInArrayMap);
> finalFields.add(new Schema.Field(fieldPrefix + "Array",
> Schema.createArray(Schema.createRecord("customArrayRecord", "", namespace,
> false, arrayFields)), "", null));
> }
> }
> @@ -1444,12 +1447,21 @@ Generate random record using
> TRIP_ENCODED_DECIMAL_SCHEMA
> }
>
> private static void addFieldsHelper(List<Schema.Field> finalFields,
> List<Schema.Type> baseFields, String fieldPrefix) {
> + addFieldsHelper(finalFields, baseFields, fieldPrefix, false);
> + }
> +
> + // TODO: [HUDI-9603] remove replaceBytesWithStrings when the issue is fixed
> + private static void addFieldsHelper(List<Schema.Field> finalFields,
> List<Schema.Type> baseFields, String fieldPrefix, boolean
> replaceBytesWithStrings) {
> for (int i = 0; i < baseFields.size(); i++) {
> if (baseFields.get(i) == Schema.Type.BOOLEAN) {
> // boolean fields are added fields
> finalFields.add(new Schema.Field(fieldPrefix + i,
> AvroSchemaUtils.createNullableSchema(Schema.Type.BOOLEAN), "", null));
> } else {
> - finalFields.add(new Schema.Field(fieldPrefix + i,
> Schema.create(baseFields.get(i)), "", null));
> + Schema.Type type = baseFields.get(i);
> + if (replaceBytesWithStrings && type == Schema.Type.BYTES) {
> + type = Schema.Type.STRING;
> + }
> + finalFields.add(new Schema.Field(fieldPrefix + i,
> Schema.create(type), "", null));
> }
> }
> }
> diff --git
> a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
>
> b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
> index dc66a0e6a74..f659e06ad50 100644
> ---
> a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
> +++
> b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
> @@ -177,23 +177,9 @@ public class TestHoodieFileGroupReaderOnFlink extends
> TestHoodieFileGroupReaderB
> @Override
> public HoodieTestDataGenerator.SchemaEvolutionConfigs
> getSchemaEvolutionConfigs() {
> HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new
> HoodieTestDataGenerator.SchemaEvolutionConfigs();
> - configs.nestedSupport = false;
> + configs.supportBytesInArrayMap = false;
> configs.arraySupport = false;
> - configs.mapSupport = false;
> configs.anyArraySupport = false;
> - configs.addNewFieldSupport = false;
> - configs.intToLongSupport = false;
> - configs.intToFloatSupport = false;
> - configs.intToDoubleSupport = false;
> - configs.intToStringSupport = false;
> - configs.longToFloatSupport = false;
> - configs.longToDoubleSupport = false;
> - configs.longToStringSupport = false;
> - configs.floatToDoubleSupport = false;
> - configs.floatToStringSupport = false;
> - configs.doubleToStringSupport = false;
> - configs.stringToBytesSupport = false;
> - configs.bytesToStringSupport = false;
> return configs;
> }{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)