This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 82b3f0598930d4d0dfb8eb448360d7f5c38dfe66
Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com>
AuthorDate: Sat Feb 4 22:51:55 2023 +0800

    [improvement](jdbc) refator jdbc of copy result set by batch (#16337)
    
    have test jdbc external table with read,  10%+ performance improvement 
after optimization
---
 be/src/util/CMakeLists.txt                         |   1 +
 be/src/util/jni-util.cpp                           |  33 ++
 be/src/util/jni-util.h                             |   1 +
 be/src/util/jni_native_method.cpp                  |  30 ++
 be/src/util/jni_native_method.h                    |  28 ++
 be/src/vec/exec/vjdbc_connector.cpp                | 225 +++++++++-
 be/src/vec/exec/vjdbc_connector.h                  |  25 +-
 fe/java-udf/pom.xml                                |   6 +
 .../main/java/org/apache/doris/udf/FakeDriver.java |  70 ---
 .../java/org/apache/doris/udf/JNINativeMethod.java |  22 +
 .../java/org/apache/doris/udf/JdbcDataSource.java  |  44 ++
 .../java/org/apache/doris/udf/JdbcExecutor.java    | 471 +++++++++++++++++++--
 .../main/java/org/apache/doris/udf/UdfUtils.java   |   2 +
 13 files changed, 828 insertions(+), 130 deletions(-)

diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 6fe40c4bcd..bcd0a0089b 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -115,6 +115,7 @@ set(UTIL_FILES
   jni-util.cpp
   exception.cpp
   libjvm_loader.cpp
+  jni_native_method.cpp
 )
 
 if (WITH_MYSQL)
diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp
index 7327664b89..b2a65eb69b 100644
--- a/be/src/util/jni-util.cpp
+++ b/be/src/util/jni-util.cpp
@@ -28,6 +28,7 @@
 #include "common/config.h"
 #include "gutil/once.h"
 #include "gutil/strings/substitute.h"
+#include "jni_native_method.h"
 #include "libjvm_loader.h"
 
 using std::string;
@@ -108,6 +109,7 @@ bool JniUtil::jvm_inited_ = false;
 __thread JNIEnv* JniUtil::tls_env_ = nullptr;
 jclass JniUtil::internal_exc_cl_ = NULL;
 jclass JniUtil::jni_util_cl_ = NULL;
+jclass JniUtil::jni_native_method_exc_cl_ = nullptr;
 jmethodID JniUtil::throwable_to_string_id_ = NULL;
 jmethodID JniUtil::throwable_to_stack_trace_id_ = NULL;
 jmethodID JniUtil::get_jvm_metrics_id_ = NULL;
@@ -252,6 +254,37 @@ Status JniUtil::Init() {
         return Status::InternalError("Failed to delete local reference to 
JniUtil class.");
     }
 
+    // Find JNINativeMethod class and create a global ref.
+    jclass local_jni_native_exc_cl = 
env->FindClass("org/apache/doris/udf/JNINativeMethod");
+    if (local_jni_native_exc_cl == nullptr) {
+        if (env->ExceptionOccurred()) {
+            env->ExceptionDescribe();
+        }
+        return Status::InternalError("Failed to find JNINativeMethod class.");
+    }
+    jni_native_method_exc_cl_ =
+            
reinterpret_cast<jclass>(env->NewGlobalRef(local_jni_native_exc_cl));
+    if (jni_native_method_exc_cl_ == nullptr) {
+        if (env->ExceptionOccurred()) {
+            env->ExceptionDescribe();
+        }
+        return Status::InternalError("Failed to create global reference to 
JNINativeMethod class.");
+    }
+    env->DeleteLocalRef(local_jni_native_exc_cl);
+    if (env->ExceptionOccurred()) {
+        return Status::InternalError("Failed to delete local reference to 
JNINativeMethod class.");
+    }
+    std::string function_name = "resizeColumn";
+    std::string function_sign = "(JI)J";
+    static JNINativeMethod java_native_methods[] = {
+            {const_cast<char*>(function_name.c_str()), 
const_cast<char*>(function_sign.c_str()),
+             (void*)&JavaNativeMethods::resizeColumn},
+    };
+
+    int res = env->RegisterNatives(jni_native_method_exc_cl_, 
java_native_methods,
+                                   sizeof(java_native_methods) / 
sizeof(java_native_methods[0]));
+    DCHECK_EQ(res, 0);
+
     // Throwable toString()
     throwable_to_string_id_ = env->GetStaticMethodID(jni_util_cl_, 
"throwableToString",
                                                      
"(Ljava/lang/Throwable;)Ljava/lang/String;");
diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h
index 5593d77dda..0e551f17cf 100644
--- a/be/src/util/jni-util.h
+++ b/be/src/util/jni-util.h
@@ -68,6 +68,7 @@ private:
 
     static bool jvm_inited_;
     static jclass internal_exc_cl_;
+    static jclass jni_native_method_exc_cl_;
     static jclass jni_util_cl_;
     static jmethodID throwable_to_string_id_;
     static jmethodID throwable_to_stack_trace_id_;
diff --git a/be/src/util/jni_native_method.cpp 
b/be/src/util/jni_native_method.cpp
new file mode 100644
index 0000000000..c3baedce6c
--- /dev/null
+++ b/be/src/util/jni_native_method.cpp
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "jni_native_method.h"
+
+#include "vec/columns/column_string.h"
+
+namespace doris {
+
+jlong JavaNativeMethods::resizeColumn(JNIEnv* env, jclass clazz, jlong 
columnAddr, jint length) {
+    auto column = 
reinterpret_cast<vectorized::ColumnString::Chars*>(columnAddr);
+    column->resize(length);
+    return reinterpret_cast<jlong>(column->data());
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/util/jni_native_method.h b/be/src/util/jni_native_method.h
new file mode 100644
index 0000000000..aeb29fcac6
--- /dev/null
+++ b/be/src/util/jni_native_method.h
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <jni.h>
+
+namespace doris {
+
+struct JavaNativeMethods {
+    static jlong resizeColumn(JNIEnv* env, jclass clazz, jlong columnAddr, 
jint length);
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/vjdbc_connector.cpp 
b/be/src/vec/exec/vjdbc_connector.cpp
index 0eb6d6f2a3..3af00dbc62 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -17,6 +17,8 @@
 
 #include "vec/exec/vjdbc_connector.h"
 
+#include <cstring>
+
 #include "common/status.h"
 #include "exec/table_connector.h"
 #include "gen_cpp/Types_types.h"
@@ -28,6 +30,7 @@
 #include "util/runtime_profile.h"
 #include "vec/columns/column_array.h"
 #include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
 #include "vec/data_types/data_type_factory.hpp"
 #include "vec/data_types/data_type_string.h"
 #include "vec/exec/scan/new_jdbc_scanner.h"
@@ -47,6 +50,7 @@ const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V";
 const char* JDBC_EXECUTOR_CONVERT_DATE_SIGNATURE = "(Ljava/lang/Object;Z)J";
 const char* JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE = 
"(Ljava/lang/Object;Z)J";
 const char* JDBC_EXECUTOR_TRANSACTION_SIGNATURE = "()V";
+const char* JDBC_EXECUTOR_COPY_BATCH_SIGNATURE = "(Ljava/lang/Object;ZIJJ)V";
 
 JdbcConnector::JdbcConnector(const JdbcConnectorParam& param)
         : TableConnector(param.tuple_desc, param.query_string),
@@ -246,8 +250,7 @@ Status JdbcConnector::_check_type(SlotDescriptor* 
slot_desc, const std::string&
             type_str, slot_desc->type().debug_string(), slot_desc->col_name());
     switch (slot_desc->type().type) {
     case TYPE_BOOLEAN: {
-        if (type_str != "java.lang.Boolean" && type_str != 
"java.math.BigDecimal" &&
-            type_str != "java.lang.Byte") {
+        if (type_str != "java.lang.Boolean" && type_str != "java.lang.Byte") {
             return Status::InternalError(error_msg);
         }
         break;
@@ -367,18 +370,12 @@ Status JdbcConnector::get_next(bool* eos, 
std::vector<MutableColumnPtr>& columns
         if (!slot_desc->is_materialized()) {
             continue;
         }
-        const std::string& column_name = slot_desc->col_name();
         jobject column_data =
                 env->CallObjectMethod(block_obj, _executor_get_list_id, 
materialized_column_index);
         jint num_rows = env->CallNonvirtualIntMethod(_executor_obj, 
_executor_clazz,
                                                      _executor_block_rows_id);
-        for (int row = 0; row < num_rows; ++row) {
-            jobject cur_data = env->CallObjectMethod(column_data, 
_executor_get_list_id, row);
-            RETURN_IF_ERROR(_convert_column_data(env, cur_data, slot_desc,
-                                                 columns[column_index].get(), 
column_index,
-                                                 column_name));
-            env->DeleteLocalRef(cur_data);
-        }
+        RETURN_IF_ERROR(_convert_batch_result_set(
+                env, column_data, slot_desc, columns[column_index].get(), 
num_rows, column_index));
         env->DeleteLocalRef(column_data);
         //here need to cast string to array type
         if (_need_cast_array_type && slot_desc->type().is_array_type()) {
@@ -391,6 +388,168 @@ Status JdbcConnector::get_next(bool* eos, 
std::vector<MutableColumnPtr>& columns
     return JniUtil::GetJniExceptionMsg(env);
 }
 
+Status JdbcConnector::_convert_batch_result_set(JNIEnv* env, jobject 
jcolumn_data,
+                                                const SlotDescriptor* 
slot_desc,
+                                                vectorized::IColumn* 
column_ptr, int num_rows,
+                                                int column_index) {
+    vectorized::IColumn* col_ptr = column_ptr;
+    col_ptr->resize(num_rows);
+    int64_t address[2] = {0, 0};
+    bool column_is_nullable = slot_desc->is_nullable();
+    if (column_is_nullable) {
+        auto* nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
+        auto& null_map = nullable_column->get_null_map_data();
+        memset(null_map.data(), 0, num_rows);
+        address[0] = reinterpret_cast<int64_t>(null_map.data());
+        col_ptr = &nullable_column->get_nested_column();
+    }
+    switch (slot_desc->type().type) {
+    case TYPE_BOOLEAN: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, 
_executor_get_boolean_result,
+                                      jcolumn_data, column_is_nullable, 
num_rows, address[0],
+                                      address[1]);
+        break;
+    }
+    case TYPE_TINYINT: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, 
_executor_get_tinyint_result,
+                                      jcolumn_data, column_is_nullable, 
num_rows, address[0],
+                                      address[1]);
+        break;
+    }
+    case TYPE_SMALLINT: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, 
_executor_get_smallint_result,
+                                      jcolumn_data, column_is_nullable, 
num_rows, address[0],
+                                      address[1]);
+        break;
+    }
+    case TYPE_INT: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, 
_executor_get_int_result,
+                                      jcolumn_data, column_is_nullable, 
num_rows, address[0],
+                                      address[1]);
+        break;
+    }
+    case TYPE_BIGINT: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, 
_executor_get_bigint_result,
+                                      jcolumn_data, column_is_nullable, 
num_rows, address[0],
+                                      address[1]);
+        break;
+    }
+    case TYPE_LARGEINT: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, 
_executor_get_largeint_result,
+                                      jcolumn_data, column_is_nullable, 
num_rows, address[0],
+                                      address[1]);
+        break;
+    }
+    case TYPE_FLOAT: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, 
_executor_get_float_result,
+                                      jcolumn_data, column_is_nullable, 
num_rows, address[0],
+                                      address[1]);
+        break;
+    }
+    case TYPE_DOUBLE: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, 
_executor_get_double_result,
+                                      jcolumn_data, column_is_nullable, 
num_rows, address[0],
+                                      address[1]);
+        break;
+    }
+    case TYPE_CHAR:
+    case TYPE_STRING:
+    case TYPE_VARCHAR: {
+        auto column_string = 
reinterpret_cast<vectorized::ColumnString*>(col_ptr);
+        address[1] = 
reinterpret_cast<int64_t>(column_string->get_offsets().data());
+        auto chars_addres = 
reinterpret_cast<int64_t>(&column_string->get_chars());
+        env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, 
_executor_get_string_result,
+                                      jcolumn_data, column_is_nullable, 
num_rows, address[0],
+                                      address[1], chars_addres);
+        break;
+    }
+    case TYPE_DATE: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, 
_executor_get_date_result,
+                                      jcolumn_data, column_is_nullable, 
num_rows, address[0],
+                                      address[1]);
+        break;
+    }
+    case TYPE_DATEV2: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, 
_executor_get_datev2_result,
+                                      jcolumn_data, column_is_nullable, 
num_rows, address[0],
+                                      address[1]);
+        break;
+    }
+    case TYPE_DATETIME: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, 
_executor_get_datetime_result,
+                                      jcolumn_data, column_is_nullable, 
num_rows, address[0],
+                                      address[1]);
+        break;
+    }
+    case TYPE_DATETIMEV2: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
+                                      _executor_get_datetimev2_result, 
jcolumn_data,
+                                      column_is_nullable, num_rows, 
address[0], address[1]);
+        break;
+    }
+    case TYPE_DECIMALV2: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
+                                      _executor_get_decimalv2_result, 
jcolumn_data,
+                                      column_is_nullable, num_rows, 
address[0], address[1]);
+        break;
+    }
+    case TYPE_DECIMAL32: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(
+                _executor_obj, _executor_clazz, 
_executor_get_decimal32_result, jcolumn_data,
+                column_is_nullable, num_rows, address[0], address[1], 
slot_desc->type().scale);
+        break;
+    }
+    case TYPE_DECIMAL64: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(
+                _executor_obj, _executor_clazz, 
_executor_get_decimal64_result, jcolumn_data,
+                column_is_nullable, num_rows, address[0], address[1], 
slot_desc->type().scale);
+        break;
+    }
+    case TYPE_DECIMAL128I: {
+        address[1] = reinterpret_cast<int64_t>(col_ptr->get_raw_data().data);
+        env->CallNonvirtualVoidMethod(
+                _executor_obj, _executor_clazz, 
_executor_get_decimal128_result, jcolumn_data,
+                column_is_nullable, num_rows, address[0], address[1], 
slot_desc->type().scale);
+        break;
+    }
+    //todo: now array type maybe should same as before, not need to change 
deal with batch
+    //if need copy by batch, should use string cast to array on all database
+    case TYPE_ARRAY: {
+        const std::string& column_name = slot_desc->col_name();
+        for (int row = 0; row < num_rows; ++row) {
+            jobject cur_data = env->CallNonvirtualObjectMethod(
+                    _executor_obj, _executor_clazz, 
_executor_convert_array_id, jcolumn_data, row);
+            RETURN_IF_ERROR(_convert_column_data(env, cur_data, slot_desc, 
column_ptr, column_index,
+                                                 column_name));
+            env->DeleteLocalRef(cur_data);
+        }
+        break;
+    }
+    default: {
+        const std::string& error_msg =
+                fmt::format("Fail to convert jdbc value to {} on column: {}",
+                            slot_desc->type().debug_string(), 
slot_desc->col_name());
+        return Status::InternalError(std::string(error_msg));
+    }
+    }
+    return JniUtil::GetJniExceptionMsg(env);
+}
+
 Status JdbcConnector::_register_func_id(JNIEnv* env) {
     auto register_id = [&](jclass clazz, const char* func_name, const char* 
func_sign,
                            jmethodID& func_id) {
@@ -415,6 +574,45 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
                                 _executor_has_next_id));
     RETURN_IF_ERROR(
             register_id(_executor_clazz, "getCurBlockRows", "()I", 
_executor_block_rows_id));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchBooleanResult",
+                                JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, 
_executor_get_boolean_result));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchTinyIntResult",
+                                JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, 
_executor_get_tinyint_result));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchSmallIntResult",
+                                JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, 
_executor_get_smallint_result));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchIntResult",
+                                JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, 
_executor_get_int_result));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchBigIntResult",
+                                JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, 
_executor_get_bigint_result));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchLargeIntResult",
+                                JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, 
_executor_get_largeint_result));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchFloatResult",
+                                JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, 
_executor_get_float_result));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDoubleResult",
+                                JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, 
_executor_get_double_result));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchStringResult",
+                                "(Ljava/lang/Object;ZIJJJ)V", 
_executor_get_string_result));
+
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateResult",
+                                JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, 
_executor_get_date_result));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateV2Result",
+                                JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, 
_executor_get_datev2_result));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateTimeResult",
+                                JDBC_EXECUTOR_COPY_BATCH_SIGNATURE, 
_executor_get_datetime_result));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDateTimeV2Result",
+                                JDBC_EXECUTOR_COPY_BATCH_SIGNATURE,
+                                _executor_get_datetimev2_result));
+
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimalV2Result",
+                                JDBC_EXECUTOR_COPY_BATCH_SIGNATURE,
+                                _executor_get_decimalv2_result));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimal32Result",
+                                "(Ljava/lang/Object;ZIJJI)V", 
_executor_get_decimal32_result));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimal64Result",
+                                "(Ljava/lang/Object;ZIJJI)V", 
_executor_get_decimal64_result));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchDecimal128Result",
+                                "(Ljava/lang/Object;ZIJJI)V", 
_executor_get_decimal128_result));
+
     RETURN_IF_ERROR(register_id(_executor_clazz, "getBlock", 
JDBC_EXECUTOR_GET_BLOCK_SIGNATURE,
                                 _executor_get_blocks_id));
     RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateToLong",
