This is an automated email from the ASF dual-hosted git repository. zhangstar333 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b6babf3af4 [pipelineX](sink) support jdbc table sink (#24970) b6babf3af4 is described below commit b6babf3af43a2bc8c0f2202aede6df64d0883376 Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Thu Sep 28 14:39:32 2023 +0800 [pipelineX](sink) support jdbc table sink (#24970) * [pipelineX](sink) support jdbc table sink --- be/src/pipeline/exec/jdbc_table_sink_operator.cpp | 76 ++++++++++++++++++++++ be/src/pipeline/exec/jdbc_table_sink_operator.h | 71 ++++++++++++++++++++ be/src/pipeline/pipeline_x/operator.cpp | 6 +- .../pipeline_x/pipeline_x_fragment_context.cpp | 14 ++++ 4 files changed, 166 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp new file mode 100644 index 0000000000..a551762027 --- /dev/null +++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "jdbc_table_sink_operator.h" + +#include <memory> + +#include "common/object_pool.h" +#include "pipeline/exec/operator.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris { +class DataSink; +} // namespace doris + +namespace doris::pipeline { + +JdbcTableSinkOperatorX::JdbcTableSinkOperatorX(const RowDescriptor& row_desc, + const std::vector<TExpr>& t_output_expr) + : DataSinkOperatorX(0), _row_desc(row_desc), _t_output_expr(t_output_expr) {} + +Status JdbcTableSinkOperatorX::init(const TDataSink& thrift_sink) { + RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::init(thrift_sink)); + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); + return Status::OK(); +} + +Status JdbcTableSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::prepare(state)); + // Prepare the exprs to run. + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); + return Status::OK(); +} + +Status JdbcTableSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::open(state)); + // Prepare the exprs to run. + RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state)); + return Status::OK(); +} + +Status JdbcTableSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, + SourceState source_state) { + CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state); + SCOPED_TIMER(local_state.profile()->total_time_counter()); + RETURN_IF_ERROR(local_state.sink(state, block, source_state)); + return Status::OK(); +} + +WriteDependency* JdbcTableSinkOperatorX::wait_for_dependency(RuntimeState* state) { + CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); + return local_state.write_blocked_by(); +} + +bool JdbcTableSinkOperatorX::is_pending_finish(RuntimeState* state) const { + auto& local_state = state->get_sink_local_state(id())->cast<JdbcTableSinkLocalState>(); + return local_state.is_pending_finish(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.h b/be/src/pipeline/exec/jdbc_table_sink_operator.h new file mode 100644 index 0000000000..6db9c38065 --- /dev/null +++ b/be/src/pipeline/exec/jdbc_table_sink_operator.h @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> + +#include "operator.h" +#include "pipeline/pipeline_x/operator.h" +#include "vec/sink/vresult_sink.h" +#include "vec/sink/writer/vjdbc_table_writer.h" + +namespace doris { +class DataSink; + +namespace pipeline { + +class JdbcTableSinkOperatorX; +class JdbcTableSinkLocalState final + : public AsyncWriterSink<vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX> { + ENABLE_FACTORY_CREATOR(JdbcTableSinkLocalState); + +public: + using Base = AsyncWriterSink<vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX>; + JdbcTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : AsyncWriterSink<vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX>(parent, state) { + } + +private: + friend class JdbcTableSinkOperatorX; +}; + +class JdbcTableSinkOperatorX final : public DataSinkOperatorX<JdbcTableSinkLocalState> { +public: + JdbcTableSinkOperatorX(const RowDescriptor& row_desc, const std::vector<TExpr>& select_exprs); + Status init(const TDataSink& thrift_sink) override; + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; + + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override; + + WriteDependency* wait_for_dependency(RuntimeState* state) override; + bool is_pending_finish(RuntimeState* state) const override; + +private: + friend class JdbcTableSinkLocalState; + template <typename Writer, typename Parent> + friend class AsyncWriterSink; + + const RowDescriptor& _row_desc; + const std::vector<TExpr>& _t_output_expr; + vectorized::VExprContextSPtrs _output_vexpr_ctxs; +}; + +} // namespace pipeline +} // namespace doris diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 47d7731344..c6331b04fb 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -35,6 +35,7 @@ #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/jdbc_scan_operator.h" +#include "pipeline/exec/jdbc_table_sink_operator.h" #include "pipeline/exec/meta_scan_operator.h" #include "pipeline/exec/multi_cast_data_stream_source.h" #include "pipeline/exec/nested_loop_join_build_operator.h" @@ -510,7 +511,8 @@ Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_s return Status::OK(); } COUNTER_SET(_wait_for_dependency_timer, _async_writer_dependency->write_watcher_elapse_time()); - if (_writer->need_normal_close()) { + // if the init failed, the _writer may be nullptr. so here need check + if (_writer && _writer->need_normal_close()) { if (exec_status.ok() && !state->is_cancelled()) { RETURN_IF_ERROR(_writer->commit_trans()); } @@ -535,6 +537,7 @@ bool AsyncWriterSink<Writer, Parent>::is_pending_finish() { #define DECLARE_OPERATOR_X(LOCAL_STATE) template class DataSinkOperatorX<LOCAL_STATE>; DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState) DECLARE_OPERATOR_X(ResultSinkLocalState) +DECLARE_OPERATOR_X(JdbcTableSinkLocalState) DECLARE_OPERATOR_X(ResultFileSinkLocalState) DECLARE_OPERATOR_X(AnalyticSinkLocalState) DECLARE_OPERATOR_X(SortSinkLocalState) @@ -602,5 +605,6 @@ template class PipelineXSinkLocalState<MultiCastDependency>; template class PipelineXLocalState<PartitionSortDependency>; template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileSinkOperatorX>; +template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX>; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 7c67b1a07b..64ea086e9a 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -58,6 +58,7 @@ #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/jdbc_scan_operator.h" +#include "pipeline/exec/jdbc_table_sink_operator.h" #include "pipeline/exec/meta_scan_operator.h" #include "pipeline/exec/multi_cast_data_stream_source.h" #include "pipeline/exec/nested_loop_join_build_operator.h" @@ -247,6 +248,19 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData _sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, thrift_sink.result_sink)); break; } + case TDataSinkType::JDBC_TABLE_SINK: { + if (!thrift_sink.__isset.jdbc_table_sink) { + return Status::InternalError("Missing data jdbc sink."); + } + if (config::enable_java_support) { + _sink.reset(new JdbcTableSinkOperatorX(row_desc, output_exprs)); + } else { + return Status::InternalError( + "Jdbc table sink is not enabled, you can change be config " + "enable_java_support to true and restart be."); + } + break; + } case TDataSinkType::RESULT_FILE_SINK: { if (!thrift_sink.__isset.result_file_sink) { return Status::InternalError("Missing result file sink."); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org