[ 
https://issues.apache.org/jira/browse/HUDI-9670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Cheng updated HUDI-9670:
-----------------------------
    Fix Version/s: 1.1.1

> Fix schema.on.write support for flink reader
> --------------------------------------------
>
>                 Key: HUDI-9670
>                 URL: https://issues.apache.org/jira/browse/HUDI-9670
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink
>            Reporter: Jonathan Vexler
>            Priority: Major
>             Fix For: 1.1.1
>
>
> org.apache.hudi.table.TestHoodieFileGroupReaderOnFlink#getSchemaEvolutionConfigs
> should have most/all cases enabled. 
>  
> org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader#ParquetColumnarRowSplitReader
>  reads the parquet footer. One solution to adding this support would be to 
> pull this up a few layers, use the util method pruneDataSchema from: 
> [https://github.com/apache/hudi/pull/13654]  and then cast/project from the 
> pruned schema to the requested schema.
>  
>  The test datagen will need to be updated because there is a flink bug 
> HUDI-9603 
> Here are some fixes I already did:
> {code:java}
> diff --git 
> a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
>  
> b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
> index a0741ea6705..fb46996317e 100644
> --- 
> a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
> +++ 
> b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/SchemaEvolvingRowDataProjection.java
> @@ -69,7 +69,7 @@ public class SchemaEvolvingRowDataProjection implements 
> RowProjection {
>        case ROW:
>          return createRowProjection(fromType, toType, renamedColumns, 
> fieldNameStack);
>        default:
> -        if (fromType.equals(toType)) {
> +        if (fromType.equals(toType) || 
> fromType.getTypeRoot().equals(toType.getTypeRoot())) {
>            return TypeConverters.NOOP_CONVERTER;
>          } else {
>            // return TypeConverter directly for non-composite type
> diff --git 
> a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
>  
> b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
> index cef2df7c037..83b886307ab 100644
> --- 
> a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
> +++ 
> b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TypeConverters.java
> @@ -53,7 +53,10 @@ import static 
> org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT;
>  import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
>  import static org.apache.flink.table.types.logical.LogicalTypeRoot.MAP;
>  import static org.apache.flink.table.types.logical.LogicalTypeRoot.ROW;
> +import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARBINARY;
>  import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
> +import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
> +import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
>  
>  /**
>   * Tool class used to perform supported casts from a {@link LogicalType} to 
> another
> @@ -81,6 +84,20 @@ public class TypeConverters {
>      LogicalTypeRoot to = toType.getTypeRoot();
>  
>      switch (to) {
> +      case VARBINARY: {
> +        if (from == VARCHAR) {
> +          return new TypeConverter() {
> +            private static final long serialVersionUID = 1L;
> +
> +            @Override
> +            public Object convert(Object val) {
> +              return getUTF8Bytes(val.toString());
> +            }
> +          };
> +        }
> +        break;
> +      }
> +
>        case BIGINT: {
>          if (from == INTEGER) {
>            return new TypeConverter() {
> @@ -202,6 +219,16 @@ public class TypeConverters {
>              }
>            };
>          }
> +        if (from == VARBINARY) {
> +          return new TypeConverter() {
> +            private static final long serialVersionUID = 1L;
> +
> +            @Override
> +            public Object convert(Object val) {
> +              return new BinaryStringData(fromUTF8Bytes((byte[]) val));
> +            }
> +          };
> +        }
>          break;
>        }
>  
> diff --git 
> a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
>  
> b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
> index bb671a8f5ef..f165161b317 100644
> --- 
> a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
> +++ 
> b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
> @@ -1336,6 +1336,9 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
>  
>      // Bytes
>      public boolean bytesToStringSupport = true;
> +
> +    // TODO: [HUDI-9607] Flink VARBINARY in array and map
> +    public boolean supportBytesInArrayMap = true;
>    }
>  
>    private enum SchemaEvolutionTypePromotionCase {
> @@ -1430,13 +1433,13 @@ Generate random record using 
> TRIP_ENCODED_DECIMAL_SCHEMA
>      if (toplevel) {
>        if (configs.mapSupport) {
>          List<Schema.Field> mapFields = new ArrayList<>(baseFields.size());
> -        addFieldsHelper(mapFields, baseFields, fieldPrefix + "Map");
> +        addFieldsHelper(mapFields, baseFields, fieldPrefix + "Map", 
> !configs.supportBytesInArrayMap);
>          finalFields.add(new Schema.Field(fieldPrefix + "Map", 
> Schema.createMap(Schema.createRecord("customMapRecord", "", namespace, false, 
> mapFields)), "", null));
>        }
>  
> -      if (configs.arraySupport) {
> +      if (configs.arraySupport && configs.anyArraySupport) {
>          List<Schema.Field> arrayFields = new ArrayList<>(baseFields.size());
> -        addFieldsHelper(arrayFields, baseFields, fieldPrefix + "Array");
> +        addFieldsHelper(arrayFields, baseFields, fieldPrefix + "Array", 
> !configs.supportBytesInArrayMap);
>          finalFields.add(new Schema.Field(fieldPrefix + "Array", 
> Schema.createArray(Schema.createRecord("customArrayRecord", "", namespace, 
> false, arrayFields)), "", null));
>        }
>      }
> @@ -1444,12 +1447,21 @@ Generate random record using 
> TRIP_ENCODED_DECIMAL_SCHEMA
>    }
>  
>    private static void addFieldsHelper(List<Schema.Field> finalFields, 
> List<Schema.Type> baseFields, String fieldPrefix) {
> +    addFieldsHelper(finalFields, baseFields, fieldPrefix, false);
> +  }
> +
> +  // TODO: [HUDI-9603] remove replaceBytesWithStrings when the issue is fixed
> +  private static void addFieldsHelper(List<Schema.Field> finalFields, 
> List<Schema.Type> baseFields, String fieldPrefix, boolean 
> replaceBytesWithStrings) {
>      for (int i = 0; i < baseFields.size(); i++) {
>        if (baseFields.get(i) == Schema.Type.BOOLEAN) {
>          // boolean fields are added fields
>          finalFields.add(new Schema.Field(fieldPrefix + i, 
> AvroSchemaUtils.createNullableSchema(Schema.Type.BOOLEAN), "", null));
>        } else {
> -        finalFields.add(new Schema.Field(fieldPrefix + i, 
> Schema.create(baseFields.get(i)), "", null));
> +        Schema.Type type = baseFields.get(i);
> +        if (replaceBytesWithStrings && type == Schema.Type.BYTES) {
> +          type = Schema.Type.STRING;
> +        }
> +        finalFields.add(new Schema.Field(fieldPrefix + i, 
> Schema.create(type), "", null));
>        }
>      }
>    }
> diff --git 
> a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
>  
> b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
> index dc66a0e6a74..f659e06ad50 100644
> --- 
> a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
> +++ 
> b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
> @@ -177,23 +177,9 @@ public class TestHoodieFileGroupReaderOnFlink extends 
> TestHoodieFileGroupReaderB
>    @Override
>    public HoodieTestDataGenerator.SchemaEvolutionConfigs 
> getSchemaEvolutionConfigs() {
>      HoodieTestDataGenerator.SchemaEvolutionConfigs configs = new 
> HoodieTestDataGenerator.SchemaEvolutionConfigs();
> -    configs.nestedSupport = false;
> +    configs.supportBytesInArrayMap = false;
>      configs.arraySupport = false;
> -    configs.mapSupport = false;
>      configs.anyArraySupport = false;
> -    configs.addNewFieldSupport = false;
> -    configs.intToLongSupport = false;
> -    configs.intToFloatSupport = false;
> -    configs.intToDoubleSupport = false;
> -    configs.intToStringSupport = false;
> -    configs.longToFloatSupport = false;
> -    configs.longToDoubleSupport = false;
> -    configs.longToStringSupport = false;
> -    configs.floatToDoubleSupport = false;
> -    configs.floatToStringSupport = false;
> -    configs.doubleToStringSupport = false;
> -    configs.stringToBytesSupport = false;
> -    configs.bytesToStringSupport = false;
>      return configs;
>    }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to