@@ -422,6 +620,9 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
     RETURN_IF_ERROR(register_id(_executor_clazz, "convertDateTimeToLong",
                                 JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE,
                                 _executor_convert_datetime_id));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "convertArrayToObject",
+                                "(Ljava/lang/Object;I)Ljava/lang/Object;",
+                                _executor_convert_array_id));
     RETURN_IF_ERROR(register_id(_executor_list_clazz, "get", 
"(I)Ljava/lang/Object;",
                                 _executor_get_list_id));
     RETURN_IF_ERROR(register_id(_executor_list_clazz, "size", "()I", 
_executor_get_list_size_id));
@@ -454,7 +655,8 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, 
jobject jobj,
     if (true == slot_desc->is_nullable()) {
         auto* nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
         if (jobj == nullptr) {
-            nullable_column->insert_data(nullptr, 0);
+            //nullable column of null map have memset 0 before
+            nullable_column->get_nested_column_ptr()->insert_default();
             if (_need_cast_array_type && slot_desc->type().type == TYPE_ARRAY) 
{
                 reinterpret_cast<vectorized::ColumnNullable*>(
                         
str_array_cols[_map_column_idx_to_cast_idx[column_index]].get())
@@ -462,7 +664,6 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, 
jobject jobj,
             }
             return Status::OK();
         } else {
-            nullable_column->get_null_map_data().push_back(0);
             col_ptr = &nullable_column->get_nested_column();
         }
     }
