This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 87b96cf [feature](iceberg) Step3: Support query iceberg external table (#8179) 87b96cf is described below commit 87b96cfcd6c3cdecb7441f89e208b1f77177182c Author: qiye <jianliang5...@gmail.com> AuthorDate: Sat Feb 26 17:04:11 2022 +0800 [feature](iceberg) Step3: Support query iceberg external table (#8179) 1. Add Iceberg scan node 2. Add Iceberg/Hive table type in thrift 3. Support querying Iceberg tables of format types `parquet` and `orc` --- be/src/runtime/descriptors.cpp | 28 ++++ be/src/runtime/descriptors.h | 18 +++ fe/fe-core/pom.xml | 8 ++ .../java/org/apache/doris/catalog/HiveTable.java | 2 +- .../org/apache/doris/catalog/IcebergTable.java | 159 ++++++++++++++++++--- .../org/apache/doris/load/BrokerFileGroup.java | 11 ++ .../java/org/apache/doris/load/LoadChecker.java | 2 +- .../org/apache/doris/planner/BrokerScanNode.java | 28 ++-- .../apache/doris/planner/DistributedPlanner.java | 2 +- .../org/apache/doris/planner/HiveScanNode.java | 2 +- .../org/apache/doris/planner/IcebergScanNode.java | 109 ++++++++++++++ .../apache/doris/planner/SingleNodePlanner.java | 4 + .../java/org/apache/doris/qe/AuditLogBuilder.java | 5 +- .../apache/doris/catalog/CreateTableLikeTest.java | 7 +- .../org/apache/doris/plugin/PluginMgrTest.java | 3 +- fe/pom.xml | 12 ++ gensrc/thrift/Types.thrift | 4 +- 17 files changed, 362 insertions(+), 42 deletions(-) diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 162545a..4225a75 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -147,6 +147,28 @@ std::string BrokerTableDescriptor::debug_string() const { return out.str(); } +HiveTableDescriptor::HiveTableDescriptor(const TTableDescriptor& tdesc) + : TableDescriptor(tdesc) {} + +HiveTableDescriptor::~HiveTableDescriptor() {} + +std::string HiveTableDescriptor::debug_string() const { + std::stringstream out; + out << "HiveTable(" << TableDescriptor::debug_string() << ")"; + return out.str(); +} + +IcebergTableDescriptor::IcebergTableDescriptor(const TTableDescriptor& tdesc) + : TableDescriptor(tdesc) {} + +IcebergTableDescriptor::~IcebergTableDescriptor() {} + +std::string IcebergTableDescriptor::debug_string() const { + std::stringstream out; + out << "IcebergTable(" << TableDescriptor::debug_string() << ")"; + return out.str(); +} + EsTableDescriptor::EsTableDescriptor(const TTableDescriptor& tdesc) : TableDescriptor(tdesc) {} EsTableDescriptor::~EsTableDescriptor() {} @@ -523,6 +545,12 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb case TTableType::ES_TABLE: desc = pool->add(new EsTableDescriptor(tdesc)); break; + case TTableType::HIVE_TABLE: + desc = pool->add(new HiveTableDescriptor(tdesc)); + break; + case TTableType::ICEBERG_TABLE: + desc = pool->add(new IcebergTableDescriptor(tdesc)); + break; default: DCHECK(false) << "invalid table type: " << tdesc.tableType; } diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 0bdc22b..cdbc3d7 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -190,6 +190,24 @@ public: private: }; +class HiveTableDescriptor : public TableDescriptor { +public: + HiveTableDescriptor(const TTableDescriptor& tdesc); + virtual ~HiveTableDescriptor(); + virtual std::string debug_string() const; + +private: +}; + +class IcebergTableDescriptor : public TableDescriptor { +public: + IcebergTableDescriptor(const TTableDescriptor& tdesc); + virtual ~IcebergTableDescriptor(); + virtual std::string debug_string() const; + +private: +}; + class EsTableDescriptor : public TableDescriptor { public: EsTableDescriptor(const TTableDescriptor& tdesc); diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 9b11108..a71545c 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -599,6 +599,14 @@ under the License. <artifactId>iceberg-hive-metastore</artifactId> <scope>provided</scope> </dependency> + + <!-- For Iceberg, must be consistent with Iceberg version --> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> <build> <finalName>palo-fe</finalName> diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java index 236d266..ff4adc0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java @@ -134,7 +134,7 @@ public class HiveTable extends Table { @Override public TTableDescriptor toThrift() { THiveTable tHiveTable = new THiveTable(getHiveDb(), getHiveTable(), getHiveProperties()); - TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE, + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, fullSchema.size(), 0, getName(), ""); tTableDescriptor.setHiveTable(tHiveTable); return tTableDescriptor; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java index c164936..41a0458 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java @@ -17,22 +17,38 @@ package org.apache.doris.catalog; - +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; +import org.apache.doris.external.iceberg.IcebergCatalog; +import org.apache.doris.external.iceberg.IcebergCatalogMgr; +import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TIcebergTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; -import com.google.common.collect.Maps; - +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expression; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** * External Iceberg table @@ -44,10 +60,23 @@ public class IcebergTable extends Table { private String icebergDb; // remote Iceberg table name private String icebergTbl; + // remote Iceberg table location + private String location; + // Iceberg table file format + private String fileFormat; + // Iceberg storage type + private StorageBackend.StorageType storageType; + // Iceberg remote host uri + private String hostUri; + // location analyze flag + private boolean isAnalyzed = false; private Map<String, String> icebergProperties = Maps.newHashMap(); private org.apache.iceberg.Table icebergTable; + private final byte[] loadLock = new byte[0]; + private final AtomicBoolean isLoaded = new AtomicBoolean(false); + public IcebergTable() { super(TableType.ICEBERG); } @@ -73,28 +102,126 @@ public class IcebergTable extends Table { return icebergDb; } - public void setIcebergDb(String icebergDb) { - this.icebergDb = icebergDb; - } - public String getIcebergTbl() { return icebergTbl; } - public void setIcebergTbl(String icebergTbl) { - this.icebergTbl = icebergTbl; - } - public Map<String, String> getIcebergProperties() { return icebergProperties; } - public void setIcebergProperties(Map<String, String> icebergProperties) { - this.icebergProperties = icebergProperties; + private void getLocation() throws UserException { + if (Strings.isNullOrEmpty(location)) { + try { + getTable(); + } catch (Exception e) { + throw new UserException("Failed to get table: " + name + ",error: " + e.getMessage()); + } + location = icebergTable.location(); + } + analyzeLocation(); + } + + private void analyzeLocation() throws UserException { + if (isAnalyzed) { + return; + } + String[] strings = StringUtils.split(location, "/"); + + // analyze storage type + String storagePrefix = strings[0].split(":")[0]; + if (storagePrefix.equalsIgnoreCase("s3")) { + this.storageType = StorageBackend.StorageType.S3; + } else if (storagePrefix.equalsIgnoreCase("hdfs")) { + this.storageType = StorageBackend.StorageType.HDFS; + } else { + throw new UserException("Not supported storage type: " + storagePrefix); + } + + // analyze host uri + // eg: hdfs://host:port + // s3://host:port + String host = strings[1]; + this.hostUri = storagePrefix + "://" + host; + this.isAnalyzed = true; + } + + public String getHostUri() throws UserException { + if (!isAnalyzed) { + getLocation(); + } + return hostUri; + } + + public StorageBackend.StorageType getStorageType() throws UserException { + if (!isAnalyzed) { + getLocation(); + } + return storageType; + } + + public String getFileFormat() throws UserException { + if (Strings.isNullOrEmpty(fileFormat)) { + try { + getTable(); + } catch (Exception e) { + throw new UserException("Failed to get table: " + name + ",error: " + e.getMessage()); + } + fileFormat = icebergTable.properties().get(TableProperties.DEFAULT_FILE_FORMAT); + } + return fileFormat; + } + + // get the iceberg table instance, if table is not loaded, load it. + private org.apache.iceberg.Table getTable() throws Exception { + if (isLoaded.get()) { + Preconditions.checkNotNull(icebergTable); + return icebergTable; + } + synchronized (loadLock) { + if (icebergTable != null) { + return icebergTable; + } + + IcebergProperty icebergProperty = getIcebergProperty(); + IcebergCatalog icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty); + try { + this.icebergTable = icebergCatalog.loadTable(TableIdentifier.of(icebergDb, icebergTbl)); + LOG.info("finished to load iceberg table: {}", name); + } catch (Exception e) { + LOG.warn("failed to load iceberg table {} from {}", name, icebergProperty.getHiveMetastoreUris(), e); + throw e; + } + + isLoaded.set(true); + return icebergTable; + } } - public org.apache.iceberg.Table getIcebergTable() { - return icebergTable; + private IcebergProperty getIcebergProperty() { + Map<String, String> properties = Maps.newHashMap(icebergProperties); + properties.put(IcebergProperty.ICEBERG_DATABASE, icebergDb); + properties.put(IcebergProperty.ICEBERG_TABLE, icebergTbl); + return new IcebergProperty(properties); + } + + /** + * Get iceberg data file by file system table location and iceberg predicates + * @throws Exception + */ + public List<TBrokerFileStatus> getIcebergDataFiles(List<Expression> predicates) throws Exception { + org.apache.iceberg.Table table = getTable(); + TableScan scan = table.newScan(); + for (Expression predicate : predicates) { + scan = scan.filter(predicate); + } + List<TBrokerFileStatus> relatedFiles = Lists.newArrayList(); + for (FileScanTask task : scan.planFiles()) { + Path path = Paths.get(task.file().path().toString()); + String relativePath = "/" + path.subpath(2, path.getNameCount()); + relatedFiles.add(new TBrokerFileStatus(relativePath, false, task.file().fileSizeInBytes(), false)); + } + return relatedFiles; } @Override @@ -128,7 +255,7 @@ public class IcebergTable extends Table { @Override public TTableDescriptor toThrift() { TIcebergTable tIcebergTable = new TIcebergTable(getIcebergDb(), getIcebergTbl(), getIcebergProperties()); - TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE, + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_TABLE, fullSchema.size(), 0, getName(), ""); tTableDescriptor.setIcebergTable(tIcebergTable); return tTableDescriptor; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 95dbc12..b5312df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.HiveTable; +import org.apache.doris.catalog.IcebergTable; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.OlapTable.OlapTableState; @@ -37,6 +38,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.load.loadv2.LoadTask; @@ -137,6 +139,15 @@ public class BrokerFileGroup implements Writable { this.columnExprList = columnExprList; } + // Used for iceberg table, no need to parse + public BrokerFileGroup(IcebergTable table) throws UserException { + this.tableId = table.getId(); + this.isNegative = false; + this.valueSeparator = "|"; + this.lineDelimiter = "\n"; + this.fileFormat = table.getFileFormat(); + } + public BrokerFileGroup(DataDescription dataDescription) { this.fileFieldNames = dataDescription.getFileFieldNames(); this.columnsFromPath = dataDescription.getColumnsFromPath(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java index f87cf93..0ee4fd2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java @@ -50,6 +50,7 @@ import org.apache.doris.transaction.TabletQuorumFailedException; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -63,7 +64,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import avro.shaded.com.google.common.collect.Lists; public class LoadChecker extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(LoadChecker.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 9f75814..bfab512 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -401,7 +401,7 @@ public class BrokerScanNode extends LoadScanNode { Collections.shuffle(backends, random); } - private TFileFormatType formatType(String fileFormat, String path) { + private TFileFormatType formatType(String fileFormat, String path) throws UserException { if (fileFormat != null) { if (fileFormat.toLowerCase().equals("parquet")) { return TFileFormatType.FORMAT_PARQUET; @@ -411,6 +411,8 @@ public class BrokerScanNode extends LoadScanNode { return TFileFormatType.FORMAT_JSON; } else if (fileFormat.toLowerCase().equals("csv")) { return TFileFormatType.FORMAT_CSV_PLAIN; + } else { + throw new UserException("Not supported file format: " + fileFormat); } } @@ -432,6 +434,10 @@ public class BrokerScanNode extends LoadScanNode { } } + public String getHostUri() throws UserException { + return ""; + } + // If fileFormat is not null, we use fileFormat instead of check file's suffix private void processFileGroup( ParamCreateContext context, @@ -440,11 +446,11 @@ public class BrokerScanNode extends LoadScanNode { if (fileStatuses == null || fileStatuses.isEmpty()) { return; } + // set hdfs params, used to Hive and Iceberg scan THdfsParams tHdfsParams = new THdfsParams(); - if (this instanceof HiveScanNode) { - String fsName = ((HiveScanNode) this).getHdfsUri(); - tHdfsParams.setFsName(fsName); - } + String fsName = getHostUri(); + tHdfsParams.setFsName(fsName); + TScanRangeLocations curLocations = newLocations(context.params, brokerDesc); long curInstanceBytes = 0; long curFileOffset = 0; @@ -477,10 +483,8 @@ public class BrokerScanNode extends LoadScanNode { } else { TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, leftBytes, columnsFromPath, numberOfColumnsFromFile, brokerDesc); - if (this instanceof HiveScanNode) { - rangeDesc.setHdfsParams(tHdfsParams); - rangeDesc.setReadByColumnDef(true); - } + rangeDesc.setHdfsParams(tHdfsParams); + rangeDesc.setReadByColumnDef(true); brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset = 0; i++; @@ -502,10 +506,8 @@ public class BrokerScanNode extends LoadScanNode { rangeDesc.setNumAsString(context.fileGroup.isNumAsString()); rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine()); } - if (this instanceof HiveScanNode) { - rangeDesc.setHdfsParams(tHdfsParams); - rangeDesc.setReadByColumnDef(true); - } + rangeDesc.setHdfsParams(tHdfsParams); + rangeDesc.setReadByColumnDef(true); brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset = 0; curInstanceBytes += leftBytes; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 8c209a6..eb099c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -40,6 +40,7 @@ import org.apache.doris.thrift.TPartitionType; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -50,7 +51,6 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import avro.shaded.com.google.common.collect.Maps; /** * The distributed planner is responsible for creating an executable, distributed plan diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java index 2619079..a67f539 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java @@ -70,7 +70,7 @@ public class HiveScanNode extends BrokerScanNode { private List<String> partitionKeys = new ArrayList<>(); /* hive table properties */ - public String getHdfsUri() { + public String getHostUri() { return hdfsUri; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java new file mode 100644 index 0000000..5428c9e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.IcebergProperty; +import org.apache.doris.catalog.IcebergTable; +import org.apache.doris.common.UserException; +import org.apache.doris.external.iceberg.util.IcebergUtils; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TExplainLevel; + +import org.apache.iceberg.expressions.Expression; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +public class IcebergScanNode extends BrokerScanNode { + private static final Logger LOG = LogManager.getLogger(IcebergScanNode.class); + + private IcebergTable icebergTable; + private final List<Expression> icebergPredicates = new ArrayList<>(); + + public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, + List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) { + super(id, desc, planNodeName, fileStatusesList, filesAdded); + icebergTable = (IcebergTable) desc.getTable(); + } + + @Override + public void init(Analyzer analyzer) throws UserException { + super.init(analyzer); + } + + @Override + protected void initFileGroup() throws UserException { + fileGroups = Lists.newArrayList(new BrokerFileGroup(icebergTable)); + brokerDesc = new BrokerDesc("IcebergTableDesc", icebergTable.getStorageType(), + icebergTable.getIcebergProperties()); + targetTable = icebergTable; + } + + @Override + public String getHostUri() throws UserException { + return icebergTable.getHostUri(); + } + + @Override + protected void getFileStatus() throws UserException { + // extract iceberg conjuncts + ListIterator<Expr> it = conjuncts.listIterator(); + while (it.hasNext()) { + Expression expression = IcebergUtils.convertToIcebergExpr(it.next()); + if (expression != null) { + icebergPredicates.add(expression); + } + } + // get iceberg file status + List<TBrokerFileStatus> fileStatuses; + try { + fileStatuses = icebergTable.getIcebergDataFiles(icebergPredicates); + } catch (Exception e) { + LOG.warn("errors while load iceberg table {} data files.", icebergTable.getName(), e); + throw new UserException("errors while load Iceberg table [" + + icebergTable.getName() + "] data files."); + } + fileStatusesList.add(fileStatuses); + filesAdded += fileStatuses.size(); + for (TBrokerFileStatus fstatus : fileStatuses) { + LOG.debug("Add file status is {}", fstatus); + } + } + + @Override + public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { + StringBuilder output = new StringBuilder(); + if (!isLoad()) { + output.append(prefix).append("TABLE: ").append(icebergTable.getName()).append("\n"); + output.append(prefix).append("PATH: ") + .append(icebergTable.getIcebergProperties().get(IcebergProperty.ICEBERG_HIVE_METASTORE_URIS)) + .append("\n"); + } + return output.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index b3aaf2a..8414da0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1698,6 +1698,10 @@ public class SingleNodePlanner { scanNode = new HiveScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), "HiveScanNode", null, -1); break; + case ICEBERG: + scanNode = new IcebergScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), "IcebergScanNode", + null, -1); + break; default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java index c940e0d..818e116 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java @@ -17,8 +17,6 @@ package org.apache.doris.qe; -import avro.shaded.com.google.common.collect.Maps; -import avro.shaded.com.google.common.collect.Sets; import org.apache.doris.common.AuditLog; import org.apache.doris.common.Config; import org.apache.doris.common.util.DigitalVersion; @@ -34,6 +32,9 @@ import org.apache.doris.plugin.PluginMgr; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + import java.lang.reflect.Field; import java.util.Map; import java.util.Set; diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java index 13a32d5..bda5ea6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java @@ -17,14 +17,11 @@ package org.apache.doris.catalog; -import avro.shaded.com.google.common.collect.Lists; -import org.apache.commons.collections.ListUtils; import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableLikeStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.common.DdlException; import org.apache.doris.common.ExceptionChecker; -import org.apache.doris.common.util.ListUtil; import org.apache.doris.qe.ConnectContext; import org.apache.doris.utframe.UtFrameUtils; @@ -33,12 +30,12 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.collect.Lists; + import java.io.File; import java.util.List; import java.util.UUID; -import avro.shaded.com.google.common.collect.Lists; - /** * @author wangcong * @version 1.0 diff --git a/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java index e5a1885..80edee5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java @@ -30,6 +30,8 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.collect.Maps; + import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -38,7 +40,6 @@ import java.io.IOException; import java.nio.file.Files; import java.util.UUID; -import avro.shaded.com.google.common.collect.Maps; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; diff --git a/fe/pom.xml b/fe/pom.xml index 80903c6..8e13dcb 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -120,7 +120,12 @@ under the License. <log4j2.version>2.17.1</log4j2.version> <revision>0.15-SNAPSHOT</revision> <project.scm.id>github</project.scm.id> + <!-- ATTN: avro version must be consistent with Iceberg version --> + <!-- Please modify iceberg.version and avro.version together, + you can find avro version info in iceberg mvn repository --> <iceberg.version>0.12.0</iceberg.version> + <avro.version>1.10.1</avro.version> + <!-- ATTN: avro version must be consistent with Iceberg version --> </properties> <profiles> <!-- for custom internal repository --> @@ -706,6 +711,13 @@ under the License. <version>${iceberg.version}</version> </dependency> + <!-- For Iceberg, must be consistent with Iceberg version --> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> + <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-column</artifactId> diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index c1c8487..88992a7 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -357,7 +357,9 @@ enum TTableType { KUDU_TABLE, // Deprecated BROKER_TABLE, ES_TABLE, - ODBC_TABLE + ODBC_TABLE, + HIVE_TABLE, + ICEBERG_TABLE } enum TOdbcTableType { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org