This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1da0b21edd [HUDI-4119] the first read result is incorrect when Flink 
upsert- Kafka connector is used in HUDi (#5626)
1da0b21edd is described below

commit 1da0b21edd6693e9025e45889cc7b1c37658a4e1
Author: aliceyyan <[email protected]>
AuthorDate: Fri May 20 18:10:24 2022 +0800

    [HUDI-4119] the first read result is incorrect when Flink upsert- Kafka 
connector is used in HUDi (#5626)
    
    * HUDI-4119 the first read result is incorrect when Flink upsert- Kafka 
connector is used in HUDi
    
    Co-authored-by: aliceyyan <[email protected]>
---
 .../java/org/apache/hudi/table/HoodieTableSource.java   | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index bad592aa21..1836857383 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -20,12 +20,16 @@ package org.apache.hudi.table;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
@@ -381,8 +385,8 @@ public class HoodieTableSource implements
   }
 
   private InputFormat<RowData, ?> getStreamInputFormat() {
-    // if table does not exist, use schema from the DDL
-    Schema tableAvroSchema = this.metaClient == null ? inferSchemaFromDdl() : 
getTableAvroSchema();
+    // if table does not exist or table data does not exist, use schema from 
the DDL
+    Schema tableAvroSchema = (this.metaClient == null || !tableDataExists()) ? 
inferSchemaFromDdl() : getTableAvroSchema();
     final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
     final RowType rowType = (RowType) rowDataType.getLogicalType();
     final RowType requiredRowType = (RowType) 
getProducedDataType().notNull().getLogicalType();
@@ -399,6 +403,15 @@ public class HoodieTableSource implements
     throw new HoodieException(errMsg);
   }
 
+  /**
+   * Returns whether the hoodie table data exists .
+   */
+  private  boolean tableDataExists() {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata 
= activeTimeline.getLastCommitMetadataWithValidData();
+    return instantAndCommitMetadata.isPresent();
+  }
+
   private MergeOnReadInputFormat mergeOnReadInputFormat(
       RowType rowType,
       RowType requiredRowType,

Reply via email to