diff --git a/be/src/vec/exec/vjdbc_connector.h 
b/be/src/vec/exec/vjdbc_connector.h
index 4f05253d64..b7c8b6166a 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -97,6 +97,9 @@ private:
     int64_t _jobject_to_datetime(JNIEnv* env, jobject jobj, bool 
is_datetime_v2);
     Status _cast_string_to_array(const SlotDescriptor* slot_desc, Block* 
block, int column_index,
                                  int rows);
+    Status _convert_batch_result_set(JNIEnv* env, jobject jobj, const 
SlotDescriptor* slot_desc,
+                                     vectorized::IColumn* column_ptr, int 
num_rows,
+                                     int column_index);
 
     const JdbcConnectorParam& _conn_param;
     //java.sql.Types: 
https://docs.oracle.com/javase/7/docs/api/constant-values.html#java.sql.Types.INTEGER
@@ -104,7 +107,7 @@ private:
             {-7, TYPE_BOOLEAN}, {-6, TYPE_TINYINT},  {5, TYPE_SMALLINT}, {4, 
TYPE_INT},
             {-5, TYPE_BIGINT},  {12, TYPE_STRING},   {7, TYPE_FLOAT},    {8, 
TYPE_DOUBLE},
             {91, TYPE_DATE},    {93, TYPE_DATETIME}, {2, TYPE_DECIMALV2}};
