cshuo commented on code in PR #13480:
URL: https://github.com/apache/hudi/pull/13480#discussion_r2188926370


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java:
##########
@@ -78,14 +81,29 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
       // setup configuration
       long ckpTimeout = dataStream.getExecutionEnvironment()
           .getCheckpointConfig().getCheckpointTimeout();
-      conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+      conf.set(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
       // set up default parallelism
       OptionsInference.setupSinkTasks(conf, 
dataStream.getExecutionConfig().getParallelism());
       // set up client id
       OptionsInference.setupClientId(conf);
       // set up index related configs
       OptionsInference.setupIndexConfigs(conf);
 
+      // Since Flink 2.0, the adaptive execution for batch job will generate 
job graph incrementally
+      // for multiple stages (FLIP-469). And the write coordinator is 
initialized along with write
+      // operator in the final stage, so hudi table should be initialized if 
necessary during the plan
+      // compilation phase when adaptive execution is enabled.

Review Comment:
   Taking the following batch job pipeline for example,
   
   source  -> row_data_to_hoodie_record -> bootstrap -> bucket_assigner ->    
stream_write
   
   There are three operator chains / stages for the pipeline:
   stage1: source  -> row_data_to_hoodie_record -> bootstrap
   stage2: bucket_assigner
   stage3: stream_write
   
   For flink 1.x, flink converts the whole stream graph into a JobGraph before 
job execution, creating a fixed execution plan, which contains 3 job vertices 
for the above example. And the batch scheduler will initialize coordinator for 
each operators in the job vertex during scheduler starting.
   
   For flink 2, job graph is generated incrementally and executed adaptively 
(FLIP-469). Take the above case for e.g, the job graph will only contains 
stage.1 initially, and stage.2 will be generated after stage.1 is finished, and 
the same for stage.3.
   
   So we can not initialize hudi table in the coordinator of writer for the 
adaptive execution for flink2, since the previous operators in the pipeline 
also have operations on hudi table.
   



##########
.github/workflows/bot.yml:
##########
@@ -1035,6 +1021,55 @@ jobs:
           HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q 
-DforceStdout)
           ./packaging/bundle-validation/ci_run.sh hudi_docker_java17 
$HUDI_VERSION openjdk17
 
+  # flink 2.0 only support Java 11 and above version
+  validate-bundles-java11:
+    runs-on: ubuntu-latest
+    strategy:
+      matrix:
+        include:
+          - scalaProfile: 'scala-2.12'
+            flinkProfile: 'flink2.0'
+            flinkAvroVersion: '1.11.4'
+            flinkParquetVersion: '1.14.4'
+            sparkProfile: 'spark3.5'
+            sparkRuntime: 'spark3.5.1'
+
+    steps:
+      - uses: actions/checkout@v3
+      - name: Set up JDK 11
+        uses: actions/setup-java@v3
+        with:
+          java-version: '11'
+          distribution: 'temurin'
+          architecture: x64
+      - name: Build Project
+        env:
+          FLINK_PROFILE: ${{ matrix.flinkProfile }}
+          SPARK_PROFILE: ${{ matrix.sparkProfile }}
+          SCALA_PROFILE: ${{ matrix.scalaProfile }}
+          FLINK_AVRO_VERSION: ${{ matrix.flinkAvroVersion }}
+          FLINK_PARQUET_VERSION: ${{ matrix.flinkParquetVersion }}
+        run: |
+          mvn clean package -T 2 -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" 
-DskipTests=true $MVN_ARGS -pl packaging/hudi-flink-bundle -am 
-Davro.version="$FLINK_AVRO_VERSION" -Dparquet.version="$FLINK_PARQUET_VERSION"
+      - name: IT - Bundle Validation - OpenJDK 11
+        env:
+          FLINK_PROFILE: ${{ matrix.flinkProfile }}
+          SPARK_PROFILE: ${{ matrix.sparkProfile }}
+          SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
+          SCALA_PROFILE: ${{ matrix.scalaProfile }}
+        run: |
+          HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q 
-DforceStdout)
+          ./packaging/bundle-validation/ci_run.sh hudi_docker_java11 
$HUDI_VERSION openjdk11
+      - name: IT - Bundle Validation - OpenJDK 17
+        env:
+          FLINK_PROFILE: ${{ matrix.flinkProfile }}
+          SPARK_PROFILE: ${{ matrix.sparkProfile }}
+          SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
+          SCALA_PROFILE: ${{ matrix.scalaProfile }}
+        run: |
+          HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q 
-DforceStdout)

Review Comment:
   yes, validating with jdk 17 java runtime, see `change_java_runtime_version` 
in `validate.sh`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to