This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b9631eeb52 [flink] Sopport refreshing partition asynchronously for 
lookup join (#7402)
b9631eeb52 is described below

commit b9631eeb526e45a99d203ea20307c053d8cb5603
Author: umi <[email protected]>
AuthorDate: Fri Mar 27 09:27:30 2026 +0800

    [flink] Sopport refreshing partition asynchronously for lookup join (#7402)
---
 .github/workflows/utitcase-flink-1.x-common.yml    |   2 +-
 .../generated/flink_connector_configuration.html   |   8 +-
 .../apache/paimon/flink/FlinkConnectorOptions.java |  12 +
 .../flink/lookup/FileStoreLookupFunction.java      |  56 +++-
 .../paimon/flink/lookup/FullCacheLookupTable.java  |  11 +
 .../paimon/flink/lookup/PartitionRefresher.java    | 223 +++++++++++++++
 .../org/apache/paimon/flink/LookupJoinITCase.java  | 313 +++++++++++++++++++++
 7 files changed, 611 insertions(+), 14 deletions(-)

diff --git a/.github/workflows/utitcase-flink-1.x-common.yml 
b/.github/workflows/utitcase-flink-1.x-common.yml
index 6825d8e7a3..f22b56014f 100644
--- a/.github/workflows/utitcase-flink-1.x-common.yml
+++ b/.github/workflows/utitcase-flink-1.x-common.yml
@@ -38,7 +38,7 @@ concurrency:
 jobs:
   build_test:
     runs-on: ubuntu-latest
-    timeout-minutes: 60
+    timeout-minutes: 100
 
     steps:
       - name: Checkout code
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 1003aff77c..d4c8b5a167 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -80,6 +80,12 @@ under the License.
             <td>Duration</td>
             <td>Specific dynamic partition refresh interval for lookup, scan 
all partitions and obtain corresponding partition.</td>
         </tr>
+        <tr>
+            <td><h5>lookup.dynamic-partition.refresh.async</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to refresh dynamic partition lookup table 
asynchronously. This option only works for full cache dimension table. When 
enabled, partition changes will be loaded in a background thread while the old 
partition data continues serving queries. When disabled (default), partition 
refresh is synchronous and blocks queries until the new partition data is fully 
loaded.</td>
+        </tr>
         <tr>
             <td><h5>lookup.refresh.async</h5></td>
             <td style="word-wrap: break-word;">false</td>
@@ -357,4 +363,4 @@ under the License.
             <td>Defines a custom parallelism for the unaware-bucket table 
compaction job. By default, if this option is not defined, the planner will 
derive the parallelism for each statement individually by also considering the 
global configuration.</td>
         </tr>
     </tbody>
-</table>
+</table>
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 8febafc5e3..89d3e2174a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -276,6 +276,18 @@ public class FlinkConnectorOptions {
                     .defaultValue(false)
                     .withDescription("Whether to refresh lookup table in an 
async thread.");
 
+    public static final ConfigOption<Boolean> 
LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC =
+            ConfigOptions.key("lookup.dynamic-partition.refresh.async")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to refresh dynamic partition lookup table 
asynchronously. "
+                                    + "This option only works for full cache 
dimension table. "
+                                    + "When enabled, partition changes will be 
loaded in a background thread "
+                                    + "while the old partition data continues 
serving queries. "
+                                    + "When disabled (default), partition 
refresh is synchronous and blocks queries "
+                                    + "until the new partition data is fully 
loaded.");
+
     public static final ConfigOption<Integer> 
LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT =
             ConfigOptions.key("lookup.refresh.async.pending-snapshot-count")
                     .intType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 5b71664ca6..69d834615f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -68,6 +68,7 @@ import java.util.stream.IntStream;
 
 import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
 import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
 import static 
org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
@@ -93,8 +94,12 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
     private final List<InternalRow.FieldGetter> projectFieldsGetters;
 
     private transient File path;
+    private transient String tmpDirectory;
     private transient LookupTable lookupTable;
 
+    // partition refresh
+    @Nullable private transient PartitionRefresher partitionRefresher;
+
     // interval of refreshing lookup table
     private transient Duration refreshInterval;
     // timestamp when refreshing lookup table
@@ -161,7 +166,7 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
 
     public void open(FunctionContext context) throws Exception {
         this.functionContext = context;
-        String tmpDirectory = getTmpDirectory(context);
+        this.tmpDirectory = getTmpDirectory(context);
         open(tmpDirectory);
     }
 
@@ -236,6 +241,16 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
                 lookupTable.specifyPartitions(
                         partitions, 
partitionLoader.createSpecificPartFilter());
             }
+            if (partitionLoader instanceof DynamicPartitionLoader) {
+                // Initialize partition refresher
+                this.partitionRefresher =
+                        new PartitionRefresher(
+                                
options.get(LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC)
+                                        && lookupTable instanceof 
FullCacheLookupTable,
+                                table.name(),
+                                this.tmpDirectory,
+                                partitionLoader.partitions());
+            }
         }
 
         if (cacheRowFilter != null) {
@@ -271,13 +286,16 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
             if (partitionLoader == null) {
                 return lookupInternal(key);
             }
-
-            if (partitionLoader.partitions().isEmpty()) {
+            List<BinaryRow> partitions =
+                    partitionRefresher != null
+                            ? partitionRefresher.currentPartitions()
+                            : partitionLoader.partitions();
+            if (partitions.isEmpty()) {
                 return Collections.emptyList();
             }
 
             List<RowData> rows = new ArrayList<>();
-            for (BinaryRow partition : partitionLoader.partitions()) {
+            for (BinaryRow partition : partitions) {
                 rows.addAll(lookupInternal(JoinedRow.join(key, partition)));
             }
             return rows;
@@ -324,7 +342,18 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
             return;
         }
 
-        // 2. refresh dynamic partition
+        // 2. check if async partition refresh has completed, and switch if so
+        if (partitionRefresher != null && 
partitionRefresher.isPartitionRefreshAsync()) {
+            LookupTable newLookupTable =
+                    
partitionRefresher.getNewLookupTable(partitionLoader.partitions());
+            if (newLookupTable != null) {
+                lookupTable.close();
+                lookupTable = newLookupTable;
+                path = partitionRefresher.path();
+            }
+        }
+
+        // 3. refresh dynamic partition
         if (partitionLoader != null) {
             boolean partitionChanged = partitionLoader.checkRefresh();
             List<BinaryRow> partitions = partitionLoader.partitions();
@@ -334,18 +363,17 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
             }
 
             if (partitionChanged) {
-                // reopen with latest partition
-                lookupTable.specifyPartitions(
-                        partitionLoader.partitions(), 
partitionLoader.createSpecificPartFilter());
-                lookupTable.close();
-                lookupTable.open();
+                partitionRefresher.startRefresh(
+                        partitions,
+                        partitionLoader.createSpecificPartFilter(),
+                        lookupTable,
+                        cacheRowFilter);
                 nextRefreshTime = System.currentTimeMillis() + 
refreshInterval.toMillis();
-                // no need to refresh the lookup table because it is reopened
                 return;
             }
         }
 
-        // 3. refresh lookup table
+        // 4. refresh lookup table
         if (shouldRefreshLookupTable()) {
             // Check if we should do full load (close and reopen table) 
instead of incremental
             // refresh
@@ -415,6 +443,10 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
 
     @Override
     public void close() throws IOException {
+        if (partitionRefresher != null) {
+            partitionRefresher.close();
+        }
+
         if (lookupTable != null) {
             lookupTable.close();
             lookupTable = null;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 81af38ea8f..6b02084d8a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -414,5 +414,16 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
                     joinKey,
                     requiredCachedBucketIds);
         }
+
+        public Context copy(File newTempPath) {
+            return new Context(
+                    table.wrapped(),
+                    projection,
+                    tablePredicate,
+                    projectedPredicate,
+                    newTempPath,
+                    joinKey,
+                    requiredCachedBucketIds);
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionRefresher.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionRefresher.java
new file mode 100644
index 0000000000..cbf6b025bb
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionRefresher.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.utils.ExecutorThreadFactory;
+import org.apache.paimon.utils.ExecutorUtils;
+import org.apache.paimon.utils.FileIOUtils;
+import org.apache.paimon.utils.Filter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CACHE_ROWS;
+
+/** Manages partition refresh logic for {@link FullCacheLookupTable}. */
+public class PartitionRefresher implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionRefresher.class);
+
+    private final boolean partitionRefreshAsync;
+    private final String tableName;
+    private final String tmpDirectory;
+    private volatile File path;
+    private ExecutorService partitionRefreshExecutor;
+    private AtomicReference<LookupTable> pendingLookupTable;
+    private AtomicReference<Exception> partitionRefreshException;
+
+    /** Current partitions being used for lookup. Updated when partition 
refresh completes. */
+    private List<BinaryRow> currentPartitions;
+
+    public PartitionRefresher(
+            boolean partitionRefreshAsync,
+            String tableName,
+            String tmpDirectory,
+            List<BinaryRow> initialPartitions) {
+        this.partitionRefreshAsync = partitionRefreshAsync;
+        this.tableName = tableName;
+        this.tmpDirectory = tmpDirectory;
+        this.currentPartitions = initialPartitions;
+        if (!partitionRefreshAsync) {
+            return;
+        }
+        this.pendingLookupTable = new AtomicReference<>(null);
+        this.partitionRefreshException = new AtomicReference<>(null);
+        this.partitionRefreshExecutor =
+                Executors.newSingleThreadExecutor(
+                        new ExecutorThreadFactory(
+                                String.format(
+                                        "%s-lookup-refresh-partition",
+                                        Thread.currentThread().getName())));
+    }
+
+    /** Get the current partitions being used for lookup. */
+    public List<BinaryRow> currentPartitions() {
+        return currentPartitions;
+    }
+
+    /**
+     * Start partition refresh. Chooses sync or async mode based on 
configuration.
+     *
+     * @param newPartitions the new partitions to refresh to
+     * @param partitionFilter the partition filter for the new partitions
+     * @param lookupTable the current lookup table to refresh
+     * @param cacheRowFilter the cache row filter, may be null
+     */
+    public void startRefresh(
+            List<BinaryRow> newPartitions,
+            @Nullable Predicate partitionFilter,
+            LookupTable lookupTable,
+            @Nullable Filter<InternalRow> cacheRowFilter)
+            throws Exception {
+        if (partitionRefreshAsync) {
+            asyncPartitionRefresh(
+                    newPartitions,
+                    partitionFilter,
+                    ((FullCacheLookupTable) lookupTable).context,
+                    cacheRowFilter);
+        } else {
+            syncPartitionRefresh(newPartitions, partitionFilter, lookupTable);
+        }
+    }
+
+    private void syncPartitionRefresh(
+            List<BinaryRow> newPartitions,
+            @Nullable Predicate partitionFilter,
+            LookupTable lookupTable)
+            throws Exception {
+        LOG.info(
+                "Synchronously refreshing partition for table {}, new 
partitions detected.",
+                tableName);
+        lookupTable.close();
+        lookupTable.specifyPartitions(newPartitions, partitionFilter);
+        lookupTable.open();
+        this.currentPartitions = newPartitions;
+        LOG.info("Synchronous partition refresh completed for table {}.", 
tableName);
+    }
+
+    private void asyncPartitionRefresh(
+            List<BinaryRow> newPartitions,
+            @Nullable Predicate partitionFilter,
+            FullCacheLookupTable.Context context,
+            @Nullable Filter<InternalRow> cacheRowFilter) {
+
+        LOG.info(
+                "Starting async partition refresh for table {}, new partitions 
detected.",
+                tableName);
+
+        partitionRefreshExecutor.submit(
+                () -> {
+                    try {
+                        this.path = new File(tmpDirectory, "lookup-" + 
UUID.randomUUID());
+                        if (!path.mkdirs()) {
+                            throw new RuntimeException("Failed to create dir: 
" + path);
+                        }
+                        FullCacheLookupTable.Context newContext = 
context.copy(path);
+                        Options options = 
Options.fromMap(context.table.options());
+                        FullCacheLookupTable newTable =
+                                FullCacheLookupTable.create(
+                                        newContext, 
options.get(LOOKUP_CACHE_ROWS));
+                        if (cacheRowFilter != null) {
+                            newTable.specifyCacheRowFilter(cacheRowFilter);
+                        }
+                        newTable.specifyPartitions(newPartitions, 
partitionFilter);
+                        newTable.open();
+
+                        pendingLookupTable.set(newTable);
+                        LOG.info("Async partition refresh completed for table 
{}.", tableName);
+                    } catch (Exception e) {
+                        LOG.error("Async partition refresh failed for table 
{}.", tableName, e);
+                        partitionRefreshException.set(e);
+                        if (path != null) {
+                            FileIOUtils.deleteDirectoryQuietly(path);
+                        }
+                    }
+                });
+    }
+
+    /**
+     * Check if an async partition refresh has completed.
+     *
+     * @param newPartitions the new partitions to update after refresh 
completes
+     * @return a Pair containing the new lookup table and its temp path if 
ready, or null if no
+     *     switch is needed
+     */
+    @Nullable
+    public LookupTable getNewLookupTable(List<BinaryRow> newPartitions) throws 
Exception {
+        if (!partitionRefreshAsync) {
+            return null;
+        }
+
+        Exception asyncException = partitionRefreshException.getAndSet(null);
+        if (asyncException != null) {
+            LOG.error(
+                    "Async partition refresh failed for table {}, will stop 
running.",
+                    tableName,
+                    asyncException);
+            throw asyncException;
+        }
+
+        LookupTable newTable = pendingLookupTable.getAndSet(null);
+        if (newTable == null) {
+            return null;
+        }
+
+        this.currentPartitions = newPartitions;
+        LOG.info("Switched to new lookup table for table {} with new 
partitions.", tableName);
+        return newTable;
+    }
+
+    /** Close partition refresh resources. */
+    @Override
+    public void close() throws IOException {
+        if (partitionRefreshExecutor != null) {
+            ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, 
partitionRefreshExecutor);
+        }
+        if (pendingLookupTable != null) {
+            LookupTable pending = pendingLookupTable.getAndSet(null);
+            if (pending != null) {
+                pending.close();
+            }
+        }
+    }
+
+    public boolean isPartitionRefreshAsync() {
+        return partitionRefreshAsync;
+    }
+
+    public File path() {
+        return path;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 408d74ea0c..32a4948501 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -1250,4 +1250,317 @@ public class LookupJoinITCase extends CatalogITCaseBase 
{
 
         iterator.close();
     }
+
+    @ParameterizedTest
+    @EnumSource(LookupCacheMode.class)
+    public void testAsyncPartitionRefresh(LookupCacheMode mode) throws 
Exception {
+        // This test verifies asynchronous partition refresh:
+        // when max_pt() changes, the lookup table is refreshed in a 
background thread,
+        // old partition data continues serving queries until the new 
partition is fully loaded.
+        sql(
+                "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, 
PRIMARY KEY (pt, k) NOT ENFORCED)"
+                        + "PARTITIONED BY (`pt`) WITH ("
+                        + "'bucket' = '1', "
+                        + "'lookup.dynamic-partition' = 'max_pt()', "
+                        + "'lookup.dynamic-partition.refresh-interval' = '1 
ms', "
+                        + "'lookup.dynamic-partition.refresh.async' = 'true', "
+                        + "'lookup.cache' = '%s', "
+                        + "'continuous.discovery-interval'='1 ms')",
+                mode);
+
+        // insert data into partition '1'
+        sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)");
+
+        String query =
+                "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+                        + "for system_time as of T.proctime AS D ON T.i = D.k";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        // verify initial lookup returns partition '1' data
+        sql("INSERT INTO T VALUES (1), (2)");
+        List<Row> result = iterator.collect(2);
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 
200));
+
+        // insert data into a new partition '2', which will trigger async 
partition refresh
+        sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 
2000)");
+        Thread.sleep(500); // wait for async refresh to complete
+        // trigger a lookup to check async completion and switch to new 
partition
+        sql("INSERT INTO T VALUES (1), (2)");
+        iterator.collect(2);
+        Thread.sleep(500);
+        sql("INSERT INTO T VALUES (1), (2)");
+        result = iterator.collect(2);
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), 
Row.of(2, 2000));
+
+        // insert another new partition '3' and verify switch again
+        sql("INSERT INTO PARTITIONED_DIM VALUES ('3', 1, 10000), ('3', 2, 
20000)");
+        Thread.sleep(500); // wait for async refresh to complete
+        sql("INSERT INTO T VALUES (1), (2)");
+        iterator.collect(2);
+        Thread.sleep(500);
+        sql("INSERT INTO T VALUES (1), (2)");
+        result = iterator.collect(2);
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 10000), 
Row.of(2, 20000));
+
+        iterator.close();
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = LookupCacheMode.class,
+            names = {"FULL", "MEMORY"})
+    public void 
testAsyncPartitionRefreshServesOldDataDuringRefresh(LookupCacheMode mode)
+            throws Exception {
+        // Verify that during async refresh, queries still return old 
partition data
+        // until the new partition is fully loaded and switched.
+        sql(
+                "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, 
PRIMARY KEY (pt, k) NOT ENFORCED)"
+                        + "PARTITIONED BY (`pt`) WITH ("
+                        + "'bucket' = '1', "
+                        + "'lookup.dynamic-partition' = 'max_pt()', "
+                        + "'lookup.dynamic-partition.refresh-interval' = '1 
ms', "
+                        + "'lookup.dynamic-partition.refresh.async' = 'true', "
+                        + "'lookup.cache' = '%s', "
+                        + "'continuous.discovery-interval'='1 ms')",
+                mode);
+
+        sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)");
+
+        String query =
+                "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+                        + "for system_time as of T.proctime AS D ON T.i = D.k";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        sql("INSERT INTO T VALUES (1), (2)");
+        List<Row> result = iterator.collect(2);
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 
200));
+
+        // insert new partition '2' to trigger async refresh
+        sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2, 
2000)");
+
+        // immediately query before async refresh completes — should still 
return old partition data
+        sql("INSERT INTO T VALUES (1), (2)");
+        result = iterator.collect(2);
+        // old partition data (100, 200) should still be served
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 
200));
+
+        // now wait for async refresh to complete and trigger switch
+        Thread.sleep(500);
+        sql("INSERT INTO T VALUES (1), (2)");
+        result = iterator.collect(2);
+        // after switch, new partition data should be returned
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000), 
Row.of(2, 2000));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testAsyncPartitionRefreshWithMultiPartitionKeys() throws 
Exception {
+        // Verify async partition refresh works correctly with multi-level 
partition keys.
+        sql(
+                "CREATE TABLE PARTITIONED_DIM (pt1 STRING, pt2 INT, k INT, v 
INT, PRIMARY KEY (pt1, pt2, k) NOT ENFORCED)"
+                        + "PARTITIONED BY (`pt1`, `pt2`) WITH ("
+                        + "'bucket' = '1', "
+                        + "'scan.partitions' = 'pt1=max_pt()', "
+                        + "'lookup.dynamic-partition.refresh-interval' = '1 
ms', "
+                        + "'lookup.dynamic-partition.refresh.async' = 'true', "
+                        + "'lookup.cache' = '%s', "
+                        + "'continuous.discovery-interval'='1 ms')",
+                LookupCacheMode.FULL);
+
+        sql(
+                "INSERT INTO PARTITIONED_DIM VALUES "
+                        + "('2024', 1, 1, 100), ('2024', 1, 2, 200), "
+                        + "('2024', 2, 1, 300), ('2024', 2, 2, 400)");
+
+        String query =
+                "SELECT D.pt1, D.pt2, T.i, D.v FROM T LEFT JOIN 
PARTITIONED_DIM "
+                        + "for system_time as of T.proctime AS D ON T.i = D.k";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        sql("INSERT INTO T VALUES (1), (2)");
+        List<Row> result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of("2024", 1, 1, 100),
+                        Row.of("2024", 1, 2, 200),
+                        Row.of("2024", 2, 1, 300),
+                        Row.of("2024", 2, 2, 400));
+
+        // insert new max partition '2025' with sub-partitions
+        sql(
+                "INSERT INTO PARTITIONED_DIM VALUES "
+                        + "('2025', 1, 1, 1000), ('2025', 1, 2, 2000), "
+                        + "('2025', 2, 1, 3000), ('2025', 2, 2, 4000)");
+        Thread.sleep(500);
+        sql("INSERT INTO T VALUES (1), (2)");
+        iterator.collect(4);
+        Thread.sleep(500);
+        sql("INSERT INTO T VALUES (1), (2)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of("2025", 1, 1, 1000),
+                        Row.of("2025", 1, 2, 2000),
+                        Row.of("2025", 2, 1, 3000),
+                        Row.of("2025", 2, 2, 4000));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testAsyncPartitionRefreshWithOverwrite() throws Exception {
+        // Verify async partition refresh works correctly when a new max 
partition
+        // is created via INSERT OVERWRITE.
+        sql(
+                "CREATE TABLE PARTITIONED_DIM (pt INT, k INT, v INT, PRIMARY 
KEY (pt, k) NOT ENFORCED)"
+                        + "PARTITIONED BY (`pt`) WITH ("
+                        + "'bucket' = '1', "
+                        + "'lookup.dynamic-partition' = 'max_pt()', "
+                        + "'lookup.dynamic-partition.refresh-interval' = '1 
ms', "
+                        + "'lookup.dynamic-partition.refresh.async' = 'true', "
+                        + "'lookup.cache' = '%s', "
+                        + "'continuous.discovery-interval'='1 ms')",
+                LookupCacheMode.FULL);
+
+        sql("INSERT INTO PARTITIONED_DIM VALUES (1, 1, 100), (1, 2, 200)");
+
+        String query =
+                "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+                        + "for system_time as of T.proctime AS D ON T.i = D.k";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        sql("INSERT INTO T VALUES (1), (2)");
+        List<Row> result = iterator.collect(2);
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 
200));
+
+        // overwrite current max partition with new data
+        sql("INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 1) VALUES (1, 
150), (2, 250)");
+        Thread.sleep(500);
+        sql("INSERT INTO T VALUES (1), (2)");
+        result = iterator.collect(2);
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 150), Row.of(2, 
250));
+
+        // overwrite to create a new max partition
+        sql(
+                "INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 2) VALUES 
(1, 1000), (2, 2000), (3, 3000)");
+        Thread.sleep(500);
+        sql("INSERT INTO T VALUES (1), (2), (3)");
+        iterator.collect(3);
+        Thread.sleep(500);
+        sql("INSERT INTO T VALUES (1), (2), (3)");
+        result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000), 
Row.of(3, 3000));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testAsyncPartitionRefreshWithMaxTwoPt() throws Exception {
+        // Verify async partition refresh works correctly with max_two_pt() 
strategy.
+        sql(
+                "CREATE TABLE TWO_PT_DIM (pt STRING, k INT, v INT, PRIMARY KEY 
(pt, k) NOT ENFORCED)"
+                        + "PARTITIONED BY (`pt`) WITH ("
+                        + "'bucket' = '1', "
+                        + "'lookup.dynamic-partition' = 'max_two_pt()', "
+                        + "'lookup.dynamic-partition.refresh-interval' = '1 
ms', "
+                        + "'lookup.dynamic-partition.refresh.async' = 'true', "
+                        + "'lookup.cache' = '%s', "
+                        + "'continuous.discovery-interval'='1 ms')",
+                LookupCacheMode.FULL);
+
+        // insert data into partitions '1' and '2'
+        sql(
+                "INSERT INTO TWO_PT_DIM VALUES "
+                        + "('1', 1, 100), ('1', 2, 200), "
+                        + "('2', 1, 300), ('2', 2, 400)");
+
+        String query =
+                "SELECT D.pt, T.i, D.v FROM T LEFT JOIN TWO_PT_DIM "
+                        + "for system_time as of T.proctime AS D ON T.i = D.k";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        sql("INSERT INTO T VALUES (1), (2)");
+        List<Row> result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of("1", 1, 100),
+                        Row.of("1", 2, 200),
+                        Row.of("2", 1, 300),
+                        Row.of("2", 2, 400));
+
+        // insert new partition '3', now max_two_pt should be '2' and '3'
+        sql("INSERT INTO TWO_PT_DIM VALUES " + "('3', 1, 1000), ('3', 2, 
2000)");
+        sql("INSERT INTO T VALUES (1), (2)");
+        iterator.collect(4);
+        Thread.sleep(500);
+        sql("INSERT INTO T VALUES (1), (2)");
+        result = iterator.collect(4);
+        // should now see data from partitions '2' and '3'
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of("2", 1, 300),
+                        Row.of("2", 2, 400),
+                        Row.of("3", 1, 1000),
+                        Row.of("3", 2, 2000));
+
+        // insert another partition '4', max_two_pt should be '3' and '4'
+        sql("INSERT INTO TWO_PT_DIM VALUES " + "('4', 1, 10000), ('4', 2, 
20000)");
+        sql("INSERT INTO T VALUES (1), (2)");
+        iterator.collect(4);
+        Thread.sleep(500);
+        sql("INSERT INTO T VALUES (1), (2)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of("3", 1, 1000),
+                        Row.of("3", 2, 2000),
+                        Row.of("4", 1, 10000),
+                        Row.of("4", 2, 20000));
+
+        iterator.close();
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = LookupCacheMode.class,
+            names = {"FULL", "MEMORY"})
+    public void testAsyncPartitionRefreshWithNonPkTable(LookupCacheMode mode) 
throws Exception {
+        // Verify async partition refresh works correctly with non-primary-key 
append tables.
+        sql(
+                "CREATE TABLE NON_PK_DIM (pt STRING, k INT, v INT)"
+                        + "PARTITIONED BY (`pt`) WITH ("
+                        + "'lookup.dynamic-partition' = 'max_pt()', "
+                        + "'lookup.dynamic-partition.refresh-interval' = '1 
ms', "
+                        + "'lookup.dynamic-partition.refresh.async' = 'true', "
+                        + "'lookup.cache' = '%s', "
+                        + "'continuous.discovery-interval'='1 ms')",
+                mode);
+
+        sql("INSERT INTO NON_PK_DIM VALUES ('1', 1, 100), ('1', 1, 101), ('1', 
2, 200)");
+
+        String query =
+                "SELECT T.i, D.v FROM T LEFT JOIN NON_PK_DIM "
+                        + "for system_time as of T.proctime AS D ON T.i = D.k";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        sql("INSERT INTO T VALUES (1), (2)");
+        List<Row> result = iterator.collect(3);
+        // non-pk table may return multiple matches
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of(1, 100), Row.of(1, 101), 
Row.of(2, 200));
+
+        // insert new partition '2' to trigger async refresh
+        sql("INSERT INTO NON_PK_DIM VALUES ('2', 1, 1000), ('2', 1, 1001), 
('2', 2, 2000)");
+        sql("INSERT INTO T VALUES (1), (2)");
+        iterator.collect(3);
+        Thread.sleep(500);
+        sql("INSERT INTO T VALUES (1), (2)");
+        result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(1, 1001), 
Row.of(2, 2000));
+
+        iterator.close();
+    }
 }


Reply via email to