This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f7bc15f9811 [fix](arrow-flight-sql) Fix FE not found arrow flight 
schema (#43960)
f7bc15f9811 is described below

commit f7bc15f9811668cfb8085aa97632337c4ab7a286
Author: Xinyi Zou <zouxi...@selectdb.com>
AuthorDate: Mon Nov 18 11:08:36 2024 +0800

    [fix](arrow-flight-sql) Fix FE not found arrow flight schema (#43960)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    After query first phase `exec_plan_fragment`, FE will fetches arrow
    schema to BE, but BE will generate arrow schema when query second stage
    `ResultSinkLocalState::open`.
    
    Therefore, this pr is changed to generate arrow schema in the first
    phase `ResultSinkLocalState::init`.
    
    Fix:
    
    ```
    rrmsg: Status [errorCode=NOT_FOUND, errorMsg=(172.16.212.191)[NOT_FOUND]FE 
not found arrow flight schema, maybe query has been canceled], error code: 
null, error msg:
    java.lang.RuntimeException: fetch arrow flight schema failed, finstId: 
3573efbeb10c44a7-956531d8e15d1630, errmsg: Status [errorCode=NOT_FOUND, 
errorMsg=(172.16.212.191)[NOT_FOUND]FE not found arrow flight schema, maybe 
query has been canceled]
            at 
org.apache.doris.service.arrowflight.FlightSqlConnectProcessor.fetchArrowFlightSchema(FlightSqlConnectProcessor.java:126)
 ~[doris-fe.jar:1.2-SNAPSHOT]
            at 
org.apache.doris.service.arrowflight.DorisFlightSqlProducer.executeQueryStatement(DorisFlightSqlProducer.java:229)
 ~[doris-fe.jar:1.2-SNAPSHOT]
            at 
org.apache.doris.service.arrowflight.DorisFlightSqlProducer.getFlightInfoStatement(DorisFlightSqlProducer.java:260)
 ~[doris-fe.jar:1.2-SNAPSHOT]
    ```
---
 be/src/pipeline/exec/result_sink_operator.cpp | 21 ++++++++++++---------
 be/src/service/internal_service.cpp           |  1 +
 2 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index a3f1133f00e..f8196910021 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -46,14 +46,25 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
     _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1);
     auto fragment_instance_id = state->fragment_instance_id();
 
+    auto& p = _parent->cast<ResultSinkOperatorX>();
     if (state->query_options().enable_parallel_result_sink) {
         _sender = _parent->cast<ResultSinkOperatorX>()._sender;
     } else {
-        auto& p = _parent->cast<ResultSinkOperatorX>();
         RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
                 fragment_instance_id, p._result_sink_buffer_size_rows, 
&_sender, state));
     }
     _sender->set_dependency(fragment_instance_id, 
_dependency->shared_from_this());
+
+    _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
+    for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, 
_output_vexpr_ctxs[i]));
+    }
+    if (p._sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
+        std::shared_ptr<arrow::Schema> arrow_schema;
+        RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, 
&arrow_schema,
+                                                        state->timezone()));
+        _sender->register_arrow_schema(arrow_schema);
+    }
     return Status::OK();
 }
 
@@ -62,10 +73,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(_open_timer);
     RETURN_IF_ERROR(Base::open(state));
     auto& p = _parent->cast<ResultSinkOperatorX>();
-    _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
-    for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
-        RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, 
_output_vexpr_ctxs[i]));
-    }
     // create writer based on sink type
     switch (p._sink_type) {
     case TResultSinkType::MYSQL_PROTOCAL: {
@@ -79,10 +86,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
         break;
     }
     case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
-        std::shared_ptr<arrow::Schema> arrow_schema;
-        RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs, 
&arrow_schema,
-                                                        state->timezone()));
-        _sender->register_arrow_schema(arrow_schema);
         _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
                 _sender.get(), _output_vexpr_ctxs, _profile));
         break;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 29eb01bad2a..be99278ab54 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -903,6 +903,7 @@ void 
PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController
         auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
                 UniqueId(request->finst_id()).to_thrift(), &schema);
         if (!st.ok()) {
+            LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st;
             st.to_protobuf(result->mutable_status());
             return;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to