imay commented on a change in pull request #450: Add EsScanNode
URL: https://github.com/apache/incubator-doris/pull/450#discussion_r246681927
 
 

 ##########
 File path: be/src/exec/es_scan_node.cpp
 ##########
 @@ -0,0 +1,668 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "es_scan_node.h"
+
+#include <string>
+#include <boost/algorithm/string.hpp>
+#include <gutil/strings/substitute.h>
+
+#include "gen_cpp/PlanNodes_types.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/runtime_state.h"
+#include "runtime/row_batch.h"
+#include "runtime/string_value.h"
+#include "runtime/tuple_row.h"
+#include "runtime/client_cache.h"
+#include "util/runtime_profile.h"
+#include "util/debug_util.h"
+#include "service/backend_options.h"
+#include "olap/olap_common.h"
+#include "olap/utils.h"
+#include "exprs/expr_context.h"
+#include "exprs/expr.h"
+#include "exprs/slot_ref.h"
+
+namespace doris {
+
+// $0 = column type (e.g. INT)
+const string ERROR_INVALID_COL_DATA = "Data source returned inconsistent 
column data. "
+    "Expected value of type $0 based on column metadata. This likely indicates 
a "
+    "problem with the data source library.";
+const string ERROR_MEM_LIMIT_EXCEEDED = "DataSourceScanNode::$0() failed to 
allocate "
+    "$1 bytes for $2.";
+
+EsScanNode::EsScanNode(
+        ObjectPool* pool,
+        const TPlanNode& tnode,
+        const DescriptorTbl& descs) :
+            ScanNode(pool, tnode, descs),
+            _tuple_id(tnode.es_scan_node.tuple_id),
+            _scan_range_idx(0) {
+    if (tnode.es_scan_node.__isset.properties) {
+        _properties = tnode.es_scan_node.properties;
+    }
+}
+
+EsScanNode::~EsScanNode() {
+}
+
+Status EsScanNode::prepare(RuntimeState* state) {
+    VLOG(1) << "EsScanNode::Prepare";
+
+    RETURN_IF_ERROR(ScanNode::prepare(state));
+    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+    if (_tuple_desc == nullptr) {
+        std::stringstream ss;
+        ss << "es tuple descriptor is null, _tuple_id=" << _tuple_id;
+        LOG(WARNING) << ss.str();
+        return Status(ss.str());
+    }
+    _env = state->exec_env();
+
+    return Status::OK;
+}
+
+Status EsScanNode::open(RuntimeState* state) {
+    VLOG(1) << "EsScanNode::Open";
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_ERROR(ExecNode::open(state));
+
+    // TExtOpenParams.row_schema
+    vector<TExtColumnDesc> cols;
+    for (const SlotDescriptor* slot : _tuple_desc->slots()) {
+        TExtColumnDesc col;
+        col.__set_name(slot->col_name());
+        col.__set_type(slot->type().to_thrift());
+        cols.emplace_back(std::move(col));
+    }
+    TExtTableSchema row_schema;
+    row_schema.cols = std::move(cols);
+    row_schema.__isset.cols = true;
+
+    // TExtOpenParams.predicates
+    vector<vector<TExtPredicate> > predicates;
+    vector<int> predicate_to_conjunct;
+    for (int i = 0; i < _conjunct_ctxs.size(); ++i) {
+        VLOG(1) << "conjunct: " << _conjunct_ctxs[i]->root()->debug_string();
+        vector<TExtPredicate> disjuncts;
+        if (get_disjuncts(_conjunct_ctxs[i], _conjunct_ctxs[i]->root(), 
disjuncts)) {
+            predicates.emplace_back(std::move(disjuncts));
+            predicate_to_conjunct.push_back(i);
+        }
+    }
+
+    // open every scan range
+    vector<int> conjunct_accepted_times(_conjunct_ctxs.size(), 0); 
+    for (int i = 0; i < _scan_ranges.size(); ++i) {
+        TEsScanRange& es_scan_range = _scan_ranges[i];
+
+        if (es_scan_range.es_hosts.empty()) {
+            std::stringstream ss;
+            ss << "es fail to open: hosts empty";
+            LOG(WARNING) << ss.str();
+            return Status(ss.str());
+        }
+
+
+        // TExtOpenParams
+        TExtOpenParams params;
+        params.__set_query_id(state->query_id());
+        _properties["index"] = es_scan_range.index;
+        if (es_scan_range.__isset.type) {
+            _properties["type"] = es_scan_range.type;
+        }
+        _properties["shard_id"] = std::to_string(es_scan_range.shard_id);
+        params.__set_properties(_properties);
+        params.__set_row_schema(row_schema);
+        params.__set_batch_size(state->batch_size());
+        params.__set_predicates(predicates);
+        TExtOpenResult result;
+
+        // choose an es node, local is the first choice
+        std::string localhost = BackendOptions::get_localhost();
+        bool is_success = false;
+        for (int j = 0; j < 2; ++j) {
+            for (auto& es_host : es_scan_range.es_hosts) {
+                if ((j == 0 && es_host.hostname != localhost)
+                    || (j == 1 && es_host.hostname == localhost)) {
+                    continue;
+                }
+                Status status = open_es(es_host, result, params);
+                if (status.ok()) {
+                   is_success = true;
+                   _addresses.push_back(es_host);
+                   _scan_handles.push_back(result.scan_handle);
+                   if (result.__isset.accepted_conjuncts) {
+                       for (int index : result.accepted_conjuncts) {
+                           
conjunct_accepted_times[predicate_to_conjunct[index]]++;
+                       }
+                   }
+                   VLOG(1) << "es open success: scan_range_idx=" << i
+                           << ", params=" << 
apache::thrift::ThriftDebugString(params)
+                           << ", result=" << 
apache::thrift::ThriftDebugString(result);
+                   break;
+                } else if (status.code() == TStatusCode::ES_SHARD_NOT_FOUND) {
+                    // if shard not found, try other nodes
+                    LOG(WARNING) << "shard not found on es node: "
+                                 << ", address=" << es_host
+                                 << ", scan_range_idx=" << i << ", try other 
nodes";
+                } else {
+                    LOG(WARNING) << "es open error: scan_range_idx=" << i
+                                 << ", address=" << es_host
+                                 << ", msg=" << status.get_error_msg();
+                    return status;
+                } 
+            }
+            if (is_success) {
+                break;
+            }
+        }
+
+        if (!is_success) {
+            std::stringstream ss;
+            ss << "es open error: scan_range_idx=" << i
+               << ", can't find shard on any node";
+            return Status(ss.str());
+        }
+    }
+
+    // remove those conjuncts that accepted by all scan ranges
+    for (int i = predicate_to_conjunct.size() - 1; i >= 0; i--) {
+        int conjunct_index = predicate_to_conjunct[i];
+        if (conjunct_accepted_times[conjunct_index] == _scan_ranges.size()) {
+            _conjunct_ctxs.erase(_conjunct_ctxs.begin() + conjunct_index);
+        }
+    }
+
+    return Status::OK;
+}
+
+Status EsScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* 
eos) {
+    VLOG(1) << "EsScanNode::GetNext";
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_TIMER(materialize_tuple_timer());
+
+    // create tuple
+    MemPool* tuple_pool = row_batch->tuple_data_pool();
+    int64_t tuple_buffer_size;
+    uint8_t* tuple_buffer = nullptr;
+    RETURN_IF_ERROR(row_batch->resize_and_allocate_tuple_buffer(state, 
&tuple_buffer_size, &tuple_buffer));
+    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
+    
+    // get batch
+    TExtGetNextResult result;
+    RETURN_IF_ERROR(get_next_from_es(result));
+    VLOG(1) << "es get next success: result=" << 
apache::thrift::ThriftDebugString(result);
+    _offsets[_scan_range_idx] += result.rows.num_rows;
+
+    // convert
+    VLOG(1) << "begin to convert: scan_range_idx=" << _scan_range_idx
+            << ", num_rows=" << result.rows.num_rows;
+    vector<TExtColumnData>& cols = result.rows.cols;
+    // indexes of the next non-null value in the row batch, per column. 
+    vector<int> cols_next_val_idx(_tuple_desc->slots().size(), 0);
+    for (int row_idx = 0; row_idx < result.rows.num_rows; row_idx++) {
+        if (reached_limit()) {
+            *eos = true;
+            break;
+        }
+        RETURN_IF_ERROR(materialize_row(tuple_pool, tuple, cols, row_idx, 
cols_next_val_idx));
+        TupleRow* tuple_row = row_batch->get_row(row_batch->add_row());
+        tuple_row->set_tuple(0, tuple);
+        if (ExecNode::eval_conjuncts(_conjunct_ctxs.data(), 
_conjunct_ctxs.size(), tuple_row)) {
+            row_batch->commit_last_row();
+            tuple = reinterpret_cast<Tuple*>(
+                reinterpret_cast<uint8_t*>(tuple) + _tuple_desc->byte_size());
+            ++_num_rows_returned;
+        }
+    }
+
+    VLOG(1) << "finish one batch: num_rows=" << row_batch->num_rows();
+    COUNTER_SET(_rows_returned_counter, _num_rows_returned);
+    if (result.__isset.eos && result.eos) {
+        VLOG(1) << "es finish one scan_range: scan_range_idx=" << 
_scan_range_idx;
+        ++_scan_range_idx;
+    }
+    if (_scan_range_idx == _scan_ranges.size()) {
+        *eos = true;
+    }
+
+    return Status::OK;
+}
+
+Status EsScanNode::close(RuntimeState* state) {
+    if (is_closed()) return Status::OK;
+    VLOG(1) << "EsScanNode::Close";
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+
+    for (int i = 0; i < _addresses.size(); ++i) {
+        TExtCloseParams params;
+        params.__set_scan_handle(_scan_handles[i]);
+        TExtCloseResult result;
+
+#ifndef BE_TEST
+        const TNetworkAddress& address = _addresses[i];
+        try {
+            Status status;
+            ExtDataSourceServiceClientCache* client_cache = 
_env->extdatasource_client_cache();
+            ExtDataSourceServiceConnection client(client_cache, address, 
10000, &status);
+            if (!status.ok()) {
+                LOG(WARNING) << "es create client error: scan_range_idx=" << i
+                             << ", address=" << address
+                             << ", msg=" << status.get_error_msg();
+                return status;
+            }
+
+            try {
+                client->close(result, params);
+            } catch (apache::thrift::transport::TTransportException& e) {
+                RETURN_IF_ERROR(client.reopen());
+                client->close(result, params);
+            }
+        } catch (apache::thrift::TException &e) {
+            std::stringstream ss;
+            ss << "es close error: scan_range_idx=" << i
+               << ", msg=" << e.what();
+            LOG(WARNING) << ss.str();
+            return Status(TStatusCode::THRIFT_RPC_ERROR, ss.str(), false);
+        }
+
+        Status status(result.status);
+        if (!status.ok()) {
+            LOG(WARNING) << "es close error: : scan_range_idx=" << i
+                         << ", msg=" << status.get_error_msg();
+            return status;
+        }
+#else
+        TStatus status;
+        result.__set_status(status);
+#endif
+    }
+
+    RETURN_IF_ERROR(ExecNode::close(state));
+    return Status::OK;
+}
+
+void EsScanNode::debug_string(int indentation_level, stringstream* out) const {
+    *out << string(indentation_level * 2, ' ');
+    *out << "EsScanNode(tupleid=" << _tuple_id;
+    *out << ")" << std::endl;
+
+    for (int i = 0; i < _children.size(); ++i) {
+        _children[i]->debug_string(indentation_level + 1, out);
+    }
+}
+
+Status EsScanNode::set_scan_ranges(const vector<TScanRangeParams>& 
scan_ranges) {
+    for (int i = 0; i < scan_ranges.size(); ++i) {
+        TScanRangeParams scan_range = scan_ranges[i];
+        DCHECK(scan_range.scan_range.__isset.es_scan_range);
+        TEsScanRange es_scan_range = scan_range.scan_range.es_scan_range;
+        _scan_ranges.push_back(es_scan_range);
+    }
+
+    _offsets.resize(scan_ranges.size(), 0);
+    return Status::OK;
+}
+
+Status EsScanNode::open_es(TNetworkAddress& address, TExtOpenResult& result, 
TExtOpenParams& params) {
+#ifndef BE_TEST
+    try {
+        ExtDataSourceServiceClientCache* client_cache = 
_env->extdatasource_client_cache();
+        Status status;
+        ExtDataSourceServiceConnection client(client_cache, address, 10000, 
&status);
+        if (!status.ok()) {
+            std::stringstream ss;
+            ss << "es create client error: address=" << address
+               << ", msg=" << status.get_error_msg();
+            return Status(ss.str());
+        }
+
+        try {
+            client->open(result, params);
+        } catch (apache::thrift::transport::TTransportException& e) {
+            RETURN_IF_ERROR(client.reopen());
+            client->open(result, params);
+        }
+        return Status(result.status);
+    } catch (apache::thrift::TException &e) {
+        std::stringstream ss;
+        ss << "es open error: address=" << address << ", msg=" << e.what();
+        return Status(ss.str());
+    }
+#else
+    TStatus status;
+    result.__set_status(status);
+    result.__set_scan_handle("0");
+    return Status(status);
+#endif
+}
+
+bool EsScanNode::get_disjuncts(ExprContext* context, Expr* conjunct,
+                               vector<TExtPredicate>& disjuncts) {
 
 Review comment:
   output use pointer, and input use const reference

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to