imay commented on a change in pull request #450: Add EsScanNode
URL: https://github.com/apache/incubator-doris/pull/450#discussion_r243204079
 
 

 ##########
 File path: be/src/exec/es_scan_node.cpp
 ##########
 @@ -0,0 +1,660 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "es_scan_node.h"
+
+#include <string>
+#include <boost/algorithm/string.hpp>
+#include <gutil/strings/substitute.h>
+
+#include "gen_cpp/PlanNodes_types.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/runtime_state.h"
+#include "runtime/row_batch.h"
+#include "runtime/string_value.h"
+#include "runtime/tuple_row.h"
+#include "runtime/client_cache.h"
+#include "util/runtime_profile.h"
+#include "util/debug_util.h"
+#include "service/backend_options.h"
+#include "olap/olap_common.h"
+#include "olap/utils.h"
+#include "exprs/expr_context.h"
+#include "exprs/expr.h"
+#include "exprs/slot_ref.h"
+
+namespace doris {
+
+// $0 = column type (e.g. INT)
+const string ERROR_INVALID_COL_DATA = "Data source returned inconsistent 
column data. "
+    "Expected value of type $0 based on column metadata. This likely indicates 
a "
+    "problem with the data source library.";
+const string ERROR_MEM_LIMIT_EXCEEDED = "DataSourceScanNode::$0() failed to 
allocate "
+    "$1 bytes for $2.";
+
+EsScanNode::EsScanNode(
+        ObjectPool* pool,
+        const TPlanNode& tnode,
+        const DescriptorTbl& descs) :
+            ScanNode(pool, tnode, descs),
+            _tuple_id(tnode.es_scan_node.tuple_id),
+            _scan_range_idx(0) {
+    if (tnode.es_scan_node.__isset.properties) {
+        _properties = tnode.es_scan_node.properties;
+    }
+}
+
+EsScanNode::~EsScanNode() {
+}
+
+Status EsScanNode::prepare(RuntimeState* state) {
+    LOG(INFO) << "EsScanNode::Prepare";
+
+    RETURN_IF_ERROR(ScanNode::prepare(state));
+    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+    if (nullptr == _tuple_desc) {
+        std::stringstream ss;
+        ss << "es tuple descriptor is null, _tuple_id=" << _tuple_id;
+        LOG(WARNING) << ss.str();
+        return Status(ss.str());
+    }
+    _env = state->exec_env();
+
+    return Status::OK;
+}
+
+Status EsScanNode::open(RuntimeState* state) {
+    LOG(INFO) << "EsScanNode::Open";
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_ERROR(ExecNode::open(state));
+
+    // TExtOpenParams.row_schema
+    vector<TExtColumnDesc> cols;
+    for (const SlotDescriptor* slot : _tuple_desc->slots()) {
+        TExtColumnDesc col;
+        col.__set_name(slot->col_name());
+        col.__set_type(slot->type().to_thrift());
+        cols.push_back(std::move(col));
+    }
+    TExtTableSchema row_schema;
+    row_schema.cols = std::move(cols);
+    row_schema.__isset.cols = true;
+
+    // TExtOpenParams.predicates
+    vector<vector<TExtPredicate> > predicates;
+    vector<int> conjunct_idxes;
+    for (int i = 0; i < _conjunct_ctxs.size(); ++i) {
+        VLOG(1) << "conjunct: " << _conjunct_ctxs[i]->root()->debug_string();
+        vector<TExtPredicate> disjuncts;
+        if (get_disjuncts(_conjunct_ctxs[i], _conjunct_ctxs[i]->root(), 
disjuncts)) {
+            predicates.push_back(std::move(disjuncts));
+            conjunct_idxes.push_back(i);
+        }
+    }
+
+    // open every scan range
+    int conjunct_accepted_times[_conjunct_ctxs.size()]; 
+    for (int i = 0; i < _scan_ranges.size(); ++i) {
+        TEsScanRange es_scan_range = _scan_ranges[i];
+
+        // TExtOpenParams
+        TExtOpenParams params;
+        params.__set_query_id(state->query_id());
+        _properties["index"] = es_scan_range.index;
+        if (es_scan_range.__isset.type) {
+            _properties["type"] = es_scan_range.type;
+        }
+        _properties["shard_id"] = std::to_string(es_scan_range.shard_id);
+        params.__set_properties(_properties);
+        params.__set_row_schema(row_schema);
+        params.__set_batch_size(state->batch_size());
+        params.__set_predicates(predicates);
+        TExtOpenResult result;
+
+        // check es host
+        if (es_scan_range.es_hosts.empty()) {
+            std::stringstream ss;
+            ss << "es fail to open: hosts empty";
+            LOG(ERROR) << ss.str();
+            return Status(ss.str());
+        }
+
+        // choose an es node, local is better
+        TNetworkAddress es_host_selected = es_scan_range.es_hosts[0];
+        int selected_idx = 0;
+        for (int j = 0; j < es_scan_range.es_hosts.size(); j++) {
+            TNetworkAddress& es_host = es_scan_range.es_hosts[j];
+            if (es_host.hostname == BackendOptions::get_localhost()) {
+                es_host_selected = es_host;
+                selected_idx = j;
+                break;
+            }
+        }
+
+        // if shard not found, try other nodes
+        Status status = open_es(es_host_selected, result, params);
+        if (status.code() == TStatusCode::ES_SHARD_NOT_FOUND) {
+            for (int j = 0; j < es_scan_range.es_hosts.size(); j++) {
+                if (j == selected_idx) continue;
+                es_host_selected = es_scan_range.es_hosts[j];
+                status = open_es(es_host_selected, result, params);
+                if (status.code() == TStatusCode::ES_SHARD_NOT_FOUND) {
+                    continue;
+                } else {
+                    break;
+                }
+            }
+        }
+
+        if (!status.ok()) {
+            LOG(WARNING) << "es open error: scan_range_idx=" << i
+                         << ", address=" << es_host_selected
+                         << ", msg=" << status.get_error_msg();
+            return status;
+        }
+
+        // get accepted_conjuncts
+        if (result.__isset.accepted_conjuncts) {
+            for (int conjunct_index : result.accepted_conjuncts) {
+                conjunct_accepted_times[conjunct_index]++;
+            }
+        }
+
+        _addresses.push_back(es_host_selected);
+        _scan_handles.push_back(result.scan_handle);
+        VLOG(1) << "es open success: scan_range_idx=" << i
+                << ", params=" << apache::thrift::ThriftDebugString(params)
+                << ", result=" << apache::thrift::ThriftDebugString(result);
+        }
+
+    // remove those conjuncts that conjunct_accepted_times[i] == 
_scan_ranges.size()
+    for (int i = conjunct_idxes.size() - 1; i >= 0; --i) {
+        if (conjunct_accepted_times[i] == _scan_ranges.size()) {
+            _conjunct_ctxs.erase(_conjunct_ctxs.begin() + i);
+        }
+    }
+
+    return Status::OK;
+}
+
+Status EsScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* 
eos) {
+    LOG(INFO) << "EsScanNode::GetNext";
+
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
+    RETURN_IF_CANCELLED(state);
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_TIMER(materialize_tuple_timer());
+
+    // create tuple
+    MemPool* tuple_pool = row_batch->tuple_data_pool();
+    int64_t tuple_buffer_size;
+    uint8_t* tuple_buffer = nullptr;
+    RETURN_IF_ERROR(row_batch->resize_and_allocate_tuple_buffer(state, 
&tuple_buffer_size, &tuple_buffer));
+    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
+    
+    // get batch
+    TExtGetNextResult result;
+    RETURN_IF_ERROR(get_next_from_es(result));
+    VLOG(1) << "es get next success: result=" << 
apache::thrift::ThriftDebugString(result);
+    _offsets[_scan_range_idx] += result.rows.num_rows;
+
+    // convert
+    VLOG(1) << "begin to convert: scan_range_idx=" << _scan_range_idx
+            << ", num_rows=" << result.rows.num_rows;
+    vector<TExtColumnData>& cols = result.rows.cols;
+    // indexes of the next non-null value in the row batch, per column. 
+    vector<int> cols_next_val_idx(_tuple_desc->slots().size(), 0);
+    for (int row_idx = 0; row_idx < result.rows.num_rows; row_idx++) {
+        if (reached_limit()) {
+            *eos = true;
+            break;
+        }
+        RETURN_IF_ERROR(materialize_row(tuple_pool, tuple, cols, row_idx, 
cols_next_val_idx));
+        TupleRow* tuple_row = row_batch->get_row(row_batch->add_row());
+        tuple_row->set_tuple(0, tuple);
+        if (ExecNode::eval_conjuncts(_conjunct_ctxs.data(), 
_conjunct_ctxs.size(), tuple_row)) {
+            row_batch->commit_last_row();
+            tuple = reinterpret_cast<Tuple*>(
+                reinterpret_cast<uint8_t*>(tuple) + _tuple_desc->byte_size());
+            ++_num_rows_returned;
+        }
+    }
+
+    VLOG(1) << "finish one batch: num_rows=" << row_batch->num_rows();
+    COUNTER_SET(_rows_returned_counter, _num_rows_returned);
+    if (result.__isset.eos && result.eos) {
+        VLOG(1) << "es finish one scan_range: scan_range_idx=" << 
_scan_range_idx;
+        ++_scan_range_idx;
+    }
+    if (_scan_range_idx == _scan_ranges.size()) {
+        *eos = true;
+    }
+
+    return Status::OK;
+}
+
+Status EsScanNode::close(RuntimeState* state) {
+    if (is_closed()) return Status::OK;
+    LOG(INFO) << "EsScanNode::Close";
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+
+    for (int i = 0; i < _addresses.size(); ++i) {
+        TExtCloseParams params;
+        params.__set_scan_handle(_scan_handles[i]);
+        TExtCloseResult result;
+
+#ifndef BE_TEST
+        const TNetworkAddress& address = _addresses[i];
+        try {
+            Status status;
+            ExtDataSourceServiceClientCache* client_cache = 
_env->extdatasource_client_cache();
+            ExtDataSourceServiceConnection client(client_cache, address, 
10000, &status);
+            if (!status.ok()) {
+                LOG(WARNING) << "es create client error: scan_range_idx=" << i
+                             << ", address=" << address
+                             << ", msg=" << status.get_error_msg();
+                return status;
+            }
+
+            try {
+                client->close(result, params);
+            } catch (apache::thrift::transport::TTransportException& e) {
+                RETURN_IF_ERROR(client.reopen());
+                client->close(result, params);
+            }
+        } catch (apache::thrift::TException &e) {
+            std::stringstream ss;
+            ss << "es close error: scan_range_idx=" << i
+               << ", msg=" << e.what();
+            LOG(WARNING) << ss.str();
+            return Status(TStatusCode::THRIFT_RPC_ERROR, ss.str(), false);
+        }
+
+        Status status(result.status);
+        if (!status.ok()) {
+            LOG(WARNING) << "es close error: : scan_range_idx=" << i
+                         << ", msg=" << status.get_error_msg();
+            return status;
+        }
+#else
+        TStatus status;
+        result.__set_status(status);
+#endif
+    }
+
+    RETURN_IF_ERROR(ExecNode::close(state));
+    return Status::OK;
+}
+
+void EsScanNode::debug_string(int indentation_level, stringstream* out) const {
+    *out << string(indentation_level * 2, ' ');
+    *out << "EsScanNode(tupleid=" << _tuple_id;
+    *out << ")" << std::endl;
+
+    for (int i = 0; i < _children.size(); ++i) {
+        _children[i]->debug_string(indentation_level + 1, out);
+    }
+}
+
+Status EsScanNode::set_scan_ranges(const vector<TScanRangeParams>& 
scan_ranges) {
+    for (int i = 0; i < scan_ranges.size(); ++i) {
+        TScanRangeParams scan_range = scan_ranges[i];
+        DCHECK(scan_range.scan_range.__isset.es_scan_range);
+        TEsScanRange es_scan_range = scan_range.scan_range.es_scan_range;
+        _scan_ranges.push_back(es_scan_range);
+    }
+
+    _offsets.resize(scan_ranges.size(), 0);
+    return Status::OK;
+}
+
+Status EsScanNode::open_es(TNetworkAddress& address, TExtOpenResult& result, 
TExtOpenParams& params) {
+#ifndef BE_TEST
+    try {
+        ExtDataSourceServiceClientCache* client_cache = 
_env->extdatasource_client_cache();
+        Status status;
+        ExtDataSourceServiceConnection client(client_cache, address, 10000, 
&status);
+        if (!status.ok()) {
+            std::stringstream ss;
+            ss << "es create client error: address=" << address
+               << ", msg=" << status.get_error_msg();
+            return Status(ss.str());
+        }
+
+        try {
+            client->open(result, params);
+        } catch (apache::thrift::transport::TTransportException& e) {
+            RETURN_IF_ERROR(client.reopen());
+            client->open(result, params);
+        }
+        return Status(result.status);
+    } catch (apache::thrift::TException &e) {
+        std::stringstream ss;
+        ss << "es open error: address=" << address << ", msg=" << e.what();
+        return Status(ss.str());
+    }
+#else
+    TStatus status;
+    result.__set_status(status);
+    result.__set_scan_handle("0");
+    return Status(status);
+#endif
+}
+
+bool EsScanNode::get_disjuncts(ExprContext* context, Expr* conjunct,
+                               vector<TExtPredicate>& disjuncts) {
+    if (TExprNodeType::BINARY_PRED == conjunct->node_type()) {
+        if (conjunct->children().size() != 2) {
+            VLOG(1) << "get disjuncts fail: number of childs is not 2";
+            return false;
+        }
+        SlotRef* slotRef;
+        TExprOpcode::type op;
+        Expr* expr;
+        if (TExprNodeType::SLOT_REF == conjunct->get_child(0)->node_type()) {
+            expr = conjunct->get_child(1);
+            slotRef = (SlotRef*)(conjunct->get_child(0));
+            op = conjunct->op();
+        } else if (TExprNodeType::SLOT_REF == 
conjunct->get_child(1)->node_type()) {
+            expr = conjunct->get_child(0);
+            slotRef = (SlotRef*)(conjunct->get_child(1));
+            op = conjunct->op();
+        } else {
+            VLOG(1) << "get disjuncts fail: no SLOT_REF child";
+            return false;
+        }
+
+        std::vector<SlotId> slot_ids;
+        slotRef->get_slot_ids(&slot_ids);
+        SlotDescriptor* slot_desc = nullptr;
+        for (SlotDescriptor* slot : _tuple_desc->slots()) {
+            if (slot->id() == slot_ids[0]) {
+                slot_desc = slot;
+                break;
+            }
+        }
+        if (nullptr == slot_desc) {
+            VLOG(1) << "get disjuncts fail: slot_desc is null";
+            return false;
+        }
+
+        TExtColumnDesc columnDesc;
+        columnDesc.__set_name(slot_desc->col_name());
+        columnDesc.__set_type(slot_desc->type().to_thrift());
+        TExtBinaryPredicate binaryPredicate;
+        binaryPredicate.__set_col(columnDesc);
+        binaryPredicate.__set_op(op);
+        binaryPredicate.__set_value(get_literal(context, expr));
+        TExtPredicate predicate;
+        predicate.__set_node_type(TExprNodeType::BINARY_PRED);
+        predicate.__set_binary_predicate(binaryPredicate);
+        disjuncts.push_back(std::move(predicate));
+        return true;
+    } else if (TExprNodeType::COMPOUND_PRED == conjunct->node_type()) {
+        if (TExprOpcode::COMPOUND_OR != conjunct->op()) {
+            VLOG(1) << "get disjuncts fail: op is not COMPOUND_OR";
+            return false;
+        }
+        if (!get_disjuncts(context, conjunct->get_child(0), disjuncts)) {
+            return false;
+        }
+        if (!get_disjuncts(context, conjunct->get_child(1), disjuncts)) {
+            return false;
+        }
+        return true;
+    } else {
+        VLOG(1) << "get disjuncts fail: node type is " << conjunct->node_type()
+                << ", should be BINARY_PRED or COMPOUND_PRED";
+        return false;
+    }
+}
+
+TExtLiteral EsScanNode::get_literal(ExprContext* context, Expr* expr) {
+    void* value = context->get_value(expr, NULL);
+    TExtLiteral literal;
+    literal.__set_node_type(expr->node_type());
+    switch (expr->node_type()) {
+    case TExprNodeType::BOOL_LITERAL: {
+        TBoolLiteral bool_literal;
+        bool_literal.__set_value(*reinterpret_cast<bool*>(value));
+        literal.__set_bool_literal(bool_literal);
+        break;
+    }
+    case TExprNodeType::DATE_LITERAL: {
+        DateTimeValue date_value = *reinterpret_cast<DateTimeValue*>(value);
+        char str[MAX_DTVALUE_STR_LEN];
+        date_value.to_string(str);
+        TDateLiteral date_literal;
+        date_literal.__set_value(str);
+        literal.__set_date_literal(date_literal);
+        break;
+    }
+    case TExprNodeType::FLOAT_LITERAL: {
+        TFloatLiteral float_literal;
+        float_literal.__set_value(*reinterpret_cast<float*>(value));
+        literal.__set_float_literal(float_literal);
+        break;
+    }
+    case TExprNodeType::INT_LITERAL: {
+        TIntLiteral int_literal;
+        int_literal.__set_value(*reinterpret_cast<int32_t*>(value));
+        literal.__set_int_literal(int_literal);
+        break;
+    }
+    case TExprNodeType::STRING_LITERAL: {
+        TStringLiteral string_literal;
+        string_literal.__set_value(*reinterpret_cast<string*>(value));
+        literal.__set_string_literal(string_literal);
+        break;
+    }
+    case TExprNodeType::DECIMAL_LITERAL: {
+        TDecimalLiteral decimal_literal;
+        
decimal_literal.__set_value(reinterpret_cast<DecimalValue*>(value)->to_string());
+        literal.__set_decimal_literal(decimal_literal);
+        break;
+    }
+    case TExprNodeType::LARGE_INT_LITERAL: {
+        char buf[48];
+        int len = 48;
+        char* v = 
LargeIntValue::to_string(*reinterpret_cast<__int128*>(value), buf, &len);
+        TLargeIntLiteral large_int_literal;
+        large_int_literal.__set_value(v);
+        literal.__set_large_int_literal(large_int_literal);
+        break;
+    }
+    default:
+        break;
+    }
+    return literal;
+}
+
+Status EsScanNode::get_next_from_es(TExtGetNextResult& result) {
+    TExtGetNextParams params;
+    params.__set_scan_handle(_scan_handles[_scan_range_idx]);
+    params.__set_offset(_offsets[_scan_range_idx]);
+
+    // getNext
+    const TNetworkAddress &address = _addresses[_scan_range_idx];
+#ifndef BE_TEST
+    try {
+        Status create_client_status;
+        ExtDataSourceServiceClientCache *client_cache = 
_env->extdatasource_client_cache();
+        ExtDataSourceServiceConnection client(client_cache, address, 10000, 
&create_client_status);
+        if (!create_client_status.ok()) {
+            LOG(WARNING) << "es create client error: scan_range_idx=" << 
_scan_range_idx
+                         << ", address=" << address
+                         << ", msg=" << create_client_status.get_error_msg();
+            return create_client_status;
+        }
+
+        try {
+            client->getNext(result, params);
+        } catch (apache::thrift::transport::TTransportException& e) {
+            RETURN_IF_ERROR(client.reopen());
+            client->getNext(result, params);
+        }
+    } catch (apache::thrift::TException &e) {
+        std::stringstream ss;
+        ss << "es get_next error: scan_range_idx=" << _scan_range_idx
+           << ", msg=" << e.what();
+        LOG(WARNING) << ss.str();
+        return Status(TStatusCode::THRIFT_RPC_ERROR, ss.str(), false);
+    }
+#else
+    TStatus status;
+    result.__set_status(status);
+    result.__set_eos(true);
+    TExtColumnData col_data;
+    std::vector<bool> is_null;
+    is_null.push_back(false);
+    col_data.__set_is_null(is_null);
+    std::vector<int32_t> int_vals;
+    int_vals.push_back(1);
+    int_vals.push_back(2);
+    col_data.__set_int_vals(int_vals);
+    std::vector<TExtColumnData> cols;
+    cols.push_back(col_data);
+    TExtRowBatch rows;
+    rows.__set_cols(cols);
+    rows.__set_num_rows(2);
+    result.__set_rows(rows);
+    return Status(status);
+#endif
+
+    // check result
+    Status get_next_status(result.status);
+    if (!get_next_status.ok()) {
+        LOG(WARNING) << "es get_next error: scan_range_idx=" << _scan_range_idx
+                     << ", address=" << address
+                     << ", msg=" << get_next_status.get_error_msg();
+        return get_next_status;
+    }
+    if (!result.__isset.rows || !result.rows.__isset.num_rows) {
+        std::stringstream ss;
+        ss << "es get_next error: scan_range_idx=" << _scan_range_idx
+           << ", msg=rows or num_rows not in result";
+        LOG(WARNING) << ss.str();
+        return Status(ss.str());
+    }
+
+    return Status::OK;
+}
+
+Status EsScanNode::materialize_row(MemPool* tuple_pool, Tuple* tuple,
+                                   const vector<TExtColumnData>& cols, int 
row_idx,
+                                   vector<int>& cols_next_val_idx) {
+  tuple->init(_tuple_desc->byte_size());
+
+  for (int i = 0; i < _tuple_desc->slots().size(); ++i) {
+    const SlotDescriptor* slot_desc = _tuple_desc->slots()[i];
+    void* slot = tuple->get_slot(slot_desc->tuple_offset());
+    const TExtColumnData& col = cols[i];
 
 Review comment:
   If slot_desc->is_materialized() is false, you should skip this slot

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to