-    bool _closed;
+    bool _closed = false;
     jclass _executor_clazz;
     jclass _executor_list_clazz;
     jclass _executor_object_clazz;
@@ -116,6 +119,23 @@ private:
     jmethodID _executor_has_next_id;
     jmethodID _executor_block_rows_id;
     jmethodID _executor_get_blocks_id;
+    jmethodID _executor_get_boolean_result;
+    jmethodID _executor_get_tinyint_result;
+    jmethodID _executor_get_smallint_result;
+    jmethodID _executor_get_int_result;
+    jmethodID _executor_get_bigint_result;
+    jmethodID _executor_get_largeint_result;
+    jmethodID _executor_get_float_result;
+    jmethodID _executor_get_double_result;
+    jmethodID _executor_get_string_result;
+    jmethodID _executor_get_date_result;
+    jmethodID _executor_get_datev2_result;
+    jmethodID _executor_get_datetime_result;
+    jmethodID _executor_get_datetimev2_result;
+    jmethodID _executor_get_decimalv2_result;
+    jmethodID _executor_get_decimal32_result;
+    jmethodID _executor_get_decimal64_result;
+    jmethodID _executor_get_decimal128_result;
     jmethodID _executor_get_types_id;
     jmethodID _executor_get_arr_list_id;
     jmethodID _executor_get_arr_type_id;
@@ -124,12 +144,13 @@ private:
     jmethodID _executor_get_list_size_id;
     jmethodID _executor_convert_date_id;
     jmethodID _executor_convert_datetime_id;
