This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7814d2b651 [Fix](Oracle External Table) fix that oracle external table can not insert batch values (#16117) 7814d2b651 is described below commit 7814d2b6510501459190a60971612bb9346488fd Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Sat Jan 21 07:57:12 2023 +0800 [Fix](Oracle External Table) fix that oracle external table can not insert batch values (#16117) Issue Number: close #xxx This pr fix two bugs: _jdbc_scanner may be nullptr in vjdbc_connector.cpp, so we use another method to count jdbc statistic. close [Enhencement](jdbc scanner) add profile for jdbc scanner #15914 In the batch insertion scenario, oracle database does not support syntax insert into tables values (...),(...); , what it supports is: insert all into table(col1,col2) values(c1v1, c2v1) into table(col1,col2) values(c1v2, c2v2) SELECT 1 FROM DUAL; --- be/src/exec/table_connector.cpp | 41 +++++++++++++++++++++- be/src/exec/table_connector.h | 9 +++++ be/src/vec/exec/scan/new_jdbc_scanner.cpp | 15 +++++++- be/src/vec/exec/scan/new_jdbc_scanner.h | 2 ++ be/src/vec/exec/vjdbc_connector.cpp | 20 ++++------- be/src/vec/exec/vjdbc_connector.h | 16 +++++++-- .../java/org/apache/doris/udf/JdbcExecutor.java | 1 - 7 files changed, 85 insertions(+), 19 deletions(-) diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp index b27d949aea..920fad3ccc 100644 --- a/be/src/exec/table_connector.cpp +++ b/be/src/exec/table_connector.cpp @@ -57,7 +57,14 @@ Status TableConnector::append(const std::string& table_name, vectorized::Block* TOdbcTableType::type table_type) { _insert_stmt_buffer.clear(); std::u16string insert_stmt; - { + if (table_type == TOdbcTableType::ORACLE) { + SCOPED_TIMER(_convert_tuple_timer); + oracle_type_append(table_name, block, output_vexpr_ctxs, start_send_row, num_rows_sent, + table_type); + // Translate utf8 string to utf16 to use unicode encoding + insert_stmt = utf8_to_u16string(_insert_stmt_buffer.data(), + _insert_stmt_buffer.data() + _insert_stmt_buffer.size()); + } else { SCOPED_TIMER(_convert_tuple_timer); fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name); @@ -94,6 +101,38 @@ Status TableConnector::append(const std::string& table_name, vectorized::Block* return Status::OK(); } +Status TableConnector::oracle_type_append( + const std::string& table_name, vectorized::Block* block, + const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs, uint32_t start_send_row, + uint32_t* num_rows_sent, TOdbcTableType::type table_type) { + fmt::format_to(_insert_stmt_buffer, "INSERT ALL "); + int num_rows = block->rows(); + int num_columns = block->columns(); + for (int i = start_send_row; i < num_rows; ++i) { + (*num_rows_sent)++; + fmt::format_to(_insert_stmt_buffer, "INTO {} VALUES (", table_name); + // Construct insert statement of odbc/jdbc table + for (int j = 0; j < num_columns; ++j) { + if (j != 0) { + fmt::format_to(_insert_stmt_buffer, "{}", ", "); + } + auto& column_ptr = block->get_by_position(j).column; + auto& type_ptr = block->get_by_position(j).type; + RETURN_IF_ERROR(convert_column_data( + column_ptr, type_ptr, output_vexpr_ctxs[j]->root()->type(), i, table_type)); + } + + if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) { + fmt::format_to(_insert_stmt_buffer, "{}", ") "); + } else { + // batch exhausted or _insert_stmt_buffer is full, need to do real insert stmt + fmt::format_to(_insert_stmt_buffer, "{}", ") SELECT 1 FROM DUAL"); + break; + } + } + return Status::OK(); +} + Status TableConnector::convert_column_data(const vectorized::ColumnPtr& column_ptr, const vectorized::DataTypePtr& type_ptr, const TypeDescriptor& type, int row, diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h index 6ba0c26b8b..88f396d086 100644 --- a/be/src/exec/table_connector.h +++ b/be/src/exec/table_connector.h @@ -82,6 +82,15 @@ protected: RuntimeProfile::Counter* _result_send_timer = nullptr; // number of sent rows RuntimeProfile::Counter* _sent_rows_counter = nullptr; + +private: + // Because Oracle database do not support + // insert into tables values (...),(...); + // Here we do something special for Oracle. + Status oracle_type_append(const std::string& table_name, vectorized::Block* block, + const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs, + uint32_t start_send_row, uint32_t* num_rows_sent, + TOdbcTableType::type table_type); }; } // namespace doris diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp index 80fc3669c8..0091efc7cd 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp @@ -18,6 +18,7 @@ #include "new_jdbc_scanner.h" #include "util/runtime_profile.h" +#include "vec/exec/vjdbc_connector.h" namespace doris::vectorized { NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, @@ -76,7 +77,7 @@ Status NewJdbcScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx _jdbc_param.query_string = std::move(_query_string); _jdbc_param.table_type = _table_type; - _jdbc_connector.reset(new (std::nothrow) JdbcConnector(this, _jdbc_param)); + _jdbc_connector.reset(new (std::nothrow) JdbcConnector(_jdbc_param)); if (_jdbc_connector == nullptr) { return Status::InternalError("new a jdbc scanner failed."); } @@ -113,6 +114,7 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* if (_jdbc_eos == true) { *eof = true; + _update_profile(); return Status::OK(); } @@ -138,6 +140,7 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* if (_jdbc_eos == true) { if (block->rows() == 0) { + _update_profile(); *eof = true; } break; @@ -160,6 +163,16 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* return Status::OK(); } +void NewJdbcScanner::_update_profile() { + JdbcConnector::JdbcStatistic& jdbc_statistic = _jdbc_connector->get_jdbc_statistic(); + COUNTER_UPDATE(_load_jar_timer, jdbc_statistic._load_jar_timer); + COUNTER_UPDATE(_init_connector_timer, jdbc_statistic._init_connector_timer); + COUNTER_UPDATE(_check_type_timer, jdbc_statistic._check_type_timer); + COUNTER_UPDATE(_get_data_timer, jdbc_statistic._get_data_timer); + COUNTER_UPDATE(_execte_read_timer, jdbc_statistic._execte_read_timer); + COUNTER_UPDATE(_connector_close_timer, jdbc_statistic._connector_close_timer); +} + Status NewJdbcScanner::close(RuntimeState* state) { RETURN_IF_ERROR(VScanner::close(state)); RETURN_IF_ERROR(_jdbc_connector->close()); diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h index 4f869d0f41..da5a3e7faf 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.h +++ b/be/src/vec/exec/scan/new_jdbc_scanner.h @@ -48,6 +48,8 @@ protected: RuntimeProfile::Counter* _connector_close_timer = nullptr; private: + void _update_profile(); + bool _is_init; bool _jdbc_eos; diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index 4a3ef8a2c4..adc1ac4cd2 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -53,12 +53,6 @@ JdbcConnector::JdbcConnector(const JdbcConnectorParam& param) _conn_param(param), _closed(false) {} -JdbcConnector::JdbcConnector(NewJdbcScanner* jdbc_scanner, const JdbcConnectorParam& param) - : TableConnector(param.tuple_desc, param.query_string), - _jdbc_scanner(jdbc_scanner), - _conn_param(param), - _closed(false) {} - JdbcConnector::~JdbcConnector() { if (!_closed) { close(); @@ -71,7 +65,7 @@ JdbcConnector::~JdbcConnector() { #define DELETE_BASIC_JAVA_CLAZZ_REF(CPP_TYPE) env->DeleteGlobalRef(_executor_##CPP_TYPE##_clazz); Status JdbcConnector::close() { - SCOPED_TIMER(_jdbc_scanner->_connector_close_timer); + SCOPED_RAW_TIMER(&_jdbc_statistic._connector_close_timer); _closed = true; if (!_is_open) { return Status::OK(); @@ -132,12 +126,12 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { if (_conn_param.resource_name.empty()) { // for jdbcExternalTable, _conn_param.resource_name == "" // so, we use _conn_param.driver_path as key of jarpath - SCOPED_TIMER(_jdbc_scanner->_load_jar_timer); + SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer); RETURN_IF_ERROR(function_cache->get_jarpath( std::abs((int64_t)hash_str(_conn_param.driver_path)), _conn_param.driver_path, _conn_param.driver_checksum, &local_location)); } else { - SCOPED_TIMER(_jdbc_scanner->_load_jar_timer); + SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer); RETURN_IF_ERROR(function_cache->get_jarpath( std::abs((int64_t)hash_str(_conn_param.resource_name)), _conn_param.driver_path, _conn_param.driver_checksum, &local_location)); @@ -158,7 +152,7 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { RETURN_IF_ERROR(jni_frame.push(env)); RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); { - SCOPED_TIMER(_jdbc_scanner->_init_connector_timer); + SCOPED_RAW_TIMER(&_jdbc_statistic._init_connector_timer); _executor_obj = env->NewObject(_executor_clazz, _executor_ctor_id, ctor_params_bytes); } jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr); @@ -186,7 +180,7 @@ Status JdbcConnector::query() { JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); { - SCOPED_TIMER(_jdbc_scanner->_execte_read_timer); + SCOPED_RAW_TIMER(&_jdbc_statistic._execte_read_timer); jint colunm_count = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_read_id); RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); @@ -201,7 +195,7 @@ Status JdbcConnector::query() { } Status JdbcConnector::_check_column_type() { - SCOPED_TIMER(_jdbc_scanner->_check_type_timer); + SCOPED_RAW_TIMER(&_jdbc_statistic._check_type_timer); JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); jobject type_lists = @@ -350,7 +344,7 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns if (!_is_open) { return Status::InternalError("get_next before open of jdbc connector."); } - SCOPED_TIMER(_jdbc_scanner->_get_data_timer); + SCOPED_RAW_TIMER(&_jdbc_statistic._get_data_timer); JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); jboolean has_next = diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index ee99be8ec5..c1d416783c 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -47,9 +47,16 @@ struct JdbcConnectorParam { class JdbcConnector : public TableConnector { public: - JdbcConnector(const JdbcConnectorParam& param); + struct JdbcStatistic { + int64_t _load_jar_timer = 0; + int64_t _init_connector_timer = 0; + int64_t _get_data_timer = 0; + int64_t _check_type_timer = 0; + int64_t _execte_read_timer = 0; + int64_t _connector_close_timer = 0; + }; - JdbcConnector(NewJdbcScanner* jdbc_scanner, const JdbcConnectorParam& param); + JdbcConnector(const JdbcConnectorParam& param); ~JdbcConnector() override; @@ -68,6 +75,8 @@ public: Status abort_trans() override; // should be call after transaction abort Status finish_trans() override; // should be call after transaction commit + JdbcStatistic& get_jdbc_statistic() { return _jdbc_statistic; } + Status close() override; private: @@ -89,7 +98,6 @@ private: Status _cast_string_to_array(const SlotDescriptor* slot_desc, Block* block, int column_index, int rows); - NewJdbcScanner* _jdbc_scanner; const JdbcConnectorParam& _conn_param; //java.sql.Types: https://docs.oracle.com/javase/7/docs/api/constant-values.html#java.sql.Types.INTEGER std::map<int, PrimitiveType> _arr_jdbc_map { @@ -126,6 +134,8 @@ private: std::vector<MutableColumnPtr> str_array_cols; // for array type to save data like big string [1,2,3] + JdbcStatistic _jdbc_statistic; + #define FUNC_VARI_DECLARE(RETURN_TYPE) \ RETURN_TYPE _jobject_to_##RETURN_TYPE(JNIEnv* env, jobject jobj); \ jclass _executor_##RETURN_TYPE##_clazz; diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java index 0c9894ad18..d90aa4055a 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java @@ -272,4 +272,3 @@ public class JdbcExecutor { } } } - --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org