This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b9631eeb52 [flink] Sopport refreshing partition asynchronously for
lookup join (#7402)
b9631eeb52 is described below
commit b9631eeb526e45a99d203ea20307c053d8cb5603
Author: umi <[email protected]>
AuthorDate: Fri Mar 27 09:27:30 2026 +0800
[flink] Sopport refreshing partition asynchronously for lookup join (#7402)
---
.github/workflows/utitcase-flink-1.x-common.yml | 2 +-
.../generated/flink_connector_configuration.html | 8 +-
.../apache/paimon/flink/FlinkConnectorOptions.java | 12 +
.../flink/lookup/FileStoreLookupFunction.java | 56 +++-
.../paimon/flink/lookup/FullCacheLookupTable.java | 11 +
.../paimon/flink/lookup/PartitionRefresher.java | 223 +++++++++++++++
.../org/apache/paimon/flink/LookupJoinITCase.java | 313 +++++++++++++++++++++
7 files changed, 611 insertions(+), 14 deletions(-)
diff --git a/.github/workflows/utitcase-flink-1.x-common.yml
b/.github/workflows/utitcase-flink-1.x-common.yml
index 6825d8e7a3..f22b56014f 100644
--- a/.github/workflows/utitcase-flink-1.x-common.yml
+++ b/.github/workflows/utitcase-flink-1.x-common.yml
@@ -38,7 +38,7 @@ concurrency:
jobs:
build_test:
runs-on: ubuntu-latest
- timeout-minutes: 60
+ timeout-minutes: 100
steps:
- name: Checkout code
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 1003aff77c..d4c8b5a167 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -80,6 +80,12 @@ under the License.
<td>Duration</td>
<td>Specific dynamic partition refresh interval for lookup, scan
all partitions and obtain corresponding partition.</td>
</tr>
+ <tr>
+ <td><h5>lookup.dynamic-partition.refresh.async</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to refresh dynamic partition lookup table
asynchronously. This option only works for full cache dimension table. When
enabled, partition changes will be loaded in a background thread while the old
partition data continues serving queries. When disabled (default), partition
refresh is synchronous and blocks queries until the new partition data is fully
loaded.</td>
+ </tr>
<tr>
<td><h5>lookup.refresh.async</h5></td>
<td style="word-wrap: break-word;">false</td>
@@ -357,4 +363,4 @@ under the License.
<td>Defines a custom parallelism for the unaware-bucket table
compaction job. By default, if this option is not defined, the planner will
derive the parallelism for each statement individually by also considering the
global configuration.</td>
</tr>
</tbody>
-</table>
+</table>
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 8febafc5e3..89d3e2174a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -276,6 +276,18 @@ public class FlinkConnectorOptions {
.defaultValue(false)
.withDescription("Whether to refresh lookup table in an
async thread.");
+ public static final ConfigOption<Boolean>
LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC =
+ ConfigOptions.key("lookup.dynamic-partition.refresh.async")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to refresh dynamic partition lookup table
asynchronously. "
+ + "This option only works for full cache
dimension table. "
+ + "When enabled, partition changes will be
loaded in a background thread "
+ + "while the old partition data continues
serving queries. "
+ + "When disabled (default), partition
refresh is synchronous and blocks queries "
+ + "until the new partition data is fully
loaded.");
+
public static final ConfigOption<Integer>
LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT =
ConfigOptions.key("lookup.refresh.async.pending-snapshot-count")
.intType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 5b71664ca6..69d834615f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -68,6 +68,7 @@ import java.util.stream.IntStream;
import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC;
import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD;
import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
import static
org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
@@ -93,8 +94,12 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
private final List<InternalRow.FieldGetter> projectFieldsGetters;
private transient File path;
+ private transient String tmpDirectory;
private transient LookupTable lookupTable;
+ // partition refresh
+ @Nullable private transient PartitionRefresher partitionRefresher;
+
// interval of refreshing lookup table
private transient Duration refreshInterval;
// timestamp when refreshing lookup table
@@ -161,7 +166,7 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
public void open(FunctionContext context) throws Exception {
this.functionContext = context;
- String tmpDirectory = getTmpDirectory(context);
+ this.tmpDirectory = getTmpDirectory(context);
open(tmpDirectory);
}
@@ -236,6 +241,16 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
lookupTable.specifyPartitions(
partitions,
partitionLoader.createSpecificPartFilter());
}
+ if (partitionLoader instanceof DynamicPartitionLoader) {
+ // Initialize partition refresher
+ this.partitionRefresher =
+ new PartitionRefresher(
+
options.get(LOOKUP_DYNAMIC_PARTITION_REFRESH_ASYNC)
+ && lookupTable instanceof
FullCacheLookupTable,
+ table.name(),
+ this.tmpDirectory,
+ partitionLoader.partitions());
+ }
}
if (cacheRowFilter != null) {
@@ -271,13 +286,16 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
if (partitionLoader == null) {
return lookupInternal(key);
}
-
- if (partitionLoader.partitions().isEmpty()) {
+ List<BinaryRow> partitions =
+ partitionRefresher != null
+ ? partitionRefresher.currentPartitions()
+ : partitionLoader.partitions();
+ if (partitions.isEmpty()) {
return Collections.emptyList();
}
List<RowData> rows = new ArrayList<>();
- for (BinaryRow partition : partitionLoader.partitions()) {
+ for (BinaryRow partition : partitions) {
rows.addAll(lookupInternal(JoinedRow.join(key, partition)));
}
return rows;
@@ -324,7 +342,18 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
return;
}
- // 2. refresh dynamic partition
+ // 2. check if async partition refresh has completed, and switch if so
+ if (partitionRefresher != null &&
partitionRefresher.isPartitionRefreshAsync()) {
+ LookupTable newLookupTable =
+
partitionRefresher.getNewLookupTable(partitionLoader.partitions());
+ if (newLookupTable != null) {
+ lookupTable.close();
+ lookupTable = newLookupTable;
+ path = partitionRefresher.path();
+ }
+ }
+
+ // 3. refresh dynamic partition
if (partitionLoader != null) {
boolean partitionChanged = partitionLoader.checkRefresh();
List<BinaryRow> partitions = partitionLoader.partitions();
@@ -334,18 +363,17 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
}
if (partitionChanged) {
- // reopen with latest partition
- lookupTable.specifyPartitions(
- partitionLoader.partitions(),
partitionLoader.createSpecificPartFilter());
- lookupTable.close();
- lookupTable.open();
+ partitionRefresher.startRefresh(
+ partitions,
+ partitionLoader.createSpecificPartFilter(),
+ lookupTable,
+ cacheRowFilter);
nextRefreshTime = System.currentTimeMillis() +
refreshInterval.toMillis();
- // no need to refresh the lookup table because it is reopened
return;
}
}
- // 3. refresh lookup table
+ // 4. refresh lookup table
if (shouldRefreshLookupTable()) {
// Check if we should do full load (close and reopen table)
instead of incremental
// refresh
@@ -415,6 +443,10 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
@Override
public void close() throws IOException {
+ if (partitionRefresher != null) {
+ partitionRefresher.close();
+ }
+
if (lookupTable != null) {
lookupTable.close();
lookupTable = null;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 81af38ea8f..6b02084d8a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -414,5 +414,16 @@ public abstract class FullCacheLookupTable implements
LookupTable {
joinKey,
requiredCachedBucketIds);
}
+
+ public Context copy(File newTempPath) {
+ return new Context(
+ table.wrapped(),
+ projection,
+ tablePredicate,
+ projectedPredicate,
+ newTempPath,
+ joinKey,
+ requiredCachedBucketIds);
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionRefresher.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionRefresher.java
new file mode 100644
index 0000000000..cbf6b025bb
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PartitionRefresher.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.utils.ExecutorThreadFactory;
+import org.apache.paimon.utils.ExecutorUtils;
+import org.apache.paimon.utils.FileIOUtils;
+import org.apache.paimon.utils.Filter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CACHE_ROWS;
+
+/** Manages partition refresh logic for {@link FullCacheLookupTable}. */
+public class PartitionRefresher implements Closeable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PartitionRefresher.class);
+
+ private final boolean partitionRefreshAsync;
+ private final String tableName;
+ private final String tmpDirectory;
+ private volatile File path;
+ private ExecutorService partitionRefreshExecutor;
+ private AtomicReference<LookupTable> pendingLookupTable;
+ private AtomicReference<Exception> partitionRefreshException;
+
+ /** Current partitions being used for lookup. Updated when partition
refresh completes. */
+ private List<BinaryRow> currentPartitions;
+
+ public PartitionRefresher(
+ boolean partitionRefreshAsync,
+ String tableName,
+ String tmpDirectory,
+ List<BinaryRow> initialPartitions) {
+ this.partitionRefreshAsync = partitionRefreshAsync;
+ this.tableName = tableName;
+ this.tmpDirectory = tmpDirectory;
+ this.currentPartitions = initialPartitions;
+ if (!partitionRefreshAsync) {
+ return;
+ }
+ this.pendingLookupTable = new AtomicReference<>(null);
+ this.partitionRefreshException = new AtomicReference<>(null);
+ this.partitionRefreshExecutor =
+ Executors.newSingleThreadExecutor(
+ new ExecutorThreadFactory(
+ String.format(
+ "%s-lookup-refresh-partition",
+ Thread.currentThread().getName())));
+ }
+
+ /** Get the current partitions being used for lookup. */
+ public List<BinaryRow> currentPartitions() {
+ return currentPartitions;
+ }
+
+ /**
+ * Start partition refresh. Chooses sync or async mode based on
configuration.
+ *
+ * @param newPartitions the new partitions to refresh to
+ * @param partitionFilter the partition filter for the new partitions
+ * @param lookupTable the current lookup table to refresh
+ * @param cacheRowFilter the cache row filter, may be null
+ */
+ public void startRefresh(
+ List<BinaryRow> newPartitions,
+ @Nullable Predicate partitionFilter,
+ LookupTable lookupTable,
+ @Nullable Filter<InternalRow> cacheRowFilter)
+ throws Exception {
+ if (partitionRefreshAsync) {
+ asyncPartitionRefresh(
+ newPartitions,
+ partitionFilter,
+ ((FullCacheLookupTable) lookupTable).context,
+ cacheRowFilter);
+ } else {
+ syncPartitionRefresh(newPartitions, partitionFilter, lookupTable);
+ }
+ }
+
+ private void syncPartitionRefresh(
+ List<BinaryRow> newPartitions,
+ @Nullable Predicate partitionFilter,
+ LookupTable lookupTable)
+ throws Exception {
+ LOG.info(
+ "Synchronously refreshing partition for table {}, new
partitions detected.",
+ tableName);
+ lookupTable.close();
+ lookupTable.specifyPartitions(newPartitions, partitionFilter);
+ lookupTable.open();
+ this.currentPartitions = newPartitions;
+ LOG.info("Synchronous partition refresh completed for table {}.",
tableName);
+ }
+
+ private void asyncPartitionRefresh(
+ List<BinaryRow> newPartitions,
+ @Nullable Predicate partitionFilter,
+ FullCacheLookupTable.Context context,
+ @Nullable Filter<InternalRow> cacheRowFilter) {
+
+ LOG.info(
+ "Starting async partition refresh for table {}, new partitions
detected.",
+ tableName);
+
+ partitionRefreshExecutor.submit(
+ () -> {
+ try {
+ this.path = new File(tmpDirectory, "lookup-" +
UUID.randomUUID());
+ if (!path.mkdirs()) {
+ throw new RuntimeException("Failed to create dir:
" + path);
+ }
+ FullCacheLookupTable.Context newContext =
context.copy(path);
+ Options options =
Options.fromMap(context.table.options());
+ FullCacheLookupTable newTable =
+ FullCacheLookupTable.create(
+ newContext,
options.get(LOOKUP_CACHE_ROWS));
+ if (cacheRowFilter != null) {
+ newTable.specifyCacheRowFilter(cacheRowFilter);
+ }
+ newTable.specifyPartitions(newPartitions,
partitionFilter);
+ newTable.open();
+
+ pendingLookupTable.set(newTable);
+ LOG.info("Async partition refresh completed for table
{}.", tableName);
+ } catch (Exception e) {
+ LOG.error("Async partition refresh failed for table
{}.", tableName, e);
+ partitionRefreshException.set(e);
+ if (path != null) {
+ FileIOUtils.deleteDirectoryQuietly(path);
+ }
+ }
+ });
+ }
+
+ /**
+ * Check if an async partition refresh has completed.
+ *
+ * @param newPartitions the new partitions to update after refresh
completes
+ * @return a Pair containing the new lookup table and its temp path if
ready, or null if no
+ * switch is needed
+ */
+ @Nullable
+ public LookupTable getNewLookupTable(List<BinaryRow> newPartitions) throws
Exception {
+ if (!partitionRefreshAsync) {
+ return null;
+ }
+
+ Exception asyncException = partitionRefreshException.getAndSet(null);
+ if (asyncException != null) {
+ LOG.error(
+ "Async partition refresh failed for table {}, will stop
running.",
+ tableName,
+ asyncException);
+ throw asyncException;
+ }
+
+ LookupTable newTable = pendingLookupTable.getAndSet(null);
+ if (newTable == null) {
+ return null;
+ }
+
+ this.currentPartitions = newPartitions;
+ LOG.info("Switched to new lookup table for table {} with new
partitions.", tableName);
+ return newTable;
+ }
+
+ /** Close partition refresh resources. */
+ @Override
+ public void close() throws IOException {
+ if (partitionRefreshExecutor != null) {
+ ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES,
partitionRefreshExecutor);
+ }
+ if (pendingLookupTable != null) {
+ LookupTable pending = pendingLookupTable.getAndSet(null);
+ if (pending != null) {
+ pending.close();
+ }
+ }
+ }
+
+ public boolean isPartitionRefreshAsync() {
+ return partitionRefreshAsync;
+ }
+
+ public File path() {
+ return path;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index 408d74ea0c..32a4948501 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -1250,4 +1250,317 @@ public class LookupJoinITCase extends CatalogITCaseBase
{
iterator.close();
}
+
+ @ParameterizedTest
+ @EnumSource(LookupCacheMode.class)
+ public void testAsyncPartitionRefresh(LookupCacheMode mode) throws
Exception {
+ // This test verifies asynchronous partition refresh:
+ // when max_pt() changes, the lookup table is refreshed in a
background thread,
+ // old partition data continues serving queries until the new
partition is fully loaded.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT,
PRIMARY KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1
ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ // insert data into partition '1'
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ // verify initial lookup returns partition '1' data
+ sql("INSERT INTO T VALUES (1), (2)");
+ List<Row> result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2,
200));
+
+ // insert data into a new partition '2', which will trigger async
partition refresh
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2,
2000)");
+ Thread.sleep(500); // wait for async refresh to complete
+ // trigger a lookup to check async completion and switch to new
partition
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(2);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000),
Row.of(2, 2000));
+
+ // insert another new partition '3' and verify switch again
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('3', 1, 10000), ('3', 2,
20000)");
+ Thread.sleep(500); // wait for async refresh to complete
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(2);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 10000),
Row.of(2, 20000));
+
+ iterator.close();
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = LookupCacheMode.class,
+ names = {"FULL", "MEMORY"})
+ public void
testAsyncPartitionRefreshServesOldDataDuringRefresh(LookupCacheMode mode)
+ throws Exception {
+ // Verify that during async refresh, queries still return old
partition data
+ // until the new partition is fully loaded and switched.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT,
PRIMARY KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1
ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 100), ('1', 2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List<Row> result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2,
200));
+
+ // insert new partition '2' to trigger async refresh
+ sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 1000), ('2', 2,
2000)");
+
+ // immediately query before async refresh completes — should still
return old partition data
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ // old partition data (100, 200) should still be served
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2,
200));
+
+ // now wait for async refresh to complete and trigger switch
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ // after switch, new partition data should be returned
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1000),
Row.of(2, 2000));
+
+ iterator.close();
+ }
+
+ @Test
+ public void testAsyncPartitionRefreshWithMultiPartitionKeys() throws
Exception {
+ // Verify async partition refresh works correctly with multi-level
partition keys.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt1 STRING, pt2 INT, k INT, v
INT, PRIMARY KEY (pt1, pt2, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt1`, `pt2`) WITH ("
+ + "'bucket' = '1', "
+ + "'scan.partitions' = 'pt1=max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1
ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ LookupCacheMode.FULL);
+
+ sql(
+ "INSERT INTO PARTITIONED_DIM VALUES "
+ + "('2024', 1, 1, 100), ('2024', 1, 2, 200), "
+ + "('2024', 2, 1, 300), ('2024', 2, 2, 400)");
+
+ String query =
+ "SELECT D.pt1, D.pt2, T.i, D.v FROM T LEFT JOIN
PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List<Row> result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("2024", 1, 1, 100),
+ Row.of("2024", 1, 2, 200),
+ Row.of("2024", 2, 1, 300),
+ Row.of("2024", 2, 2, 400));
+
+ // insert new max partition '2025' with sub-partitions
+ sql(
+ "INSERT INTO PARTITIONED_DIM VALUES "
+ + "('2025', 1, 1, 1000), ('2025', 1, 2, 2000), "
+ + "('2025', 2, 1, 3000), ('2025', 2, 2, 4000)");
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(4);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("2025", 1, 1, 1000),
+ Row.of("2025", 1, 2, 2000),
+ Row.of("2025", 2, 1, 3000),
+ Row.of("2025", 2, 2, 4000));
+
+ iterator.close();
+ }
+
+ @Test
+ public void testAsyncPartitionRefreshWithOverwrite() throws Exception {
+ // Verify async partition refresh works correctly when a new max
partition
+ // is created via INSERT OVERWRITE.
+ sql(
+ "CREATE TABLE PARTITIONED_DIM (pt INT, k INT, v INT, PRIMARY
KEY (pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1
ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ LookupCacheMode.FULL);
+
+ sql("INSERT INTO PARTITIONED_DIM VALUES (1, 1, 100), (1, 2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List<Row> result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2,
200));
+
+ // overwrite current max partition with new data
+ sql("INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 1) VALUES (1,
150), (2, 250)");
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 150), Row.of(2,
250));
+
+ // overwrite to create a new max partition
+ sql(
+ "INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 2) VALUES
(1, 1000), (2, 2000), (3, 3000)");
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2), (3)");
+ iterator.collect(3);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2), (3)");
+ result = iterator.collect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(2, 2000),
Row.of(3, 3000));
+
+ iterator.close();
+ }
+
+ @Test
+ public void testAsyncPartitionRefreshWithMaxTwoPt() throws Exception {
+ // Verify async partition refresh works correctly with max_two_pt()
strategy.
+ sql(
+ "CREATE TABLE TWO_PT_DIM (pt STRING, k INT, v INT, PRIMARY KEY
(pt, k) NOT ENFORCED)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'bucket' = '1', "
+ + "'lookup.dynamic-partition' = 'max_two_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1
ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ LookupCacheMode.FULL);
+
+ // insert data into partitions '1' and '2'
+ sql(
+ "INSERT INTO TWO_PT_DIM VALUES "
+ + "('1', 1, 100), ('1', 2, 200), "
+ + "('2', 1, 300), ('2', 2, 400)");
+
+ String query =
+ "SELECT D.pt, T.i, D.v FROM T LEFT JOIN TWO_PT_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List<Row> result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("1", 1, 100),
+ Row.of("1", 2, 200),
+ Row.of("2", 1, 300),
+ Row.of("2", 2, 400));
+
+ // insert new partition '3', now max_two_pt should be '2' and '3'
+ sql("INSERT INTO TWO_PT_DIM VALUES " + "('3', 1, 1000), ('3', 2,
2000)");
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(4);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(4);
+ // should now see data from partitions '2' and '3'
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("2", 1, 300),
+ Row.of("2", 2, 400),
+ Row.of("3", 1, 1000),
+ Row.of("3", 2, 2000));
+
+ // insert another partition '4', max_two_pt should be '3' and '4'
+ sql("INSERT INTO TWO_PT_DIM VALUES " + "('4', 1, 10000), ('4', 2,
20000)");
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(4);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of("3", 1, 1000),
+ Row.of("3", 2, 2000),
+ Row.of("4", 1, 10000),
+ Row.of("4", 2, 20000));
+
+ iterator.close();
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = LookupCacheMode.class,
+ names = {"FULL", "MEMORY"})
+ public void testAsyncPartitionRefreshWithNonPkTable(LookupCacheMode mode)
throws Exception {
+ // Verify async partition refresh works correctly with non-primary-key
append tables.
+ sql(
+ "CREATE TABLE NON_PK_DIM (pt STRING, k INT, v INT)"
+ + "PARTITIONED BY (`pt`) WITH ("
+ + "'lookup.dynamic-partition' = 'max_pt()', "
+ + "'lookup.dynamic-partition.refresh-interval' = '1
ms', "
+ + "'lookup.dynamic-partition.refresh.async' = 'true', "
+ + "'lookup.cache' = '%s', "
+ + "'continuous.discovery-interval'='1 ms')",
+ mode);
+
+ sql("INSERT INTO NON_PK_DIM VALUES ('1', 1, 100), ('1', 1, 101), ('1',
2, 200)");
+
+ String query =
+ "SELECT T.i, D.v FROM T LEFT JOIN NON_PK_DIM "
+ + "for system_time as of T.proctime AS D ON T.i = D.k";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2)");
+ List<Row> result = iterator.collect(3);
+ // non-pk table may return multiple matches
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 100), Row.of(1, 101),
Row.of(2, 200));
+
+ // insert new partition '2' to trigger async refresh
+ sql("INSERT INTO NON_PK_DIM VALUES ('2', 1, 1000), ('2', 1, 1001),
('2', 2, 2000)");
+ sql("INSERT INTO T VALUES (1), (2)");
+ iterator.collect(3);
+ Thread.sleep(500);
+ sql("INSERT INTO T VALUES (1), (2)");
+ result = iterator.collect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 1000), Row.of(1, 1001),
Row.of(2, 2000));
+
+ iterator.close();
+ }
}