cshuo commented on code in PR #13480:
URL: https://github.com/apache/hudi/pull/13480#discussion_r2188926370
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java:
##########
@@ -78,14 +81,29 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context
context) {
// setup configuration
long ckpTimeout = dataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout();
- conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+ conf.set(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
// set up default parallelism
OptionsInference.setupSinkTasks(conf,
dataStream.getExecutionConfig().getParallelism());
// set up client id
OptionsInference.setupClientId(conf);
// set up index related configs
OptionsInference.setupIndexConfigs(conf);
+ // Since Flink 2.0, the adaptive execution for batch job will generate
job graph incrementally
+ // for multiple stages (FLIP-469). And the write coordinator is
initialized along with write
+ // operator in the final stage, so hudi table should be initialized if
necessary during the plan
+ // compilation phase when adaptive execution is enabled.
Review Comment:
Taking the following batch job pipeline for example,
source -> row_data_to_hoodie_record -> bootstrap -> bucket_assigner ->
stream_write
There are three operator chains / stages for the pipeline:
stage1: source -> row_data_to_hoodie_record -> bootstrap
stage2: bucket_assigner
stage3: stream_write
For flink 1.x, flink converts the whole stream graph into a JobGraph before
job execution, creating a fixed execution plan, which contains 3 job vertices
for the above example. And the batch scheduler will initialize coordinator for
each operators in the job vertex during scheduler starting.
For flink 2, job graph is generated incrementally and executed adaptively
(FLIP-469). Take the above case for e.g, the job graph will only contains
stage.1 initially, and stage.2 will be generated after stage.1 is finished, and
the same for stage.3.
So we can not initialize hudi table in the coordinator of writer for the
adaptive execution for flink2, since the previous operators in the pipeline
also have operations on hudi table.
##########
.github/workflows/bot.yml:
##########
@@ -1035,6 +1021,55 @@ jobs:
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q
-DforceStdout)
./packaging/bundle-validation/ci_run.sh hudi_docker_java17
$HUDI_VERSION openjdk17
+ # flink 2.0 only support Java 11 and above version
+ validate-bundles-java11:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ include:
+ - scalaProfile: 'scala-2.12'
+ flinkProfile: 'flink2.0'
+ flinkAvroVersion: '1.11.4'
+ flinkParquetVersion: '1.14.4'
+ sparkProfile: 'spark3.5'
+ sparkRuntime: 'spark3.5.1'
+
+ steps:
+ - uses: actions/checkout@v3
+ - name: Set up JDK 11
+ uses: actions/setup-java@v3
+ with:
+ java-version: '11'
+ distribution: 'temurin'
+ architecture: x64
+ - name: Build Project
+ env:
+ FLINK_PROFILE: ${{ matrix.flinkProfile }}
+ SPARK_PROFILE: ${{ matrix.sparkProfile }}
+ SCALA_PROFILE: ${{ matrix.scalaProfile }}
+ FLINK_AVRO_VERSION: ${{ matrix.flinkAvroVersion }}
+ FLINK_PARQUET_VERSION: ${{ matrix.flinkParquetVersion }}
+ run: |
+ mvn clean package -T 2 -D"$SCALA_PROFILE" -D"$FLINK_PROFILE"
-DskipTests=true $MVN_ARGS -pl packaging/hudi-flink-bundle -am
-Davro.version="$FLINK_AVRO_VERSION" -Dparquet.version="$FLINK_PARQUET_VERSION"
+ - name: IT - Bundle Validation - OpenJDK 11
+ env:
+ FLINK_PROFILE: ${{ matrix.flinkProfile }}
+ SPARK_PROFILE: ${{ matrix.sparkProfile }}
+ SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
+ SCALA_PROFILE: ${{ matrix.scalaProfile }}
+ run: |
+ HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q
-DforceStdout)
+ ./packaging/bundle-validation/ci_run.sh hudi_docker_java11
$HUDI_VERSION openjdk11
+ - name: IT - Bundle Validation - OpenJDK 17
+ env:
+ FLINK_PROFILE: ${{ matrix.flinkProfile }}
+ SPARK_PROFILE: ${{ matrix.sparkProfile }}
+ SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
+ SCALA_PROFILE: ${{ matrix.scalaProfile }}
+ run: |
+ HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q
-DforceStdout)
Review Comment:
yes, validating with jdk 17 java runtime, see `change_java_runtime_version`
in `validate.sh`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]