This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 48ea35b464f [opt](paimon)Use the API instead of reading from the meta table (#47544) 48ea35b464f is described below commit 48ea35b464feff1f8f38d5c5326db95fc5de311b Author: wuwenchi <wuwen...@selectdb.com> AuthorDate: Thu Feb 20 11:07:45 2025 +0800 [opt](paimon)Use the API instead of reading from the meta table (#47544) ### What problem does this PR solve? 1. Use `latestSnapshotId` to get the latest snapshot id. 2. If there are no additional options, no copy operation is performed on the table. 3. Use `DataTable.schemaManager().schema()` to get schema. 4. Use `Catalog.listPartitions` to get partitions. --- .../datasource/paimon/PaimonExternalCatalog.java | 19 ++++ .../datasource/paimon/PaimonExternalTable.java | 42 ++------ .../datasource/paimon/PaimonMetadataCache.java | 46 ++++----- .../datasource/paimon/PaimonPartitionInfo.java | 11 ++- .../doris/datasource/paimon/PaimonSchema.java | 46 --------- .../doris/datasource/paimon/PaimonSnapshot.java | 11 ++- .../apache/doris/datasource/paimon/PaimonUtil.java | 106 +++++---------------- .../datasource/paimon/source/PaimonScanNode.java | 5 + .../java/org/apache/doris/mtmv/PaimonUtilTest.java | 71 -------------- 9 files changed, 91 insertions(+), 266 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java index e87994ecdd3..fd332f8216f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java @@ -40,6 +40,7 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import java.io.IOException; import java.util.ArrayList; @@ -137,6 +138,24 @@ public abstract class PaimonExternalCatalog extends ExternalCatalog { } } + public List<Partition> getPaimonPartitions(String dbName, String tblName) { + makeSureInitialized(); + try { + return hadoopAuthenticator.doAs(() -> { + List<Partition> partitions = new ArrayList<>(); + try { + partitions = catalog.listPartitions(Identifier.create(dbName, tblName)); + } catch (Catalog.TableNotExistException e) { + LOG.warn("TableNotExistException", e); + } + return partitions; + }); + } catch (IOException e) { + throw new RuntimeException("Failed to get Paimon table partitions:" + getName() + "." + + dbName + "." + tblName + ", because " + e.getMessage(), e); + } + } + protected String getPaimonCatalogType(String catalogType) { if (PAIMON_HMS.equalsIgnoreCase(catalogType)) { return PaimonProperties.PAIMON_HMS_CATALOG; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 992f469906e..7cf965f4eef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -50,17 +50,13 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.partition.Partition; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.DataTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.Split; -import org.apache.paimon.table.system.SchemasTable; import org.apache.paimon.types.DataField; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -93,9 +89,7 @@ public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTab } public Table getPaimonTable(Optional<MvccSnapshot> snapshot) { - return paimonTable.copy( - Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), - String.valueOf(getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getSnapshotId()))); + return getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getTable(); } public PaimonSchemaCacheValue getPaimonSchemaCacheValue(long schemaId) { @@ -194,12 +188,12 @@ public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTab public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) throws AnalysisException { - PaimonPartition paimonPartition = getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartition() + Partition paimonPartition = getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartition() .get(partitionName); if (paimonPartition == null) { throw new AnalysisException("can not find partition: " + partitionName); } - return new MTMVTimestampSnapshot(paimonPartition.getLastUpdateTime()); + return new MTMVTimestampSnapshot(paimonPartition.lastFileCreationTime()); } @Override @@ -244,10 +238,11 @@ public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTab makeSureInitialized(); PaimonSchemaCacheKey paimonSchemaCacheKey = (PaimonSchemaCacheKey) key; try { - PaimonSchema schema = loadPaimonSchemaBySchemaId(paimonSchemaCacheKey); - List<DataField> columns = schema.getFields(); + Table table = ((PaimonExternalCatalog) getCatalog()).getPaimonTable(key.getDbName(), name); + TableSchema tableSchema = ((DataTable) table).schemaManager().schema(paimonSchemaCacheKey.getSchemaId()); + List<DataField> columns = tableSchema.fields(); List<Column> dorisColumns = Lists.newArrayListWithCapacity(columns.size()); - Set<String> partitionColumnNames = Sets.newHashSet(schema.getPartitionKeys()); + Set<String> partitionColumnNames = Sets.newHashSet(tableSchema.partitionKeys()); List<Column> partitionColumns = Lists.newArrayList(); for (DataField field : columns) { Column column = new Column(field.name().toLowerCase(), @@ -267,23 +262,6 @@ public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTab } - private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key) throws IOException { - Table table = ((PaimonExternalCatalog) getCatalog()).getPaimonTable(key.getDbName(), - name + Catalog.SYSTEM_TABLE_SPLITTER + SchemasTable.SCHEMAS); - PredicateBuilder builder = new PredicateBuilder(table.rowType()); - Predicate predicate = builder.equal(0, key.getSchemaId()); - // Adding predicates will also return excess data - List<InternalRow> rows = PaimonUtil.read(table, new int[] {0, 1, 2}, predicate); - for (InternalRow row : rows) { - PaimonSchema schema = PaimonUtil.rowToSchema(row); - if (schema.getSchemaId() == key.getSchemaId()) { - return schema; - } - } - throw new CacheException("failed to initSchema for: %s.%s.%s.%s", - null, getCatalog().getName(), key.getDbName(), key.getTblName(), key.getSchemaId()); - } - private PaimonSchemaCacheValue getPaimonSchemaCacheValue(Optional<MvccSnapshot> snapshot) { PaimonSnapshotCacheValue snapshotCacheValue = getOrFetchSnapshotCacheValue(snapshot); return getPaimonSchemaCacheValue(snapshotCacheValue.getSnapshot().getSchemaId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java index 109394fabde..e6023743f3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -26,17 +26,15 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalMetaCacheMgr; import com.github.benmanes.caffeine.cache.LoadingCache; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.collections.CollectionUtils; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.data.InternalRow; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.partition.Partition; import org.apache.paimon.table.Table; -import org.apache.paimon.table.system.PartitionsTable; -import org.apache.paimon.table.system.SnapshotsTable; import org.jetbrains.annotations.NotNull; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.OptionalLong; @@ -75,39 +73,27 @@ public class PaimonMetadataCache { private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key, List<Column> partitionColumns) throws IOException, AnalysisException { if (CollectionUtils.isEmpty(partitionColumns)) { - return new PaimonPartitionInfo(); + return PaimonPartitionInfo.EMPTY; } - List<PaimonPartition> paimonPartitions = loadPartitions(key); + List<Partition> paimonPartitions = ((PaimonExternalCatalog) key.getCatalog()) + .getPaimonPartitions(key.getDbName(), key.getTableName()); return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); } - private List<PaimonPartition> loadPartitions(PaimonSnapshotCacheKey key) - throws IOException { - Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), - key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); - List<InternalRow> rows = PaimonUtil.read(table, null, null); - List<PaimonPartition> res = Lists.newArrayListWithCapacity(rows.size()); - for (InternalRow row : rows) { - res.add(PaimonUtil.rowToPartition(row)); - } - return res; - } - private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) throws IOException { - Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), - key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); + Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), key.getTableName()); + Table snapshotTable = table; // snapshotId and schemaId - List<InternalRow> rows = PaimonUtil.read(table, new int[] {0, 1}, null); - long latestSnapshotId = 0L; + Long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID; long latestSchemaId = 0L; - for (InternalRow row : rows) { - long snapshotId = row.getLong(0); - if (snapshotId > latestSnapshotId) { - latestSnapshotId = snapshotId; - latestSchemaId = row.getLong(1); - } + OptionalLong optionalSnapshotId = table.latestSnapshotId(); + if (optionalSnapshotId.isPresent()) { + latestSnapshotId = optionalSnapshotId.getAsLong(); + latestSchemaId = table.snapshot(latestSnapshotId).schemaId(); + snapshotTable = + table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), latestSnapshotId.toString())); } - return new PaimonSnapshot(latestSnapshotId, latestSchemaId); + return new PaimonSnapshot(latestSnapshotId, latestSchemaId, snapshotTable); } public void invalidateCatalogCache(long catalogId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java index 88515a2510d..a6339ef5155 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java @@ -20,20 +20,23 @@ package org.apache.doris.datasource.paimon; import org.apache.doris.catalog.PartitionItem; import com.google.common.collect.Maps; +import org.apache.paimon.partition.Partition; import java.util.Map; public class PaimonPartitionInfo { + public static final PaimonPartitionInfo EMPTY = new PaimonPartitionInfo(); + private final Map<String, PartitionItem> nameToPartitionItem; - private final Map<String, PaimonPartition> nameToPartition; + private final Map<String, Partition> nameToPartition; - public PaimonPartitionInfo() { + private PaimonPartitionInfo() { this.nameToPartitionItem = Maps.newHashMap(); this.nameToPartition = Maps.newHashMap(); } public PaimonPartitionInfo(Map<String, PartitionItem> nameToPartitionItem, - Map<String, PaimonPartition> nameToPartition) { + Map<String, Partition> nameToPartition) { this.nameToPartitionItem = nameToPartitionItem; this.nameToPartition = nameToPartition; } @@ -42,7 +45,7 @@ public class PaimonPartitionInfo { return nameToPartitionItem; } - public Map<String, PaimonPartition> getNameToPartition() { + public Map<String, Partition> getNameToPartition() { return nameToPartition; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java deleted file mode 100644 index ef26e1ed208..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource.paimon; - -import org.apache.paimon.types.DataField; - -import java.util.List; - -public class PaimonSchema { - private final long schemaId; - private final List<DataField> fields; - private final List<String> partitionKeys; - - public PaimonSchema(long schemaId, List<DataField> fields, List<String> partitionKeys) { - this.schemaId = schemaId; - this.fields = fields; - this.partitionKeys = partitionKeys; - } - - public long getSchemaId() { - return schemaId; - } - - public List<DataField> getFields() { - return fields; - } - - public List<String> getPartitionKeys() { - return partitionKeys; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java index 4a536dd72cc..96f32370d99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java @@ -17,13 +17,18 @@ package org.apache.doris.datasource.paimon; +import org.apache.paimon.table.Table; + public class PaimonSnapshot { + public static long INVALID_SNAPSHOT_ID = -1; private final long snapshotId; private final long schemaId; + private final Table table; - public PaimonSnapshot(long snapshotId, long schemaId) { + public PaimonSnapshot(long snapshotId, long schemaId, Table table) { this.snapshotId = snapshotId; this.schemaId = schemaId; + this.table = table; } public long getSnapshotId() { @@ -33,4 +38,8 @@ public class PaimonSnapshot { public long getSchemaId() { return schemaId; } + + public Table getTable() { + return table; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index bbb1eaf5096..4119f978d24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -36,9 +36,9 @@ import org.apache.logging.log4j.Logger; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.partition.Partition; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.ArrayType; @@ -46,7 +46,6 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Projection; @@ -69,7 +68,9 @@ public class PaimonUtil { for (Pair<ConfigOption<?>, String> pair : dynamicOptions) { options.put(pair.getKey().key(), pair.getValue()); } - table = table.copy(options); + if (!options.isEmpty()) { + table = table.copy(options); + } ReadBuilder readBuilder = table.newReadBuilder(); if (projection != null) { readBuilder.withProjection(projection); @@ -89,71 +90,40 @@ public class PaimonUtil { return rows; } + public static PaimonPartitionInfo generatePartitionInfo(List<Column> partitionColumns, + List<Partition> paimonPartitions) { - /* - https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table - +---------------+----------------+--------------------+--------------------+------------------------+ - | partition | record_count | file_size_in_bytes| file_count| last_update_time| - +---------------+----------------+--------------------+--------------------+------------------------+ - | [1] | 1 | 645 | 1 | 2024-06-24 10:25:57.400| - +---------------+----------------+--------------------+--------------------+------------------------+ - org.apache.paimon.table.system.PartitionsTable.TABLE_TYPE - public static final RowType TABLE_TYPE = - new RowType( - Arrays.asList( - new DataField(0, "partition", SerializationUtils.newStringType(true)), - new DataField(1, "record_count", new BigIntType(false)), - new DataField(2, "file_size_in_bytes", new BigIntType(false)), - new DataField(3, "file_count", new BigIntType(false)), - new DataField(4, "last_update_time", DataTypes.TIMESTAMP_MILLIS()))); - */ - public static PaimonPartition rowToPartition(InternalRow row) { - String partition = row.getString(0).toString(); - long recordCount = row.getLong(1); - long fileSizeInBytes = row.getLong(2); - long fileCount = row.getLong(3); - long lastUpdateTime = row.getTimestamp(4, 3).getMillisecond(); - return new PaimonPartition(partition, recordCount, fileSizeInBytes, fileCount, lastUpdateTime); - } + if (CollectionUtils.isEmpty(partitionColumns) || paimonPartitions.isEmpty()) { + return PaimonPartitionInfo.EMPTY; + } - public static PaimonPartitionInfo generatePartitionInfo(List<Column> partitionColumns, - List<PaimonPartition> paimonPartitions) { Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap(); - Map<String, PaimonPartition> nameToPartition = Maps.newHashMap(); + Map<String, Partition> nameToPartition = Maps.newHashMap(); PaimonPartitionInfo partitionInfo = new PaimonPartitionInfo(nameToPartitionItem, nameToPartition); - if (CollectionUtils.isEmpty(partitionColumns)) { - return partitionInfo; - } - for (PaimonPartition paimonPartition : paimonPartitions) { - String partitionName = getPartitionName(partitionColumns, paimonPartition.getPartitionValues()); - nameToPartition.put(partitionName, paimonPartition); + + for (Partition partition : paimonPartitions) { + Map<String, String> spec = partition.spec(); + StringBuilder sb = new StringBuilder(); + for (Map.Entry<String, String> entry : spec.entrySet()) { + sb.append(entry.getKey()).append("=").append(entry.getValue()).append("/"); + } + if (sb.length() > 0) { + sb.deleteCharAt(sb.length() - 1); + } + String partitionName = sb.toString(); + nameToPartition.put(partitionName, partition); try { // partition values return by paimon api, may have problem, // to avoid affecting the query, we catch exceptions here nameToPartitionItem.put(partitionName, toListPartitionItem(partitionName, partitionColumns)); } catch (Exception e) { - LOG.warn("toListPartitionItem failed, partitionColumns: {}, partitionValues: {}", partitionColumns, - paimonPartition.getPartitionValues(), e); + LOG.warn("toListPartitionItem failed, partitionColumns: {}, partitionValues: {}", + partitionColumns, partition.spec(), e); } } return partitionInfo; } - private static String getPartitionName(List<Column> partitionColumns, String partitionValueStr) { - Preconditions.checkNotNull(partitionValueStr); - String[] partitionValues = partitionValueStr.replace("[", "").replace("]", "") - .split(","); - Preconditions.checkState(partitionColumns.size() == partitionValues.length); - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < partitionColumns.size(); ++i) { - if (i != 0) { - sb.append("/"); - } - sb.append(partitionColumns.get(i).getName()).append("=").append(partitionValues[i]); - } - return sb.toString(); - } - public static ListPartitionItem toListPartitionItem(String partitionName, List<Column> partitionColumns) throws AnalysisException { List<Type> types = partitionColumns.stream() @@ -251,32 +221,4 @@ public class PaimonUtil { public static Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) { return paimonPrimitiveTypeToDorisType(type); } - - /** - * https://paimon.apache.org/docs/0.9/maintenance/system-tables/#schemas-table - * demo: - * 0 - * [{"id":0,"name":"user_id","type":"BIGINT NOT NULL"}, - * {"id":1,"name":"item_id","type":"BIGINT"}, - * {"id":2,"name":"behavior","type":"STRING"}, - * {"id":3,"name":"dt","type":"STRING NOT NULL"}, - * {"id":4,"name":"hh","type":"STRING NOT NULL"}] - * ["dt"] - * ["dt","hh","user_id"] - * {"owner":"hadoop","provider":"paimon"} - * 2024-12-03 15:38:14.734 - * - * @param row - * @return - */ - public static PaimonSchema rowToSchema(InternalRow row) { - long schemaId = row.getLong(0); - String fieldsStr = row.getString(1).toString(); - String partitionKeysStr = row.getString(2).toString(); - List<DataField> fields = JsonSerdeUtil.fromJson(fieldsStr, new TypeReference<List<DataField>>() { - }); - List<String> partitionKeys = JsonSerdeUtil.fromJson(partitionKeysStr, new TypeReference<List<String>>() { - }); - return new PaimonSchema(schemaId, fields, partitionKeys); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 0e9a8042a65..59e7eed5d42 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -44,6 +44,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.source.DataSplit; @@ -207,6 +208,10 @@ public class PaimonScanNode extends FileQueryScanNode { SessionVariable.IgnoreSplitType ignoreSplitType = SessionVariable.IgnoreSplitType .valueOf(sessionVariable.getIgnoreSplitType()); List<Split> splits = new ArrayList<>(); + if (!source.getPaimonTable().options().containsKey(CoreOptions.SCAN_SNAPSHOT_ID.key())) { + // an empty table in PaimonSnapshotCacheValue + return splits; + } int[] projected = desc.getSlots().stream().mapToInt( slot -> source.getPaimonTable().rowType() .getFieldNames() diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java deleted file mode 100644 index 789af7bf835..00000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java +++ /dev/null @@ -1,71 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.mtmv; - -import org.apache.doris.analysis.LiteralExpr; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.PartitionItem; -import org.apache.doris.catalog.PartitionKey; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.datasource.paimon.PaimonPartition; -import org.apache.doris.datasource.paimon.PaimonPartitionInfo; -import org.apache.doris.datasource.paimon.PaimonUtil; - -import com.google.common.collect.Lists; -import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.data.Timestamp; -import org.junit.Assert; -import org.junit.Test; - -import java.util.List; - -public class PaimonUtilTest { - - @Test - public void testGeneratePartitionInfo() throws AnalysisException { - Column k1 = new Column("k1", PrimitiveType.INT); - Column k2 = new Column("k2", PrimitiveType.VARCHAR); - List<Column> partitionColumns = Lists.newArrayList(k1, k2); - PaimonPartition p1 = new PaimonPartition("[1,aa]", 2, 3, 4, 5); - List<PaimonPartition> paimonPartitions = Lists.newArrayList(p1); - PaimonPartitionInfo partitionInfo = PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); - String expectPartitionName = "k1=1/k2=aa"; - Assert.assertTrue(partitionInfo.getNameToPartitionItem().containsKey(expectPartitionName)); - PartitionItem partitionItem = partitionInfo.getNameToPartitionItem().get(expectPartitionName); - List<PartitionKey> keys = partitionItem.getItems(); - Assert.assertEquals(1, keys.size()); - PartitionKey partitionKey = keys.get(0); - List<LiteralExpr> exprs = partitionKey.getKeys(); - Assert.assertEquals(2, exprs.size()); - Assert.assertEquals(1, exprs.get(0).getLongValue()); - Assert.assertEquals("aa", exprs.get(1).getStringValue()); - } - - @Test - public void testRowToPartition() { - GenericRow row = GenericRow.of(BinaryString.fromString("[1,b]"), 2L, 3L, 4L, Timestamp.fromEpochMillis(5L)); - PaimonPartition paimonPartition = PaimonUtil.rowToPartition(row); - Assert.assertEquals("[1,b]", paimonPartition.getPartitionValues()); - Assert.assertEquals(2L, paimonPartition.getRecordCount()); - Assert.assertEquals(3L, paimonPartition.getFileSizeInBytes()); - Assert.assertEquals(4L, paimonPartition.getFileCount()); - Assert.assertEquals(5L, paimonPartition.getLastUpdateTime()); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org