This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f7bc15f9811 [fix](arrow-flight-sql) Fix FE not found arrow flight schema (#43960) f7bc15f9811 is described below commit f7bc15f9811668cfb8085aa97632337c4ab7a286 Author: Xinyi Zou <zouxi...@selectdb.com> AuthorDate: Mon Nov 18 11:08:36 2024 +0800 [fix](arrow-flight-sql) Fix FE not found arrow flight schema (#43960) ### What problem does this PR solve? Problem Summary: After query first phase `exec_plan_fragment`, FE will fetches arrow schema to BE, but BE will generate arrow schema when query second stage `ResultSinkLocalState::open`. Therefore, this pr is changed to generate arrow schema in the first phase `ResultSinkLocalState::init`. Fix: ``` rrmsg: Status [errorCode=NOT_FOUND, errorMsg=(172.16.212.191)[NOT_FOUND]FE not found arrow flight schema, maybe query has been canceled], error code: null, error msg: java.lang.RuntimeException: fetch arrow flight schema failed, finstId: 3573efbeb10c44a7-956531d8e15d1630, errmsg: Status [errorCode=NOT_FOUND, errorMsg=(172.16.212.191)[NOT_FOUND]FE not found arrow flight schema, maybe query has been canceled] at org.apache.doris.service.arrowflight.FlightSqlConnectProcessor.fetchArrowFlightSchema(FlightSqlConnectProcessor.java:126) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.service.arrowflight.DorisFlightSqlProducer.executeQueryStatement(DorisFlightSqlProducer.java:229) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.service.arrowflight.DorisFlightSqlProducer.getFlightInfoStatement(DorisFlightSqlProducer.java:260) ~[doris-fe.jar:1.2-SNAPSHOT] ``` --- be/src/pipeline/exec/result_sink_operator.cpp | 21 ++++++++++++--------- be/src/service/internal_service.cpp | 1 + 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index a3f1133f00e..f8196910021 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -46,14 +46,25 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1); auto fragment_instance_id = state->fragment_instance_id(); + auto& p = _parent->cast<ResultSinkOperatorX>(); if (state->query_options().enable_parallel_result_sink) { _sender = _parent->cast<ResultSinkOperatorX>()._sender; } else { - auto& p = _parent->cast<ResultSinkOperatorX>(); RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( fragment_instance_id, p._result_sink_buffer_size_rows, &_sender, state)); } _sender->set_dependency(fragment_instance_id, _dependency->shared_from_this()); + + _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size()); + for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); + } + if (p._sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) { + std::shared_ptr<arrow::Schema> arrow_schema; + RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema, + state->timezone())); + _sender->register_arrow_schema(arrow_schema); + } return Status::OK(); } @@ -62,10 +73,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(Base::open(state)); auto& p = _parent->cast<ResultSinkOperatorX>(); - _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size()); - for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { - RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); - } // create writer based on sink type switch (p._sink_type) { case TResultSinkType::MYSQL_PROTOCAL: { @@ -79,10 +86,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) { break; } case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { - std::shared_ptr<arrow::Schema> arrow_schema; - RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, &arrow_schema, - state->timezone())); - _sender->register_arrow_schema(arrow_schema); _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter( _sender.get(), _output_vexpr_ctxs, _profile)); break; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 29eb01bad2a..be99278ab54 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -903,6 +903,7 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema( UniqueId(request->finst_id()).to_thrift(), &schema); if (!st.ok()) { + LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st; st.to_protobuf(result->mutable_status()); return; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org