This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6cbde0c4fdb [opt](paimon)Upgrade the Paimon version to 1.0.0 and Iceberg to 1.6.1 (#46990) 6cbde0c4fdb is described below commit 6cbde0c4fdbcabca947a82245486268c236ced22 Author: wuwenchi <wuwen...@selectdb.com> AuthorDate: Sat Feb 1 22:34:18 2025 +0800 [opt](paimon)Upgrade the Paimon version to 1.0.0 and Iceberg to 1.6.1 (#46990) ### What problem does this PR solve? Problem Summary: Upgrade the Paimon version to 1.0.0 By default, paimon uses a caching catalog to cache some data to improve read performance. FYI: https://paimon.apache.org/docs/1.0/maintenance/configurations/#catalogoptions If you do not want to use this catalog, you can add a configuration `paimon.cache-enabled ` to turn it off: ``` CREATE CATALOG `c1` PROPERTIES ( "type" = "paimon", "paimon.catalog.type" = "xxx", "paimon.cache-enabled" = "false", "warehouse" = "xxx" ); ``` If you want to modify cache-related parameters, you can add the `paimon.` prefix to the parameters supported by paimon, such as: ``` CREATE CATALOG `c1` PROPERTIES ( "type" = "paimon", "paimon.catalog.type" = "xxx", "warehouse" = "xxx", "paimon.cache.expiration-interval" = "20 min", "paimon.cache.manifest.small-file-memory"="10 mb" ); ``` Note: During the doris upgrade process, this error may occur:  This is because doris will upgrade be first, and then upgrade fe. During this process, the version of paimon on be may be higher than that on fe. This is normal. Because bucketkey judgment is newly added in the higher version of paimon, which is not available in the lower version. After the fe upgrade is completed normally, there will be no more errors. --- .../datasource/iceberg/source/IcebergScanNode.java | 84 ++++++++++++---------- .../datasource/paimon/PaimonExternalCatalog.java | 11 ++- .../datasource/paimon/PaimonExternalTable.java | 2 +- .../datasource/paimon/PaimonMetadataCache.java | 2 +- .../apache/doris/datasource/paimon/PaimonUtil.java | 2 +- .../datasource/paimon/source/PaimonScanNode.java | 7 +- fe/pom.xml | 6 +- 7 files changed, 70 insertions(+), 44 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 756c9024cdc..87392d0ff38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -196,7 +196,12 @@ public class IcebergScanNode extends FileQueryScanNode { try { return preExecutionAuthenticator.execute(() -> doGetSplits(numBackends)); } catch (Exception e) { - throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e); + Optional<NotSupportedException> opt = checkNotSupportedException(e); + if (opt.isPresent()) { + throw opt.get(); + } else { + throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e); + } } } @@ -232,7 +237,12 @@ public class IcebergScanNode extends FileQueryScanNode { ); splitAssignment.finishSchedule(); } catch (Exception e) { - splitAssignment.setException(new UserException(e.getMessage(), e)); + Optional<NotSupportedException> opt = checkNotSupportedException(e); + if (opt.isPresent()) { + splitAssignment.setException(new UserException(opt.get().getMessage(), opt.get())); + } else { + splitAssignment.setException(new UserException(e.getMessage(), e)); + } } }); } @@ -266,40 +276,7 @@ public class IcebergScanNode extends FileQueryScanNode { private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) { long targetSplitSize = getRealFileSplitSize(0); - CloseableIterable<FileScanTask> splitFiles; - try { - splitFiles = TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); - } catch (NullPointerException e) { - /* - Caused by: java.lang.NullPointerException: Type cannot be null - at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull - (Preconditions.java:921) ~[iceberg-bundled-guava-1.4.3.jar:?] - at org.apache.iceberg.types.Types$NestedField.<init>(Types.java:447) ~[iceberg-api-1.4.3.jar:?] - at org.apache.iceberg.types.Types$NestedField.optional(Types.java:416) ~[iceberg-api-1.4.3.jar:?] - at org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:132) ~[iceberg-api-1.4.3.jar:?] - at org.apache.iceberg.DeleteFileIndex.lambda$new$0(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?] - at org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap.forEach - (RegularImmutableMap.java:297) ~[iceberg-bundled-guava-1.4.3.jar:?] - at org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?] - at org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:71) ~[iceberg-core-1.4.3.jar:?] - at org.apache.iceberg.DeleteFileIndex$Builder.build(DeleteFileIndex.java:578) ~[iceberg-core-1.4.3.jar:?] - at org.apache.iceberg.ManifestGroup.plan(ManifestGroup.java:183) ~[iceberg-core-1.4.3.jar:?] - at org.apache.iceberg.ManifestGroup.planFiles(ManifestGroup.java:170) ~[iceberg-core-1.4.3.jar:?] - at org.apache.iceberg.DataTableScan.doPlanFiles(DataTableScan.java:89) ~[iceberg-core-1.4.3.jar:?] - at org.apache.iceberg.SnapshotScan.planFiles(SnapshotScan.java:139) ~[iceberg-core-1.4.3.jar:?] - at org.apache.doris.datasource.iceberg.source.IcebergScanNode.doGetSplits - (IcebergScanNode.java:209) ~[doris-fe.jar:1.2-SNAPSHOT] - EXAMPLE: - CREATE TABLE iceberg_tb(col1 INT,col2 STRING) USING ICEBERG PARTITIONED BY (bucket(10,col2)); - INSERT INTO iceberg_tb VALUES( ... ); - ALTER TABLE iceberg_tb DROP PARTITION FIELD bucket(10,col2); - ALTER TABLE iceberg_tb DROP COLUMNS col2 STRING; - Link: https://github.com/apache/iceberg/pull/10755 - */ - LOG.warn("Iceberg TableScanUtil.splitFiles throw NullPointerException. Cause : ", e); - throw new NotSupportedException("Unable to read Iceberg table with dropped old partition column."); - } - return splitFiles; + return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); } private Split createIcebergSplit(FileScanTask fileScanTask) { @@ -529,4 +506,39 @@ public class IcebergScanNode extends FileQueryScanNode { public int numApproximateSplits() { return NUM_SPLITS_PER_PARTITION * partitionPathSet.size() > 0 ? partitionPathSet.size() : 1; } + + private Optional<NotSupportedException> checkNotSupportedException(Exception e) { + if (e instanceof NullPointerException) { + /* + Caused by: java.lang.NullPointerException: Type cannot be null + at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull + (Preconditions.java:921) ~[iceberg-bundled-guava-1.4.3.jar:?] + at org.apache.iceberg.types.Types$NestedField.<init>(Types.java:447) ~[iceberg-api-1.4.3.jar:?] + at org.apache.iceberg.types.Types$NestedField.optional(Types.java:416) ~[iceberg-api-1.4.3.jar:?] + at org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:132) ~[iceberg-api-1.4.3.jar:?] + at org.apache.iceberg.DeleteFileIndex.lambda$new$0(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?] + at org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap.forEach + (RegularImmutableMap.java:297) ~[iceberg-bundled-guava-1.4.3.jar:?] + at org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?] + at org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:71) ~[iceberg-core-1.4.3.jar:?] + at org.apache.iceberg.DeleteFileIndex$Builder.build(DeleteFileIndex.java:578) ~[iceberg-core-1.4.3.jar:?] + at org.apache.iceberg.ManifestGroup.plan(ManifestGroup.java:183) ~[iceberg-core-1.4.3.jar:?] + at org.apache.iceberg.ManifestGroup.planFiles(ManifestGroup.java:170) ~[iceberg-core-1.4.3.jar:?] + at org.apache.iceberg.DataTableScan.doPlanFiles(DataTableScan.java:89) ~[iceberg-core-1.4.3.jar:?] + at org.apache.iceberg.SnapshotScan.planFiles(SnapshotScan.java:139) ~[iceberg-core-1.4.3.jar:?] + at org.apache.doris.datasource.iceberg.source.IcebergScanNode.doGetSplits + (IcebergScanNode.java:209) ~[doris-fe.jar:1.2-SNAPSHOT] + EXAMPLE: + CREATE TABLE iceberg_tb(col1 INT,col2 STRING) USING ICEBERG PARTITIONED BY (bucket(10,col2)); + INSERT INTO iceberg_tb VALUES( ... ); + ALTER TABLE iceberg_tb DROP PARTITION FIELD bucket(10,col2); + ALTER TABLE iceberg_tb DROP COLUMNS col2 STRING; + Link: https://github.com/apache/iceberg/pull/10755 + */ + LOG.warn("Iceberg TableScanUtil.splitFiles throw NullPointerException. Cause : ", e); + return Optional.of( + new NotSupportedException("Unable to read Iceberg table with dropped old partition column.")); + } + return Optional.empty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java index eb25336ab0b..7d857524f2b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Catalog.TableNotExistException; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; @@ -94,7 +95,15 @@ public abstract class PaimonExternalCatalog extends ExternalCatalog { public boolean tableExist(SessionContext ctx, String dbName, String tblName) { makeSureInitialized(); try { - return hadoopAuthenticator.doAs(() -> catalog.tableExists(Identifier.create(dbName, tblName))); + return hadoopAuthenticator.doAs(() -> { + try { + catalog.getTable(Identifier.create(dbName, tblName)); + return true; + } catch (TableNotExistException e) { + return false; + } + }); + } catch (IOException e) { throw new RuntimeException("Failed to check table existence, catalog name: " + getName(), e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 347e8df5c8f..622d6dbdfaf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -273,7 +273,7 @@ public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTab PredicateBuilder builder = new PredicateBuilder(table.rowType()); Predicate predicate = builder.equal(0, key.getSchemaId()); // Adding predicates will also return excess data - List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}, {1}, {2}}, predicate); + List<InternalRow> rows = PaimonUtil.read(table, new int[] {0, 1, 2}, predicate); for (InternalRow row : rows) { PaimonSchema schema = PaimonUtil.rowToSchema(row); if (schema.getSchemaId() == key.getSchemaId()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java index 5b711e07066..109394fabde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -97,7 +97,7 @@ public class PaimonMetadataCache { Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); // snapshotId and schemaId - List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}, {1}}, null); + List<InternalRow> rows = PaimonUtil.read(table, new int[] {0, 1}, null); long latestSnapshotId = 0L; long latestSchemaId = 0L; for (InternalRow row : rows) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index b3df41bc5ce..bbb1eaf5096 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -62,7 +62,7 @@ public class PaimonUtil { private static final Logger LOG = LogManager.getLogger(PaimonUtil.class); public static List<InternalRow> read( - Table table, @Nullable int[][] projection, @Nullable Predicate predicate, + Table table, @Nullable int[] projection, @Nullable Predicate predicate, Pair<ConfigOption<?>, String>... dynamicOptions) throws IOException { Map<String, String> options = new HashMap<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 28efbc58f51..0e9a8042a65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -208,7 +208,12 @@ public class PaimonScanNode extends FileQueryScanNode { .valueOf(sessionVariable.getIgnoreSplitType()); List<Split> splits = new ArrayList<>(); int[] projected = desc.getSlots().stream().mapToInt( - slot -> (source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName()))) + slot -> source.getPaimonTable().rowType() + .getFieldNames() + .stream() + .map(String::toLowerCase) + .collect(Collectors.toList()) + .indexOf(slot.getColumn().getName())) .toArray(); ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder(); List<org.apache.paimon.table.source.Split> paimonSplits = readBuilder.withFilter(predicates) diff --git a/fe/pom.xml b/fe/pom.xml index 2c61fe32299..8e59c80c89a 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -222,7 +222,7 @@ under the License. <module>be-java-extensions</module> </modules> <properties> - <doris.hive.catalog.shade.version>2.1.1</doris.hive.catalog.shade.version> + <doris.hive.catalog.shade.version>2.1.3</doris.hive.catalog.shade.version> <avro.version>1.11.4</avro.version> <parquet.version>1.13.1</parquet.version> <spark.version>3.4.3</spark.version> @@ -318,7 +318,7 @@ under the License. <!-- ATTN: avro version must be consistent with Iceberg version --> <!-- Please modify iceberg.version and avro.version together, you can find avro version info in iceberg mvn repository --> - <iceberg.version>1.4.3</iceberg.version> + <iceberg.version>1.6.1</iceberg.version> <maxcompute.version>0.49.0-public</maxcompute.version> <arrow.version>17.0.0</arrow.version> <presto.hadoop.version>2.7.4-11</presto.hadoop.version> @@ -365,7 +365,7 @@ under the License. <quartz.version>2.3.2</quartz.version> <aircompressor.version>0.27</aircompressor.version> <!-- paimon --> - <paimon.version>0.8.1</paimon.version> + <paimon.version>1.0.0</paimon.version> <disruptor.version>3.4.4</disruptor.version> <!-- arrow flight sql --> <arrow.vector.classifier>shade-format-flatbuffers</arrow.vector.classifier> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org