This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d0a4152cf [HUDI-4611] Fix the duplicate creation of config in 
HoodieFlinkStreamer (#6369)
0d0a4152cf is described below

commit 0d0a4152cfd362185066519ae926ac4513c7a152
Author: feiyang_deepnova <[email protected]>
AuthorDate: Fri Aug 12 11:24:56 2022 +0800

    [HUDI-4611] Fix the duplicate creation of config in HoodieFlinkStreamer 
(#6369)
    
    Co-authored-by: linfey <[email protected]>
---
 .../src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java   | 4 ++--
 .../hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java   | 4 ----
 2 files changed, 2 insertions(+), 6 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 013753b6d9..29f55f78ac 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -69,12 +69,12 @@ public class HoodieFlinkStreamer {
     TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps();
     kafkaProps.putAll(StreamerUtil.appendKafkaProps(cfg));
 
+    Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
     // Read from kafka source
     RowType rowType =
-        (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
+        (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
             .getLogicalType();
 
-    Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
     long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
     int parallelism = env.getParallelism();
     conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index e5e30c1c98..4b93faeaf7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -116,10 +116,6 @@ public class StreamerUtil {
         new Path(cfg.propsFilePath), cfg.configs).getProps();
   }
 
-  public static Schema getSourceSchema(FlinkStreamerConfig cfg) {
-    return new 
FilebasedSchemaProvider(FlinkStreamerConfig.toFlinkConfig(cfg)).getSourceSchema();
-  }
-
   public static Schema 
getSourceSchema(org.apache.flink.configuration.Configuration conf) {
     if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) {
       return new FilebasedSchemaProvider(conf).getSourceSchema();

Reply via email to