voonhous commented on code in PR #13480:
URL: https://github.com/apache/hudi/pull/13480#discussion_r2185428515


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java:
##########
@@ -78,14 +81,29 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
       // setup configuration
       long ckpTimeout = dataStream.getExecutionEnvironment()
           .getCheckpointConfig().getCheckpointTimeout();
-      conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+      conf.set(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
       // set up default parallelism
       OptionsInference.setupSinkTasks(conf, 
dataStream.getExecutionConfig().getParallelism());
       // set up client id
       OptionsInference.setupClientId(conf);
       // set up index related configs
       OptionsInference.setupIndexConfigs(conf);
 
+      // Since Flink 2.0, the adaptive execution for batch job will generate 
job graph incrementally
+      // for multiple stages (FLIP-469). And the write coordinator is 
initialized along with write
+      // operator in the final stage, so hudi table should be initialized if 
necessary during the plan
+      // compilation phase when adaptive execution is enabled.

Review Comment:
   don't quite understand this, what happens in batch execution if it is not 
flink2?



##########
.github/workflows/bot.yml:
##########
@@ -1035,6 +1021,55 @@ jobs:
           HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q 
-DforceStdout)
           ./packaging/bundle-validation/ci_run.sh hudi_docker_java17 
$HUDI_VERSION openjdk17
 
+  # flink 2.0 only support Java 11 and above version
+  validate-bundles-java11:
+    runs-on: ubuntu-latest
+    strategy:
+      matrix:
+        include:
+          - scalaProfile: 'scala-2.12'
+            flinkProfile: 'flink2.0'
+            flinkAvroVersion: '1.11.4'
+            flinkParquetVersion: '1.14.4'
+            sparkProfile: 'spark3.5'
+            sparkRuntime: 'spark3.5.1'
+
+    steps:
+      - uses: actions/checkout@v3
+      - name: Set up JDK 11
+        uses: actions/setup-java@v3
+        with:
+          java-version: '11'
+          distribution: 'temurin'
+          architecture: x64
+      - name: Build Project
+        env:
+          FLINK_PROFILE: ${{ matrix.flinkProfile }}
+          SPARK_PROFILE: ${{ matrix.sparkProfile }}
+          SCALA_PROFILE: ${{ matrix.scalaProfile }}
+          FLINK_AVRO_VERSION: ${{ matrix.flinkAvroVersion }}
+          FLINK_PARQUET_VERSION: ${{ matrix.flinkParquetVersion }}
+        run: |
+          mvn clean package -T 2 -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" 
-DskipTests=true $MVN_ARGS -pl packaging/hudi-flink-bundle -am 
-Davro.version="$FLINK_AVRO_VERSION" -Dparquet.version="$FLINK_PARQUET_VERSION"
+      - name: IT - Bundle Validation - OpenJDK 11
+        env:
+          FLINK_PROFILE: ${{ matrix.flinkProfile }}
+          SPARK_PROFILE: ${{ matrix.sparkProfile }}
+          SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
+          SCALA_PROFILE: ${{ matrix.scalaProfile }}
+        run: |
+          HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q 
-DforceStdout)
+          ./packaging/bundle-validation/ci_run.sh hudi_docker_java11 
$HUDI_VERSION openjdk11
+      - name: IT - Bundle Validation - OpenJDK 17
+        env:
+          FLINK_PROFILE: ${{ matrix.flinkProfile }}
+          SPARK_PROFILE: ${{ matrix.sparkProfile }}
+          SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
+          SCALA_PROFILE: ${{ matrix.scalaProfile }}
+        run: |
+          HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q 
-DforceStdout)

Review Comment:
   Just to check, we're compiling hudi dependencies using java11, then checking 
if it is able to run in a java17 environment here right?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java:
##########
@@ -73,7 +72,28 @@ public static HoodieFlinkWriteClient 
createWriteClient(Configuration conf) throw
     HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
     // build the write client to start the embedded timeline server
     final HoodieFlinkWriteClient writeClient = new 
HoodieFlinkWriteClient<>(new 
HoodieFlinkEngineContext(HadoopConfigurations.getHadoopConf(conf)), 
writeConfig);
-    
writeClient.setOperationType(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
+    
writeClient.setOperationType(WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION)));
+    // create the filesystem view storage properties for client
+    initViewStorageProperties(conf, writeConfig);
+    return writeClient;
+  }
+
+  /**
+   * Initialize the 'view_storage_conf' meta file.
+   *
+   * <p>This expects to be used by the driver, the client can then send 
requests for files view.

Review Comment:
   files view -> filesystem views



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to