morningman commented on code in PR #13959: URL: https://github.com/apache/doris/pull/13959#discussion_r1013701332
########## fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java: ########## @@ -40,15 +40,15 @@ /** * This scan node is used for table valued function. Review Comment: Change the comment ########## be/src/service/internal_service.cpp: ########## @@ -407,6 +410,78 @@ void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); } +void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* controller, + const PFetchTableSchemaRequest* request, + PFetchTableSchemaResult* result, + google::protobuf::Closure* done) { + VLOG_RPC << "fetch table schema"; + brpc::ClosureGuard closure_guard(done); + TFileScanRange file_scan_range; + Status st = Status::OK(); + { + const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data()); + uint32_t len = request->file_scan_range().size(); + st = deserialize_thrift_msg(buf, &len, false, &file_scan_range); + if (!st.ok()) { + LOG(WARNING) << "fetch table schema failed, errmsg=" << st.get_error_msg(); + st.to_protobuf(result->mutable_status()); + return; + } + } + + if (file_scan_range.__isset.ranges == false) { + st = Status::InternalError("can not get TFileRangeDesc."); + st.to_protobuf(result->mutable_status()); + return; + } + if (file_scan_range.__isset.params == false) { + st = Status::InternalError("can not get TFileScanRangeParams."); + st.to_protobuf(result->mutable_status()); + return; + } + const TFileRangeDesc& range = file_scan_range.ranges.at(0); + const TFileScanRangeParams& params = file_scan_range.params; + // file_slots is no use + std::vector<SlotDescriptor*> file_slots; + std::unique_ptr<vectorized::GenericReader> reader(nullptr); + std::unique_ptr<RuntimeProfile> profile(new RuntimeProfile("FetchTableSchema")); + switch (params.format_type) { + case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_CSV_DEFLATE: { + reader.reset(new vectorized::CsvReader(profile.get(), params, range, file_slots)); + break; + } + default: + st = Status::InternalError("Not supported file format in fetch table schema: {}", + params.format_type); + st.to_protobuf(result->mutable_status()); + return; + } + std::unordered_map<std::string, TypeDescriptor> name_to_col_type; + std::vector<std::string> col_names; + std::vector<TypeDescriptor> col_types; + st = reader->get_parsered_schema(&col_names, &col_types); + if (!st.ok()) { + LOG(WARNING) << "fetch table schema failed, errmsg=" << st.get_error_msg(); + st.to_protobuf(result->mutable_status()); + return; + } + result->set_column_nums(col_names.size()); + for (size_t idx = 0; idx < col_names.size(); ++idx) { + result->add_column_names(col_names[idx]); + } + for (size_t idx = 0; idx < col_types.size(); ++idx) { + PTypeDesc* type_desc = result->add_column_types(); + col_types[idx].to_protobuf(type_desc); + } + LOG(INFO) << "complete parse, status: " << st; Review Comment: Remove this log, meaningless ########## be/src/service/internal_service.cpp: ########## @@ -407,6 +410,78 @@ void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); } +void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* controller, + const PFetchTableSchemaRequest* request, + PFetchTableSchemaResult* result, + google::protobuf::Closure* done) { + VLOG_RPC << "fetch table schema"; + brpc::ClosureGuard closure_guard(done); + TFileScanRange file_scan_range; + Status st = Status::OK(); + { + const uint8_t* buf = (const uint8_t*)(request->file_scan_range().data()); + uint32_t len = request->file_scan_range().size(); + st = deserialize_thrift_msg(buf, &len, false, &file_scan_range); + if (!st.ok()) { + LOG(WARNING) << "fetch table schema failed, errmsg=" << st.get_error_msg(); + st.to_protobuf(result->mutable_status()); + return; + } + } + + if (file_scan_range.__isset.ranges == false) { + st = Status::InternalError("can not get TFileRangeDesc."); + st.to_protobuf(result->mutable_status()); + return; + } + if (file_scan_range.__isset.params == false) { + st = Status::InternalError("can not get TFileScanRangeParams."); + st.to_protobuf(result->mutable_status()); + return; + } + const TFileRangeDesc& range = file_scan_range.ranges.at(0); + const TFileScanRangeParams& params = file_scan_range.params; + // file_slots is no use + std::vector<SlotDescriptor*> file_slots; + std::unique_ptr<vectorized::GenericReader> reader(nullptr); + std::unique_ptr<RuntimeProfile> profile(new RuntimeProfile("FetchTableSchema")); + switch (params.format_type) { + case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_CSV_DEFLATE: { Review Comment: Missing Parquet and ORC? ########## fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java: ########## @@ -202,6 +187,39 @@ public void init(Analyzer analyzer) throws UserException { initParamCreateContexts(analyzer); } + private void initHMSExternalTable(HMSExternalTable hmsTable) throws UserException { + Preconditions.checkNotNull(hmsTable); + + if (hmsTable.isView()) { + throw new AnalysisException( + String.format("Querying external view '[%s].%s.%s' is not supported", hmsTable.getDlaType(), + hmsTable.getDbName(), hmsTable.getName())); + } + + FileScanProviderIf scanProvider; + switch (hmsTable.getDlaType()) { + case HUDI: + scanProvider = new HudiScanProvider(hmsTable, desc); + break; + case ICEBERG: + scanProvider = new IcebergScanProvider(hmsTable, desc); + break; + case HIVE: + scanProvider = new HiveScanProvider(hmsTable, desc); + break; + default: + throw new UserException("Unknown table type: " + hmsTable.getDlaType()); + } + this.scanProviders.add(scanProvider); + } + + private void initFunctionGenTable(FunctionGenTable table, ExternalFileTableValuedFunction tvf) { + Preconditions.checkNotNull(table); + FileScanProviderIf scanProvider; Review Comment: ```suggestion FileScanProviderIf scanProvider = new TVFScanProvider(table, desc, tvf);; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org