This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f64ab1d597e [enhance](iceberg) Refactor Iceberg metadata cache
structure and add table cache test cases (#59716)
f64ab1d597e is described below
commit f64ab1d597e687dbe7a1a7299b71d69d23a1047d
Author: Socrates <[email protected]>
AuthorDate: Thu Jan 29 14:38:01 2026 +0800
[enhance](iceberg) Refactor Iceberg metadata cache structure and add table
cache test cases (#59716)
### What problem does this PR solve?
## Description
### Changes
This PR refactors the Iceberg metadata cache structure to improve code
organization and adds comprehensive test cases for table cache behavior.
### Main Changes
#### 1. Refactored IcebergMetadataCache
- Introduced `IcebergTableCacheValue` to encapsulate table-related
metadata
- Removed redundant `snapshotListCache` and `snapshotCache`
- Merged snapshot information into `IcebergTableCacheValue` with lazy
loading
- Simplified cache structure from 3 separate caches to 2: `tableCache`
and `viewCache`
**Before:**
```java
private LoadingCache<IcebergMetadataCacheKey, List<Snapshot>>
snapshotListCache;
private LoadingCache<IcebergMetadataCacheKey, Table> tableCache;
private LoadingCache<IcebergMetadataCacheKey, IcebergSnapshotCacheValue>
snapshotCache;
```
**After:**
```java
private LoadingCache<IcebergMetadataCacheKey, IcebergTableCacheValue>
tableCache;
private LoadingCache<IcebergMetadataCacheKey, View> viewCache;
```
#### 2. Lazy Loading for Snapshot Cache
- Snapshot cache is now loaded on-demand through
`IcebergTableCacheValue.getSnapshotCacheValue()`
- Reduced unnecessary memory footprint for queries that don't require
snapshot information
- Snapshot information is mainly used for MTMV scenarios
#### 3. Simplified Cache API
- `getIcebergTable()`: Returns the Table object directly from
`IcebergTableCacheValue`
- `getSnapshotCache()`: Returns snapshot cache value with lazy loading
- `getSnapshotList()`: Returns snapshot list from the Table object
#### 4. Test Cases
- Added comprehensive test case `test_iceberg_table_cache` to verify
cache behavior
- Tests cover both cache-enabled and cache-disabled scenarios
- Validated external modifications (INSERT, DELETE, UPDATE, schema
changes) are properly handled
### Benefits
| Aspect | Improvement |
|--------|-------------|
| **Memory Usage** | Reduced by eliminating duplicate caching of
snapshot information |
| **Code Structure** | Cleaner with single `IcebergTableCacheValue`
instead of multiple separate caches |
| **Performance** | Better with lazy loading of snapshot cache only when
needed |
| **Maintainability** | Simpler cache management logic |
### Test Results
- Added regression test: `test_iceberg_table_cache.groovy`
- Tests validate cache behavior with TTL and external modifications
- Verified cache invalidation works correctly with `REFRESH TABLE`
- Test scenarios include:
- DML operations (INSERT, DELETE, UPDATE, INSERT OVERWRITE)
- Schema changes (ADD/DROP/RENAME COLUMN, ALTER COLUMN TYPE)
- Partition evolution (ADD/DROP/REPLACE PARTITION FIELD)
### Related Files
**Core Changes:**
- `IcebergMetadataCache.java` - Refactored cache structure
- `IcebergTableCacheValue.java` - New class to encapsulate table
metadata
- `IcebergExternalCatalog.java` - Updated cache-related configurations
**Tests:**
- `test_iceberg_table_cache.groovy` - Comprehensive cache behavior tests
- `Suite.groovy` - Updated `getSparkIcebergContainerName()`
implementation
---
.../doris/datasource/hive/HMSExternalTable.java | 2 +-
.../doris/datasource/hive/IcebergDlaTable.java | 21 +-
.../datasource/iceberg/IcebergExternalCatalog.java | 13 +-
.../datasource/iceberg/IcebergExternalTable.java | 22 +-
.../datasource/iceberg/IcebergMetadataCache.java | 130 ++----
.../datasource/iceberg/IcebergTableCacheValue.java | 49 +++
.../doris/datasource/iceberg/IcebergUtils.java | 143 ++++---
.../iceberg/test_iceberg_table_cache.groovy | 443 +++++++++++++++++++++
8 files changed, 626 insertions(+), 197 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index d05980fadd3..aca6e8eb3a8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -1150,7 +1150,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
return HudiUtils.getHudiMvccSnapshot(tableSnapshot, this);
} else if (getDlaType() == DLAType.ICEBERG) {
return new IcebergMvccSnapshot(
- IcebergUtils.getIcebergSnapshotCacheValue(tableSnapshot,
this, scanParams));
+ IcebergUtils.getSnapshotCacheValue(tableSnapshot, this,
scanParams));
} else {
return new EmptyMvccSnapshot();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
index 81ac23132e8..4868e0a5841 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
@@ -21,7 +21,6 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.datasource.iceberg.IcebergSchemaCacheValue;
import org.apache.doris.datasource.iceberg.IcebergSnapshotCacheValue;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
@@ -52,9 +51,7 @@ public class IcebergDlaTable extends HMSDlaTable {
@Override
public Map<String, PartitionItem>
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
- return Maps.newHashMap(
- IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, hmsTable)
- .getPartitionInfo().getNameToPartitionItem());
+ return Maps.newHashMap(IcebergUtils.getIcebergPartitionItems(snapshot,
hmsTable));
}
@Override
@@ -69,19 +66,13 @@ public class IcebergDlaTable extends HMSDlaTable {
@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
- IcebergSnapshotCacheValue snapshotValue =
- IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, hmsTable);
- IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue(
- hmsTable,
- snapshotValue.getSnapshot().getSchemaId());
- return schemaValue.getPartitionColumns();
+ return IcebergUtils.getIcebergPartitionColumns(snapshot, hmsTable);
}
@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName,
MTMVRefreshContext context,
Optional<MvccSnapshot>
snapshot) throws AnalysisException {
- IcebergSnapshotCacheValue snapshotValue =
- IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, hmsTable);
+ IcebergSnapshotCacheValue snapshotValue =
IcebergUtils.getSnapshotCacheValue(snapshot, hmsTable);
long latestSnapshotId =
snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
// If partition snapshot ID is unavailable (<= 0), fallback to table
snapshot ID
// This can happen when last_updated_snapshot_id is null in Iceberg
metadata
@@ -102,16 +93,14 @@ public class IcebergDlaTable extends HMSDlaTable {
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot)
throws AnalysisException {
hmsTable.makeSureInitialized();
- IcebergSnapshotCacheValue snapshotValue =
- IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, hmsTable);
+ IcebergSnapshotCacheValue snapshotValue =
IcebergUtils.getSnapshotCacheValue(snapshot, hmsTable);
return new
MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
}
@Override
public MTMVSnapshotIf getTableSnapshot(Optional<MvccSnapshot> snapshot)
throws AnalysisException {
hmsTable.makeSureInitialized();
- IcebergSnapshotCacheValue snapshotValue =
- IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, hmsTable);
+ IcebergSnapshotCacheValue snapshotValue =
IcebergUtils.getSnapshotCacheValue(snapshot, hmsTable);
return new
MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 6c03ac8225f..bdf0a73bb73 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -54,7 +54,6 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
public static final String ICEBERG_S3_TABLES = "s3tables";
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
public static final String ICEBERG_TABLE_META_CACHE_TTL_SECOND =
"iceberg.table.meta.cache.ttl-second";
- public static final String ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND =
"iceberg.snapshot.meta.cache.ttl-second";
public static final String ICEBERG_MANIFEST_CACHE_ENABLE =
"iceberg.manifest.cache.enable";
public static final String ICEBERG_MANIFEST_CACHE_CAPACITY_MB =
"iceberg.manifest.cache.capacity-mb";
public static final String ICEBERG_MANIFEST_CACHE_TTL_SECOND =
"iceberg.manifest.cache.ttl-second";
@@ -97,15 +96,6 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
+ tableMetaCacheTtlSecond);
}
- // check iceberg.snapshot.meta.cache.ttl-second parameter
- String partitionCacheTtlSecond =
catalogProperty.getOrDefault(ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND, null);
- if (Objects.nonNull(partitionCacheTtlSecond) &&
NumberUtils.toInt(partitionCacheTtlSecond, CACHE_NO_TTL)
- < CACHE_TTL_DISABLE_CACHE) {
- throw new DdlException(
- "The parameter " + ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND
+ " is wrong, value is "
- + partitionCacheTtlSecond);
- }
-
String manifestCacheEnable =
catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
if (Objects.nonNull(manifestCacheEnable)
&& !(manifestCacheEnable.equalsIgnoreCase("true") ||
manifestCacheEnable.equalsIgnoreCase("false"))) {
@@ -135,8 +125,7 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
super.notifyPropertiesUpdated(updatedProps);
String tableMetaCacheTtl =
updatedProps.getOrDefault(ICEBERG_TABLE_META_CACHE_TTL_SECOND, null);
- String snapshotMetaCacheTtl =
updatedProps.getOrDefault(ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND, null);
- if (Objects.nonNull(tableMetaCacheTtl) ||
Objects.nonNull(snapshotMetaCacheTtl)) {
+ if (Objects.nonNull(tableMetaCacheTtl)) {
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
}
String manifestCacheEnable =
updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
index b8b82463ea5..5f421199b86 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
@@ -135,15 +135,12 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
@Override
public Map<String, PartitionItem>
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
- return Maps.newHashMap(
- IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, this)
- .getPartitionInfo().getNameToPartitionItem());
+ return Maps.newHashMap(IcebergUtils.getIcebergPartitionItems(snapshot,
this));
}
@Override
public Map<String, PartitionItem>
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
- return IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, this)
- .getPartitionInfo().getNameToPartitionItem();
+ return IcebergUtils.getIcebergPartitionItems(snapshot, this);
}
@Override
@@ -158,18 +155,13 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
- IcebergSnapshotCacheValue snapshotValue =
- IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, this);
- IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue(
- this, snapshotValue.getSnapshot().getSchemaId());
- return schemaValue.getPartitionColumns();
+ return IcebergUtils.getIcebergPartitionColumns(snapshot, this);
}
@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName,
MTMVRefreshContext context,
Optional<MvccSnapshot>
snapshot) throws AnalysisException {
- IcebergSnapshotCacheValue snapshotValue =
- IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, this);
+ IcebergSnapshotCacheValue snapshotValue =
IcebergUtils.getSnapshotCacheValue(snapshot, this);
long latestSnapshotId =
snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
// If partition snapshot ID is unavailable (<= 0), fallback to table
snapshot ID
// This can happen when last_updated_snapshot_id is null in Iceberg
metadata
@@ -195,13 +187,13 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
@Override
public MTMVSnapshotIf getTableSnapshot(Optional<MvccSnapshot> snapshot)
throws AnalysisException {
makeSureInitialized();
- IcebergSnapshotCacheValue snapshotValue =
IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, this);
+ IcebergSnapshotCacheValue snapshotValue =
IcebergUtils.getSnapshotCacheValue(snapshot, this);
return new
MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
}
@Override
public long getNewestUpdateVersionOrTime() {
- return IcebergUtils.getIcebergSnapshotCacheValue(Optional.empty(),
this, Optional.empty())
+ return IcebergUtils.getLatestSnapshotCacheValue(this)
.getPartitionInfo().getNameToIcebergPartition().values().stream()
.mapToLong(IcebergPartition::getLastUpdateTime).max().orElse(0);
}
@@ -256,7 +248,7 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
if (isView()) {
return new EmptyMvccSnapshot();
} else {
- return new
IcebergMvccSnapshot(IcebergUtils.getIcebergSnapshotCacheValue(
+ return new IcebergMvccSnapshot(IcebergUtils.getSnapshotCacheValue(
tableSnapshot, this, scanParams));
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index 7b256cbdd2e..5f0c0700efe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -18,7 +18,6 @@
package org.apache.doris.datasource.iceberg;
import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CacheFactory;
import org.apache.doris.common.Config;
@@ -55,9 +54,7 @@ public class IcebergMetadataCache {
private static final Logger LOG =
LogManager.getLogger(IcebergMetadataCache.class);
private final ExecutorService executor;
private final ExternalCatalog catalog;
- private LoadingCache<IcebergMetadataCacheKey, List<Snapshot>>
snapshotListCache;
- private LoadingCache<IcebergMetadataCacheKey, Table> tableCache;
- private LoadingCache<IcebergMetadataCacheKey, IcebergSnapshotCacheValue>
snapshotCache;
+ private LoadingCache<IcebergMetadataCacheKey, IcebergTableCacheValue>
tableCache;
private LoadingCache<IcebergMetadataCacheKey, View> viewCache;
private IcebergManifestCache manifestCache;
@@ -72,19 +69,6 @@ public class IcebergMetadataCache {
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_TABLE_META_CACHE_TTL_SECOND),
ExternalCatalog.CACHE_NO_TTL);
- long snapshotMetaCacheTtlSecond = NumberUtils.toLong(
-
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND),
- ExternalCatalog.CACHE_NO_TTL);
-
- CacheFactory snapshotListCacheFactory = new CacheFactory(
- OptionalLong.of(snapshotMetaCacheTtlSecond >=
ExternalCatalog.CACHE_TTL_DISABLE_CACHE
- ? snapshotMetaCacheTtlSecond :
Config.external_cache_expire_time_seconds_after_access),
- OptionalLong.of(Config.external_cache_refresh_time_minutes *
60),
- Config.max_external_table_cache_num,
- true,
- null);
- this.snapshotListCache =
snapshotListCacheFactory.buildCache(this::loadSnapshots, executor);
-
CacheFactory tableCacheFactory = new CacheFactory(
OptionalLong.of(tableMetaCacheTtlSecond >=
ExternalCatalog.CACHE_TTL_DISABLE_CACHE
? tableMetaCacheTtlSecond :
Config.external_cache_expire_time_seconds_after_access),
@@ -92,16 +76,7 @@ public class IcebergMetadataCache {
Config.max_external_table_cache_num,
true,
null);
- this.tableCache = tableCacheFactory.buildCache(this::loadTable,
executor);
-
- CacheFactory snapshotCacheFactory = new CacheFactory(
- OptionalLong.of(snapshotMetaCacheTtlSecond >=
ExternalCatalog.CACHE_TTL_DISABLE_CACHE
- ? snapshotMetaCacheTtlSecond :
Config.external_cache_expire_time_seconds_after_access),
- OptionalLong.of(Config.external_cache_refresh_time_minutes *
60),
- Config.max_external_table_cache_num,
- true,
- null);
- this.snapshotCache =
snapshotCacheFactory.buildCache(this::loadSnapshot, executor);
+ this.tableCache =
tableCacheFactory.buildCache(this::loadTableCacheValue, executor);
this.viewCache = tableCacheFactory.buildCache(this::loadView,
executor);
long manifestCacheCapacityMb = NumberUtils.toLong(
@@ -116,32 +91,32 @@ public class IcebergMetadataCache {
public Table getIcebergTable(ExternalTable dorisTable) {
IcebergMetadataCacheKey key = new
IcebergMetadataCacheKey(dorisTable.getOrBuildNameMapping());
- return tableCache.get(key);
+ return tableCache.get(key).getIcebergTable();
}
public Table getIcebergTable(IcebergMetadataCacheKey key) {
- return tableCache.get(key);
+ return tableCache.get(key).getIcebergTable();
}
public IcebergSnapshotCacheValue getSnapshotCache(ExternalTable
dorisTable) {
IcebergMetadataCacheKey key = new
IcebergMetadataCacheKey(dorisTable.getOrBuildNameMapping());
- return snapshotCache.get(key);
- }
-
- public IcebergManifestCache getManifestCache() {
- return manifestCache;
+ IcebergTableCacheValue tableCacheValue = tableCache.get(key);
+ return tableCacheValue.getSnapshotCacheValue(() ->
loadSnapshot(dorisTable, tableCacheValue.getIcebergTable()));
}
- @NotNull
- private List<Snapshot> loadSnapshots(IcebergMetadataCacheKey key) {
- Table icebergTable = getIcebergTable(key);
+ public List<Snapshot> getSnapshotList(ExternalTable dorisTable) {
+ Table icebergTable = getIcebergTable(dorisTable);
List<Snapshot> snaps = Lists.newArrayList();
Iterables.addAll(snaps, icebergTable.snapshots());
return snaps;
}
+ public IcebergManifestCache getManifestCache() {
+ return manifestCache;
+ }
+
@NotNull
- private Table loadTable(IcebergMetadataCacheKey key) {
+ private IcebergTableCacheValue loadTableCacheValue(IcebergMetadataCacheKey
key) {
NameMapping nameMapping = key.nameMapping;
CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(nameMapping.getCtlId());
if (catalog == null) {
@@ -160,8 +135,10 @@ public class IcebergMetadataCache {
if (LOG.isDebugEnabled()) {
LOG.debug("load iceberg table {}", nameMapping, new
Exception());
}
- return ((ExternalCatalog)
catalog).getExecutionAuthenticator().execute(()
- -> ops.loadTable(nameMapping.getRemoteDbName(),
nameMapping.getRemoteTblName()));
+ Table table = ((ExternalCatalog)
catalog).getExecutionAuthenticator()
+ .execute(()
+ -> ops.loadTable(nameMapping.getRemoteDbName(),
nameMapping.getRemoteTblName()));
+ return new IcebergTableCacheValue(table);
} catch (Exception e) {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e),
e);
}
@@ -169,38 +146,33 @@ public class IcebergMetadataCache {
}
@NotNull
- private IcebergSnapshotCacheValue loadSnapshot(IcebergMetadataCacheKey
key) throws AnalysisException {
- NameMapping nameMapping = key.nameMapping;
- TableIf dorisTable =
Env.getCurrentEnv().getCatalogMgr().getCatalogOrAnalysisException(nameMapping.getCtlId())
- .getDbOrAnalysisException(nameMapping.getLocalDbName())
- .getTableOrAnalysisException(nameMapping.getLocalTblName());
-
+ private IcebergSnapshotCacheValue loadSnapshot(ExternalTable dorisTable,
Table icebergTable) {
if (!(dorisTable instanceof MTMVRelatedTableIf)) {
- throw new AnalysisException(String.format("Table %s.%s is not a
valid MTMV related table.",
- nameMapping.getLocalDbName(),
nameMapping.getLocalTblName()));
+ throw new RuntimeException(String.format("Table %s.%s is not a
valid MTMV related table.",
+ dorisTable.getDbName(), dorisTable.getName()));
}
- MTMVRelatedTableIf table = (MTMVRelatedTableIf) dorisTable;
- IcebergSnapshot lastedIcebergSnapshot =
IcebergUtils.getLastedIcebergSnapshot((ExternalTable) table);
- IcebergPartitionInfo icebergPartitionInfo;
- if (!table.isValidRelatedTable()) {
- icebergPartitionInfo = IcebergPartitionInfo.empty();
- } else {
- icebergPartitionInfo =
IcebergUtils.loadPartitionInfo((ExternalTable) table,
- lastedIcebergSnapshot.getSnapshotId());
+ try {
+ MTMVRelatedTableIf table = (MTMVRelatedTableIf) dorisTable;
+ IcebergSnapshot latestIcebergSnapshot =
IcebergUtils.getLatestIcebergSnapshot(icebergTable);
+ IcebergPartitionInfo icebergPartitionInfo;
+ if (!table.isValidRelatedTable()) {
+ icebergPartitionInfo = IcebergPartitionInfo.empty();
+ } else {
+ icebergPartitionInfo =
IcebergUtils.loadPartitionInfo(dorisTable, icebergTable,
+ latestIcebergSnapshot.getSnapshotId(),
latestIcebergSnapshot.getSchemaId());
+ }
+ return new IcebergSnapshotCacheValue(icebergPartitionInfo,
latestIcebergSnapshot);
+ } catch (AnalysisException e) {
+ throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e),
e);
}
- return new IcebergSnapshotCacheValue(icebergPartitionInfo,
lastedIcebergSnapshot);
}
public void invalidateCatalogCache(long catalogId) {
- snapshotListCache.asMap().keySet().stream()
- .filter(key -> key.nameMapping.getCtlId() == catalogId)
- .forEach(snapshotListCache::invalidate);
-
tableCache.asMap().entrySet().stream()
.filter(entry -> entry.getKey().nameMapping.getCtlId() ==
catalogId)
.forEach(entry -> {
- ManifestFiles.dropCache(entry.getValue().io());
+
ManifestFiles.dropCache(entry.getValue().getIcebergTable().io());
if (LOG.isDebugEnabled()) {
LOG.info("invalidate iceberg table cache {} when
invalidating catalog cache",
entry.getKey().nameMapping, new Exception());
@@ -208,10 +180,6 @@ public class IcebergMetadataCache {
tableCache.invalidate(entry.getKey());
});
- snapshotCache.asMap().keySet().stream()
- .filter(key -> key.nameMapping.getCtlId() == catalogId)
- .forEach(snapshotCache::invalidate);
-
viewCache.asMap().entrySet().stream()
.filter(entry -> entry.getKey().nameMapping.getCtlId() ==
catalogId)
.forEach(entry -> viewCache.invalidate(entry.getKey()));
@@ -222,12 +190,6 @@ public class IcebergMetadataCache {
long catalogId = dorisTable.getCatalog().getId();
String dbName = dorisTable.getDbName();
String tblName = dorisTable.getName();
- snapshotListCache.asMap().keySet().stream()
- .filter(key -> key.nameMapping.getCtlId() == catalogId
- && key.nameMapping.getLocalDbName().equals(dbName)
- && key.nameMapping.getLocalTblName().equals(tblName))
- .forEach(snapshotListCache::invalidate);
-
tableCache.asMap().entrySet().stream()
.filter(entry -> {
IcebergMetadataCacheKey key = entry.getKey();
@@ -236,19 +198,13 @@ public class IcebergMetadataCache {
&&
key.nameMapping.getLocalTblName().equals(tblName);
})
.forEach(entry -> {
- ManifestFiles.dropCache(entry.getValue().io());
+
ManifestFiles.dropCache(entry.getValue().getIcebergTable().io());
if (LOG.isDebugEnabled()) {
LOG.info("invalidate iceberg table cache {}",
entry.getKey().nameMapping, new Exception());
}
tableCache.invalidate(entry.getKey());
});
-
- snapshotCache.asMap().keySet().stream()
- .filter(key -> key.nameMapping.getCtlId() == catalogId
- && key.nameMapping.getLocalDbName().equals(dbName)
- && key.nameMapping.getLocalTblName().equals(tblName))
- .forEach(snapshotCache::invalidate);
viewCache.asMap().entrySet().stream()
.filter(entry -> {
IcebergMetadataCacheKey key = entry.getKey();
@@ -260,11 +216,6 @@ public class IcebergMetadataCache {
}
public void invalidateDbCache(long catalogId, String dbName) {
- snapshotListCache.asMap().keySet().stream()
- .filter(key -> key.nameMapping.getCtlId() == catalogId
- && key.nameMapping.getLocalDbName().equals(dbName))
- .forEach(snapshotListCache::invalidate);
-
tableCache.asMap().entrySet().stream()
.filter(entry -> {
IcebergMetadataCacheKey key = entry.getKey();
@@ -272,18 +223,13 @@ public class IcebergMetadataCache {
&& key.nameMapping.getLocalDbName().equals(dbName);
})
.forEach(entry -> {
- ManifestFiles.dropCache(entry.getValue().io());
+
ManifestFiles.dropCache(entry.getValue().getIcebergTable().io());
if (LOG.isDebugEnabled()) {
LOG.info("invalidate iceberg table cache {} when
invalidating db cache",
entry.getKey().nameMapping, new Exception());
}
tableCache.invalidate(entry.getKey());
});
-
- snapshotCache.asMap().keySet().stream()
- .filter(key -> key.nameMapping.getCtlId() == catalogId
- && key.nameMapping.getLocalDbName().equals(dbName))
- .forEach(snapshotCache::invalidate);
viewCache.asMap().entrySet().stream()
.filter(entry -> {
IcebergMetadataCacheKey key = entry.getKey();
@@ -334,12 +280,8 @@ public class IcebergMetadataCache {
public Map<String, Map<String, String>> getCacheStats() {
Map<String, Map<String, String>> res = Maps.newHashMap();
- res.put("iceberg_snapshot_list_cache",
ExternalMetaCacheMgr.getCacheStats(snapshotListCache.stats(),
- snapshotListCache.estimatedSize()));
res.put("iceberg_table_cache",
ExternalMetaCacheMgr.getCacheStats(tableCache.stats(),
tableCache.estimatedSize()));
- res.put("iceberg_snapshot_cache",
ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(),
- snapshotCache.estimatedSize()));
return res;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTableCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTableCacheValue.java
new file mode 100644
index 00000000000..b133a912522
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTableCacheValue.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.iceberg.Table;
+
+import java.util.function.Supplier;
+
+public class IcebergTableCacheValue {
+ private final Table icebergTable;
+
+ private volatile boolean snapshotCacheLoaded;
+ private volatile IcebergSnapshotCacheValue snapshotCacheValue;
+
+ public IcebergTableCacheValue(Table icebergTable) {
+ this.icebergTable = icebergTable;
+ }
+
+ public Table getIcebergTable() {
+ return icebergTable;
+ }
+
+ public IcebergSnapshotCacheValue
getSnapshotCacheValue(Supplier<IcebergSnapshotCacheValue> loader) {
+ if (!snapshotCacheLoaded) {
+ synchronized (this) {
+ if (!snapshotCacheLoaded) {
+ snapshotCacheValue = loader.get();
+ snapshotCacheLoaded = true;
+ }
+ }
+ }
+ return snapshotCacheValue;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index a4475392a2d..e14a79cf3ea 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -732,9 +732,16 @@ public class IcebergUtils {
}
public static Table getIcebergTable(ExternalTable dorisTable) {
- return Env.getCurrentEnv()
- .getExtMetaCacheMgr()
-
.getIcebergMetadataCache(dorisTable.getCatalog()).getIcebergTable(dorisTable);
+ return
icebergMetadataCache(dorisTable.getCatalog()).getIcebergTable(dorisTable);
+ }
+
+ // Centralize cache access to keep call sites consistent and easy to
understand.
+ private static IcebergMetadataCache icebergMetadataCache(ExternalCatalog
catalog) {
+ return
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(catalog);
+ }
+
+ private static ExternalSchemaCache schemaCache(ExternalCatalog catalog) {
+ return
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
}
public static org.apache.iceberg.types.Type dorisTypeToIcebergType(Type
type) {
@@ -923,7 +930,8 @@ public class IcebergUtils {
/**
* Get iceberg schema from catalog and convert them to doris schema
*/
- private static List<Column> getSchema(ExternalTable dorisTable, long
schemaId, boolean isView) {
+ private static List<Column> getSchema(ExternalTable dorisTable, long
schemaId, boolean isView,
+ Table icebergTable) {
try {
return
dorisTable.getCatalog().getExecutionAuthenticator().execute(() -> {
Schema schema;
@@ -935,11 +943,11 @@ public class IcebergUtils {
schema = icebergView.schemas().get((int) schemaId);
}
} else {
- Table icebergTable = getIcebergTable(dorisTable);
- if (schemaId == NEWEST_SCHEMA_ID ||
icebergTable.currentSnapshot() == null) {
- schema = icebergTable.schema();
+ Table table = icebergTable != null ? icebergTable :
getIcebergTable(dorisTable);
+ if (schemaId == NEWEST_SCHEMA_ID ||
table.currentSnapshot() == null) {
+ schema = table.schema();
} else {
- schema = icebergTable.schemas().get((int) schemaId);
+ schema = table.schemas().get((int) schemaId);
}
}
String type = isView ? "view" : "table";
@@ -989,10 +997,7 @@ public class IcebergUtils {
// the table may be null when the iceberg metadata cache is not
loaded.But I don't think it's a problem,
// because the NPE would be caught in the caller and return the
default value -1.
// Meanwhile, it will trigger iceberg metadata cache to load the
table, so we can get it next time.
- Table icebergTable = Env.getCurrentEnv()
- .getExtMetaCacheMgr()
- .getIcebergMetadataCache(tbl.getCatalog())
- .getIcebergTable(tbl);
+ Table icebergTable = getIcebergTable(tbl);
Snapshot snapshot = icebergTable.currentSnapshot();
if (snapshot == null) {
LOG.info("Iceberg table {}.{}.{} is empty, return -1.",
@@ -1215,8 +1220,7 @@ public class IcebergUtils {
// read schema from external schema cache
public static IcebergSchemaCacheValue getSchemaCacheValue(ExternalTable
dorisTable, long schemaId) {
- ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(dorisTable.getCatalog());
- Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
+ Optional<SchemaCacheValue> schemaCacheValue =
schemaCache(dorisTable.getCatalog()).getSchemaValue(
new IcebergSchemaCacheKey(dorisTable.getOrBuildNameMapping(),
schemaId));
if (!schemaCacheValue.isPresent()) {
throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
@@ -1225,20 +1229,29 @@ public class IcebergUtils {
return (IcebergSchemaCacheValue) schemaCacheValue.get();
}
- public static IcebergSnapshot getLastedIcebergSnapshot(ExternalTable
dorisTable) {
- Table table = IcebergUtils.getIcebergTable(dorisTable);
+ public static IcebergSnapshot getLatestIcebergSnapshot(Table table) {
Snapshot snapshot = table.currentSnapshot();
long snapshotId = snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID
: snapshot.snapshotId();
- return new IcebergSnapshot(snapshotId, table.schema().schemaId());
+ // Use the latest table schema even if the current snapshot doesn't
advance its schemaId,
+ // e.g. schema-only changes without a new snapshot.
+ long schemaId = table.schema().schemaId();
+ return new IcebergSnapshot(snapshotId, schemaId);
}
- public static IcebergPartitionInfo loadPartitionInfo(ExternalTable
dorisTable, long snapshotId)
+ public static IcebergPartitionInfo loadPartitionInfo(ExternalTable
dorisTable, Table table, long snapshotId)
throws AnalysisException {
// snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table,
haven't contained any snapshot yet.
if (snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID) {
return IcebergPartitionInfo.empty();
}
- Table table = getIcebergTable(dorisTable);
+ return loadPartitionInfo(dorisTable, table, snapshotId,
table.snapshot(snapshotId).schemaId());
+ }
+
+ public static IcebergPartitionInfo loadPartitionInfo(ExternalTable
dorisTable, Table table, long snapshotId,
+ long schemaId) throws AnalysisException {
+ if (snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID) {
+ return IcebergPartitionInfo.empty();
+ }
List<IcebergPartition> icebergPartitions;
try {
icebergPartitions =
dorisTable.getCatalog().getExecutionAuthenticator()
@@ -1252,8 +1265,7 @@ public class IcebergUtils {
Map<String, IcebergPartition> nameToPartition = Maps.newHashMap();
Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
- List<Column> partitionColumns = IcebergUtils.getSchemaCacheValue(
- dorisTable,
table.snapshot(snapshotId).schemaId()).getPartitionColumns();
+ List<Column> partitionColumns =
IcebergUtils.getSchemaCacheValue(dorisTable, schemaId).getPartitionColumns();
for (IcebergPartition partition : icebergPartitions) {
nameToPartition.put(partition.getPartitionName(), partition);
String transform =
table.specs().get(partition.getSpecId()).fields().get(0).transform().toString();
@@ -1479,13 +1491,28 @@ public class IcebergUtils {
}
}
- public static IcebergSnapshotCacheValue getIcebergSnapshotCacheValue(
+ public static IcebergSchemaCacheValue getSchemaCacheValue(ExternalTable
dorisTable, IcebergSnapshotCacheValue sv) {
+ return getSchemaCacheValue(dorisTable, sv.getSnapshot().getSchemaId());
+ }
+
+ public static IcebergSnapshotCacheValue
getLatestSnapshotCacheValue(ExternalTable dorisTable) {
+ return
icebergMetadataCache(dorisTable.getCatalog()).getSnapshotCache(dorisTable);
+ }
+
+ public static IcebergSnapshotCacheValue
getSnapshotCacheValue(Optional<MvccSnapshot> snapshot,
+ ExternalTable dorisTable) {
+ if (snapshot.isPresent() && snapshot.get() instanceof
IcebergMvccSnapshot) {
+ return ((IcebergMvccSnapshot)
snapshot.get()).getSnapshotCacheValue();
+ }
+ return getLatestSnapshotCacheValue(dorisTable);
+ }
+
+ public static IcebergSnapshotCacheValue getSnapshotCacheValue(
Optional<TableSnapshot> tableSnapshot,
ExternalTable dorisTable,
Optional<TableScanParams> scanParams) {
if (tableSnapshot.isPresent() ||
IcebergUtils.isIcebergBranchOrTag(scanParams)) {
- // If a snapshot is specified,
- // use the specified snapshot and the corresponding schema(not the
latest schema).
+ // If a snapshot is specified, use the specified snapshot and the
corresponding schema (not latest).
Table icebergTable = getIcebergTable(dorisTable);
IcebergTableQueryInfo info;
try {
@@ -1496,53 +1523,54 @@ public class IcebergUtils {
return new IcebergSnapshotCacheValue(
IcebergPartitionInfo.empty(),
new IcebergSnapshot(info.getSnapshotId(),
info.getSchemaId()));
- } else {
- // Otherwise, use the latest snapshot and the latest schema.
- return Env.getCurrentEnv().getExtMetaCacheMgr()
- .getIcebergMetadataCache(dorisTable.getCatalog())
- .getSnapshotCache(dorisTable);
}
+ return getLatestSnapshotCacheValue(dorisTable);
}
public static List<Column> getIcebergSchema(ExternalTable dorisTable) {
Optional<MvccSnapshot> snapshotFromContext =
MvccUtil.getSnapshotFromContext(dorisTable);
- IcebergSnapshotCacheValue cacheValue =
- IcebergUtils.getOrFetchSnapshotCacheValue(snapshotFromContext,
dorisTable);
- return IcebergUtils.getSchemaCacheValue(dorisTable,
cacheValue.getSnapshot().getSchemaId())
- .getSchema();
+ IcebergSnapshotCacheValue cacheValue =
IcebergUtils.getSnapshotCacheValue(snapshotFromContext, dorisTable);
+ return IcebergUtils.getSchemaCacheValue(dorisTable,
cacheValue).getSchema();
+ }
+
+ public static List<Column>
getIcebergPartitionColumns(Optional<MvccSnapshot> snapshot, ExternalTable
dorisTable) {
+ IcebergSnapshotCacheValue snapshotValue =
getSnapshotCacheValue(snapshot, dorisTable);
+ return getSchemaCacheValue(dorisTable,
snapshotValue).getPartitionColumns();
}
- public static IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue(
- Optional<MvccSnapshot> snapshot,
+ public static Map<String, PartitionItem>
getIcebergPartitionItems(Optional<MvccSnapshot> snapshot,
ExternalTable dorisTable) {
- if (snapshot.isPresent()) {
- return ((IcebergMvccSnapshot)
snapshot.get()).getSnapshotCacheValue();
- } else {
- return IcebergUtils.getIcebergSnapshotCacheValue(Optional.empty(),
dorisTable, Optional.empty());
- }
+ return getSnapshotCacheValue(snapshot,
dorisTable).getPartitionInfo().getNameToPartitionItem();
}
public static View getIcebergView(ExternalTable dorisTable) {
- IcebergMetadataCache metadataCache =
Env.getCurrentEnv().getExtMetaCacheMgr()
- .getIcebergMetadataCache(dorisTable.getCatalog());
- return metadataCache.getIcebergView(dorisTable);
+ return
icebergMetadataCache(dorisTable.getCatalog()).getIcebergView(dorisTable);
}
public static Optional<SchemaCacheValue> loadSchemaCacheValue(
ExternalTable dorisTable, long schemaId, boolean isView) {
- List<Column> schema = IcebergUtils.getSchema(dorisTable, schemaId,
isView);
+ return isView
+ ? loadViewSchemaCacheValue(dorisTable, schemaId)
+ : loadTableSchemaCacheValue(dorisTable, schemaId);
+ }
+
+ private static Optional<SchemaCacheValue>
loadViewSchemaCacheValue(ExternalTable dorisTable, long schemaId) {
+ List<Column> schema = IcebergUtils.getSchema(dorisTable, schemaId,
true, null);
+ return Optional.of(new IcebergSchemaCacheValue(schema,
Lists.newArrayList()));
+ }
+
+ private static Optional<SchemaCacheValue>
loadTableSchemaCacheValue(ExternalTable dorisTable, long schemaId) {
+ Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
+ List<Column> schema = IcebergUtils.getSchema(dorisTable, schemaId,
false, icebergTable);
+ // get table partition column info
List<Column> tmpColumns = Lists.newArrayList();
- if (!isView) {
- // get table partition column info
- Table table = IcebergUtils.getIcebergTable(dorisTable);
- PartitionSpec spec = table.spec();
- for (PartitionField field : spec.fields()) {
- Types.NestedField col =
table.schema().findField(field.sourceId());
- for (Column c : schema) {
- if (c.getName().equalsIgnoreCase(col.name())) {
- tmpColumns.add(c);
- break;
- }
+ PartitionSpec spec = icebergTable.spec();
+ for (PartitionField field : spec.fields()) {
+ Types.NestedField col =
icebergTable.schema().findField(field.sourceId());
+ for (Column c : schema) {
+ if (c.getName().equalsIgnoreCase(col.name())) {
+ tmpColumns.add(c);
+ break;
}
}
}
@@ -1556,10 +1584,7 @@ public class IcebergUtils {
}
public static IcebergManifestCache getManifestCache(ExternalCatalog
catalog) {
- return Env.getCurrentEnv()
- .getExtMetaCacheMgr()
- .getIcebergMetadataCache(catalog)
- .getManifestCache();
+ return icebergMetadataCache(catalog).getManifestCache();
}
public static boolean isManifestCacheEnabled(ExternalCatalog catalog) {
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy
new file mode 100644
index 00000000000..7cc9f6af0b7
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy
@@ -0,0 +1,443 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_table_cache",
"p0,external,doris,external_docker,external_docker_doris") {
+
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+ String restPort = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minioPort = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ String catalogWithCache = "test_iceberg_table_cache_with_cache"
+ String catalogNoCache = "test_iceberg_table_cache_no_cache"
+ String testDb = "cache_test_db"
+
+ // Create catalogs
+ sql """drop catalog if exists ${catalogWithCache}"""
+ sql """drop catalog if exists ${catalogNoCache}"""
+
+ // Catalog with cache enabled (default)
+ sql """
+ CREATE CATALOG ${catalogWithCache} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${restPort}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minioPort}",
+ "s3.region" = "us-east-1"
+ );
+ """
+
+ // Catalog with cache disabled (TTL=0)
+ sql """
+ CREATE CATALOG ${catalogNoCache} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${restPort}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minioPort}",
+ "s3.region" = "us-east-1",
+ "iceberg.table.meta.cache.ttl-second" = "0"
+ );
+ """
+
+ try {
+ // Create test database via Spark
+ spark_iceberg "CREATE DATABASE IF NOT EXISTS demo.${testDb}"
+
+ // ==================== Test 1: DML Operations ====================
+ logger.info("========== Test 1: DML Operations ==========")
+
+ // Test 1.1: INSERT
+ logger.info("--- Test 1.1: External INSERT ---")
+ spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_insert"
+ spark_iceberg "CREATE TABLE demo.${testDb}.test_insert (id INT, name
STRING) USING iceberg"
+ spark_iceberg "INSERT INTO demo.${testDb}.test_insert VALUES (1,
'initial')"
+
+ // Query from Doris to cache the data
+ sql """switch ${catalogWithCache}"""
+ def result1 = sql """select * from ${testDb}.test_insert order by id"""
+ logger.info("Initial data (with cache): ${result1}")
+ assertEquals(1, result1.size())
+
+ sql """switch ${catalogNoCache}"""
+ def result1_no_cache = sql """select * from ${testDb}.test_insert
order by id"""
+ logger.info("Initial data (no cache): ${result1_no_cache}")
+ assertEquals(1, result1_no_cache.size())
+
+ // External INSERT via Spark
+ spark_iceberg "INSERT INTO demo.${testDb}.test_insert VALUES (2,
'external_insert')"
+
+ // Query without refresh - cached catalog should see old data
+ sql """switch ${catalogWithCache}"""
+ def result2 = sql """select * from ${testDb}.test_insert order by id"""
+ logger.info("After external INSERT (with cache, no refresh):
${result2}")
+ assertEquals(1, result2.size()) // Should still see 1 row due to cache
+
+ // Query without refresh - no-cache catalog should see new data
+ sql """switch ${catalogNoCache}"""
+ def result2_no_cache = sql """select * from ${testDb}.test_insert
order by id"""
+ logger.info("After external INSERT (no cache): ${result2_no_cache}")
+ assertEquals(2, result2_no_cache.size()) // Should see 2 rows
+
+ // Refresh and verify
+ sql """switch ${catalogWithCache}"""
+ sql """refresh table ${testDb}.test_insert"""
+ def result3 = sql """select * from ${testDb}.test_insert order by id"""
+ logger.info("After REFRESH TABLE (with cache): ${result3}")
+ assertEquals(2, result3.size()) // Should now see 2 rows
+
+ // Test 1.2: DELETE
+ logger.info("--- Test 1.2: External DELETE ---")
+ spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_delete"
+ spark_iceberg "CREATE TABLE demo.${testDb}.test_delete (id INT, name
STRING) USING iceberg TBLPROPERTIES ('format-version'='2')"
+ spark_iceberg "INSERT INTO demo.${testDb}.test_delete VALUES (1,
'row1'), (2, 'row2'), (3, 'row3')"
+
+ // Cache the data
+ sql """switch ${catalogWithCache}"""
+ def del_result1 = sql """select * from ${testDb}.test_delete order by
id"""
+ assertEquals(3, del_result1.size())
+
+ sql """switch ${catalogNoCache}"""
+ def del_result1_nc = sql """select * from ${testDb}.test_delete order
by id"""
+ assertEquals(3, del_result1_nc.size())
+
+ // External DELETE via Spark
+ spark_iceberg "DELETE FROM demo.${testDb}.test_delete WHERE id = 2"
+
+ // Verify cache behavior
+ sql """switch ${catalogWithCache}"""
+ def del_result2 = sql """select * from ${testDb}.test_delete order by
id"""
+ logger.info("After external DELETE (with cache, no refresh):
${del_result2}")
+ assertEquals(3, del_result2.size()) // Should still see 3 rows
+
+ sql """switch ${catalogNoCache}"""
+ def del_result2_nc = sql """select * from ${testDb}.test_delete order
by id"""
+ logger.info("After external DELETE (no cache): ${del_result2_nc}")
+ assertEquals(2, del_result2_nc.size()) // Should see 2 rows
+
+ sql """switch ${catalogWithCache}"""
+ sql """refresh table ${testDb}.test_delete"""
+ def del_result3 = sql """select * from ${testDb}.test_delete order by
id"""
+ assertEquals(2, del_result3.size())
+
+ // Test 1.3: UPDATE
+ logger.info("--- Test 1.3: External UPDATE ---")
+ spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_update"
+ spark_iceberg "CREATE TABLE demo.${testDb}.test_update (id INT, value
INT) USING iceberg TBLPROPERTIES ('format-version'='2')"
+ spark_iceberg "INSERT INTO demo.${testDb}.test_update VALUES (1, 100),
(2, 200)"
+
+ // Cache the data
+ sql """switch ${catalogWithCache}"""
+ def upd_result1 = sql """select * from ${testDb}.test_update order by
id"""
+ assertEquals(2, upd_result1.size())
+ assertEquals(100, upd_result1[0][1])
+
+ // External UPDATE via Spark
+ spark_iceberg "UPDATE demo.${testDb}.test_update SET value = 999 WHERE
id = 1"
+
+ // Verify cache behavior
+ sql """switch ${catalogWithCache}"""
+ def upd_result2 = sql """select * from ${testDb}.test_update order by
id"""
+ logger.info("After external UPDATE (with cache, no refresh):
${upd_result2}")
+ assertEquals(100, upd_result2[0][1]) // Should still see old value
+
+ sql """switch ${catalogNoCache}"""
+ def upd_result2_nc = sql """select * from ${testDb}.test_update order
by id"""
+ logger.info("After external UPDATE (no cache): ${upd_result2_nc}")
+ assertEquals(999, upd_result2_nc[0][1]) // Should see new value
+
+ sql """switch ${catalogWithCache}"""
+ sql """refresh table ${testDb}.test_update"""
+ def upd_result3 = sql """select * from ${testDb}.test_update order by
id"""
+ assertEquals(999, upd_result3[0][1])
+
+ // Test 1.4: INSERT OVERWRITE
+ logger.info("--- Test 1.4: External INSERT OVERWRITE ---")
+ spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_overwrite"
+ spark_iceberg "CREATE TABLE demo.${testDb}.test_overwrite (id INT,
name STRING) USING iceberg"
+ spark_iceberg "INSERT INTO demo.${testDb}.test_overwrite VALUES (1,
'old1'), (2, 'old2')"
+
+ // Cache the data
+ sql """switch ${catalogWithCache}"""
+ def ow_result1 = sql """select * from ${testDb}.test_overwrite order
by id"""
+ assertEquals(2, ow_result1.size())
+
+ // External INSERT OVERWRITE via Spark
+ spark_iceberg "INSERT OVERWRITE demo.${testDb}.test_overwrite SELECT
10, 'new'"
+
+ // Verify cache behavior
+ sql """switch ${catalogWithCache}"""
+ def ow_result2 = sql """select * from ${testDb}.test_overwrite order
by id"""
+ logger.info("After external INSERT OVERWRITE (with cache, no refresh):
${ow_result2}")
+ assertEquals(2, ow_result2.size()) // Should still see old data
+
+ sql """switch ${catalogNoCache}"""
+ def ow_result2_nc = sql """select * from ${testDb}.test_overwrite
order by id"""
+ logger.info("After external INSERT OVERWRITE (no cache):
${ow_result2_nc}")
+ assertEquals(1, ow_result2_nc.size()) // Should see new data
+
+ sql """switch ${catalogWithCache}"""
+ sql """refresh table ${testDb}.test_overwrite"""
+ def ow_result3 = sql """select * from ${testDb}.test_overwrite order
by id"""
+ assertEquals(1, ow_result3.size())
+
+ // ==================== Test 2: Schema Change Operations
====================
+ logger.info("========== Test 2: Schema Change Operations ==========")
+
+ // Test 2.1: ADD COLUMN
+ logger.info("--- Test 2.1: External ADD COLUMN ---")
+ spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_add_column"
+ spark_iceberg "CREATE TABLE demo.${testDb}.test_add_column (id INT,
name STRING) USING iceberg"
+ spark_iceberg "INSERT INTO demo.${testDb}.test_add_column VALUES (1,
'test')"
+
+ // Cache the schema
+ sql """switch ${catalogWithCache}"""
+ def add_col_desc1 = sql """desc ${testDb}.test_add_column"""
+ logger.info("Initial schema (with cache): ${add_col_desc1}")
+ assertEquals(2, add_col_desc1.size())
+
+ sql """switch ${catalogNoCache}"""
+ def add_col_desc1_nc = sql """desc ${testDb}.test_add_column"""
+ assertEquals(2, add_col_desc1_nc.size())
+
+ // External ADD COLUMN via Spark
+ spark_iceberg "ALTER TABLE demo.${testDb}.test_add_column ADD COLUMN
new_col INT"
+
+ // Verify cache behavior
+ sql """switch ${catalogWithCache}"""
+ def add_col_desc2 = sql """desc ${testDb}.test_add_column"""
+ logger.info("After external ADD COLUMN (with cache, no refresh):
${add_col_desc2}")
+ assertEquals(2, add_col_desc2.size()) // Should still see 2 columns
+
+ sql """switch ${catalogNoCache}"""
+ def add_col_desc2_nc = sql """desc ${testDb}.test_add_column"""
+ logger.info("After external ADD COLUMN (no cache):
${add_col_desc2_nc}")
+ assertEquals(3, add_col_desc2_nc.size()) // Should see 3 columns
+
+ sql """switch ${catalogWithCache}"""
+ sql """refresh table ${testDb}.test_add_column"""
+ def add_col_desc3 = sql """desc ${testDb}.test_add_column"""
+ assertEquals(3, add_col_desc3.size())
+
+ // Test 2.2: DROP COLUMN
+ logger.info("--- Test 2.2: External DROP COLUMN ---")
+ spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_drop_column"
+ spark_iceberg "CREATE TABLE demo.${testDb}.test_drop_column (id INT,
name STRING, to_drop INT) USING iceberg"
+ spark_iceberg "INSERT INTO demo.${testDb}.test_drop_column VALUES (1,
'test', 100)"
+
+ // Cache the schema
+ sql """switch ${catalogWithCache}"""
+ def drop_col_desc1 = sql """desc ${testDb}.test_drop_column"""
+ assertEquals(3, drop_col_desc1.size())
+
+ // External DROP COLUMN via Spark
+ spark_iceberg "ALTER TABLE demo.${testDb}.test_drop_column DROP COLUMN
to_drop"
+
+ // Verify cache behavior
+ sql """switch ${catalogWithCache}"""
+ def drop_col_desc2 = sql """desc ${testDb}.test_drop_column"""
+ logger.info("After external DROP COLUMN (with cache, no refresh):
${drop_col_desc2}")
+ assertEquals(3, drop_col_desc2.size()) // Should still see 3 columns
+
+ sql """switch ${catalogNoCache}"""
+ def drop_col_desc2_nc = sql """desc ${testDb}.test_drop_column"""
+ logger.info("After external DROP COLUMN (no cache):
${drop_col_desc2_nc}")
+ assertEquals(2, drop_col_desc2_nc.size()) // Should see 2 columns
+
+ sql """switch ${catalogWithCache}"""
+ sql """refresh table ${testDb}.test_drop_column"""
+ def drop_col_desc3 = sql """desc ${testDb}.test_drop_column"""
+ assertEquals(2, drop_col_desc3.size())
+
+ // Test 2.3: RENAME COLUMN
+ logger.info("--- Test 2.3: External RENAME COLUMN ---")
+ spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_rename_column"
+ spark_iceberg "CREATE TABLE demo.${testDb}.test_rename_column (id INT,
old_name STRING) USING iceberg"
+ spark_iceberg "INSERT INTO demo.${testDb}.test_rename_column VALUES
(1, 'test')"
+
+ // Cache the schema
+ sql """switch ${catalogWithCache}"""
+ def rename_col_desc1 = sql """desc ${testDb}.test_rename_column"""
+ logger.info("Initial schema: ${rename_col_desc1}")
+ assertTrue(rename_col_desc1.toString().contains("old_name"))
+
+ // External RENAME COLUMN via Spark
+ spark_iceberg "ALTER TABLE demo.${testDb}.test_rename_column RENAME
COLUMN old_name TO new_name"
+
+ // Verify cache behavior
+ sql """switch ${catalogWithCache}"""
+ def rename_col_desc2 = sql """desc ${testDb}.test_rename_column"""
+ logger.info("After external RENAME COLUMN (with cache, no refresh):
${rename_col_desc2}")
+ assertTrue(rename_col_desc2.toString().contains("old_name")) //
Should still see old name
+
+ sql """switch ${catalogNoCache}"""
+ def rename_col_desc2_nc = sql """desc ${testDb}.test_rename_column"""
+ logger.info("After external RENAME COLUMN (no cache):
${rename_col_desc2_nc}")
+ assertTrue(rename_col_desc2_nc.toString().contains("new_name")) //
Should see new name
+
+ sql """switch ${catalogWithCache}"""
+ sql """refresh table ${testDb}.test_rename_column"""
+ def rename_col_desc3 = sql """desc ${testDb}.test_rename_column"""
+ assertTrue(rename_col_desc3.toString().contains("new_name"))
+
+ // Test 2.4: ALTER COLUMN TYPE
+ logger.info("--- Test 2.4: External ALTER COLUMN TYPE ---")
+ spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_alter_type"
+ spark_iceberg "CREATE TABLE demo.${testDb}.test_alter_type (id INT,
value INT) USING iceberg"
+ spark_iceberg "INSERT INTO demo.${testDb}.test_alter_type VALUES (1,
100)"
+
+ // Cache the schema
+ sql """switch ${catalogWithCache}"""
+ def alter_type_desc1 = sql """desc ${testDb}.test_alter_type"""
+ logger.info("Initial schema: ${alter_type_desc1}")
+ // value column should be INT
+
assertTrue(alter_type_desc1[1][1].toString().toLowerCase().contains("int"))
+
+ // External ALTER COLUMN TYPE via Spark (INT -> BIGINT)
+ spark_iceberg "ALTER TABLE demo.${testDb}.test_alter_type ALTER COLUMN
value TYPE BIGINT"
+
+ // Verify cache behavior
+ sql """switch ${catalogWithCache}"""
+ def alter_type_desc2 = sql """desc ${testDb}.test_alter_type"""
+ logger.info("After external ALTER TYPE (with cache, no refresh):
${alter_type_desc2}")
+
assertTrue(alter_type_desc2[1][1].toString().toLowerCase().contains("int")) //
Should still see INT
+
+ sql """switch ${catalogNoCache}"""
+ def alter_type_desc2_nc = sql """desc ${testDb}.test_alter_type"""
+ logger.info("After external ALTER TYPE (no cache):
${alter_type_desc2_nc}")
+
assertTrue(alter_type_desc2_nc[1][1].toString().toLowerCase().contains("bigint"))
// Should see BIGINT
+
+ sql """switch ${catalogWithCache}"""
+ sql """refresh table ${testDb}.test_alter_type"""
+ def alter_type_desc3 = sql """desc ${testDb}.test_alter_type"""
+
assertTrue(alter_type_desc3[1][1].toString().toLowerCase().contains("bigint"))
+
+ // ==================== Test 3: Partition Evolution
====================
+ logger.info("========== Test 3: Partition Evolution ==========")
+
+ // Test 3.1: ADD PARTITION FIELD
+ logger.info("--- Test 3.1: External ADD PARTITION FIELD ---")
+ spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_add_partition"
+ spark_iceberg "CREATE TABLE demo.${testDb}.test_add_partition (id INT,
dt DATE, value INT) USING iceberg"
+ spark_iceberg "INSERT INTO demo.${testDb}.test_add_partition VALUES
(1, DATE'2024-01-15', 100)"
+
+ // Cache the partition spec by querying data (show partitions is not
supported for Iceberg tables)
+ sql """switch ${catalogWithCache}"""
+ def add_part_result_initial = sql """select count(*) from
${testDb}.test_add_partition"""
+ logger.info("Initial data count (with cache):
${add_part_result_initial}")
+
+ // External ADD PARTITION FIELD via Spark
+ spark_iceberg "ALTER TABLE demo.${testDb}.test_add_partition ADD
PARTITION FIELD month(dt)"
+
+ // Insert data after partition evolution
+ spark_iceberg "INSERT INTO demo.${testDb}.test_add_partition VALUES
(2, DATE'2024-02-20', 200)"
+
+ // Verify cache behavior - check data count as partition spec is
harder to verify directly
+ sql """switch ${catalogWithCache}"""
+ def add_part_result1 = sql """select count(*) from
${testDb}.test_add_partition"""
+ logger.info("After external ADD PARTITION FIELD (with cache, no
refresh): ${add_part_result1}")
+ assertEquals(1, add_part_result1[0][0]) // Should still see 1 row
+
+ sql """switch ${catalogNoCache}"""
+ def add_part_result1_nc = sql """select count(*) from
${testDb}.test_add_partition"""
+ logger.info("After external ADD PARTITION FIELD (no cache):
${add_part_result1_nc}")
+ assertEquals(2, add_part_result1_nc[0][0]) // Should see 2 rows
+
+ sql """switch ${catalogWithCache}"""
+ sql """refresh table ${testDb}.test_add_partition"""
+ def add_part_result2 = sql """select count(*) from
${testDb}.test_add_partition"""
+ assertEquals(2, add_part_result2[0][0])
+
+ // Test 3.2: DROP PARTITION FIELD
+ logger.info("--- Test 3.2: External DROP PARTITION FIELD ---")
+ spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_drop_partition"
+ spark_iceberg "CREATE TABLE demo.${testDb}.test_drop_partition (id
INT, category STRING, value INT) USING iceberg PARTITIONED BY (category)"
+ spark_iceberg "INSERT INTO demo.${testDb}.test_drop_partition VALUES
(1, 'A', 100), (2, 'B', 200)"
+
+ // Cache the partition spec
+ sql """switch ${catalogWithCache}"""
+ def drop_part_result1 = sql """select * from
${testDb}.test_drop_partition order by id"""
+ assertEquals(2, drop_part_result1.size())
+
+ // External DROP PARTITION FIELD via Spark
+ spark_iceberg "ALTER TABLE demo.${testDb}.test_drop_partition DROP
PARTITION FIELD category"
+
+ // Insert data after partition evolution
+ spark_iceberg "INSERT INTO demo.${testDb}.test_drop_partition VALUES
(3, 'C', 300)"
+
+ // Verify cache behavior
+ sql """switch ${catalogWithCache}"""
+ def drop_part_result2 = sql """select count(*) from
${testDb}.test_drop_partition"""
+ logger.info("After external DROP PARTITION FIELD (with cache, no
refresh): ${drop_part_result2}")
+ assertEquals(2, drop_part_result2[0][0]) // Should still see 2 rows
+
+ sql """switch ${catalogNoCache}"""
+ def drop_part_result2_nc = sql """select count(*) from
${testDb}.test_drop_partition"""
+ logger.info("After external DROP PARTITION FIELD (no cache):
${drop_part_result2_nc}")
+ assertEquals(3, drop_part_result2_nc[0][0]) // Should see 3 rows
+
+ sql """switch ${catalogWithCache}"""
+ sql """refresh table ${testDb}.test_drop_partition"""
+ def drop_part_result3 = sql """select count(*) from
${testDb}.test_drop_partition"""
+ assertEquals(3, drop_part_result3[0][0])
+
+ // Test 3.3: REPLACE PARTITION FIELD
+ logger.info("--- Test 3.3: External REPLACE PARTITION FIELD ---")
+ spark_iceberg "DROP TABLE IF EXISTS
demo.${testDb}.test_replace_partition"
+ spark_iceberg "CREATE TABLE demo.${testDb}.test_replace_partition (id
INT, ts TIMESTAMP, value INT) USING iceberg PARTITIONED BY (days(ts))"
+ spark_iceberg "INSERT INTO demo.${testDb}.test_replace_partition
VALUES (1, TIMESTAMP'2024-01-15 10:00:00', 100)"
+
+ // Cache the partition spec
+ sql """switch ${catalogWithCache}"""
+ def replace_part_result1 = sql """select * from
${testDb}.test_replace_partition order by id"""
+ assertEquals(1, replace_part_result1.size())
+
+ // External REPLACE PARTITION FIELD via Spark (days -> months)
+ spark_iceberg "ALTER TABLE demo.${testDb}.test_replace_partition
REPLACE PARTITION FIELD days(ts) WITH months(ts)"
+
+ // Insert data after partition evolution
+ spark_iceberg "INSERT INTO demo.${testDb}.test_replace_partition
VALUES (2, TIMESTAMP'2024-02-20 15:00:00', 200)"
+
+ // Verify cache behavior
+ sql """switch ${catalogWithCache}"""
+ def replace_part_result2 = sql """select count(*) from
${testDb}.test_replace_partition"""
+ logger.info("After external REPLACE PARTITION FIELD (with cache, no
refresh): ${replace_part_result2}")
+ assertEquals(1, replace_part_result2[0][0]) // Should still see 1 row
+
+ sql """switch ${catalogNoCache}"""
+ def replace_part_result2_nc = sql """select count(*) from
${testDb}.test_replace_partition"""
+ logger.info("After external REPLACE PARTITION FIELD (no cache):
${replace_part_result2_nc}")
+ assertEquals(2, replace_part_result2_nc[0][0]) // Should see 2 rows
+
+ sql """switch ${catalogWithCache}"""
+ sql """refresh table ${testDb}.test_replace_partition"""
+ def replace_part_result3 = sql """select count(*) from
${testDb}.test_replace_partition"""
+ assertEquals(2, replace_part_result3[0][0])
+
+ logger.info("All tests passed!")
+
+ } finally {
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]