+    jmethodID _executor_convert_array_id;
     jmethodID _get_bytes_id;
     jmethodID _to_string_id;
     jmethodID _executor_begin_trans_id;
     jmethodID _executor_finish_trans_id;
     jmethodID _executor_abort_trans_id;
-    bool _need_cast_array_type;
+    bool _need_cast_array_type = false;
     std::map<int, int> _map_column_idx_to_cast_idx;
     std::vector<DataTypePtr> _input_array_string_types;
     std::vector<MutableColumnPtr>
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index b37e9c4696..992b6a5ae7 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -90,6 +90,12 @@ under the License.
             <artifactId>ojdbc6</artifactId>
             <version>11.2.0.4</version>
         </dependency>
+        <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.2.8</version>
+        </dependency>
         <!-- 
https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-engine -->
         <dependency>
             <groupId>org.junit.jupiter</groupId>
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java 
b/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java
deleted file mode 100644
index 94fbde6217..0000000000
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/FakeDriver.java
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.udf;
-
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.DriverPropertyInfo;
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.util.Properties;
-import java.util.logging.Logger;
-
-
-public class FakeDriver implements Driver {
-    private Driver driver;
-
-    FakeDriver(Driver driver) {
-        this.driver = driver;
-    }
-
-    @Override
-    public Connection connect(String url, Properties info) throws SQLException 
{
-        return this.driver.connect(url, info);
-    }
-
-    @Override
-    public boolean acceptsURL(String url) throws SQLException {
-        return this.driver.acceptsURL(url);
-    }
-
-    @Override
-    public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) 
throws SQLException {
-        return this.driver.getPropertyInfo(url, info);
-    }
-
-    @Override
-    public int getMajorVersion() {
-        return this.driver.getMajorVersion();
-    }
-
-    @Override
-    public int getMinorVersion() {
-        return this.driver.getMinorVersion();
-    }
-
-    @Override
-    public boolean jdbcCompliant() {
-        return this.driver.jdbcCompliant();
-    }
-
-    @Override
-    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
-        return null;
-    }
-}
diff --git 
a/fe/java-udf/src/main/java/org/apache/doris/udf/JNINativeMethod.java 
b/fe/java-udf/src/main/java/org/apache/doris/udf/JNINativeMethod.java
new file mode 100644
index 0000000000..9d441b1e54
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JNINativeMethod.java
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+public class JNINativeMethod {
+    public static native long resizeColumn(long columnAddr, int byteSize);
+}
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcDataSource.java 
b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcDataSource.java
new file mode 100644
index 0000000000..4d6f4f59e6
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcDataSource.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+import com.alibaba.druid.pool.DruidDataSource;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class JdbcDataSource {
+    private static final JdbcDataSource jdbcDataSource = new JdbcDataSource();
+    private final Map<String, DruidDataSource> sourcesMap = new 
ConcurrentHashMap<>();
+
+    public static JdbcDataSource getDataSource() {
+        return jdbcDataSource;
+    }
+
+    public DruidDataSource getSource(String jdbcUrl) {
+        return sourcesMap.get(jdbcUrl);
+    }
+
+    public void putSource(String jdbcUrl, DruidDataSource ds) {
+        sourcesMap.put(jdbcUrl, ds);
+    }
+
+    public Map<String, DruidDataSource> getSourcesMap() {
+        return sourcesMap;
+    }
+}
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java 
b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index e76e9e78e3..d2bc7d9622 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -20,21 +20,22 @@ package org.apache.doris.udf;
 import org.apache.doris.thrift.TJdbcExecutorCtorParams;
 import org.apache.doris.thrift.TJdbcOperation;
 
+import com.alibaba.druid.pool.DruidDataSource;
 import com.google.common.base.Preconditions;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 
-import java.io.File;
-import java.lang.reflect.InvocationTargetException;
+import java.io.FileNotFoundException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.RoundingMode;
 import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
+import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
 import java.sql.Date;
