cshuo commented on code in PR #18812:
URL: https://github.com/apache/hudi/pull/18812#discussion_r3385003403
##########
pom.xml:
##########
@@ -2924,7 +2926,11 @@
<flink.connector.kafka.version>4.0.1-2.0</flink.connector.kafka.version>
<orc.flink.version>1.5.6</orc.flink.version>
<flink.avro.version>1.11.4</flink.avro.version>
- <flink.format.parquet.version>1.15.2</flink.format.parquet.version>
+ <!-- VARIANT support needs parquet 1.16.0, which requires Hadoop 3.3.0+
+ (parquet-java dropped Hadoop-2 and uses FileSystem.openFile()).
+ Scoped to Flink modules so non-Flink modules keep the global
versions. -->
+ <flink.format.parquet.version>1.16.0</flink.format.parquet.version>
+ <flink.hadoop.version>3.3.0</flink.hadoop.version>
Review Comment:
**Scope/altitude: this is not a gated, additive change as described.** The
PR body states the parquet/hadoop bump is confined to a `flink-variant-parquet`
Maven profile "activated only when `-Dflink.format.parquet.version=1.16.0` is
passed" and that "Default builds are unaffected." But this hunk edits the
**base `flink2.1` profile properties unconditionally**:
`flink.format.parquet.version` 1.15.2→1.16.0 and a new
`flink.hadoop.version=3.3.0`. Combined with `hudi-flink2.1.x/pom.xml` setting
`<parquet.version>${flink.format.parquet.version}</parquet.version>` and
`hadoop-common` switching to `${flink.hadoop.version}`, **every default
`flink2.1` build now ships parquet-java 1.16.0 + Hadoop 3.3.0** — not just the
variant CI path. No `flink-variant-parquet` profile exists in the diff.
If bumping the entire Flink 2.1 stack is intended (the `.asf.yaml`/`bot.yml`
1.15.2→1.16.0 changes suggest it is), please update the PR description, since
the "purely additive / zero deletions" claim is inaccurate and reviewers will
assess risk on the wrong basis.
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantFlinkTableServices.java:
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.adapter.DataTypeAdapter;
+import org.apache.hudi.adapter.DataTypeAdapterTestUtils;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
+import org.apache.hudi.utils.TestTableEnvs;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Flink-only integration tests for VARIANT columns through MOR compaction and
COW clustering.
+ * Compaction is commit-triggered via {@code compaction.delta_commits};
clustering is executed by
+ * {@link HoodieFlinkClusteringJob.AsyncClusteringService} after batch SQL
inserts (batch jobs do not
+ * checkpoint through the embedded clustering operator).
+ */
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestVariantFlinkTableServices {
+
+ private static final int ASYNC_SERVICE_TIMEOUT_SECONDS = 60;
+
+ private static boolean nativeVariantAvailable;
+
+ @TempDir
+ Path tempDir;
+
+ @BeforeAll
+ static void checkVariantSupport() {
+ try {
+ DataTypeAdapter.createVariantType();
+ nativeVariantAvailable = true;
+ } catch (UnsupportedOperationException e) {
+ nativeVariantAvailable = false;
+ }
+ }
+
+ private static void assumeVariantWriteSupport() {
+ Assumptions.assumeTrue(nativeVariantAvailable, "VARIANT requires Flink
2.1+");
+ Assumptions.assumeTrue(
+ variantParquetAnnotationAvailable(),
+ "VARIANT Parquet requires parquet-java 1.16.0+ and Hadoop 3.3+ "
+ + "(CI: test-flink-variant job; local: -Dparquet.version=1.16.0 "
Review Comment:
**Skip message points to a CI job and flags that do not exist in this PR.**
This message (and the identical one in `ITTestVariantCrossEngineCompatibility`,
plus the PR description) tells developers a `test-flink-variant` CI job runs
these ITs and that they should pass `-Dparquet.version=1.16.0
-Dflink.format.parquet.version=1.16.0 -Dhadoop.version=3.3.0` locally. But:
1. No `test-flink-variant` job is added to `bot.yml` in this diff — these
ITs actually run inside the standard `test-flink-1`/`test-flink-2` matrix (now
bumped to 1.16.0).
2. Since the `flink2.1` profile now defaults
`flink.format.parquet.version=1.16.0` and `flink.hadoop.version=3.3.0`, the
`-Dflink.format.parquet.version` / `-Dhadoop.version` flags are no-ops for
Flink modules (Flink modules read `flink.hadoop.version`, not `hadoop.version`).
Please correct the guidance to reference the real job/flags, otherwise
anyone debugging a skipped VARIANT IT will chase a nonexistent job.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]