hudi-agent commented on code in PR #18812:
URL: https://github.com/apache/hudi/pull/18812#discussion_r3382393196


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantFlinkTableServices.java:
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.adapter.DataTypeAdapter;
+import org.apache.hudi.adapter.DataTypeAdapterTestUtils;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
+import org.apache.hudi.utils.TestTableEnvs;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Flink-only integration tests for VARIANT columns through MOR compaction and 
COW clustering.
+ * Compaction is commit-triggered via {@code compaction.delta_commits}; 
clustering is executed by
+ * {@link HoodieFlinkClusteringJob.AsyncClusteringService} after batch SQL 
inserts (batch jobs do not
+ * checkpoint through the embedded clustering operator).
+ */
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestVariantFlinkTableServices {
+
+  private static final int ASYNC_SERVICE_TIMEOUT_SECONDS = 60;
+
+  private static boolean nativeVariantAvailable;
+
+  @TempDir
+  Path tempDir;
+
+  @BeforeAll
+  static void checkVariantSupport() {
+    try {
+      DataTypeAdapter.createVariantType();
+      nativeVariantAvailable = true;
+    } catch (UnsupportedOperationException e) {
+      nativeVariantAvailable = false;
+    }
+  }
+
+  private static void assumeVariantWriteSupport() {
+    Assumptions.assumeTrue(nativeVariantAvailable, "VARIANT requires Flink 
2.1+");
+    Assumptions.assumeTrue(
+        variantParquetAnnotationAvailable(),
+        "VARIANT Parquet requires parquet-java 1.16.0+ and Hadoop 3.3+ "
+            + "(CI: test-flink-variant job; local: -Dparquet.version=1.16.0 "
+            + "-Dflink.format.parquet.version=1.16.0 -Dhadoop.version=3.3.0)");
+  }
+
+  private static boolean variantParquetAnnotationAvailable() {
+    try {
+      return DataTypeAdapter.variantParquetAnnotation().isPresent();
+    } catch (UnsupportedOperationException e) {
+      return false;
+    }
+  }
+
+  @Test
+  public void testFlinkMorCompactionPreservesVariant() throws Exception {
+    assumeVariantWriteSupport();
+
+    String tablePath = tempDir.resolve("variant_mor_compact").toString();
+    TableEnvironment tEnv = TestTableEnvs.getBatchTableEnv();
+    tEnv.getConfig()
+        .getConfiguration()
+        .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
4);
+
+    // 4 delta commits (3 inserts + 1 upsert) then async compaction on the 4th 
commit.
+    tEnv.executeSql(createVariantMorTableDdl(tablePath, 4));
+
+    tEnv.executeSql(
+            "INSERT INTO variant_table VALUES "
+                + "(1, CAST('r1' AS STRING), PARSE_JSON('{\"k\":1}'), 
CAST(1000 AS BIGINT))")
+        .await();
+    tEnv.executeSql(
+            "INSERT INTO variant_table VALUES "
+                + "(2, CAST('r2' AS STRING), PARSE_JSON('{\"k\":2}'), 
CAST(1000 AS BIGINT))")
+        .await();
+    tEnv.executeSql(
+            "INSERT INTO variant_table VALUES "
+                + "(3, CAST('r3' AS STRING), PARSE_JSON('{\"k\":3}'), 
CAST(1000 AS BIGINT))")
+        .await();
+    tEnv.executeSql(
+            "INSERT INTO variant_table VALUES "
+                + "(1, CAST('r1' AS STRING), 
PARSE_JSON('{\"k\":1,\"merged\":true}'), CAST(2000 AS BIGINT))")
+        .await();
+
+    assertTrue(
+        TestUtils.waitUntil(
+            () -> hasCompletedCompaction(tablePath) && 
!hasPendingCompaction(tablePath),
+            ASYNC_SERVICE_TIMEOUT_SECONDS),
+        "Commit-triggered compaction should complete after 4 delta commits");
+
+    List<Row> rows =
+        CollectionUtil.iteratorToList(
+            tEnv.executeSql("SELECT id, name, v, ts FROM variant_table ORDER 
BY id").collect());
+    assertEquals(3, rows.size(), "Compaction should preserve row count");
+
+    assertRowVariantJson(tEnv, rows.get(0), 1, "r1", 2000L, 
"{\"k\":1,\"merged\":true}");
+    assertRowVariantJson(tEnv, rows.get(1), 2, "r2", 1000L, "{\"k\":2}");
+    assertRowVariantJson(tEnv, rows.get(2), 3, "r3", 1000L, "{\"k\":3}");
+
+    tEnv.executeSql("DROP TABLE variant_table");
+  }
+
+  @Test
+  public void testFlinkCowClusteringPreservesVariant() throws Exception {
+    assumeVariantWriteSupport();
+
+    String tablePath = tempDir.resolve("variant_cow_cluster").toString();
+    TableEnvironment tEnv = TestTableEnvs.getBatchTableEnv();
+    tEnv.getConfig()
+        .getConfiguration()
+        .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
4);
+
+    int clusteringDeltaCommits = 2;
+    tEnv.executeSql(createVariantCowPartitionedTableDdl(tablePath, 
clusteringDeltaCommits));
+
+    tEnv.executeSql(
+            "INSERT INTO variant_table VALUES "
+                + "(1, CAST('n1' AS STRING), 
PARSE_JSON('{\"part\":\"par1\"}'), CAST(1000 AS BIGINT), "
+                + "CAST('par1' AS STRING)), "
+                + "(2, CAST('n2' AS STRING), 
PARSE_JSON('{\"part\":\"par2\"}'), CAST(2000 AS BIGINT), "
+                + "CAST('par2' AS STRING))")
+        .await();
+
+    List<Row> afterFirstCommit =
+        CollectionUtil.iteratorToList(
+            tEnv.executeSql("SELECT id, v FROM variant_table ORDER BY 
id").collect());
+    byte[][] metadataBefore = new byte[4][];
+    byte[][] valueBefore = new byte[4][];
+    for (int i = 0; i < 2; i++) {
+      
DataTypeAdapterTestUtils.assertAsBinaryVariant(afterFirstCommit.get(i).getField(1));
+      metadataBefore[i] = 
DataTypeAdapter.getVariantMetadata(afterFirstCommit.get(i).getField(1));
+      valueBefore[i] = 
DataTypeAdapter.getVariantValue(afterFirstCommit.get(i).getField(1));
+    }
+
+    tEnv.executeSql(
+            "INSERT INTO variant_table VALUES "
+                + "(3, CAST('n3' AS STRING), 
PARSE_JSON('{\"part\":\"par3\"}'), CAST(3000 AS BIGINT), "
+                + "CAST('par3' AS STRING)), "
+                + "(4, CAST('n4' AS STRING), 
PARSE_JSON('{\"part\":\"par4\"}'), CAST(4000 AS BIGINT), "
+                + "CAST('par4' AS STRING))")
+        .await();
+
+    List<Row> afterSecondCommit =
+        CollectionUtil.iteratorToList(
+            tEnv.executeSql("SELECT id, v FROM variant_table WHERE id IN (3, 
4) ORDER BY id").collect());
+    for (int i = 0; i < 2; i++) {
+      
DataTypeAdapterTestUtils.assertAsBinaryVariant(afterSecondCommit.get(i).getField(1));
+      metadataBefore[i + 2] = 
DataTypeAdapter.getVariantMetadata(afterSecondCommit.get(i).getField(1));
+      valueBefore[i + 2] = 
DataTypeAdapter.getVariantValue(afterSecondCommit.get(i).getField(1));
+    }
+
+    runClusteringService(tablePath, clusteringDeltaCommits);
+
+    List<Row> afterCluster =
+        CollectionUtil.iteratorToList(
+            tEnv.executeSql("SELECT id, name, v, ts, `partition` FROM 
variant_table ORDER BY id")
+                .collect());
+    assertEquals(4, afterCluster.size(), "Clustering should preserve row 
count");
+
+    for (int i = 0; i < 4; i++) {
+      Row row = afterCluster.get(i);
+      assertEquals(i + 1, row.getField(0));
+      assertEquals("n" + (i + 1), row.getField(1));
+      assertEquals(1000L * (i + 1), row.getField(3));
+      assertEquals("par" + (i + 1), row.getField(4));
+      DataTypeAdapterTestUtils.assertAsBinaryVariant(row.getField(2));
+      assertArrayEquals(metadataBefore[i], 
DataTypeAdapter.getVariantMetadata(row.getField(2)));
+      assertArrayEquals(valueBefore[i], 
DataTypeAdapter.getVariantValue(row.getField(2)));
+    }
+
+    tEnv.executeSql("DROP TABLE variant_table");
+  }
+
+  private static String createVariantMorTableDdl(String tablePath, int 
compactionDeltaCommits) {
+    return String.format(
+        "CREATE TABLE variant_table ("
+            + "  id INT,"
+            + "  name STRING,"
+            + "  v VARIANT,"
+            + "  ts BIGINT,"
+            + "  PRIMARY KEY (id) NOT ENFORCED"
+            + ") WITH ("
+            + "  'connector' = 'hudi',"
+            + "  'path' = '%s',"
+            + "  'table.type' = 'MERGE_ON_READ',"
+            + "  '%s' = 'true',"
+            + "  '%s' = 'true',"
+            + "  '%s' = '%d',"
+            + "  '%s' = '1',"
+            + "  '%s' = 'false'"
+            + ")",
+        tablePath.replace("'", "''"),
+        FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(),
+        FlinkOptions.COMPACTION_ASYNC_ENABLED.key(),
+        FlinkOptions.COMPACTION_DELTA_COMMITS.key(),
+        compactionDeltaCommits,
+        FlinkOptions.COMPACTION_TASKS.key(),
+        FlinkOptions.METADATA_ENABLED.key());
+  }
+
+  private static String createVariantCowPartitionedTableDdl(String tablePath, 
int clusteringDeltaCommits) {
+    return String.format(
+        "CREATE TABLE variant_table ("
+            + "  id INT,"
+            + "  name STRING,"
+            + "  v VARIANT,"
+            + "  ts BIGINT,"
+            + "  `partition` STRING,"
+            + "  PRIMARY KEY (id) NOT ENFORCED"
+            + ") WITH ("
+            + "  'connector' = 'hudi',"
+            + "  'path' = '%s',"
+            + "  'table.type' = 'COPY_ON_WRITE',"
+            + "  '%s' = '%s',"
+            + "  '%s' = 'partition',"
+            + "  '%s' = 'false',"
+            + "  '%s' = 'false',"
+            + "  '%s' = '%d',"
+            + "  '%s' = '1',"
+            + "  '%s' = 'false'"
+            + ")",
+        tablePath.replace("'", "''"),
+        FlinkOptions.OPERATION.key(),
+        WriteOperationType.INSERT.value(),
+        FlinkOptions.PARTITION_PATH_FIELD.key(),
+        FlinkOptions.CLUSTERING_SCHEDULE_ENABLED.key(),
+        FlinkOptions.CLUSTERING_ASYNC_ENABLED.key(),
+        FlinkOptions.CLUSTERING_DELTA_COMMITS.key(),
+        clusteringDeltaCommits,
+        FlinkOptions.CLUSTERING_TASKS.key(),
+        FlinkOptions.METADATA_ENABLED.key());
+  }
+
+  private static void assertRowVariantJson(
+      TableEnvironment tEnv,
+      Row row,
+      int expectedId,
+      String expectedName,
+      long expectedTs,
+      String jsonLiteral)
+      throws Exception {
+    assertEquals(expectedId, row.getField(0));
+    assertEquals(expectedName, row.getField(1));
+    assertEquals(expectedTs, row.getField(3));
+    assertVariantMatchesParseJson(tEnv, row.getField(2), jsonLiteral);
+  }
+
+  private static void assertVariantMatchesParseJson(
+      TableEnvironment tEnv, Object actual, String jsonLiteral) throws 
Exception {
+    DataTypeAdapterTestUtils.assertAsBinaryVariant(actual);
+    String escaped = jsonLiteral.replace("'", "''");
+    Object expected =
+        CollectionUtil.iteratorToList(
+                tEnv.executeSql("SELECT PARSE_JSON('" + escaped + 
"')").collect())
+            .get(0)
+            .getField(0);
+    DataTypeAdapterTestUtils.assertAsBinaryVariant(expected);
+    assertArrayEquals(
+        DataTypeAdapter.getVariantMetadata(expected), 
DataTypeAdapter.getVariantMetadata(actual));
+    assertArrayEquals(
+        DataTypeAdapter.getVariantValue(expected), 
DataTypeAdapter.getVariantValue(actual));
+  }
+
+  /**
+   * Batch SQL INSERT jobs do not drive the append-mode clustering operator 
through checkpoints;
+   * schedule and execute clustering via {@link HoodieFlinkClusteringJob} 
(same pattern as
+   * {@code ITTestHoodieFlinkClustering}). Schedules once with a write client, 
then runs the job with
+   * {@code cfg.schedule = false} so {@link 
HoodieFlinkClusteringJob.AsyncClusteringService#cluster} executes the
+   * pending plan instead of calling {@code scheduleClustering} again (which 
no-ops when a plan already exists).
+   */
+  private static void runClusteringService(String tablePath, int 
clusteringDeltaCommits) throws Exception {
+    FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+    cfg.path = tablePath;
+    cfg.schedule = true;
+    cfg.clusteringDeltaCommits = clusteringDeltaCommits;
+    Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    conf.set(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, 
metaClient.getTableConfig().getRecordKeyFieldProp());
+    conf.set(
+        FlinkOptions.PARTITION_PATH_FIELD,
+        
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig()).orElse(""));
+    // INSERT pipeline must not schedule/execute clustering 
(OptionsResolver.needsScheduleClustering /
+    // needsAsyncClustering). Otherwise the sink consumes the delta window or 
leaves clustering on the
+    // timeline and standalone scheduleClustering returns empty.
+    conf.set(FlinkOptions.CLUSTERING_ASYNC_ENABLED, false);
+    conf.set(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
+    conf.set(FlinkOptions.CLUSTERING_DELTA_COMMITS, clusteringDeltaCommits);
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    try (HoodieFlinkWriteClient<?> writeClient = 
FlinkWriteClients.createWriteClient(conf)) {
+      assertTrue(
+          writeClient.scheduleClustering(Option.empty()).isPresent(),
+          "Clustering plan should be schedulable after inserts");
+    }
+
+    // HoodieFlinkClusteringJob#cluster with cfg.schedule true calls 
scheduleClustering again; that returns
+    // empty when a REQUESTED plan already exists, so the job returns without 
env.execute(). Run execute
+    // against the plan we just scheduled.
+    cfg.schedule = false;
+
+    HoodieFlinkClusteringJob.AsyncClusteringService service =
+        new HoodieFlinkClusteringJob.AsyncClusteringService(cfg, conf);
+    try {
+      new HoodieFlinkClusteringJob(service).start(false);
+    } finally {
+      service.shutDown();
+    }
+    assertTrue(
+        hasCompletedClustering(tablePath) && !hasPendingClustering(tablePath),
+        "Clustering should complete after inserts");
+  }
+
+  private static HoodieTableMetaClient metaClient(String tablePath) {

Review Comment:
   🤖 nit: `metaClient(String tablePath)` reads as a noun; Java methods should 
be verbs. Something like `createMetaClient` or `buildMetaClient` would make the 
call sites (`metaClient(tablePath)` → `createMetaClient(tablePath)`) a bit 
clearer to skim.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantFlinkTableServices.java:
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.adapter.DataTypeAdapter;
+import org.apache.hudi.adapter.DataTypeAdapterTestUtils;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
+import org.apache.hudi.utils.TestTableEnvs;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Flink-only integration tests for VARIANT columns through MOR compaction and 
COW clustering.
+ * Compaction is commit-triggered via {@code compaction.delta_commits}; 
clustering is executed by
+ * {@link HoodieFlinkClusteringJob.AsyncClusteringService} after batch SQL 
inserts (batch jobs do not
+ * checkpoint through the embedded clustering operator).
+ */
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestVariantFlinkTableServices {
+
+  private static final int ASYNC_SERVICE_TIMEOUT_SECONDS = 60;
+
+  private static boolean nativeVariantAvailable;
+
+  @TempDir
+  Path tempDir;
+
+  @BeforeAll
+  static void checkVariantSupport() {
+    try {
+      DataTypeAdapter.createVariantType();
+      nativeVariantAvailable = true;
+    } catch (UnsupportedOperationException e) {
+      nativeVariantAvailable = false;
+    }
+  }
+
+  private static void assumeVariantWriteSupport() {
+    Assumptions.assumeTrue(nativeVariantAvailable, "VARIANT requires Flink 
2.1+");
+    Assumptions.assumeTrue(
+        variantParquetAnnotationAvailable(),
+        "VARIANT Parquet requires parquet-java 1.16.0+ and Hadoop 3.3+ "
+            + "(CI: test-flink-variant job; local: -Dparquet.version=1.16.0 "
+            + "-Dflink.format.parquet.version=1.16.0 -Dhadoop.version=3.3.0)");
+  }
+
+  private static boolean variantParquetAnnotationAvailable() {
+    try {
+      return DataTypeAdapter.variantParquetAnnotation().isPresent();
+    } catch (UnsupportedOperationException e) {
+      return false;
+    }
+  }
+
+  @Test
+  public void testFlinkMorCompactionPreservesVariant() throws Exception {
+    assumeVariantWriteSupport();
+
+    String tablePath = tempDir.resolve("variant_mor_compact").toString();
+    TableEnvironment tEnv = TestTableEnvs.getBatchTableEnv();
+    tEnv.getConfig()
+        .getConfiguration()
+        .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
4);
+
+    // 4 delta commits (3 inserts + 1 upsert) then async compaction on the 4th 
commit.
+    tEnv.executeSql(createVariantMorTableDdl(tablePath, 4));
+
+    tEnv.executeSql(
+            "INSERT INTO variant_table VALUES "
+                + "(1, CAST('r1' AS STRING), PARSE_JSON('{\"k\":1}'), 
CAST(1000 AS BIGINT))")
+        .await();
+    tEnv.executeSql(
+            "INSERT INTO variant_table VALUES "
+                + "(2, CAST('r2' AS STRING), PARSE_JSON('{\"k\":2}'), 
CAST(1000 AS BIGINT))")
+        .await();
+    tEnv.executeSql(
+            "INSERT INTO variant_table VALUES "
+                + "(3, CAST('r3' AS STRING), PARSE_JSON('{\"k\":3}'), 
CAST(1000 AS BIGINT))")
+        .await();
+    tEnv.executeSql(
+            "INSERT INTO variant_table VALUES "
+                + "(1, CAST('r1' AS STRING), 
PARSE_JSON('{\"k\":1,\"merged\":true}'), CAST(2000 AS BIGINT))")
+        .await();
+
+    assertTrue(
+        TestUtils.waitUntil(
+            () -> hasCompletedCompaction(tablePath) && 
!hasPendingCompaction(tablePath),
+            ASYNC_SERVICE_TIMEOUT_SECONDS),
+        "Commit-triggered compaction should complete after 4 delta commits");
+
+    List<Row> rows =
+        CollectionUtil.iteratorToList(
+            tEnv.executeSql("SELECT id, name, v, ts FROM variant_table ORDER 
BY id").collect());
+    assertEquals(3, rows.size(), "Compaction should preserve row count");
+
+    assertRowVariantJson(tEnv, rows.get(0), 1, "r1", 2000L, 
"{\"k\":1,\"merged\":true}");
+    assertRowVariantJson(tEnv, rows.get(1), 2, "r2", 1000L, "{\"k\":2}");
+    assertRowVariantJson(tEnv, rows.get(2), 3, "r3", 1000L, "{\"k\":3}");
+
+    tEnv.executeSql("DROP TABLE variant_table");
+  }
+
+  @Test
+  public void testFlinkCowClusteringPreservesVariant() throws Exception {
+    assumeVariantWriteSupport();
+
+    String tablePath = tempDir.resolve("variant_cow_cluster").toString();
+    TableEnvironment tEnv = TestTableEnvs.getBatchTableEnv();
+    tEnv.getConfig()
+        .getConfiguration()
+        .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
4);
+
+    int clusteringDeltaCommits = 2;
+    tEnv.executeSql(createVariantCowPartitionedTableDdl(tablePath, 
clusteringDeltaCommits));
+
+    tEnv.executeSql(
+            "INSERT INTO variant_table VALUES "
+                + "(1, CAST('n1' AS STRING), 
PARSE_JSON('{\"part\":\"par1\"}'), CAST(1000 AS BIGINT), "
+                + "CAST('par1' AS STRING)), "
+                + "(2, CAST('n2' AS STRING), 
PARSE_JSON('{\"part\":\"par2\"}'), CAST(2000 AS BIGINT), "
+                + "CAST('par2' AS STRING))")
+        .await();
+
+    List<Row> afterFirstCommit =
+        CollectionUtil.iteratorToList(
+            tEnv.executeSql("SELECT id, v FROM variant_table ORDER BY 
id").collect());
+    byte[][] metadataBefore = new byte[4][];
+    byte[][] valueBefore = new byte[4][];
+    for (int i = 0; i < 2; i++) {
+      
DataTypeAdapterTestUtils.assertAsBinaryVariant(afterFirstCommit.get(i).getField(1));
+      metadataBefore[i] = 
DataTypeAdapter.getVariantMetadata(afterFirstCommit.get(i).getField(1));
+      valueBefore[i] = 
DataTypeAdapter.getVariantValue(afterFirstCommit.get(i).getField(1));
+    }
+
+    tEnv.executeSql(
+            "INSERT INTO variant_table VALUES "
+                + "(3, CAST('n3' AS STRING), 
PARSE_JSON('{\"part\":\"par3\"}'), CAST(3000 AS BIGINT), "
+                + "CAST('par3' AS STRING)), "
+                + "(4, CAST('n4' AS STRING), 
PARSE_JSON('{\"part\":\"par4\"}'), CAST(4000 AS BIGINT), "
+                + "CAST('par4' AS STRING))")
+        .await();
+
+    List<Row> afterSecondCommit =
+        CollectionUtil.iteratorToList(
+            tEnv.executeSql("SELECT id, v FROM variant_table WHERE id IN (3, 
4) ORDER BY id").collect());
+    for (int i = 0; i < 2; i++) {
+      
DataTypeAdapterTestUtils.assertAsBinaryVariant(afterSecondCommit.get(i).getField(1));
+      metadataBefore[i + 2] = 
DataTypeAdapter.getVariantMetadata(afterSecondCommit.get(i).getField(1));
+      valueBefore[i + 2] = 
DataTypeAdapter.getVariantValue(afterSecondCommit.get(i).getField(1));
+    }
+
+    runClusteringService(tablePath, clusteringDeltaCommits);
+
+    List<Row> afterCluster =
+        CollectionUtil.iteratorToList(
+            tEnv.executeSql("SELECT id, name, v, ts, `partition` FROM 
variant_table ORDER BY id")
+                .collect());
+    assertEquals(4, afterCluster.size(), "Clustering should preserve row 
count");
+
+    for (int i = 0; i < 4; i++) {
+      Row row = afterCluster.get(i);
+      assertEquals(i + 1, row.getField(0));
+      assertEquals("n" + (i + 1), row.getField(1));
+      assertEquals(1000L * (i + 1), row.getField(3));
+      assertEquals("par" + (i + 1), row.getField(4));
+      DataTypeAdapterTestUtils.assertAsBinaryVariant(row.getField(2));
+      assertArrayEquals(metadataBefore[i], 
DataTypeAdapter.getVariantMetadata(row.getField(2)));
+      assertArrayEquals(valueBefore[i], 
DataTypeAdapter.getVariantValue(row.getField(2)));
+    }
+
+    tEnv.executeSql("DROP TABLE variant_table");
+  }
+
+  private static String createVariantMorTableDdl(String tablePath, int 
compactionDeltaCommits) {
+    return String.format(
+        "CREATE TABLE variant_table ("
+            + "  id INT,"
+            + "  name STRING,"
+            + "  v VARIANT,"
+            + "  ts BIGINT,"
+            + "  PRIMARY KEY (id) NOT ENFORCED"
+            + ") WITH ("
+            + "  'connector' = 'hudi',"
+            + "  'path' = '%s',"
+            + "  'table.type' = 'MERGE_ON_READ',"
+            + "  '%s' = 'true',"
+            + "  '%s' = 'true',"
+            + "  '%s' = '%d',"
+            + "  '%s' = '1',"
+            + "  '%s' = 'false'"
+            + ")",
+        tablePath.replace("'", "''"),
+        FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(),
+        FlinkOptions.COMPACTION_ASYNC_ENABLED.key(),
+        FlinkOptions.COMPACTION_DELTA_COMMITS.key(),
+        compactionDeltaCommits,
+        FlinkOptions.COMPACTION_TASKS.key(),
+        FlinkOptions.METADATA_ENABLED.key());
+  }
+
+  private static String createVariantCowPartitionedTableDdl(String tablePath, 
int clusteringDeltaCommits) {
+    return String.format(
+        "CREATE TABLE variant_table ("
+            + "  id INT,"
+            + "  name STRING,"
+            + "  v VARIANT,"
+            + "  ts BIGINT,"
+            + "  `partition` STRING,"
+            + "  PRIMARY KEY (id) NOT ENFORCED"
+            + ") WITH ("
+            + "  'connector' = 'hudi',"
+            + "  'path' = '%s',"
+            + "  'table.type' = 'COPY_ON_WRITE',"
+            + "  '%s' = '%s',"
+            + "  '%s' = 'partition',"
+            + "  '%s' = 'false',"
+            + "  '%s' = 'false',"
+            + "  '%s' = '%d',"
+            + "  '%s' = '1',"
+            + "  '%s' = 'false'"
+            + ")",
+        tablePath.replace("'", "''"),
+        FlinkOptions.OPERATION.key(),
+        WriteOperationType.INSERT.value(),
+        FlinkOptions.PARTITION_PATH_FIELD.key(),
+        FlinkOptions.CLUSTERING_SCHEDULE_ENABLED.key(),
+        FlinkOptions.CLUSTERING_ASYNC_ENABLED.key(),
+        FlinkOptions.CLUSTERING_DELTA_COMMITS.key(),
+        clusteringDeltaCommits,
+        FlinkOptions.CLUSTERING_TASKS.key(),
+        FlinkOptions.METADATA_ENABLED.key());
+  }
+
+  private static void assertRowVariantJson(
+      TableEnvironment tEnv,
+      Row row,
+      int expectedId,
+      String expectedName,
+      long expectedTs,
+      String jsonLiteral)
+      throws Exception {
+    assertEquals(expectedId, row.getField(0));
+    assertEquals(expectedName, row.getField(1));
+    assertEquals(expectedTs, row.getField(3));
+    assertVariantMatchesParseJson(tEnv, row.getField(2), jsonLiteral);
+  }
+
+  private static void assertVariantMatchesParseJson(

Review Comment:
   🤖 nit: `assertVariantMatchesParseJson` is duplicated byte-for-byte in 
`ITTestVariantCrossEngineCompatibility`. Could it live in 
`DataTypeAdapterTestUtils` so there's one place to update if the comparison 
logic ever changes?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to