This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 46d88ede02 [Refactor](Metadata tvf) Reconstruct Metadata table-value function into a more general framework. (#17590) 46d88ede02 is described below commit 46d88ede024d1cb8c31837c54c1ac721b35e6966 Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Fri Mar 17 19:54:50 2023 +0800 [Refactor](Metadata tvf) Reconstruct Metadata table-value function into a more general framework. (#17590) --- be/src/vec/exec/scan/vmeta_scanner.cpp | 62 +++--- be/src/vec/exec/scan/vmeta_scanner.h | 5 +- .../org/apache/doris/planner/DataGenScanNode.java | 5 +- .../doris/planner/external/MetadataScanNode.java | 30 +-- .../apache/doris/service/FrontendServiceImpl.java | 211 +----------------- .../tablefunction/DataGenTableValuedFunction.java | 8 + .../tablefunction/IcebergTableValuedFunction.java | 57 +++-- .../doris/tablefunction/MetadataGenerator.java | 248 +++++++++++++++++++++ .../tablefunction/MetadataTableValuedFunction.java | 18 +- .../tablefunction/NumbersTableValuedFunction.java | 9 - gensrc/thrift/FrontendService.thrift | 10 +- gensrc/thrift/PlanNodes.thrift | 15 +- gensrc/thrift/Types.thrift | 8 + 13 files changed, 368 insertions(+), 318 deletions(-) diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp index 86e60edf11..50877d5558 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.cpp +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -31,7 +31,6 @@ VMetaScanner::VMetaScanner(RuntimeState* state, VMetaScanNode* parent, int64_t t const TScanRangeParams& scan_range, int64_t limit, RuntimeProfile* profile) : VScanner(state, static_cast<VScanNode*>(parent), limit, profile), - _parent(parent), _meta_eos(false), _tuple_id(tuple_id), _scan_range(scan_range.scan_range) {} @@ -46,11 +45,7 @@ Status VMetaScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_p VLOG_CRITICAL << "VMetaScanner::prepare"; RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr)); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); - if (_scan_range.meta_scan_range.__isset.iceberg_params) { - RETURN_IF_ERROR(_fetch_iceberg_metadata_batch()); - } else { - _meta_eos = true; - } + RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range)); return Status::OK(); } @@ -158,37 +153,28 @@ Status VMetaScanner::_fill_block_with_remote_data(const std::vector<MutableColum return Status::OK(); } -Status VMetaScanner::_fetch_iceberg_metadata_batch() { - VLOG_CRITICAL << "VMetaScanner::_fetch_iceberg_metadata_batch"; +Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { + VLOG_CRITICAL << "VMetaScanner::_fetch_metadata"; TFetchSchemaTableDataRequest request; - request.cluster_name = ""; - request.__isset.cluster_name = true; - request.schema_table_name = TSchemaTableName::ICEBERG_TABLE_META; - request.__isset.schema_table_name = true; - auto scan_params = _parent->scan_params(); - TMetadataTableRequestParams meta_table_params = TMetadataTableRequestParams(); - meta_table_params.catalog = scan_params.catalog; - meta_table_params.__isset.catalog = true; - meta_table_params.database = scan_params.database; - meta_table_params.__isset.database = true; - meta_table_params.table = scan_params.table; - meta_table_params.__isset.table = true; - - meta_table_params.iceberg_metadata_params = _scan_range.meta_scan_range.iceberg_params; - meta_table_params.__isset.iceberg_metadata_params = true; - - request.metada_table_params = meta_table_params; - request.__isset.metada_table_params = true; + switch (meta_scan_range.metadata_type) { + case TMetadataType::ICEBERG: + RETURN_IF_ERROR(_build_iceberg_metadata_request(meta_scan_range, &request)); + break; + default: + _meta_eos = true; + return Status::OK(); + } + // _state->query_timeout() is seconds, change to milliseconds + int time_out = _state->query_timeout() * 1000; TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; TFetchSchemaTableDataResult result; - RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( master_addr.hostname, master_addr.port, [&request, &result](FrontendServiceConnection& client) { client->fetchSchemaTableData(result, request); }, - config::txn_commit_rpc_timeout_ms)); + time_out)); Status status(result.status); if (!status.ok()) { @@ -199,6 +185,26 @@ Status VMetaScanner::_fetch_iceberg_metadata_batch() { return Status::OK(); } +Status VMetaScanner::_build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range, + TFetchSchemaTableDataRequest* request) { + VLOG_CRITICAL << "VMetaScanner::_build_iceberg_metadata_request"; + if (!meta_scan_range.__isset.iceberg_params) { + return Status::InternalError("Can not find TIcebergMetadataParams from meta_scan_range."); + } + + // create request + request->__set_cluster_name(""); + request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); + + // create TMetadataTableRequestParams + TMetadataTableRequestParams metadata_table_params; + metadata_table_params.__set_metadata_type(TMetadataType::ICEBERG); + metadata_table_params.__set_iceberg_metadata_params(meta_scan_range.iceberg_params); + + request->__set_metada_table_params(metadata_table_params); + return Status::OK(); +} + Status VMetaScanner::close(RuntimeState* state) { VLOG_CRITICAL << "VMetaScanner::close"; RETURN_IF_ERROR(VScanner::close(state)); diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h index 63cd50fe10..ffb6ff8d6c 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.h +++ b/be/src/vec/exec/scan/vmeta_scanner.h @@ -35,10 +35,11 @@ public: protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; Status _fill_block_with_remote_data(const std::vector<MutableColumnPtr>& columns); - Status _fetch_iceberg_metadata_batch(); + Status _fetch_metadata(const TMetaScanRange& meta_scan_range); + Status _build_iceberg_metadata_request(const TMetaScanRange& meta_scan_range, + TFetchSchemaTableDataRequest* request); private: - VMetaScanNode* _parent; bool _meta_eos; TupleId _tuple_id; const TupleDescriptor* _tuple_desc; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java index 94c5c07028..cee9007b27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java @@ -48,9 +48,8 @@ public class DataGenScanNode extends ScanNode { private DataGenTableValuedFunction tvf; private boolean isFinalized = false; - public DataGenScanNode(PlanNodeId id, TupleDescriptor desc, - String planNodeName, DataGenTableValuedFunction tvf) { - super(id, desc, planNodeName, StatisticalType.TABLE_VALUED_FUNCTION_NODE); + public DataGenScanNode(PlanNodeId id, TupleDescriptor desc, DataGenTableValuedFunction tvf) { + super(id, desc, "DataGenScanNode", StatisticalType.TABLE_VALUED_FUNCTION_NODE); this.tvf = tvf; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java index aa9c840197..bcb34b93ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java @@ -24,12 +24,8 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.system.Backend; -import org.apache.doris.tablefunction.IcebergTableValuedFunction; import org.apache.doris.tablefunction.MetadataTableValuedFunction; -import org.apache.doris.thrift.TIcebergMetadataParams; -import org.apache.doris.thrift.TIcebergMetadataType; import org.apache.doris.thrift.TMetaScanNode; -import org.apache.doris.thrift.TMetaScanRange; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; @@ -64,10 +60,8 @@ public class MetadataScanNode extends ScanNode { protected void toThrift(TPlanNode planNode) { planNode.setNodeType(TPlanNodeType.META_SCAN_NODE); TMetaScanNode metaScanNode = new TMetaScanNode(); - metaScanNode.setCatalog(tvf.getMetadataTableName().getCtl()); - metaScanNode.setDatabase(tvf.getMetadataTableName().getDb()); - metaScanNode.setTable(tvf.getMetadataTableName().getTbl()); metaScanNode.setTupleId(desc.getId().asInt()); + metaScanNode.setMetadataType(this.tvf.getMetadataType()); planNode.setMetaScanNode(metaScanNode); } @@ -83,28 +77,18 @@ public class MetadataScanNode extends ScanNode { @Override public boolean needToCheckColumnPriv() { - return super.needToCheckColumnPriv(); + return false; } private void buildScanRanges() { - if (tvf.getMetaType() == MetadataTableValuedFunction.MetaType.ICEBERG) { - IcebergTableValuedFunction icebergTvf = (IcebergTableValuedFunction) tvf; - // todo: split - TScanRangeLocations locations = createIcebergTvfLocations(icebergTvf); - scanRangeLocations.add(locations); - } + // todo: split + TScanRangeLocations locations = createMetaDataTvfLocations(); + scanRangeLocations.add(locations); } - private TScanRangeLocations createIcebergTvfLocations(IcebergTableValuedFunction icebergTvf) { + private TScanRangeLocations createMetaDataTvfLocations() { TScanRange scanRange = new TScanRange(); - TMetaScanRange metaScanRange = new TMetaScanRange(); - // set iceberg metadata params - TIcebergMetadataParams icebergMetadataParams = new TIcebergMetadataParams(); - int metadataType = icebergTvf.getMetaQueryType().ordinal(); - icebergMetadataParams.setMetadataType(TIcebergMetadataType.findByValue(metadataType)); - - metaScanRange.setIcebergParams(icebergMetadataParams); - scanRange.setMetaScanRange(metaScanRange); + scanRange.setMetaScanRange(tvf.getMetaScanRange()); // set location TScanRangeLocation location = new TScanRangeLocation(); Backend backend = backendPolicy.getNextBe(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 911f2b5f15..58c2eb479e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -17,7 +17,6 @@ package org.apache.doris.service; -import org.apache.doris.alter.DecommissionType; import org.apache.doris.alter.SchemaChangeHandler; import org.apache.doris.analysis.AddColumnsClause; import org.apache.doris.analysis.ColumnDef; @@ -29,7 +28,6 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.HMSResource; import org.apache.doris.catalog.Index; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Replica; @@ -39,7 +37,6 @@ import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.catalog.external.ExternalDatabase; -import org.apache.doris.cluster.Cluster; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.AuthenticationException; @@ -56,11 +53,9 @@ import org.apache.doris.common.ThriftServerEventProcessor; import org.apache.doris.common.UserException; import org.apache.doris.common.Version; import org.apache.doris.common.annotation.LogException; -import org.apache.doris.common.util.TimeUtils; import org.apache.doris.cooldown.CooldownDelete; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalCatalog; -import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.master.MasterImpl; import org.apache.doris.mysql.privilege.AccessControllerManager; @@ -71,15 +66,14 @@ import org.apache.doris.qe.ConnectProcessor; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.VariableMgr; -import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.tablefunction.MetadataGenerator; import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.FrontendServiceVersion; import org.apache.doris.thrift.TAddColumnsRequest; import org.apache.doris.thrift.TAddColumnsResult; -import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TCheckAuthRequest; import org.apache.doris.thrift.TCheckAuthResult; import org.apache.doris.thrift.TColumn; @@ -104,7 +98,6 @@ import org.apache.doris.thrift.TGetDbsParams; import org.apache.doris.thrift.TGetDbsResult; import org.apache.doris.thrift.TGetTablesParams; import org.apache.doris.thrift.TGetTablesResult; -import org.apache.doris.thrift.TIcebergMetadataType; import org.apache.doris.thrift.TInitExternalCtlMetaRequest; import org.apache.doris.thrift.TInitExternalCtlMetaResult; import org.apache.doris.thrift.TListPrivilegesResult; @@ -120,7 +113,6 @@ import org.apache.doris.thrift.TLoadTxnRollbackResult; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TMasterResult; -import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMySqlLoadAcquireTokenResult; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPrivilegeCtrl; @@ -130,7 +122,6 @@ import org.apache.doris.thrift.TPrivilegeType; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; import org.apache.doris.thrift.TReportRequest; -import org.apache.doris.thrift.TRow; import org.apache.doris.thrift.TShowVariableRequest; import org.apache.doris.thrift.TShowVariableResult; import org.apache.doris.thrift.TSnapshotLoaderReportRequest; @@ -150,22 +141,14 @@ import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.doris.transaction.TxnCommitAttachment; import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.gson.Gson; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; -import org.jetbrains.annotations.NotNull; import java.time.Instant; -import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.ArrayList; @@ -1352,197 +1335,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { public TFetchSchemaTableDataResult fetchSchemaTableData(TFetchSchemaTableDataRequest request) throws TException { switch (request.getSchemaTableName()) { case BACKENDS: - return getBackendsSchemaTable(request); - case ICEBERG_TABLE_META: - return getIcebergMetadataTable(request); + return MetadataGenerator.getBackendsSchemaTable(request); + case METADATA_TABLE: + return MetadataGenerator.getMetadataTable(request); default: break; } - TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); - result.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR)); - return result; - } - - private TFetchSchemaTableDataResult getIcebergMetadataTable(TFetchSchemaTableDataRequest request) { - if (!request.isSetMetadaTableParams()) { - return errorResult("Metadata table params is not set. "); - } - TMetadataTableRequestParams params = request.getMetadaTableParams(); - if (!params.isSetIcebergMetadataParams()) { - return errorResult("Iceberg metadata params is not set. "); - } - - HMSExternalCatalog catalog = (HMSExternalCatalog) Env.getCurrentEnv().getCatalogMgr() - .getCatalog(params.getCatalog()); - org.apache.iceberg.Table table; - try { - table = getIcebergTable(catalog, params.getDatabase(), params.getTable()); - } catch (MetaNotFoundException e) { - return errorResult(e.getMessage()); - } - TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); - List<TRow> dataBatch = Lists.newArrayList(); - TIcebergMetadataType metadataType = params.getIcebergMetadataParams().getMetadataType(); - switch (metadataType) { - case SNAPSHOTS: - for (Snapshot snapshot : table.snapshots()) { - TRow trow = new TRow(); - LocalDateTime committedAt = LocalDateTime.ofInstant(Instant.ofEpochMilli( - snapshot.timestampMillis()), TimeUtils.getTimeZone().toZoneId()); - long encodedDatetime = convertToDateTimeV2(committedAt.getYear(), committedAt.getMonthValue(), - committedAt.getDayOfMonth(), committedAt.getHour(), - committedAt.getMinute(), committedAt.getSecond()); - - trow.addToColumnValue(new TCell().setLongVal(encodedDatetime)); - trow.addToColumnValue(new TCell().setLongVal(snapshot.snapshotId())); - if (snapshot.parentId() == null) { - trow.addToColumnValue(new TCell().setLongVal(-1L)); - } else { - trow.addToColumnValue(new TCell().setLongVal(snapshot.parentId())); - } - trow.addToColumnValue(new TCell().setStringVal(snapshot.operation())); - trow.addToColumnValue(new TCell().setStringVal(snapshot.manifestListLocation())); - dataBatch.add(trow); - } - break; - default: - return errorResult("Unsupported metadata inspect type: " + metadataType); - } - result.setDataBatch(dataBatch); - result.setStatus(new TStatus(TStatusCode.OK)); - return result; - } - - public static long convertToDateTimeV2(int year, int month, int day, int hour, int minute, int second) { - return (long) second << 20 | (long) minute << 26 | (long) hour << 32 - | (long) day << 37 | (long) month << 42 | (long) year << 46; - } - - @NotNull - private TFetchSchemaTableDataResult errorResult(String msg) { - TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); - result.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR)); - result.status.addToErrorMsgs(msg); - return result; - } - - private org.apache.iceberg.Table getIcebergTable(HMSExternalCatalog catalog, String db, String tbl) - throws MetaNotFoundException { - org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); - Configuration conf = new HdfsConfiguration(); - Map<String, String> properties = catalog.getCatalogProperty().getHadoopProperties(); - for (Map.Entry<String, String> entry : properties.entrySet()) { - conf.set(entry.getKey(), entry.getValue()); - } - hiveCatalog.setConf(conf); - Map<String, String> catalogProperties = new HashMap<>(); - catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, catalog.getHiveMetastoreUris()); - catalogProperties.put("uri", catalog.getHiveMetastoreUris()); - hiveCatalog.initialize("hive", catalogProperties); - return hiveCatalog.loadTable(TableIdentifier.of(db, tbl)); - } - - private TFetchSchemaTableDataResult getBackendsSchemaTable(TFetchSchemaTableDataRequest request) { - final SystemInfoService clusterInfoService = Env.getCurrentSystemInfo(); - TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); - List<Long> backendIds = null; - if (!Strings.isNullOrEmpty(request.cluster_name)) { - final Cluster cluster = Env.getCurrentEnv().getCluster(request.cluster_name); - // root not in any cluster - if (null == cluster) { - return result; - } - backendIds = cluster.getBackendIdList(); - } else { - backendIds = clusterInfoService.getBackendIds(false); - if (backendIds == null) { - return result; - } - } - - long start = System.currentTimeMillis(); - Stopwatch watch = Stopwatch.createUnstarted(); - - List<TRow> dataBatch = Lists.newArrayList(); - for (long backendId : backendIds) { - Backend backend = clusterInfoService.getBackend(backendId); - if (backend == null) { - continue; - } - - watch.start(); - Integer tabletNum = Env.getCurrentInvertedIndex().getTabletNumByBackendId(backendId); - watch.stop(); - - TRow trow = new TRow(); - trow.addToColumnValue(new TCell().setLongVal(backendId)); - trow.addToColumnValue(new TCell().setStringVal(backend.getOwnerClusterName())); - trow.addToColumnValue(new TCell().setStringVal(backend.getIp())); - if (Strings.isNullOrEmpty(request.cluster_name)) { - trow.addToColumnValue(new TCell().setIntVal(backend.getHeartbeatPort())); - trow.addToColumnValue(new TCell().setIntVal(backend.getBePort())); - trow.addToColumnValue(new TCell().setIntVal(backend.getHttpPort())); - trow.addToColumnValue(new TCell().setIntVal(backend.getBrpcPort())); - } - trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastStartTime()))); - trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastUpdateMs()))); - trow.addToColumnValue(new TCell().setStringVal(String.valueOf(backend.isAlive()))); - if (backend.isDecommissioned() && backend.getDecommissionType() == DecommissionType.ClusterDecommission) { - trow.addToColumnValue(new TCell().setStringVal("false")); - trow.addToColumnValue(new TCell().setStringVal("true")); - } else if (backend.isDecommissioned() - && backend.getDecommissionType() == DecommissionType.SystemDecommission) { - trow.addToColumnValue(new TCell().setStringVal("true")); - trow.addToColumnValue(new TCell().setStringVal("false")); - } else { - trow.addToColumnValue(new TCell().setStringVal("false")); - trow.addToColumnValue(new TCell().setStringVal("false")); - } - trow.addToColumnValue(new TCell().setLongVal(tabletNum)); - - // capacity - // data used - trow.addToColumnValue(new TCell().setLongVal(backend.getDataUsedCapacityB())); - - // available - long availB = backend.getAvailableCapacityB(); - trow.addToColumnValue(new TCell().setLongVal(availB)); - - // total - long totalB = backend.getTotalCapacityB(); - trow.addToColumnValue(new TCell().setLongVal(totalB)); - - // used percent - double used = 0.0; - if (totalB <= 0) { - used = 0.0; - } else { - used = (double) (totalB - availB) * 100 / totalB; - } - trow.addToColumnValue(new TCell().setDoubleVal(used)); - trow.addToColumnValue(new TCell().setDoubleVal(backend.getMaxDiskUsedPct() * 100)); - - // remote used capacity - trow.addToColumnValue(new TCell().setLongVal(backend.getRemoteUsedCapacityB())); - - // tags - trow.addToColumnValue(new TCell().setStringVal(backend.getTagMapString())); - // err msg - trow.addToColumnValue(new TCell().setStringVal(backend.getHeartbeatErrMsg())); - // version - trow.addToColumnValue(new TCell().setStringVal(backend.getVersion())); - // status - trow.addToColumnValue(new TCell().setStringVal(new Gson().toJson(backend.getBackendStatus()))); - dataBatch.add(trow); - } - - // backends proc node get result too slow, add log to observer. - LOG.debug("backends proc get tablet num cost: {}, total cost: {}", - watch.elapsed(TimeUnit.MILLISECONDS), (System.currentTimeMillis() - start)); - - result.setDataBatch(dataBatch); - result.setStatus(new TStatus(TStatusCode.OK)); - return result; + return MetadataGenerator.errorResult("Fetch schema table name is not set"); } private TNetworkAddress getClientAddr() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java index ab3579e9d8..fc2a6d6dd3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java @@ -17,7 +17,11 @@ package org.apache.doris.tablefunction; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.common.AnalysisException; +import org.apache.doris.planner.DataGenScanNode; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.ScanNode; import org.apache.doris.thrift.TDataGenFunctionName; import java.util.List; @@ -27,4 +31,8 @@ public abstract class DataGenTableValuedFunction extends TableValuedFunctionIf { public abstract TDataGenFunctionName getDataGenFunctionName(); + @Override + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { + return new DataGenScanNode(id, desc, this); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java index 9b58119ddd..38d11496e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java @@ -26,6 +26,10 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TIcebergMetadataParams; +import org.apache.doris.thrift.TIcebergQueryType; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; @@ -40,8 +44,6 @@ import java.util.Map; */ public class IcebergTableValuedFunction extends MetadataTableValuedFunction { - public enum MetadataType { SNAPSHOTS } - public static final String NAME = "iceberg_meta"; private static final String TABLE = "table"; private static final String QUERY_TYPE = "query_type"; @@ -51,54 +53,71 @@ public class IcebergTableValuedFunction extends MetadataTableValuedFunction { .add(QUERY_TYPE) .build(); - private final MetadataType queryType; - private final TableName tableName; + private TIcebergQueryType queryType; + + // here tableName represents the name of a table in Iceberg. + private final TableName icebergTableName; public IcebergTableValuedFunction(Map<String, String> params) throws AnalysisException { - super(MetaType.ICEBERG); Map<String, String> validParams = Maps.newHashMap(); for (String key : params.keySet()) { if (!PROPERTIES_SET.contains(key.toLowerCase())) { throw new AnalysisException("'" + key + "' is invalid property"); } - // check ctl db tbl + // check ctl, db, tbl validParams.put(key.toLowerCase(), params.get(key)); } String tableName = validParams.get(TABLE); - String queryType = validParams.get(QUERY_TYPE); - if (tableName == null || queryType == null) { + String queryTypeString = validParams.get(QUERY_TYPE); + if (tableName == null || queryTypeString == null) { throw new AnalysisException("Invalid iceberg metadata query"); } String[] names = tableName.split("\\."); if (names.length != 3) { throw new AnalysisException("The iceberg table name contains the catalogName, databaseName, and tableName"); } - this.tableName = new TableName(names[0], names[1], names[2]); + this.icebergTableName = new TableName(names[0], names[1], names[2]); // check auth if (!Env.getCurrentEnv().getAccessManager() - .checkTblPriv(ConnectContext.get(), this.tableName, PrivPredicate.SELECT)) { + .checkTblPriv(ConnectContext.get(), this.icebergTableName, PrivPredicate.SELECT)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "SELECT", ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), - this.tableName.getDb() + ": " + this.tableName.getTbl()); + this.icebergTableName.getDb() + ": " + this.icebergTableName.getTbl()); } try { - this.queryType = MetadataType.valueOf(queryType.toUpperCase()); + // TODO(ftw): check here + this.queryType = TIcebergQueryType.valueOf(queryTypeString.toUpperCase()); } catch (IllegalArgumentException e) { throw new AnalysisException("Unsupported iceberg metadata query type: " + queryType); } } + public TIcebergQueryType getIcebergQueryType() { + return queryType; + } + @Override - public String getTableName() { - return "IcebergMetadataTableValuedFunction"; + public TMetadataType getMetadataType() { + return TMetadataType.ICEBERG; } - public TableName getMetadataTableName() { - return tableName; + @Override + public TMetaScanRange getMetaScanRange() { + TMetaScanRange metaScanRange = new TMetaScanRange(); + metaScanRange.setMetadataType(TMetadataType.ICEBERG); + // set iceberg metadata params + TIcebergMetadataParams icebergMetadataParams = new TIcebergMetadataParams(); + icebergMetadataParams.setIcebergQueryType(queryType); + icebergMetadataParams.setCatalog(icebergTableName.getCtl()); + icebergMetadataParams.setDatabase(icebergTableName.getDb()); + icebergMetadataParams.setTable(icebergTableName.getTbl()); + metaScanRange.setIcebergParams(icebergMetadataParams); + return metaScanRange; } - public MetadataType getMetaQueryType() { - return queryType; + @Override + public String getTableName() { + return "IcebergMetadataTableValuedFunction"; } /** @@ -110,7 +129,7 @@ public class IcebergTableValuedFunction extends MetadataTableValuedFunction { @Override public List<Column> getTableColumns() throws AnalysisException { List<Column> resColumns = new ArrayList<>(); - if (queryType == MetadataType.SNAPSHOTS) { + if (queryType == TIcebergQueryType.SNAPSHOTS) { resColumns.add(new Column("committed_at", PrimitiveType.DATETIMEV2, false)); resColumns.add(new Column("snapshot_id", PrimitiveType.BIGINT, false)); resColumns.add(new Column("parent_id", PrimitiveType.BIGINT, false)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java new file mode 100644 index 0000000000..20a5cd3a00 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -0,0 +1,248 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.alter.DecommissionType; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HMSResource; +import org.apache.doris.cluster.Cluster; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TCell; +import org.apache.doris.thrift.TFetchSchemaTableDataRequest; +import org.apache.doris.thrift.TFetchSchemaTableDataResult; +import org.apache.doris.thrift.TIcebergMetadataParams; +import org.apache.doris.thrift.TIcebergQueryType; +import org.apache.doris.thrift.TMetadataTableRequestParams; +import org.apache.doris.thrift.TRow; +import org.apache.doris.thrift.TStatus; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.base.Stopwatch; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class MetadataGenerator { + private static final Logger LOG = LogManager.getLogger(MetadataGenerator.class); + + public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableDataRequest request) { + if (!request.isSetMetadaTableParams()) { + return errorResult("Metadata table params is not set. "); + } + switch (request.getMetadaTableParams().getMetadataType()) { + case ICEBERG: + return icebergMetadataResult(request.getMetadaTableParams()); + default: + break; + } + return errorResult("Metadata table params is not set. "); + } + + public static TFetchSchemaTableDataResult getBackendsSchemaTable(TFetchSchemaTableDataRequest request) { + final SystemInfoService clusterInfoService = Env.getCurrentSystemInfo(); + List<Long> backendIds = null; + if (!Strings.isNullOrEmpty(request.cluster_name)) { + final Cluster cluster = Env.getCurrentEnv().getCluster(request.cluster_name); + // root not in any cluster + if (null == cluster) { + return errorResult("Cluster is not existed."); + } + backendIds = cluster.getBackendIdList(); + } else { + backendIds = clusterInfoService.getBackendIds(false); + } + + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + long start = System.currentTimeMillis(); + Stopwatch watch = Stopwatch.createUnstarted(); + + List<TRow> dataBatch = Lists.newArrayList(); + for (long backendId : backendIds) { + Backend backend = clusterInfoService.getBackend(backendId); + if (backend == null) { + continue; + } + + watch.start(); + Integer tabletNum = Env.getCurrentInvertedIndex().getTabletNumByBackendId(backendId); + watch.stop(); + + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setLongVal(backendId)); + trow.addToColumnValue(new TCell().setStringVal(backend.getOwnerClusterName())); + trow.addToColumnValue(new TCell().setStringVal(backend.getIp())); + if (Strings.isNullOrEmpty(request.cluster_name)) { + trow.addToColumnValue(new TCell().setIntVal(backend.getHeartbeatPort())); + trow.addToColumnValue(new TCell().setIntVal(backend.getBePort())); + trow.addToColumnValue(new TCell().setIntVal(backend.getHttpPort())); + trow.addToColumnValue(new TCell().setIntVal(backend.getBrpcPort())); + } + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastStartTime()))); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastUpdateMs()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(backend.isAlive()))); + if (backend.isDecommissioned() && backend.getDecommissionType() == DecommissionType.ClusterDecommission) { + trow.addToColumnValue(new TCell().setStringVal("false")); + trow.addToColumnValue(new TCell().setStringVal("true")); + } else if (backend.isDecommissioned() + && backend.getDecommissionType() == DecommissionType.SystemDecommission) { + trow.addToColumnValue(new TCell().setStringVal("true")); + trow.addToColumnValue(new TCell().setStringVal("false")); + } else { + trow.addToColumnValue(new TCell().setStringVal("false")); + trow.addToColumnValue(new TCell().setStringVal("false")); + } + trow.addToColumnValue(new TCell().setLongVal(tabletNum)); + + // capacity + // data used + trow.addToColumnValue(new TCell().setLongVal(backend.getDataUsedCapacityB())); + + // available + long availB = backend.getAvailableCapacityB(); + trow.addToColumnValue(new TCell().setLongVal(availB)); + + // total + long totalB = backend.getTotalCapacityB(); + trow.addToColumnValue(new TCell().setLongVal(totalB)); + + // used percent + double used = 0.0; + if (totalB <= 0) { + used = 0.0; + } else { + used = (double) (totalB - availB) * 100 / totalB; + } + trow.addToColumnValue(new TCell().setDoubleVal(used)); + trow.addToColumnValue(new TCell().setDoubleVal(backend.getMaxDiskUsedPct() * 100)); + + // remote used capacity + trow.addToColumnValue(new TCell().setLongVal(backend.getRemoteUsedCapacityB())); + + // tags + trow.addToColumnValue(new TCell().setStringVal(backend.getTagMapString())); + // err msg + trow.addToColumnValue(new TCell().setStringVal(backend.getHeartbeatErrMsg())); + // version + trow.addToColumnValue(new TCell().setStringVal(backend.getVersion())); + // status + trow.addToColumnValue(new TCell().setStringVal(new Gson().toJson(backend.getBackendStatus()))); + dataBatch.add(trow); + } + + // backends proc node get result too slow, add log to observer. + LOG.debug("backends proc get tablet num cost: {}, total cost: {}", + watch.elapsed(TimeUnit.MILLISECONDS), (System.currentTimeMillis() - start)); + + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + + @NotNull + public static TFetchSchemaTableDataResult errorResult(String msg) { + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + result.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR)); + result.status.addToErrorMsgs(msg); + return result; + } + + private static TFetchSchemaTableDataResult icebergMetadataResult(TMetadataTableRequestParams params) { + if (!params.isSetIcebergMetadataParams()) { + return errorResult("Iceberg metadata params is not set. "); + } + TIcebergMetadataParams icebergMetadataParams = params.getIcebergMetadataParams(); + HMSExternalCatalog catalog = (HMSExternalCatalog) Env.getCurrentEnv().getCatalogMgr() + .getCatalog(icebergMetadataParams.getCatalog()); + org.apache.iceberg.Table table; + try { + table = getIcebergTable(catalog, icebergMetadataParams.getDatabase(), icebergMetadataParams.getTable()); + } catch (MetaNotFoundException e) { + return errorResult(e.getMessage()); + } + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + List<TRow> dataBatch = Lists.newArrayList(); + TIcebergQueryType icebergQueryType = icebergMetadataParams.getIcebergQueryType(); + switch (icebergQueryType) { + case SNAPSHOTS: + for (Snapshot snapshot : table.snapshots()) { + TRow trow = new TRow(); + LocalDateTime committedAt = LocalDateTime.ofInstant(Instant.ofEpochMilli( + snapshot.timestampMillis()), TimeUtils.getTimeZone().toZoneId()); + long encodedDatetime = convertToDateTimeV2(committedAt.getYear(), committedAt.getMonthValue(), + committedAt.getDayOfMonth(), committedAt.getHour(), + committedAt.getMinute(), committedAt.getSecond()); + + trow.addToColumnValue(new TCell().setLongVal(encodedDatetime)); + trow.addToColumnValue(new TCell().setLongVal(snapshot.snapshotId())); + if (snapshot.parentId() == null) { + trow.addToColumnValue(new TCell().setLongVal(-1L)); + } else { + trow.addToColumnValue(new TCell().setLongVal(snapshot.parentId())); + } + trow.addToColumnValue(new TCell().setStringVal(snapshot.operation())); + trow.addToColumnValue(new TCell().setStringVal(snapshot.manifestListLocation())); + dataBatch.add(trow); + } + break; + default: + return errorResult("Unsupported iceberg inspect type: " + icebergQueryType); + } + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + + private static org.apache.iceberg.Table getIcebergTable(HMSExternalCatalog catalog, String db, String tbl) + throws MetaNotFoundException { + org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); + Configuration conf = new HdfsConfiguration(); + Map<String, String> properties = catalog.getCatalogProperty().getHadoopProperties(); + for (Map.Entry<String, String> entry : properties.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + hiveCatalog.setConf(conf); + Map<String, String> catalogProperties = new HashMap<>(); + catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, catalog.getHiveMetastoreUris()); + catalogProperties.put("uri", catalog.getHiveMetastoreUris()); + hiveCatalog.initialize("hive", catalogProperties); + return hiveCatalog.loadTable(TableIdentifier.of(db, tbl)); + } + + private static long convertToDateTimeV2(int year, int month, int day, int hour, int minute, int second) { + return (long) second << 20 | (long) minute << 26 | (long) hour << 32 + | (long) day << 37 | (long) month << 42 | (long) year << 46; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java index fd83c59957..0b30ea18c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java @@ -17,27 +17,17 @@ package org.apache.doris.tablefunction; -import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.external.MetadataScanNode; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; public abstract class MetadataTableValuedFunction extends TableValuedFunctionIf { + public abstract TMetadataType getMetadataType(); - public enum MetaType { ICEBERG } - - private final MetaType metaType; - - public MetadataTableValuedFunction(MetaType metaType) { - this.metaType = metaType; - } - - public MetaType getMetaType() { - return metaType; - } - - public abstract TableName getMetadataTableName(); + public abstract TMetaScanRange getMetaScanRange(); @Override public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java index 3e0e92b5b5..639dfeef35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java @@ -17,14 +17,10 @@ package org.apache.doris.tablefunction; -import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.AnalysisException; -import org.apache.doris.planner.DataGenScanNode; -import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.planner.ScanNode; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TDataGenFunctionName; import org.apache.doris.thrift.TDataGenScanRange; @@ -141,9 +137,4 @@ public class NumbersTableValuedFunction extends DataGenTableValuedFunction { } return res; } - - @Override - public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { - return new DataGenScanNode(id, desc, "DataGenScanNode", this); - } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 54b12fd8ac..1336d5e81d 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -715,16 +715,14 @@ struct TInitExternalCtlMetaResult { 2: optional string status; } -enum TSchemaTableName{ +enum TSchemaTableName { BACKENDS = 0, - ICEBERG_TABLE_META = 1, + METADATA_TABLE = 1, } struct TMetadataTableRequestParams { - 1: optional PlanNodes.TIcebergMetadataParams iceberg_metadata_params - 2: optional string catalog - 3: optional string database - 4: optional string table + 1: optional Types.TMetadataType metadata_type + 2: optional PlanNodes.TIcebergMetadataParams iceberg_metadata_params } struct TFetchSchemaTableDataRequest { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 978c90e103..e80372eb69 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -377,16 +377,17 @@ struct TDataGenScanRange { 1: optional TTVFNumbersScanRange numbers_params } -enum TIcebergMetadataType { - SNAPSHOTS = 0, -} struct TIcebergMetadataParams { - 1: optional TIcebergMetadataType metadata_type + 1: optional Types.TIcebergQueryType iceberg_query_type + 2: optional string catalog + 3: optional string database + 4: optional string table } struct TMetaScanRange { - 1: optional TIcebergMetadataParams iceberg_params + 1: optional Types.TMetadataType metadata_type + 2: optional TIcebergMetadataParams iceberg_params } // Specification of an individual data range which is held in its entirety @@ -532,9 +533,7 @@ struct TSchemaScanNode { struct TMetaScanNode { 1: required Types.TTupleId tuple_id - 2: optional string catalog - 3: optional string database - 4: optional string table + 2: optional Types.TMetadataType metadata_type } struct TTestExternalScanNode { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index aa1e00de98..e81f7fd002 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -657,6 +657,14 @@ enum TSortType { ZORDER, } +enum TMetadataType { + ICEBERG +} + +enum TIcebergQueryType { + SNAPSHOTS +} + // represent a user identity struct TUserIdentity { 1: optional string username --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org