This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 82b3f0598930d4d0dfb8eb448360d7f5c38dfe66 Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Sat Feb 4 22:51:55 2023 +0800 [improvement](jdbc) refator jdbc of copy result set by batch (#16337) have test jdbc external table with read, 10%+ performance improvement after optimization --- be/src/util/CMakeLists.txt | 1 + be/src/util/jni-util.cpp | 33 ++ be/src/util/jni-util.h | 1 + be/src/util/jni_native_method.cpp | 30 ++ be/src/util/jni_native_method.h | 28 ++ be/src/vec/exec/vjdbc_connector.cpp | 225 +++++++++- be/src/vec/exec/vjdbc_connector.h | 25 +- fe/java-udf/pom.xml | 6 + .../main/java/org/apache/doris/udf/FakeDriver.java | 70 --- .../java/org/apache/doris/udf/JNINativeMethod.java | 22 + .../java/org/apache/doris/udf/JdbcDataSource.java | 44 ++ .../java/org/apache/doris/udf/JdbcExecutor.java | 471 +++++++++++++++++++-- .../main/java/org/apache/doris/udf/UdfUtils.java | 2 + 13 files changed, 828 insertions(+), 130 deletions(-) diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 6fe40c4bcd..bcd0a0089b 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -115,6 +115,7 @@ set(UTIL_FILES jni-util.cpp exception.cpp libjvm_loader.cpp + jni_native_method.cpp ) if (WITH_MYSQL) diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp index 7327664b89..b2a65eb69b 100644 --- a/be/src/util/jni-util.cpp +++ b/be/src/util/jni-util.cpp @@ -28,6 +28,7 @@ #include "common/config.h" #include "gutil/once.h" #include "gutil/strings/substitute.h" +#include "jni_native_method.h" #include "libjvm_loader.h" using std::string; @@ -108,6 +109,7 @@ bool JniUtil::jvm_inited_ = false; __thread JNIEnv* JniUtil::tls_env_ = nullptr; jclass JniUtil::internal_exc_cl_ = NULL; jclass JniUtil::jni_util_cl_ = NULL; +jclass JniUtil::jni_native_method_exc_cl_ = nullptr; jmethodID JniUtil::throwable_to_string_id_ = NULL; jmethodID JniUtil::throwable_to_stack_trace_id_ = NULL; jmethodID JniUtil::get_jvm_metrics_id_ = NULL; @@ -252,6 +254,37 @@ Status JniUtil::Init() { return Status::InternalError("Failed to delete local reference to JniUtil class."); } + // Find JNINativeMethod class and create a global ref. + jclass local_jni_native_exc_cl = env->FindClass("org/apache/doris/udf/JNINativeMethod"); + if (local_jni_native_exc_cl == nullptr) { + if (env->ExceptionOccurred()) { + env->ExceptionDescribe(); + } + return Status::InternalError("Failed to find JNINativeMethod class."); + } + jni_native_method_exc_cl_ = + reinterpret_cast<jclass>(env->NewGlobalRef(local_jni_native_exc_cl)); + if (jni_native_method_exc_cl_ == nullptr) { + if (env->ExceptionOccurred()) { + env->ExceptionDescribe(); + } + return Status::InternalError("Failed to create global reference to JNINativeMethod class."); + } + env->DeleteLocalRef(local_jni_native_exc_cl); + if (env->ExceptionOccurred()) { + return Status::InternalError("Failed to delete local reference to JNINativeMethod class."); + } + std::string function_name = "resizeColumn"; + std::string function_sign = "(JI)J"; + static JNINativeMethod java_native_methods[] = { + {const_cast<char*>(function_name.c_str()), const_cast<char*>(function_sign.c_str()), + (void*)&JavaNativeMethods::resizeColumn}, + }; + + int res = env->RegisterNatives(jni_native_method_exc_cl_, java_native_methods, + sizeof(java_native_methods) / sizeof(java_native_methods[0])); + DCHECK_EQ(res, 0); + // Throwable toString() throwable_to_string_id_ = env->GetStaticMethodID(jni_util_cl_, "throwableToString", "(Ljava/lang/Throwable;)Ljava/lang/String;"); diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h index 5593d77dda..0e551f17cf 100644 --- a/be/src/util/jni-util.h +++ b/be/src/util/jni-util.h @@ -68,6 +68,7 @@ private: static bool jvm_inited_; static jclass internal_exc_cl_; + static jclass jni_native_method_exc_cl_; static jclass jni_util_cl_; static jmethodID throwable_to_string_id_; static jmethodID throwable_to_stack_trace_id_; diff --git a/be/src/util/jni_native_method.cpp b/be/src/util/jni_native_method.cpp new file mode 100644 index 0000000000..c3baedce6c --- /dev/null +++ b/be/src/util/jni_native_method.cpp @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "jni_native_method.h" + +#include "vec/columns/column_string.h" + +namespace doris { + +jlong JavaNativeMethods::resizeColumn(JNIEnv* env, jclass clazz, jlong columnAddr, jint length) { + auto column = reinterpret_cast<vectorized::ColumnString::Chars*>(columnAddr); + column->resize(length); + return reinterpret_cast<jlong>(column->data()); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/util/jni_native_method.h b/be/src/util/jni_native_method.h new file mode 100644 index 0000000000..aeb29fcac6 --- /dev/null +++ b/be/src/util/jni_native_method.h @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <jni.h> + +namespace doris { + +struct JavaNativeMethods { + static jlong resizeColumn(JNIEnv* env, jclass clazz, jlong columnAddr, jint length); +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index 0eb6d6f2a3..3af00dbc62 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -17,6 +17,8 @@ #include "vec/exec/vjdbc_connector.h" +#include <cstring> + #include "common/status.h" #include "exec/table_connector.h" #include "gen_cpp/Types_types.h" @@ -28,6 +30,7 @@ #include "util/runtime_profile.h" #include "vec/columns/column_array.h" #include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" #include "vec/data_types/data_type_factory.hpp" #include "vec/data_types/data_type_string.h" #include "vec/exec/scan/new_jdbc_scanner.h" @@ -47,6 +50,7 @@ const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V"; const char* JDBC_EXECUTOR_CONVERT_DATE_SIGNATURE = "(Ljava/lang/Object;Z)J"; const char* JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE = "(Ljava/lang/Object;Z)J"; const char* JDBC_EXECUTOR_TRANSACTION_SIGNATURE = "()V"; +const char* JDBC_EXECUTOR_COPY_BATCH_SIGNATURE = "(Ljava/lang/Object;ZIJJ)V"; JdbcConnector::JdbcConnector(const JdbcConnectorParam& param) : TableConnector(param.tuple_desc, param.query_string), @@ -246,8 +250,7 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& type_str, slot_desc->type().debug_string(), slot_desc->col_name()); switch (slot_desc->type().type) { case TYPE_BOOLEAN: { - if (type_str != "java.lang.Boolean" && type_str != "java.math.BigDecimal" && - type_str != "java.lang.Byte") { + if (type_str != "java.lang.Boolean" && type_str != "java.lang.Byte") { return Status::InternalError(error_msg); } break; @@ -367,18 +370,12 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns if (!slot_desc->is_materialized()) { continue; } - const std::string& column_name = slot_desc->col_name(); jobject column_data = env->CallObjectMethod(block_obj, _executor_get_list_id, materialized_column_index); jint num_rows = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_block_rows_id); - for (int row = 0; row < num_rows; ++row) { - jobject cur_data = env->CallObjectMethod(column_data, _executor_get_list_id, row); - RETURN_IF_ERROR(_convert_column_data(env, cur_data, slot_desc, - columns[column_index].get(), column_index, - column_name)); - env->DeleteLocalRef(cur_data); - } + RETURN_IF_ERROR(_convert_batch_result_set( + env, column_data, slot_desc, columns[column_index].get(), num_rows, column_index)); env->DeleteLocalRef(column_data); //here need to cast string to array type if (_need_cast_array_type && slot_desc->type().is_array_type()) { @@ -391,6 +388,168 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns return JniUtil::GetJniExceptionMsg(env); } +Status JdbcConnector::_convert_batch_result_set(JNIEnv* env, jobject jcolumn_data, + const SlotDescriptor* slot_desc, + vectorized::IColumn* column_ptr, int num_rows, + int column_index) { + vectorized::IColumn* col_ptr = column_ptr; + col_ptr->resize(num_rows); + int64_t address[2] = {0, 0}; + bool column_is_nullable = slot_desc->is_nullable(); + if (column_is_nullable) { + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr); + auto& null_map = nullable_column->get_null_map_data(); + memset(null_map.data(), 0, num_rows); + address[0] = reinterpret_cast<int64_t>(null_map.data()); + col_ptr = &nullable_column->get_nested_column(); + } + switch (slot_desc->type().type) { + case TYPE_BOOLEAN: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_boolean_result, + jcolumn_data, column_is_nullable, num_rows, address[0], + address[1]); + break; + } + case TYPE_TINYINT: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_tinyint_result, + jcolumn_data, column_is_nullable, num_rows, address[0], + address[1]); + break; + } + case TYPE_SMALLINT: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_smallint_result, + jcolumn_data, column_is_nullable, num_rows, address[0], + address[1]); + break; + } + case TYPE_INT: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_int_result, + jcolumn_data, column_is_nullable, num_rows, address[0], + address[1]); + break; + } + case TYPE_BIGINT: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_bigint_result, + jcolumn_data, column_is_nullable, num_rows, address[0], + address[1]); + break; + } + case TYPE_LARGEINT: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_largeint_result, + jcolumn_data, column_is_nullable, num_rows, address[0], + address[1]); + break; + } + case TYPE_FLOAT: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_float_result, + jcolumn_data, column_is_nullable, num_rows, address[0], + address[1]); + break; + } + case TYPE_DOUBLE: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_double_result, + jcolumn_data, column_is_nullable, num_rows, address[0], + address[1]); + break; + } + case TYPE_CHAR: + case TYPE_STRING: + case TYPE_VARCHAR: { + auto column_string = reinterpret_cast<vectorized::ColumnString*>(col_ptr); + address[1] = reinterpret_cast<int64_t>(column_string->get_offsets().data()); + auto chars_addres = reinterpret_cast<int64_t>(&column_string->get_chars()); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_string_result, + jcolumn_data, column_is_nullable, num_rows, address[0], + address[1], chars_addres); + break; + } + case TYPE_DATE: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_date_result, + jcolumn_data, column_is_nullable, num_rows, address[0], + address[1]); + break; + } + case TYPE_DATEV2: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_datev2_result, + jcolumn_data, column_is_nullable, num_rows, address[0], + address[1]); + break; + } + case TYPE_DATETIME: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_datetime_result, + jcolumn_data, column_is_nullable, num_rows, address[0], + address[1]); + break; + } + case TYPE_DATETIMEV2: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, + _executor_get_datetimev2_result, jcolumn_data, + column_is_nullable, num_rows, address[0], address[1]); + break; + } + case TYPE_DECIMALV2: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, + _executor_get_decimalv2_result, jcolumn_data, + column_is_nullable, num_rows, address[0], address[1]); + break; + } + case TYPE_DECIMAL32: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod( + _executor_obj, _executor_clazz, _executor_get_decimal32_result, jcolumn_data, + column_is_nullable, num_rows, address[0], address[1], slot_desc->type().scale); + break; + } + case TYPE_DECIMAL64: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod( + _executor_obj, _executor_clazz, _executor_get_decimal64_result, jcolumn_data, + column_is_nullable, num_rows, address[0], address[1], slot_desc->type().scale); + break; + } + case TYPE_DECIMAL128I: { + address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data); + env->CallNonvirtualVoidMethod( + _executor_obj, _executor_clazz, _executor_get_decimal128_result, jcolumn_data, + column_is_nullable, num_rows, address[0], address[1], slot_desc->type().scale); + break; + } + //todo: now array type maybe should same as before, not need to change deal with batch + //if need copy by batch, should use string cast to array on all database + case TYPE_ARRAY: { + const std::string& column_name = slot_desc->col_name(); + for (int row = 0; row < num_rows; ++row) { + jobject cur_data = env->CallNonvirtualObjectMethod( + _executor_obj, _executor_clazz, _executor_convert_array_id, jcolumn_data, row); + RETURN_IF_ERROR(_convert_column_data(env, cur_data, slot_desc, column_ptr, column_index, + column_name)); + env->DeleteLocalRef(cur_data); + } + break; + } + default: { + const std::string& error_msg = + fmt::format("Fail to convert jdbc value to {} on column: {}", + slot_desc->type().debug_string(), slot_desc->col_name()); + return Status::InternalError(std::string(error_msg)); + } + } + return JniUtil::GetJniExceptionMsg(env); +} + Status JdbcConnector::_register_func_id(JNIEnv* env) { auto register_id = [&](jclass clazz, const char* func_name, const char* func_sign, jmethodID& func_id) { @@ -415,6 +574,45 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { _executor_has_next_id)); RETURN_IF_ERROR( register_id(_executor_clazz, "getCurBlockRows", "()I", _executor_block_rows_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchBooleanResult", + JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_boolean_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchTinyIntResult", + JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_tinyint_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchSmallIntResult", + JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_smallint_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchIntResult", + JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_int_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchBigIntResult", + JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_bigint_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchLargeIntResult", + JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_largeint_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchFloatResult", + JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_float_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDoubleResult", + JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_double_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchStringResult", + "(Ljava/lang/Object;ZIJJJ)V", _executor_get_string_result)); + + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateResult", + JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_date_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateV2Result", + JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_datev2_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateTimeResult", + JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, _executor_get_datetime_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateTimeV2Result", + JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, + _executor_get_datetimev2_result)); + + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimalV2Result", + JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, + _executor_get_decimalv2_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimal32Result", + "(Ljava/lang/Object;ZIJJI)V", _executor_get_decimal32_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimal64Result", + "(Ljava/lang/Object;ZIJJI)V", _executor_get_decimal64_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimal128Result", + "(Ljava/lang/Object;ZIJJI)V", _executor_get_decimal128_result)); + RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock", JDBC_EXECUTOR_GET_BLOCK_SIGNATURE, _executor_get_blocks_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateToLong", @@ -422,6 +620,9 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateTimeToLong", JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE, _executor_convert_datetime_id)); + RETURN_IF_ERROR(register_id(_executor_clazz, "convertArrayToObject", + "(Ljava/lang/Object;I)Ljava/lang/Object;", + _executor_convert_array_id)); RETURN_IF_ERROR(register_id(_executor_list_clazz, "get", "(I)Ljava/lang/Object;", _executor_get_list_id)); RETURN_IF_ERROR(register_id(_executor_list_clazz, "size", "()I", _executor_get_list_size_id)); @@ -454,7 +655,8 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj, if (true == slot_desc->is_nullable()) { auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr); if (jobj == nullptr) { - nullable_column->insert_data(nullptr, 0); + //nullable column of null map have memset 0 before + nullable_column->get_nested_column_ptr()->insert_default(); if (_need_cast_array_type && slot_desc->type().type == TYPE_ARRAY) { reinterpret_cast<vectorized::ColumnNullable*>( str_array_cols[_map_column_idx_to_cast_idx[column_index]].get()) @@ -462,7 +664,6 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj, } return Status::OK(); } else { - nullable_column->get_null_map_data().push_back(0); col_ptr = &nullable_column->get_nested_column(); } } diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index 4f05253d64..b7c8b6166a 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -97,6 +97,9 @@ private: int64_t _jobject_to_datetime(JNIEnv* env, jobject jobj, bool is_datetime_v2); Status _cast_string_to_array(const SlotDescriptor* slot_desc, Block* block, int column_index, int rows); + Status _convert_batch_result_set(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc, + vectorized::IColumn* column_ptr, int num_rows, + int column_index); const JdbcConnectorParam& _conn_param; //java.sql.Types: https://docs.oracle.com/javase/7/docs/api/constant-values.html#java.sql.Types.INTEGER @@ -104,7 +107,7 @@ private: {-7, TYPE_BOOLEAN}, {-6, TYPE_TINYINT}, {5, TYPE_SMALLINT}, {4, TYPE_INT}, {-5, TYPE_BIGINT}, {12, TYPE_STRING}, {7, TYPE_FLOAT}, {8, TYPE_DOUBLE}, {91, TYPE_DATE}, {93, TYPE_DATETIME}, {2, TYPE_DECIMALV2}}; - bool _closed; + bool _closed = false; jclass _executor_clazz; jclass _executor_list_clazz; jclass _executor_object_clazz; @@ -116,6 +119,23 @@ private: jmethodID _executor_has_next_id; jmethodID _executor_block_rows_id; jmethodID _executor_get_blocks_id; + jmethodID _executor_get_boolean_result; + jmethodID _executor_get_tinyint_result; + jmethodID _executor_get_smallint_result; + jmethodID _executor_get_int_result; + jmethodID _executor_get_bigint_result; + jmethodID _executor_get_largeint_result; + jmethodID _executor_get_float_result; + jmethodID _executor_get_double_result; + jmethodID _executor_get_string_result; + jmethodID _executor_get_date_result; + jmethodID _executor_get_datev2_result; + jmethodID _executor_get_datetime_result; + jmethodID _executor_get_datetimev2_result; + jmethodID _executor_get_decimalv2_result; + jmethodID _executor_get_decimal32_result; + jmethodID _executor_get_decimal64_result; + jmethodID _executor_get_decimal128_result; jmethodID _executor_get_types_id; jmethodID _executor_get_arr_list_id; jmethodID _executor_get_arr_type_id; @@ -124,12 +144,13 @@ private: jmethodID _executor_get_list_size_id; jmethodID _executor_convert_date_id; jmethodID _executor_convert_datetime_id; + jmethodID _executor_convert_array_id; jmethodID _get_bytes_id; jmethodID _to_string_id; jmethodID _executor_begin_trans_id; jmethodID _executor_finish_trans_id; jmethodID _executor_abort_trans_id; - bool _need_cast_array_type; + bool _need_cast_array_type = false; std::map<int, int> _map_column_idx_to_cast_idx; std::vector<DataTypePtr> _input_array_string_types; std::vector<MutableColumnPtr> diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml index b37e9c4696..992b6a5ae7 100644 --- a/fe/java-udf/pom.xml +++ b/fe/java-udf/pom.xml @@ -90,6 +90,12 @@ under the License. <artifactId>ojdbc6</artifactId> <version>11.2.0.4</version> </dependency> + <!-- https://mvnrepository.com/artifact/com.alibaba/druid --> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>druid</artifactId> + <version>1.2.8</version> + </dependency> <!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-engine --> <dependency> <groupId>org.junit.jupiter</groupId> diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java b/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java deleted file mode 100644 index 94fbde6217..0000000000 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java +++ /dev/null @@ -1,70 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.udf; - -import java.sql.Connection; -import java.sql.Driver; -import java.sql.DriverPropertyInfo; -import java.sql.SQLException; -import java.sql.SQLFeatureNotSupportedException; -import java.util.Properties; -import java.util.logging.Logger; - - -public class FakeDriver implements Driver { - private Driver driver; - - FakeDriver(Driver driver) { - this.driver = driver; - } - - @Override - public Connection connect(String url, Properties info) throws SQLException { - return this.driver.connect(url, info); - } - - @Override - public boolean acceptsURL(String url) throws SQLException { - return this.driver.acceptsURL(url); - } - - @Override - public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { - return this.driver.getPropertyInfo(url, info); - } - - @Override - public int getMajorVersion() { - return this.driver.getMajorVersion(); - } - - @Override - public int getMinorVersion() { - return this.driver.getMinorVersion(); - } - - @Override - public boolean jdbcCompliant() { - return this.driver.jdbcCompliant(); - } - - @Override - public Logger getParentLogger() throws SQLFeatureNotSupportedException { - return null; - } -} diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JNINativeMethod.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JNINativeMethod.java new file mode 100644 index 0000000000..9d441b1e54 --- /dev/null +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JNINativeMethod.java @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +public class JNINativeMethod { + public static native long resizeColumn(long columnAddr, int byteSize); +} diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcDataSource.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcDataSource.java new file mode 100644 index 0000000000..4d6f4f59e6 --- /dev/null +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcDataSource.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import com.alibaba.druid.pool.DruidDataSource; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class JdbcDataSource { + private static final JdbcDataSource jdbcDataSource = new JdbcDataSource(); + private final Map<String, DruidDataSource> sourcesMap = new ConcurrentHashMap<>(); + + public static JdbcDataSource getDataSource() { + return jdbcDataSource; + } + + public DruidDataSource getSource(String jdbcUrl) { + return sourcesMap.get(jdbcUrl); + } + + public void putSource(String jdbcUrl, DruidDataSource ds) { + sourcesMap.put(jdbcUrl, ds); + } + + public Map<String, DruidDataSource> getSourcesMap() { + return sourcesMap; + } +} diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java index e76e9e78e3..d2bc7d9622 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java @@ -20,21 +20,22 @@ package org.apache.doris.udf; import org.apache.doris.thrift.TJdbcExecutorCtorParams; import org.apache.doris.thrift.TJdbcOperation; +import com.alibaba.druid.pool.DruidDataSource; import com.google.common.base.Preconditions; import org.apache.log4j.Logger; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; -import java.io.File; -import java.lang.reflect.InvocationTargetException; +import java.io.FileNotFoundException; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.RoundingMode; import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; +import java.nio.charset.StandardCharsets; import java.sql.Connection; import java.sql.Date; -import java.sql.Driver; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; @@ -55,10 +56,11 @@ public class JdbcExecutor { private ResultSetMetaData resultSetMetaData = null; private List<String> resultColumnTypeNames = null; private int baseTypeInt = 0; - private URLClassLoader classLoader = null; - private List<List<Object>> block = null; + private List<Object[]> block = null; private int bacthSizeNum = 0; private int curBlockRows = 0; + private static final byte[] emptyBytes = new byte[0]; + private DruidDataSource druidDataSource = null; public JdbcExecutor(byte[] thriftParams) throws Exception { TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams(); @@ -80,17 +82,11 @@ public class JdbcExecutor { stmt.close(); } if (conn != null) { - conn.clearWarnings(); conn.close(); } - if (classLoader != null) { - classLoader.clearAssertionStatus(); - classLoader.close(); - } resultSet = null; stmt = null; conn = null; - classLoader = null; } public int read() throws UdfRuntimeException { @@ -102,24 +98,20 @@ public class JdbcExecutor { block = new ArrayList<>(columnCount); for (int i = 0; i < columnCount; ++i) { resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1)); - block.add(Arrays.asList(new Object[bacthSizeNum])); + Class<?> clazz = Class.forName(resultSetMetaData.getColumnClassName(i + 1)); + block.add((Object[]) Array.newInstance(clazz, bacthSizeNum)); } return columnCount; } catch (SQLException e) { throw new UdfRuntimeException("JDBC executor sql has error: ", e); + } catch (ClassNotFoundException e) { + throw new UdfRuntimeException("JDBC executor sql ClassNotFoundException: ", e); } } public int write(String sql) throws UdfRuntimeException { try { - boolean res = stmt.execute(sql); - if (res) { // sql query - resultSet = stmt.getResultSet(); - resultSetMetaData = resultSet.getMetaData(); - return resultSetMetaData.getColumnCount(); - } else { - return stmt.getUpdateCount(); - } + return stmt.executeUpdate(sql); } catch (SQLException e) { throw new UdfRuntimeException("JDBC executor sql has error: ", e); } @@ -173,13 +165,13 @@ public class JdbcExecutor { } } - public List<List<Object>> getBlock(int batchSize) throws UdfRuntimeException { + public List<Object[]> getBlock(int batchSize) throws UdfRuntimeException { try { int columnCount = resultSetMetaData.getColumnCount(); curBlockRows = 0; do { for (int i = 0; i < columnCount; ++i) { - block.get(i).set(curBlockRows, resultSet.getObject(i + 1)); + block.get(i)[curBlockRows] = resultSet.getObject(i + 1); } curBlockRows++; } while (curBlockRows < batchSize && resultSet.next()); @@ -243,20 +235,55 @@ public class JdbcExecutor { date.getHour(), date.getMinute(), date.getSecond(), false); } + public byte convertTinyIntToByte(Object obj) { + byte res = 0; + if (obj instanceof Integer) { + res = ((Integer) obj).byteValue(); + } else if (obj instanceof Short) { + res = ((Short) obj).byteValue(); + } + return res; + } + + public short convertSmallIntToShort(Object obj) { + short res = 0; + if (obj instanceof Integer) { + res = ((Integer) obj).shortValue(); + } else if (obj instanceof Short) { + res = (short) obj; + } + return res; + } + + public Object convertArrayToObject(Object obj, int idx) { + Object[] columnData = (Object[]) obj; + if (columnData[idx] instanceof String) { + return (String) columnData[idx]; + } else { + return (java.sql.Array) columnData[idx]; + } + } + private void init(String driverUrl, String sql, int batchSize, String driverClass, String jdbcUrl, String jdbcUser, String jdbcPassword, TJdbcOperation op) throws UdfRuntimeException { try { - File file = new File(driverUrl); - URL url = file.toURI().toURL(); - classLoader = new URLClassLoader(new URL[] {url}); - Driver driver = (Driver) Class.forName(driverClass, true, classLoader).getDeclaredConstructor() - .newInstance(); - // in jdk11 cann't call addURL function by reflect to load class. so use this way - // But DriverManager can't find the driverClass correctly, so add a faker driver - // https://www.kfu.com/~nsayer/Java/dyn-jdbc.html - DriverManager.registerDriver(new FakeDriver(driver)); - conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword); - + ClassLoader parent = getClass().getClassLoader(); + ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, parent); + druidDataSource = JdbcDataSource.getDataSource().getSource(jdbcUrl); + if (druidDataSource == null) { + DruidDataSource ds = new DruidDataSource(); + ds.setDriverClassLoader(classLoader); + ds.setDriverClassName(driverClass); + ds.setUrl(jdbcUrl); + ds.setUsername(jdbcUser); + ds.setPassword(jdbcPassword); + ds.setMinIdle(1); + ds.setInitialSize(2); + ds.setMaxActive(5); + druidDataSource = ds; + JdbcDataSource.getDataSource().putSource(jdbcUrl, ds); + } + conn = druidDataSource.getConnection(); if (op == TJdbcOperation.READ) { conn.setAutoCommit(false); Preconditions.checkArgument(sql != null); @@ -267,20 +294,372 @@ public class JdbcExecutor { } else { stmt = conn.createStatement(); } - } catch (ClassNotFoundException e) { - throw new UdfRuntimeException("ClassNotFoundException: " + driverClass, e); } catch (MalformedURLException e) { throw new UdfRuntimeException("MalformedURLException to load class about " + driverUrl, e); } catch (SQLException e) { throw new UdfRuntimeException("Initialize datasource failed: ", e); - } catch (InstantiationException e) { - throw new UdfRuntimeException("InstantiationException failed: ", e); - } catch (IllegalAccessException e) { - throw new UdfRuntimeException("IllegalAccessException failed: ", e); - } catch (InvocationTargetException e) { - throw new UdfRuntimeException("InvocationTargetException new instance failed: ", e); - } catch (NoSuchMethodException e) { - throw new UdfRuntimeException("NoSuchMethodException Load class failed: ", e); + } catch (FileNotFoundException e) { + throw new UdfRuntimeException("FileNotFoundException failed: ", e); + } + } + + public void copyBatchBooleanResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr) { + Boolean[] column = (Boolean[]) columnObj; + if (isNullable) { + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putByte(columnAddr + i, column[i] ? (byte) 1 : 0); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putByte(columnAddr + i, column[i] ? (byte) 1 : 0); + } + } + } + + public void copyBatchTinyIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr) { + Object[] column = (Object[]) columnObj; + if (isNullable) { + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putByte(columnAddr + i, convertTinyIntToByte(column[i])); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putByte(columnAddr + i, convertTinyIntToByte(column[i])); + } + } + } + + public void copyBatchSmallIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr) { + Object[] column = (Object[]) columnObj; + if (isNullable) { + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putShort(columnAddr + (i * 2), convertSmallIntToShort(column[i])); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putShort(columnAddr + (i * 2), convertSmallIntToShort(column[i])); + } + } + } + + public void copyBatchIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr) { + Integer[] column = (Integer[]) columnObj; + if (isNullable) { + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putInt(columnAddr + (i * 4), (int) column[i]); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putInt(columnAddr + (i * 4), (int) column[i]); + } + } + } + + public void copyBatchBigIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr) { + Long[] column = (Long[]) columnObj; + if (isNullable) { + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), (long) column[i]); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), (long) column[i]); + } + } + } + + public void copyBatchLargeIntResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr) { + BigInteger[] column = (BigInteger[]) columnObj; + if (isNullable == true) { + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + byte[] bytes = UdfUtils.convertByteOrder(column[i].toByteArray()); + byte[] value = new byte[16]; + if (column[i].signum() == -1) { + Arrays.fill(value, (byte) -1); + } + for (int index = 0; index < Math.min(bytes.length, value.length); ++index) { + value[index] = bytes[index]; + } + UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null, columnAddr + (i * 16), 16); + } + } + } else { + for (int i = 0; i < numRows; i++) { + byte[] bytes = UdfUtils.convertByteOrder(column[i].toByteArray()); + byte[] value = new byte[16]; + if (column[i].signum() == -1) { + Arrays.fill(value, (byte) -1); + } + for (int index = 0; index < Math.min(bytes.length, value.length); ++index) { + value[index] = bytes[index]; + } + UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null, columnAddr + (i * 16), 16); + } + } + } + + public void copyBatchFloatResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr) { + Float[] column = (Float[]) columnObj; + if (isNullable) { + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putFloat(columnAddr + (i * 4), (float) column[i]); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putFloat(columnAddr + (i * 4), (float) column[i]); + } + } + } + + public void copyBatchDoubleResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr) { + Double[] column = (Double[]) columnObj; + if (isNullable) { + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putDouble(columnAddr + (i * 8), (double) column[i]); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putDouble(columnAddr + (i * 8), (double) column[i]); + } + } + } + + public void copyBatchDateResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr) { + Object[] column = (Object[]) columnObj; + if (isNullable) { + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), convertDateToLong(column[i], false)); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), convertDateToLong(column[i], false)); + } + } + } + + public void copyBatchDateV2Result(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr) { + Object[] column = (Object[]) columnObj; + if (isNullable) { + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putInt(columnAddr + (i * 4), (int) convertDateToLong(column[i], true)); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putInt(columnAddr + (i * 4), (int) convertDateToLong(column[i], true)); + } + } + } + + public void copyBatchDateTimeResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr) throws UdfRuntimeException { + Object[] column = (Object[]) columnObj; + if (isNullable) { + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), convertDateTimeToLong(column[i], false)); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), convertDateTimeToLong(column[i], false)); + } + } + } + + public void copyBatchDateTimeV2Result(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr) throws UdfRuntimeException { + Object[] column = (Object[]) columnObj; + if (isNullable) { + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), convertDateTimeToLong(column[i], true)); + } + } + } else { + for (int i = 0; i < numRows; i++) { + UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), convertDateTimeToLong(column[i], true)); + } + } + } + + public void copyBatchStringResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long offsetsAddr, long charsAddr) { + String[] column = (String[]) columnObj; + int[] offsets = new int[numRows]; + byte[][] byteRes = new byte[numRows][]; + int offset = 0; + if (isNullable == true) { + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + byteRes[i] = emptyBytes; + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + byteRes[i] = column[i].getBytes(StandardCharsets.UTF_8); + } + offset += byteRes[i].length; + offsets[i] = offset; + } + } else { + for (int i = 0; i < numRows; i++) { + byteRes[i] = column[i].getBytes(StandardCharsets.UTF_8); + offset += byteRes[i].length; + offsets[i] = offset; + } + } + byte[] bytes = new byte[offsets[numRows - 1]]; + long bytesAddr = JNINativeMethod.resizeColumn(charsAddr, offsets[numRows - 1]); + int dst = 0; + for (int i = 0; i < numRows; i++) { + for (int j = 0; j < byteRes[i].length; j++) { + bytes[dst++] = byteRes[i][j]; + } + } + UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, offsetsAddr, numRows * 4L); + UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, bytesAddr, offsets[numRows - 1]); + } + + public void copyBatchDecimalV2Result(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr) { + BigDecimal[] column = (BigDecimal[]) columnObj; + BigInteger[] data = new BigInteger[numRows]; + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + data[i] = null; + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + data[i] = column[i].setScale(9, RoundingMode.HALF_EVEN).unscaledValue(); + } + } + copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 16); + } + + public void copyBatchDecimal32Result(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr, int scale) { + BigDecimal[] column = (BigDecimal[]) columnObj; + BigInteger[] data = new BigInteger[numRows]; + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + data[i] = null; + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + data[i] = column[i].setScale(scale, RoundingMode.HALF_EVEN).unscaledValue(); + } + } + copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 4); + } + + public void copyBatchDecimal64Result(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr, int scale) { + BigDecimal[] column = (BigDecimal[]) columnObj; + BigInteger[] data = new BigInteger[numRows]; + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + data[i] = null; + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + data[i] = column[i].setScale(scale, RoundingMode.HALF_EVEN).unscaledValue(); + } + } + copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 8); + } + + public void copyBatchDecimal128Result(Object columnObj, boolean isNullable, int numRows, long nullMapAddr, + long columnAddr, int scale) { + BigDecimal[] column = (BigDecimal[]) columnObj; + BigInteger[] data = new BigInteger[numRows]; + for (int i = 0; i < numRows; i++) { + if (column[i] == null) { + data[i] = null; + UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1); + } else { + data[i] = column[i].setScale(scale, RoundingMode.HALF_EVEN).unscaledValue(); + } + } + copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 16); + } + + public void copyBatchDecimalResult(Object columnObj, boolean isNullable, int numRows, + long columnAddr, int typeLen) { + BigInteger[] column = (BigInteger[]) columnObj; + if (isNullable == true) { + for (int i = 0; i < numRows; i++) { + if (column[i] != null) { + byte[] bytes = UdfUtils.convertByteOrder(column[i].toByteArray()); + byte[] value = new byte[typeLen]; + if (column[i].signum() == -1) { + Arrays.fill(value, (byte) -1); + } + for (int index = 0; index < Math.min(bytes.length, value.length); ++index) { + value[index] = bytes[index]; + } + UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null, columnAddr + (i * typeLen), typeLen); + } + } + } else { + for (int i = 0; i < numRows; i++) { + byte[] bytes = UdfUtils.convertByteOrder(column[i].toByteArray()); + byte[] value = new byte[typeLen]; + if (column[i].signum() == -1) { + Arrays.fill(value, (byte) -1); + } + for (int index = 0; index < Math.min(bytes.length, value.length); ++index) { + value[index] = bytes[index]; + } + UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null, columnAddr + (i * typeLen), typeLen); + } } } } diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java index d9fa55bb2f..4746350cff 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java @@ -51,6 +51,7 @@ public class UdfUtils { public static final Unsafe UNSAFE; private static final long UNSAFE_COPY_THRESHOLD = 1024L * 1024L; public static final long BYTE_ARRAY_OFFSET; + public static final long INT_ARRAY_OFFSET; static { UNSAFE = (Unsafe) AccessController.doPrivileged( @@ -64,6 +65,7 @@ public class UdfUtils { } }); BYTE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + INT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(int[].class); } // Data types that are supported as return or argument types in Java UDFs. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org