This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b41748efa1 [feature-wip](new-scan)Add new jdbc scanner and new jdbc scan node (#12848) b41748efa1 is described below commit b41748efa13dfb8628492c7450c26be263f359a8 Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Fri Oct 7 09:55:17 2022 +0800 [feature-wip](new-scan)Add new jdbc scanner and new jdbc scan node (#12848) Related pr: #11582 This pr is the new jdbc scan node and scanner. --- be/src/exec/exec_node.cpp | 13 ++- be/src/runtime/plan_fragment_executor.cpp | 7 +- be/src/vec/CMakeLists.txt | 2 + be/src/vec/exec/scan/new_jdbc_scan_node.cpp | 62 +++++++++++ be/src/vec/exec/scan/new_jdbc_scan_node.h | 46 ++++++++ be/src/vec/exec/scan/new_jdbc_scanner.cpp | 156 ++++++++++++++++++++++++++++ be/src/vec/exec/scan/new_jdbc_scanner.h | 58 +++++++++++ 7 files changed, 341 insertions(+), 3 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 4a9d648d1c..ff22f26b84 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -62,6 +62,7 @@ #include "vec/exec/file_scan_node.h" #include "vec/exec/join/vhash_join_node.h" #include "vec/exec/scan/new_file_scan_node.h" +#include "vec/exec/scan/new_jdbc_scan_node.h" #include "vec/exec/scan/new_odbc_scan_node.h" #include "vec/exec/scan/new_olap_scan_node.h" #include "vec/exec/vaggregation_node.h" @@ -462,7 +463,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN case TPlanNodeType::JDBC_SCAN_NODE: if (state->enable_vectorized_exec()) { #ifdef LIBJVM - *node = pool->add(new vectorized::VJdbcScanNode(pool, tnode, descs)); + if (config::enable_new_scan_node) { + *node = pool->add(new vectorized::NewJdbcScanNode(pool, tnode, descs)); + } else { + *node = pool->add(new vectorized::VJdbcScanNode(pool, tnode, descs)); + } #else return Status::InternalError("Jdbc scan node is disabled since no libjvm is found!"); #endif @@ -731,7 +736,11 @@ void ExecNode::try_do_aggregate_serde_improve() { ExecNode* child0 = agg_node[0]->_children[0]; if (typeid(*child0) == typeid(vectorized::NewOlapScanNode) || typeid(*child0) == typeid(vectorized::NewFileScanNode) || - typeid(*child0) == typeid(vectorized::NewOdbcScanNode)) { + typeid(*child0) == typeid(vectorized::NewOdbcScanNode) +#ifdef LIBJVM + || typeid(*child0) == typeid(vectorized::NewJdbcScanNode) +#endif + ) { vectorized::VScanNode* scan_node = static_cast<vectorized::VScanNode*>(agg_node[0]->_children[0]); scan_node->set_no_agg_finalize(); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 2fe491a374..cff6630c23 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -47,6 +47,7 @@ #include "util/uid_util.h" #include "vec/core/block.h" #include "vec/exec/scan/new_file_scan_node.h" +#include "vec/exec/scan/new_jdbc_scan_node.h" #include "vec/exec/scan/new_odbc_scan_node.h" #include "vec/exec/scan/new_olap_scan_node.h" #include "vec/exec/vexchange_node.h" @@ -170,7 +171,11 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, ExecNode* node = scan_nodes[i]; if (typeid(*node) == typeid(vectorized::NewOlapScanNode) || typeid(*node) == typeid(vectorized::NewFileScanNode) || - typeid(*node) == typeid(vectorized::NewOdbcScanNode)) { + typeid(*node) == typeid(vectorized::NewOdbcScanNode) +#ifdef LIBJVM + || typeid(*node) == typeid(vectorized::NewJdbcScanNode) +#endif + ) { vectorized::VScanNode* scan_node = static_cast<vectorized::VScanNode*>(scan_nodes[i]); const std::vector<TScanRangeParams>& scan_ranges = find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 9632fa13d3..c69cdeb01d 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -254,6 +254,8 @@ set(VEC_FILES exec/scan/vfile_scanner.cpp exec/scan/new_odbc_scanner.cpp exec/scan/new_odbc_scan_node.cpp + exec/scan/new_jdbc_scanner.cpp + exec/scan/new_jdbc_scan_node.cpp ) add_library(Vec STATIC diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp new file mode 100644 index 0000000000..da76859915 --- /dev/null +++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/new_jdbc_scan_node.h" +#ifdef LIBJVM + +#include "vec/exec/scan/new_jdbc_scanner.h" +#include "vec/exec/scan/vscanner.h" +namespace doris::vectorized { +NewJdbcScanNode::NewJdbcScanNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : VScanNode(pool, tnode, descs), + _table_name(tnode.jdbc_scan_node.table_name), + _tuple_id(tnode.jdbc_scan_node.tuple_id), + _query_string(tnode.jdbc_scan_node.query_string) { + _output_tuple_id = tnode.jdbc_scan_node.tuple_id; +} + +std::string NewJdbcScanNode::get_name() { + return fmt::format("VNewJdbcScanNode({0})", _table_name); +} + +Status NewJdbcScanNode::prepare(RuntimeState* state) { + VLOG_CRITICAL << "VNewJdbcScanNode::Prepare"; + RETURN_IF_ERROR(VScanNode::prepare(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + _scanner_mem_tracker = std::make_unique<MemTracker>("NewJdbcScanners"); + return Status::OK(); +} + +Status NewJdbcScanNode::_init_profile() { + RETURN_IF_ERROR(VScanNode::_init_profile()); + return Status::OK(); +} + +Status NewJdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) { + if (_eos == true) { + return Status::OK(); + } + NewJdbcScanner* scanner = new NewJdbcScanner( + _state, this, _limit_per_scanner, _scanner_mem_tracker.get(), _tuple_id, _query_string); + _scanner_pool.add(scanner); + RETURN_IF_ERROR(scanner->prepare(_state)); + scanners->push_back(static_cast<VScanner*>(scanner)); + return Status::OK(); +} +} // namespace doris::vectorized +#endif diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.h b/be/src/vec/exec/scan/new_jdbc_scan_node.h new file mode 100644 index 0000000000..287522fc0d --- /dev/null +++ b/be/src/vec/exec/scan/new_jdbc_scan_node.h @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#ifdef LIBJVM + +#include "runtime/runtime_state.h" +#include "vec/exec/scan/vscan_node.h" + +namespace doris { +namespace vectorized { +class NewJdbcScanNode : public VScanNode { +public: + NewJdbcScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + + Status prepare(RuntimeState* state) override; + std::string get_name() override; + +protected: + Status _init_profile() override; + Status _init_scanners(std::list<VScanner*>* scanners) override; + +private: + std::string _table_name; + TupleId _tuple_id; + std::string _query_string; + + std::unique_ptr<MemTracker> _scanner_mem_tracker; +}; +} // namespace vectorized +} // namespace doris +#endif diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp new file mode 100644 index 0000000000..c571714e94 --- /dev/null +++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "new_jdbc_scanner.h" + +#ifdef LIBJVM + +namespace doris::vectorized { +NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, + MemTracker* mem_tracker, TupleId tuple_id, std::string query_string) + : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker), + _is_init(false), + _jdbc_eos(false), + _tuple_id(tuple_id), + _query_string(query_string), + _tuple_desc(nullptr) {} + +Status NewJdbcScanner::prepare(RuntimeState* state) { + VLOG_CRITICAL << "NewJdbcScanner::Prepare"; + if (_is_init) { + return Status::OK(); + } + + if (state == nullptr) { + return Status::InternalError("input pointer is NULL of VJdbcScanNode::prepare."); + } + + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + + // get tuple desc + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (_tuple_desc == nullptr) { + return Status::InternalError("Failed to get tuple descriptor."); + } + + // get jdbc table info + const JdbcTableDescriptor* jdbc_table = + static_cast<const JdbcTableDescriptor*>(_tuple_desc->table_desc()); + if (jdbc_table == nullptr) { + return Status::InternalError("jdbc table pointer is NULL of VJdbcScanNode::prepare."); + } + _jdbc_param.driver_class = jdbc_table->jdbc_driver_class(); + _jdbc_param.driver_path = jdbc_table->jdbc_driver_url(); + _jdbc_param.resource_name = jdbc_table->jdbc_resource_name(); + _jdbc_param.driver_checksum = jdbc_table->jdbc_driver_checksum(); + _jdbc_param.jdbc_url = jdbc_table->jdbc_url(); + _jdbc_param.user = jdbc_table->jdbc_user(); + _jdbc_param.passwd = jdbc_table->jdbc_passwd(); + _jdbc_param.tuple_desc = _tuple_desc; + _jdbc_param.query_string = std::move(_query_string); + + _jdbc_connector.reset(new (std::nothrow) JdbcConnector(_jdbc_param)); + if (_jdbc_connector == nullptr) { + return Status::InternalError("new a jdbc scanner failed."); + } + + _is_init = true; + return Status::OK(); +} + +Status NewJdbcScanner::open(RuntimeState* state) { + VLOG_CRITICAL << "NewJdbcScanner::open"; + if (state == nullptr) { + return Status::InternalError("input pointer is NULL of VJdbcScanNode::open."); + } + + if (!_is_init) { + return Status::InternalError("used before initialize of VJdbcScanNode::open."); + } + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(VScanner::open(state)); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + RETURN_IF_ERROR(_jdbc_connector->open()); + RETURN_IF_ERROR(_jdbc_connector->query()); + return Status::OK(); +} + +Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { + VLOG_CRITICAL << "NewJdbcScanner::_get_block_impl"; + if (nullptr == state || nullptr == block || nullptr == eof) { + return Status::InternalError("input is NULL pointer"); + } + + if (!_is_init) { + return Status::InternalError("used before initialize of VJdbcScanNode::get_next."); + } + + if (_jdbc_eos == true) { + *eof = true; + return Status::OK(); + } + + auto column_size = _tuple_desc->slots().size(); + std::vector<MutableColumnPtr> columns(column_size); + bool mem_reuse = block->mem_reuse(); + // only empty block should be here + DCHECK(block->rows() == 0); + + do { + RETURN_IF_CANCELLED(state); + + columns.resize(column_size); + for (auto i = 0; i < column_size; i++) { + if (mem_reuse) { + columns[i] = std::move(*block->get_by_position(i).column).mutate(); + } else { + columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column(); + } + } + + RETURN_IF_ERROR(_jdbc_connector->get_next(&_jdbc_eos, columns, state->batch_size())); + + if (_jdbc_eos == true) { + if (block->rows() == 0) { + *eof = true; + } + break; + } + + // Before really use the Block, must clear other ptr of column in block + // So here need do std::move and clear in `columns` + if (!mem_reuse) { + int column_index = 0; + for (const auto slot_desc : _tuple_desc->slots()) { + block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + } else { + columns.clear(); + } + VLOG_ROW << "NewJdbcScanNode output rows: " << block->rows(); + } while (block->rows() == 0 && !(*eof)); + return Status::OK(); +} + +Status NewJdbcScanner::close(RuntimeState* state) { + RETURN_IF_ERROR(VScanner::close(state)); + return Status::OK(); +} +} // namespace doris::vectorized +#endif diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h new file mode 100644 index 0000000000..75dabcacfa --- /dev/null +++ b/be/src/vec/exec/scan/new_jdbc_scanner.h @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#ifdef LIBJVM + +#include "runtime/runtime_state.h" +#include "vec/exec/scan/new_jdbc_scan_node.h" +#include "vec/exec/scan/vscanner.h" +#include "vec/exec/vjdbc_connector.h" +namespace doris { +namespace vectorized { +class NewJdbcScanner : public VScanner { +public: + NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, + MemTracker* mem_tracker, TupleId tuple_id, std::string query_string); + + Status open(RuntimeState* state) override; + Status close(RuntimeState* state) override; + +public: + Status prepare(RuntimeState* state); + +protected: + Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; + +private: + bool _is_init; + + bool _jdbc_eos; + + // Tuple id resolved in prepare() to set _tuple_desc; + TupleId _tuple_id; + // SQL + std::string _query_string; + // Descriptor of tuples read from JDBC table. + const TupleDescriptor* _tuple_desc; + // Scanner of JDBC. + std::unique_ptr<JdbcConnector> _jdbc_connector; + JdbcConnectorParam _jdbc_param; +}; +} // namespace vectorized +} // namespace doris +#endif --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org