-import java.sql.Driver;
-import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
@@ -55,10 +56,11 @@ public class JdbcExecutor {
     private ResultSetMetaData resultSetMetaData = null;
     private List<String> resultColumnTypeNames = null;
     private int baseTypeInt = 0;
-    private URLClassLoader classLoader = null;
-    private List<List<Object>> block = null;
+    private List<Object[]> block = null;
     private int bacthSizeNum = 0;
     private int curBlockRows = 0;
+    private static final byte[] emptyBytes = new byte[0];
+    private DruidDataSource druidDataSource = null;
 
     public JdbcExecutor(byte[] thriftParams) throws Exception {
         TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
@@ -80,17 +82,11 @@ public class JdbcExecutor {
             stmt.close();
         }
         if (conn != null) {
-            conn.clearWarnings();
             conn.close();
         }
-        if (classLoader != null) {
-            classLoader.clearAssertionStatus();
-            classLoader.close();
-        }
         resultSet = null;
         stmt = null;
         conn = null;
-        classLoader = null;
     }
 
     public int read() throws UdfRuntimeException {
@@ -102,24 +98,20 @@ public class JdbcExecutor {
             block = new ArrayList<>(columnCount);
             for (int i = 0; i < columnCount; ++i) {
                 
resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
-                block.add(Arrays.asList(new Object[bacthSizeNum]));
+                Class<?> clazz = 
Class.forName(resultSetMetaData.getColumnClassName(i + 1));
+                block.add((Object[]) Array.newInstance(clazz, bacthSizeNum));
             }
             return columnCount;
         } catch (SQLException e) {
             throw new UdfRuntimeException("JDBC executor sql has error: ", e);
+        } catch (ClassNotFoundException e) {
+            throw new UdfRuntimeException("JDBC executor sql 
ClassNotFoundException: ", e);
         }
     }
 
     public int write(String sql) throws UdfRuntimeException {
         try {
-            boolean res = stmt.execute(sql);
-            if (res) { // sql query
-                resultSet = stmt.getResultSet();
-                resultSetMetaData = resultSet.getMetaData();
-                return resultSetMetaData.getColumnCount();
-            } else {
-                return stmt.getUpdateCount();
-            }
+            return stmt.executeUpdate(sql);
         } catch (SQLException e) {
             throw new UdfRuntimeException("JDBC executor sql has error: ", e);
         }
@@ -173,13 +165,13 @@ public class JdbcExecutor {
         }
     }
 
-    public List<List<Object>> getBlock(int batchSize) throws 
UdfRuntimeException {
+    public List<Object[]> getBlock(int batchSize) throws UdfRuntimeException {
         try {
             int columnCount = resultSetMetaData.getColumnCount();
             curBlockRows = 0;
             do {
                 for (int i = 0; i < columnCount; ++i) {
-                    block.get(i).set(curBlockRows, resultSet.getObject(i + 1));
+                    block.get(i)[curBlockRows] = resultSet.getObject(i + 1);
                 }
                 curBlockRows++;
             } while (curBlockRows < batchSize && resultSet.next());
@@ -243,20 +235,55 @@ public class JdbcExecutor {
                 date.getHour(), date.getMinute(), date.getSecond(), false);
     }
 
+    public byte convertTinyIntToByte(Object obj) {
+        byte res = 0;
+        if (obj instanceof Integer) {
+            res = ((Integer) obj).byteValue();
+        } else if (obj instanceof Short) {
+            res = ((Short) obj).byteValue();
+        }
+        return res;
+    }
+
+    public short convertSmallIntToShort(Object obj) {
+        short res = 0;
+        if (obj instanceof Integer) {
+            res = ((Integer) obj).shortValue();
+        } else if (obj instanceof Short) {
+            res = (short) obj;
+        }
+        return res;
+    }
+
+    public Object convertArrayToObject(Object obj, int idx) {
+        Object[] columnData = (Object[]) obj;
+        if (columnData[idx] instanceof String) {
+            return (String) columnData[idx];
+        } else {
+            return (java.sql.Array) columnData[idx];
+        }
+    }
+
     private void init(String driverUrl, String sql, int batchSize, String 
driverClass, String jdbcUrl, String jdbcUser,
             String jdbcPassword, TJdbcOperation op) throws UdfRuntimeException 
{
         try {
-            File file = new File(driverUrl);
-            URL url = file.toURI().toURL();
-            classLoader = new URLClassLoader(new URL[] {url});
-            Driver driver = (Driver) Class.forName(driverClass, true, 
classLoader).getDeclaredConstructor()
-                    .newInstance();
-            // in jdk11 cann't call addURL function by reflect to load class. 
so use this way
-            // But DriverManager can't find the driverClass correctly, so add 
a faker driver
-            // https://www.kfu.com/~nsayer/Java/dyn-jdbc.html
-            DriverManager.registerDriver(new FakeDriver(driver));
-            conn = DriverManager.getConnection(jdbcUrl, jdbcUser, 
jdbcPassword);
-
+            ClassLoader parent = getClass().getClassLoader();
+            ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, 
parent);
+            druidDataSource = 
JdbcDataSource.getDataSource().getSource(jdbcUrl);
+            if (druidDataSource == null) {
+                DruidDataSource ds = new DruidDataSource();
+                ds.setDriverClassLoader(classLoader);
+                ds.setDriverClassName(driverClass);
+                ds.setUrl(jdbcUrl);
+                ds.setUsername(jdbcUser);
+                ds.setPassword(jdbcPassword);
+                ds.setMinIdle(1);
+                ds.setInitialSize(2);
+                ds.setMaxActive(5);
+                druidDataSource = ds;
+                JdbcDataSource.getDataSource().putSource(jdbcUrl, ds);
+            }
+            conn = druidDataSource.getConnection();
             if (op == TJdbcOperation.READ) {
                 conn.setAutoCommit(false);
                 Preconditions.checkArgument(sql != null);
@@ -267,20 +294,372 @@ public class JdbcExecutor {
             } else {
                 stmt = conn.createStatement();
             }
-        } catch (ClassNotFoundException e) {
-            throw new UdfRuntimeException("ClassNotFoundException:  " + 
driverClass, e);
         } catch (MalformedURLException e) {
             throw new UdfRuntimeException("MalformedURLException to load class 
about " + driverUrl, e);
         } catch (SQLException e) {
             throw new UdfRuntimeException("Initialize datasource failed: ", e);
-        } catch (InstantiationException e) {
-            throw new UdfRuntimeException("InstantiationException failed: ", 
e);
-        } catch (IllegalAccessException e) {
-            throw new UdfRuntimeException("IllegalAccessException failed: ", 
e);
-        } catch (InvocationTargetException e) {
-            throw new UdfRuntimeException("InvocationTargetException new 
instance failed: ", e);
-        } catch (NoSuchMethodException e) {
-            throw new UdfRuntimeException("NoSuchMethodException Load class 
failed: ", e);
+        } catch (FileNotFoundException e) {
+            throw new UdfRuntimeException("FileNotFoundException failed: ", e);
+        }
+    }
+
+    public void copyBatchBooleanResult(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
+            long columnAddr) {
+        Boolean[] column = (Boolean[]) columnObj;
+        if (isNullable) {
+            for (int i = 0; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putByte(columnAddr + i, column[i] ? (byte) 
1 : 0);
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putByte(columnAddr + i, column[i] ? (byte) 1 : 
0);
+            }
+        }
+    }
+
+    public void copyBatchTinyIntResult(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
+            long columnAddr) {
+        Object[] column = (Object[]) columnObj;
+        if (isNullable) {
+            for (int i = 0; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putByte(columnAddr + i, 
convertTinyIntToByte(column[i]));
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putByte(columnAddr + i, 
convertTinyIntToByte(column[i]));
+            }
+        }
+    }
+
+    public void copyBatchSmallIntResult(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
+            long columnAddr) {
+        Object[] column = (Object[]) columnObj;
+        if (isNullable) {
+            for (int i = 0; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putShort(columnAddr + (i * 2), 
convertSmallIntToShort(column[i]));
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putShort(columnAddr + (i * 2), 
convertSmallIntToShort(column[i]));
+            }
+        }
+    }
+
+    public void copyBatchIntResult(Object columnObj, boolean isNullable, int 
numRows, long nullMapAddr,
+            long columnAddr) {
+        Integer[] column = (Integer[]) columnObj;
+        if (isNullable) {
+            for (int i = 0; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putInt(columnAddr + (i * 4), (int) 
column[i]);
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putInt(columnAddr + (i * 4), (int) column[i]);
+            }
+        }
+    }
+
+    public void copyBatchBigIntResult(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
+            long columnAddr) {
+        Long[] column = (Long[]) columnObj;
+        if (isNullable) {
+            for (int i = 0; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), (long) 
column[i]);
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), (long) 
column[i]);
+            }
+        }
+    }
+
+    public void copyBatchLargeIntResult(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
+            long columnAddr) {
+        BigInteger[] column = (BigInteger[]) columnObj;
+        if (isNullable == true) {
+            for (int i = 0; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    byte[] bytes = 
UdfUtils.convertByteOrder(column[i].toByteArray());
+                    byte[] value = new byte[16];
+                    if (column[i].signum() == -1) {
+                        Arrays.fill(value, (byte) -1);
+                    }
+                    for (int index = 0; index < Math.min(bytes.length, 
value.length); ++index) {
+                        value[index] = bytes[index];
+                    }
+                    UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, 
null, columnAddr + (i * 16), 16);
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                byte[] bytes = 
UdfUtils.convertByteOrder(column[i].toByteArray());
+                byte[] value = new byte[16];
+                if (column[i].signum() == -1) {
+                    Arrays.fill(value, (byte) -1);
+                }
+                for (int index = 0; index < Math.min(bytes.length, 
value.length); ++index) {
+                    value[index] = bytes[index];
+                }
+                UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null, 
columnAddr + (i * 16), 16);
+            }
+        }
+    }
+
+    public void copyBatchFloatResult(Object columnObj, boolean isNullable, int 
numRows, long nullMapAddr,
+            long columnAddr) {
+        Float[] column = (Float[]) columnObj;
+        if (isNullable) {
+            for (int i = 0; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putFloat(columnAddr + (i * 4), (float) 
column[i]);
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putFloat(columnAddr + (i * 4), (float) 
column[i]);
+            }
+        }
+    }
+
+    public void copyBatchDoubleResult(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
+            long columnAddr) {
+        Double[] column = (Double[]) columnObj;
+        if (isNullable) {
+            for (int i = 0; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putDouble(columnAddr + (i * 8), (double) 
column[i]);
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putDouble(columnAddr + (i * 8), (double) 
column[i]);
+            }
+        }
+    }
+
+    public void copyBatchDateResult(Object columnObj, boolean isNullable, int 
numRows, long nullMapAddr,
+            long columnAddr) {
+        Object[] column = (Object[]) columnObj;
+        if (isNullable) {
+            for (int i = 0; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), 
convertDateToLong(column[i], false));
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), 
convertDateToLong(column[i], false));
+            }
+        }
+    }
+
+    public void copyBatchDateV2Result(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
+            long columnAddr) {
+        Object[] column = (Object[]) columnObj;
+        if (isNullable) {
+            for (int i = 0; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putInt(columnAddr + (i * 4), (int) 
convertDateToLong(column[i], true));
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putInt(columnAddr + (i * 4), (int) 
convertDateToLong(column[i], true));
+            }
+        }
+    }
+
+    public void copyBatchDateTimeResult(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
+            long columnAddr) throws UdfRuntimeException {
+        Object[] column = (Object[]) columnObj;
+        if (isNullable) {
+            for (int i = 0; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), 
convertDateTimeToLong(column[i], false));
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), 
convertDateTimeToLong(column[i], false));
+            }
+        }
+    }
+
+    public void copyBatchDateTimeV2Result(Object columnObj, boolean 
isNullable, int numRows, long nullMapAddr,
+            long columnAddr) throws UdfRuntimeException {
+        Object[] column = (Object[]) columnObj;
+        if (isNullable) {
+            for (int i = 0; i < numRows; i++) {
+                if (column[i] == null) {
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), 
convertDateTimeToLong(column[i], true));
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                UdfUtils.UNSAFE.putLong(columnAddr + (i * 8), 
convertDateTimeToLong(column[i], true));
+            }
+        }
+    }
+
+    public void copyBatchStringResult(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
+            long offsetsAddr, long charsAddr) {
+        String[] column = (String[]) columnObj;
+        int[] offsets = new int[numRows];
+        byte[][] byteRes = new byte[numRows][];
+        int offset = 0;
+        if (isNullable == true) {
+            for (int i = 0; i < numRows; i++) {
+                if (column[i] == null) {
+                    byteRes[i] = emptyBytes;
+                    UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+                } else {
+                    byteRes[i] = column[i].getBytes(StandardCharsets.UTF_8);
+                }
+                offset += byteRes[i].length;
+                offsets[i] = offset;
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                byteRes[i] = column[i].getBytes(StandardCharsets.UTF_8);
+                offset += byteRes[i].length;
+                offsets[i] = offset;
+            }
+        }
+        byte[] bytes = new byte[offsets[numRows - 1]];
+        long bytesAddr = JNINativeMethod.resizeColumn(charsAddr, 
offsets[numRows - 1]);
+        int dst = 0;
+        for (int i = 0; i < numRows; i++) {
+            for (int j = 0; j < byteRes[i].length; j++) {
+                bytes[dst++] = byteRes[i][j];
+            }
+        }
+        UdfUtils.copyMemory(offsets, UdfUtils.INT_ARRAY_OFFSET, null, 
offsetsAddr, numRows * 4L);
+        UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null, 
bytesAddr, offsets[numRows - 1]);
+    }
+
+    public void copyBatchDecimalV2Result(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
+            long columnAddr) {
+        BigDecimal[] column = (BigDecimal[]) columnObj;
+        BigInteger[] data = new BigInteger[numRows];
+        for (int i = 0; i < numRows; i++) {
+            if (column[i] == null) {
+                data[i] = null;
+                UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+            } else {
+                data[i] = column[i].setScale(9, 
RoundingMode.HALF_EVEN).unscaledValue();
+            }
+        }
+        copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 16);
+    }
+
+    public void copyBatchDecimal32Result(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
+            long columnAddr, int scale) {
+        BigDecimal[] column = (BigDecimal[]) columnObj;
+        BigInteger[] data = new BigInteger[numRows];
+        for (int i = 0; i < numRows; i++) {
+            if (column[i] == null) {
+                data[i] = null;
+                UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+            } else {
+                data[i] = column[i].setScale(scale, 
RoundingMode.HALF_EVEN).unscaledValue();
+            }
+        }
+        copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 4);
+    }
+
+    public void copyBatchDecimal64Result(Object columnObj, boolean isNullable, 
int numRows, long nullMapAddr,
+            long columnAddr, int scale) {
+        BigDecimal[] column = (BigDecimal[]) columnObj;
+        BigInteger[] data = new BigInteger[numRows];
+        for (int i = 0; i < numRows; i++) {
+            if (column[i] == null) {
+                data[i] = null;
+                UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+            } else {
+                data[i] = column[i].setScale(scale, 
RoundingMode.HALF_EVEN).unscaledValue();
+            }
+        }
+        copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 8);
+    }
+
+    public void copyBatchDecimal128Result(Object columnObj, boolean 
isNullable, int numRows, long nullMapAddr,
+            long columnAddr, int scale) {
+        BigDecimal[] column = (BigDecimal[]) columnObj;
+        BigInteger[] data = new BigInteger[numRows];
+        for (int i = 0; i < numRows; i++) {
+            if (column[i] == null) {
+                data[i] = null;
+                UdfUtils.UNSAFE.putByte(nullMapAddr + i, (byte) 1);
+            } else {
+                data[i] = column[i].setScale(scale, 
RoundingMode.HALF_EVEN).unscaledValue();
+            }
+        }
+        copyBatchDecimalResult(data, isNullable, numRows, columnAddr, 16);
+    }
+
+    public void copyBatchDecimalResult(Object columnObj, boolean isNullable, 
int numRows,
+            long columnAddr, int typeLen) {
+        BigInteger[] column = (BigInteger[]) columnObj;
+        if (isNullable == true) {
+            for (int i = 0; i < numRows; i++) {
+                if (column[i] != null) {
+                    byte[] bytes = 
UdfUtils.convertByteOrder(column[i].toByteArray());
+                    byte[] value = new byte[typeLen];
+                    if (column[i].signum() == -1) {
+                        Arrays.fill(value, (byte) -1);
+                    }
+                    for (int index = 0; index < Math.min(bytes.length, 
value.length); ++index) {
+                        value[index] = bytes[index];
+                    }
+                    UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, 
null, columnAddr + (i * typeLen), typeLen);
+                }
+            }
+        } else {
+            for (int i = 0; i < numRows; i++) {
+                byte[] bytes = 
UdfUtils.convertByteOrder(column[i].toByteArray());
+                byte[] value = new byte[typeLen];
+                if (column[i].signum() == -1) {
+                    Arrays.fill(value, (byte) -1);
+                }
+                for (int index = 0; index < Math.min(bytes.length, 
value.length); ++index) {
+                    value[index] = bytes[index];
+                }
+                UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null, 
columnAddr + (i * typeLen), typeLen);
+            }
         }
     }
 }
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java 
b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java
index d9fa55bb2f..4746350cff 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java
@@ -51,6 +51,7 @@ public class UdfUtils {
     public static final Unsafe UNSAFE;
     private static final long UNSAFE_COPY_THRESHOLD = 1024L * 1024L;
     public static final long BYTE_ARRAY_OFFSET;
+    public static final long INT_ARRAY_OFFSET;
 
     static {
         UNSAFE = (Unsafe) AccessController.doPrivileged(
@@ -64,6 +65,7 @@ public class UdfUtils {
                     }
                 });
         BYTE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+        INT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(int[].class);
     }
 
     // Data types that are supported as return or argument types in Java UDFs.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to