This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b6babf3af4 [pipelineX](sink) support jdbc table sink (#24970)
b6babf3af4 is described below

commit b6babf3af43a2bc8c0f2202aede6df64d0883376
Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com>
AuthorDate: Thu Sep 28 14:39:32 2023 +0800

    [pipelineX](sink) support jdbc table sink (#24970)
    
    * [pipelineX](sink) support jdbc table sink
---
 be/src/pipeline/exec/jdbc_table_sink_operator.cpp  | 76 ++++++++++++++++++++++
 be/src/pipeline/exec/jdbc_table_sink_operator.h    | 71 ++++++++++++++++++++
 be/src/pipeline/pipeline_x/operator.cpp            |  6 +-
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 14 ++++
 4 files changed, 166 insertions(+), 1 deletion(-)

diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp 
b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
new file mode 100644
index 0000000000..a551762027
--- /dev/null
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "jdbc_table_sink_operator.h"
+
+#include <memory>
+
+#include "common/object_pool.h"
+#include "pipeline/exec/operator.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris {
+class DataSink;
+} // namespace doris
+
+namespace doris::pipeline {
+
+JdbcTableSinkOperatorX::JdbcTableSinkOperatorX(const RowDescriptor& row_desc,
+                                               const std::vector<TExpr>& 
t_output_expr)
+        : DataSinkOperatorX(0), _row_desc(row_desc), 
_t_output_expr(t_output_expr) {}
+
+Status JdbcTableSinkOperatorX::init(const TDataSink& thrift_sink) {
+    
RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::init(thrift_sink));
+    // From the thrift expressions create the real exprs.
+    RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, 
_output_vexpr_ctxs));
+    return Status::OK();
+}
+
+Status JdbcTableSinkOperatorX::prepare(RuntimeState* state) {
+    
RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::prepare(state));
+    // Prepare the exprs to run.
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc));
+    return Status::OK();
+}
+
+Status JdbcTableSinkOperatorX::open(RuntimeState* state) {
+    RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::open(state));
+    // Prepare the exprs to run.
+    RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
+    return Status::OK();
+}
+
+Status JdbcTableSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
block,
+                                    SourceState source_state) {
+    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    SCOPED_TIMER(local_state.profile()->total_time_counter());
+    RETURN_IF_ERROR(local_state.sink(state, block, source_state));
+    return Status::OK();
+}
+
+WriteDependency* JdbcTableSinkOperatorX::wait_for_dependency(RuntimeState* 
state) {
+    CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);
+    return local_state.write_blocked_by();
+}
+
+bool JdbcTableSinkOperatorX::is_pending_finish(RuntimeState* state) const {
+    auto& local_state = 
state->get_sink_local_state(id())->cast<JdbcTableSinkLocalState>();
+    return local_state.is_pending_finish();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.h 
b/be/src/pipeline/exec/jdbc_table_sink_operator.h
new file mode 100644
index 0000000000..6db9c38065
--- /dev/null
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
+#include "vec/sink/vresult_sink.h"
+#include "vec/sink/writer/vjdbc_table_writer.h"
+
+namespace doris {
+class DataSink;
+
+namespace pipeline {
+
+class JdbcTableSinkOperatorX;
+class JdbcTableSinkLocalState final
+        : public AsyncWriterSink<vectorized::VJdbcTableWriter, 
JdbcTableSinkOperatorX> {
+    ENABLE_FACTORY_CREATOR(JdbcTableSinkLocalState);
+
+public:
+    using Base = AsyncWriterSink<vectorized::VJdbcTableWriter, 
JdbcTableSinkOperatorX>;
+    JdbcTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+            : AsyncWriterSink<vectorized::VJdbcTableWriter, 
JdbcTableSinkOperatorX>(parent, state) {
+    }
+
+private:
+    friend class JdbcTableSinkOperatorX;
+};
+
+class JdbcTableSinkOperatorX final : public 
DataSinkOperatorX<JdbcTableSinkLocalState> {
+public:
+    JdbcTableSinkOperatorX(const RowDescriptor& row_desc, const 
std::vector<TExpr>& select_exprs);
+    Status init(const TDataSink& thrift_sink) override;
+    Status prepare(RuntimeState* state) override;
+    Status open(RuntimeState* state) override;
+
+    Status sink(RuntimeState* state, vectorized::Block* in_block,
+                SourceState source_state) override;
+
+    WriteDependency* wait_for_dependency(RuntimeState* state) override;
+    bool is_pending_finish(RuntimeState* state) const override;
+
+private:
+    friend class JdbcTableSinkLocalState;
+    template <typename Writer, typename Parent>
+    friend class AsyncWriterSink;
+
+    const RowDescriptor& _row_desc;
+    const std::vector<TExpr>& _t_output_expr;
+    vectorized::VExprContextSPtrs _output_vexpr_ctxs;
+};
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 47d7731344..c6331b04fb 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -35,6 +35,7 @@
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/exec/jdbc_scan_operator.h"
+#include "pipeline/exec/jdbc_table_sink_operator.h"
 #include "pipeline/exec/meta_scan_operator.h"
 #include "pipeline/exec/multi_cast_data_stream_source.h"
 #include "pipeline/exec/nested_loop_join_build_operator.h"
@@ -510,7 +511,8 @@ Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* 
state, Status exec_s
         return Status::OK();
     }
     COUNTER_SET(_wait_for_dependency_timer, 
_async_writer_dependency->write_watcher_elapse_time());
-    if (_writer->need_normal_close()) {
+    // if the init failed, the _writer may be nullptr. so here need check
+    if (_writer && _writer->need_normal_close()) {
         if (exec_status.ok() && !state->is_cancelled()) {
             RETURN_IF_ERROR(_writer->commit_trans());
         }
@@ -535,6 +537,7 @@ bool AsyncWriterSink<Writer, Parent>::is_pending_finish() {
 #define DECLARE_OPERATOR_X(LOCAL_STATE) template class 
DataSinkOperatorX<LOCAL_STATE>;
 DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState)
 DECLARE_OPERATOR_X(ResultSinkLocalState)
+DECLARE_OPERATOR_X(JdbcTableSinkLocalState)
 DECLARE_OPERATOR_X(ResultFileSinkLocalState)
 DECLARE_OPERATOR_X(AnalyticSinkLocalState)
 DECLARE_OPERATOR_X(SortSinkLocalState)
@@ -602,5 +605,6 @@ template class PipelineXSinkLocalState<MultiCastDependency>;
 template class PipelineXLocalState<PartitionSortDependency>;
 
 template class AsyncWriterSink<doris::vectorized::VFileResultWriter, 
ResultFileSinkOperatorX>;
+template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, 
JdbcTableSinkOperatorX>;
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 7c67b1a07b..64ea086e9a 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -58,6 +58,7 @@
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/exec/jdbc_scan_operator.h"
+#include "pipeline/exec/jdbc_table_sink_operator.h"
 #include "pipeline/exec/meta_scan_operator.h"
 #include "pipeline/exec/multi_cast_data_stream_source.h"
 #include "pipeline/exec/nested_loop_join_build_operator.h"
@@ -247,6 +248,19 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
         _sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, 
thrift_sink.result_sink));
         break;
     }
+    case TDataSinkType::JDBC_TABLE_SINK: {
+        if (!thrift_sink.__isset.jdbc_table_sink) {
+            return Status::InternalError("Missing data jdbc sink.");
+        }
+        if (config::enable_java_support) {
+            _sink.reset(new JdbcTableSinkOperatorX(row_desc, output_exprs));
+        } else {
+            return Status::InternalError(
+                    "Jdbc table sink is not enabled, you can change be config "
+                    "enable_java_support to true and restart be.");
+        }
+        break;
+    }
     case TDataSinkType::RESULT_FILE_SINK: {
         if (!thrift_sink.__isset.result_file_sink) {
             return Status::InternalError("Missing result file sink.");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to