This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a807978882 [refactor](non-vec) Remove rowbatch code from delta writer
and some rowbatch related code (#15349)
a807978882 is described below
commit a807978882dfaea978201998d874012e9474bf70
Author: yiguolei <[email protected]>
AuthorDate: Mon Dec 26 08:54:51 2022 +0800
[refactor](non-vec) Remove rowbatch code from delta writer and some
rowbatch related code (#15349)
Co-authored-by: yiguolei <[email protected]>
---
be/src/exec/data_sink.h | 1 -
be/src/exec/exec_node.cpp | 45 --
be/src/exec/exec_node.h | 45 --
be/src/exec/row_batch_list.h | 130 ----
be/src/exec/table_connector.cpp | 109 ----
be/src/exec/table_connector.h | 4 -
be/src/exec/tablet_info.h | 1 -
be/src/olap/delta_writer.cpp | 26 -
be/src/olap/delta_writer.h | 2 -
be/src/runtime/fragment_mgr.cpp | 1 -
be/src/runtime/load_channel.h | 12 +-
be/src/runtime/plan_fragment_executor.h | 1 -
be/src/runtime/result_writer.h | 1 -
be/src/runtime/tablets_channel.cpp | 11 +-
be/src/service/internal_service.cpp | 60 --
be/src/service/internal_service.h | 15 -
be/src/vec/core/block.cpp | 40 --
be/src/vec/core/block.h | 7 -
be/src/vec/exec/vbroker_scan_node.cpp | 2 -
be/src/vec/exec/vbroker_scan_node.h | 1 -
be/src/vec/exec/vtable_function_node.h | 1 -
be/src/vec/runtime/vsorted_run_merger.h | 6 -
be/src/vec/sink/vdata_stream_sender.h | 1 -
be/src/vec/sink/vmysql_result_writer.h | 1 -
be/src/vec/sink/vresult_sink.h | 1 -
be/src/vec/sink/vtablet_sink.cpp | 1 -
be/src/vec/sink/vtablet_sink.h | 7 -
be/test/CMakeLists.txt | 2 -
be/test/runtime/data_stream_test.cpp | 667 ---------------------
be/test/runtime/load_channel_mgr_test.cpp | 474 ---------------
be/test/runtime/result_sink_test.cpp | 93 ---
be/test/vec/exec/vtablet_sink_test.cpp | 25 -
.../apache/doris/utframe/MockedBackendFactory.java | 6 -
gensrc/proto/internal_service.proto | 2 -
34 files changed, 4 insertions(+), 1797 deletions(-)
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 299e1c5376..f558678308 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -32,7 +32,6 @@
namespace doris {
class ObjectPool;
-class RowBatch;
class RuntimeProfile;
class RuntimeState;
class TPlanFragmentExecParams;
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 616bf9f5e0..ab9f52f4af 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -64,51 +64,6 @@ namespace doris {
const std::string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsReturnedRate";
-ExecNode::RowBatchQueue::RowBatchQueue(int max_batches) :
BlockingQueue<RowBatch*>(max_batches) {}
-
-ExecNode::RowBatchQueue::~RowBatchQueue() {
- DCHECK(cleanup_queue_.empty());
-}
-
-void ExecNode::RowBatchQueue::AddBatch(RowBatch* batch) {
- if (!blocking_put(batch)) {
- std::lock_guard<std::mutex> lock(lock_);
- cleanup_queue_.push_back(batch);
- }
-}
-
-bool ExecNode::RowBatchQueue::AddBatchWithTimeout(RowBatch* batch, int64_t
timeout_micros) {
- // return blocking_put_with_timeout(batch, timeout_micros);
- return blocking_put(batch);
-}
-
-RowBatch* ExecNode::RowBatchQueue::GetBatch() {
- RowBatch* result = nullptr;
- if (blocking_get(&result)) {
- return result;
- }
- return nullptr;
-}
-
-int ExecNode::RowBatchQueue::Cleanup() {
- int num_io_buffers = 0;
-
- // RowBatch* batch = nullptr;
- // while ((batch = GetBatch()) != nullptr) {
- // num_io_buffers += batch->num_io_buffers();
- // delete batch;
- // }
-
- std::lock_guard<std::mutex> l(lock_);
- for (std::list<RowBatch*>::iterator it = cleanup_queue_.begin(); it !=
cleanup_queue_.end();
- ++it) {
- // num_io_buffers += (*it)->num_io_buffers();
- delete *it;
- }
- cleanup_queue_.clear();
- return num_io_buffers;
-}
-
ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs)
: _id(tnode.node_id),
_type(tnode.node_type),
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index f5af72ac61..ff95d96934 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -40,7 +40,6 @@ class Expr;
class ExprContext;
class ObjectPool;
class Counters;
-class RowBatch;
class RuntimeState;
class TPlan;
class TupleRow;
@@ -266,50 +265,6 @@ protected:
/// Only use in vectorized exec engine try to do projections to trans
_row_desc -> _output_row_desc
Status do_projections(vectorized::Block* origin_block, vectorized::Block*
output_block);
- /// Extends blocking queue for row batches. Row batches have a property
that
- /// they must be processed in the order they were produced, even in
cancellation
- /// paths. Preceding row batches can contain ptrs to memory in subsequent
row batches
- /// and we need to make sure those ptrs stay valid.
- /// Row batches that are added after Shutdown() are queued in another
queue, which can
- /// be cleaned up during Close().
- /// All functions are thread safe.
- class RowBatchQueue : public BlockingQueue<RowBatch*> {
- public:
- /// max_batches is the maximum number of row batches that can be
queued.
- /// When the queue is full, producers will block.
- RowBatchQueue(int max_batches);
- ~RowBatchQueue();
-
- /// Adds a batch to the queue. This is blocking if the queue is full.
- void AddBatch(RowBatch* batch);
-
- /// Adds a batch to the queue. If the queue is full, this blocks until
space becomes
- /// available or 'timeout_micros' has elapsed.
- /// Returns true if the element was added to the queue, false if it
wasn't. If this
- /// method returns false, the queue didn't take ownership of the batch
and it must be
- /// managed externally.
- bool AddBatchWithTimeout(RowBatch* batch, int64_t timeout_micros);
-
- /// Gets a row batch from the queue. Returns nullptr if there are no
more.
- /// This function blocks.
- /// Returns nullptr after Shutdown().
- RowBatch* GetBatch();
-
- /// Deletes all row batches in cleanup_queue_. Not valid to call
AddBatch()
- /// after this is called.
- /// Returns the number of io buffers that were released (for debug
tracking)
- int Cleanup();
-
- private:
- /// Lock protecting cleanup_queue_
- // SpinLock lock_;
- // TODO(dhc): need to modify spinlock
- std::mutex lock_;
-
- /// Queue of orphaned row batches
- std::list<RowBatch*> cleanup_queue_;
- };
-
int _id; // unique w/in single plan tree
TPlanNodeType::type _type;
ObjectPool* _pool;
diff --git a/be/src/exec/row_batch_list.h b/be/src/exec/row_batch_list.h
deleted file mode 100644
index a81f0aae86..0000000000
--- a/be/src/exec/row_batch_list.h
+++ /dev/null
@@ -1,130 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-//
https://github.com/apache/impala/blob/branch-2.9.0/be/src/exec/row-batch-list.h
-// and modified by Doris
-
-#pragma once
-
-#include <string>
-#include <vector>
-
-#include "runtime/row_batch.h"
-#include "runtime/tuple_row.h"
-
-namespace doris {
-
-class TupleRow;
-class RowDescriptor;
-class MemPool;
-
-// A simple list structure for RowBatches that provides an interface for
-// iterating over the TupleRows.
-class RowBatchList {
-public:
- RowBatchList() : _total_num_rows(0) {}
- virtual ~RowBatchList() {}
-
- // A simple iterator used to scan over all the rows stored in the list.
- class TupleRowIterator {
- public:
- // Dummy constructor
- TupleRowIterator() : _list(nullptr), _row_idx(0) {}
- virtual ~TupleRowIterator() {}
-
- // Returns true if this iterator is at the end, i.e. get_row() cannot
be called.
- bool at_end() { return _batch_it == _list->_row_batches.end(); }
-
- // Returns the current row. Callers must check the iterator is not
at_end() before
- // calling get_row().
- TupleRow* get_row() {
- DCHECK(!at_end());
- return (*_batch_it)->get_row(_row_idx);
- }
-
- // Increments the iterator. No-op if the iterator is at the end.
- void next() {
- if (_batch_it == _list->_row_batches.end()) {
- return;
- }
-
- if (++_row_idx == (*_batch_it)->num_rows()) {
- ++_batch_it;
- _row_idx = 0;
- }
- }
-
- private:
- friend class RowBatchList;
-
- TupleRowIterator(RowBatchList* list)
- : _list(list), _batch_it(list->_row_batches.begin()),
_row_idx(0) {}
-
- RowBatchList* _list;
- std::vector<RowBatch*>::iterator _batch_it;
- int64_t _row_idx;
- };
-
- // Add the 'row_batch' to the list. The RowBatch* and all of its resources
are owned
- // by the caller.
- void add_row_batch(RowBatch* row_batch) {
- if (row_batch->num_rows() == 0) {
- return;
- }
-
- _row_batches.push_back(row_batch);
- _total_num_rows += row_batch->num_rows();
- }
-
- // Resets the list.
- void reset() {
- _row_batches.clear();
- _total_num_rows = 0;
- }
-
- // Outputs a debug string containing the contents of the list.
- std::string debug_string(const RowDescriptor& desc) {
- std::stringstream out;
- out << "RowBatchList(";
- out << "num_rows=" << _total_num_rows << "; ";
- RowBatchList::TupleRowIterator it = iterator();
-
- while (!it.at_end()) {
- out << " " << it.get_row()->to_string(desc);
- it.next();
- }
-
- out << " )";
- return out.str();
- }
-
- // Returns the total number of rows in all row batches.
- int64_t total_num_rows() { return _total_num_rows; }
-
- // Returns a new iterator over all the tuple rows.
- TupleRowIterator iterator() { return TupleRowIterator(this); }
-
-private:
- friend class TupleRowIterator;
-
- std::vector<RowBatch*> _row_batches;
-
- // Total number of rows
- int64_t _total_num_rows;
-};
-
-} // namespace doris
diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp
index fa725832d3..e342f9abd6 100644
--- a/be/src/exec/table_connector.cpp
+++ b/be/src/exec/table_connector.cpp
@@ -45,115 +45,6 @@ std::u16string TableConnector::utf8_to_u16string(const
char* first, const char*
return utf8_utf16_cvt.from_bytes(first, last);
}
-Status TableConnector::append(const std::string& table_name, RowBatch* batch,
- const std::vector<ExprContext*>&
output_expr_ctxs,
- uint32_t start_send_row, uint32* num_rows_sent) {
- _insert_stmt_buffer.clear();
- std::u16string insert_stmt;
- {
- SCOPED_TIMER(_convert_tuple_timer);
- fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (",
table_name);
-
- int num_rows = batch->num_rows();
- for (int i = start_send_row; i < num_rows; ++i) {
- auto row = batch->get_row(i);
- (*num_rows_sent)++;
-
- // Construct insert statement of odbc table
- int num_columns = output_expr_ctxs.size();
- for (int j = 0; j < num_columns; ++j) {
- if (j != 0) {
- fmt::format_to(_insert_stmt_buffer, "{}", ", ");
- }
- void* item = output_expr_ctxs[j]->get_value(row);
- if (item == nullptr) {
- fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
- continue;
- }
- switch (output_expr_ctxs[j]->root()->type().type) {
- case TYPE_BOOLEAN:
- case TYPE_TINYINT:
- fmt::format_to(_insert_stmt_buffer, "{}",
*static_cast<int8_t*>(item));
- break;
- case TYPE_SMALLINT:
- fmt::format_to(_insert_stmt_buffer, "{}",
*static_cast<int16_t*>(item));
- break;
- case TYPE_INT:
- fmt::format_to(_insert_stmt_buffer, "{}",
*static_cast<int32_t*>(item));
- break;
- case TYPE_BIGINT:
- fmt::format_to(_insert_stmt_buffer, "{}",
*static_cast<int64_t*>(item));
- break;
- case TYPE_FLOAT:
- fmt::format_to(_insert_stmt_buffer, "{}",
*static_cast<float*>(item));
- break;
- case TYPE_DOUBLE:
- fmt::format_to(_insert_stmt_buffer, "{}",
*static_cast<double*>(item));
- break;
- case TYPE_DATE:
- case TYPE_DATETIME: {
- char buf[64];
- const auto* time_val = (const DateTimeValue*)(item);
- time_val->to_string(buf);
- fmt::format_to(_insert_stmt_buffer, "'{}'", buf);
- break;
- }
- case TYPE_VARCHAR:
- case TYPE_CHAR:
- case TYPE_STRING: {
- const auto* string_val = (const StringValue*)(item);
-
- if (string_val->ptr == nullptr) {
- if (string_val->len == 0) {
- fmt::format_to(_insert_stmt_buffer, "{}", "''");
- } else {
- fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
- }
- } else {
- fmt::format_to(_insert_stmt_buffer, "'{}'",
- fmt::basic_string_view(string_val->ptr,
string_val->len));
- }
- break;
- }
- case TYPE_DECIMALV2: {
- const DecimalV2Value decimal_val(
- reinterpret_cast<const
PackedInt128*>(item)->value);
- char buffer[MAX_DECIMAL_WIDTH];
- int output_scale =
output_expr_ctxs[j]->root()->output_scale();
- int len = decimal_val.to_buffer(buffer, output_scale);
- _insert_stmt_buffer.append(buffer, buffer + len);
- break;
- }
- case TYPE_LARGEINT: {
- fmt::format_to(_insert_stmt_buffer, "{}",
- reinterpret_cast<const
PackedInt128*>(item)->value);
- break;
- }
- default: {
- return Status::InternalError("can't convert this type to
mysql type. type = {}",
-
output_expr_ctxs[j]->root()->type().type);
- }
- }
- }
-
- if (i < num_rows - 1 && _insert_stmt_buffer.size() <
INSERT_BUFFER_SIZE) {
- fmt::format_to(_insert_stmt_buffer, "{}", "),(");
- } else {
- // batch exhausted or _insert_stmt_buffer is full, need to do
real insert stmt
- fmt::format_to(_insert_stmt_buffer, "{}", ")");
- break;
- }
- }
- // Translate utf8 string to utf16 to use unicode encoding
- insert_stmt = utf8_to_u16string(_insert_stmt_buffer.data(),
- _insert_stmt_buffer.data() +
_insert_stmt_buffer.size());
- }
-
- RETURN_IF_ERROR(exec_write_sql(insert_stmt, _insert_stmt_buffer));
- COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent);
- return Status::OK();
-}
-
Status TableConnector::append(const std::string& table_name,
vectorized::Block* block,
const std::vector<vectorized::VExprContext*>&
output_vexpr_ctxs,
uint32_t start_send_row, uint32_t* num_rows_sent,
diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h
index 3fa9f5f5b1..a6077b3227 100644
--- a/be/src/exec/table_connector.h
+++ b/be/src/exec/table_connector.h
@@ -49,10 +49,6 @@ public:
virtual Status exec_write_sql(const std::u16string& insert_stmt,
const fmt::memory_buffer&
_insert_stmt_buffer) = 0;
- //write data into table row batch
- Status append(const std::string& table_name, RowBatch* batch,
- const std::vector<ExprContext*>& _output_expr_ctxs, uint32_t
start_send_row,
- uint32_t* num_rows_sent);
//write data into table vectorized
Status append(const std::string& table_name, vectorized::Block* block,
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index 247cd1a4f1..0b6fd00dbb 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -36,7 +36,6 @@
namespace doris {
class MemPool;
-class RowBatch;
struct OlapTableIndexSchema {
int64_t index_id;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 2a0fded006..aa05bed875 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -183,32 +183,6 @@ Status DeltaWriter::write(Tuple* tuple) {
return Status::OK();
}
-Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>&
row_idxs) {
- std::lock_guard<std::mutex> l(_lock);
- if (!_is_init && !_is_cancelled) {
- RETURN_NOT_OK(init());
- }
-
- if (_is_cancelled) {
- return _cancel_status;
- }
-
- _total_received_rows += row_idxs.size();
- for (const auto& row_idx : row_idxs) {
- _mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0));
- }
-
- if (_mem_table->memory_usage() >= config::write_buffer_size) {
- auto s = _flush_memtable_async();
- _reset_mem_table();
- if (OLAP_UNLIKELY(!s.ok())) {
- return s;
- }
- }
-
- return Status::OK();
-}
-
Status DeltaWriter::write(const vectorized::Block* block, const
std::vector<int>& row_idxs) {
if (UNLIKELY(row_idxs.empty())) {
return Status::OK();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index dafd77a8f8..eac3ea75cc 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -27,7 +27,6 @@ namespace doris {
class FlushToken;
class MemTable;
class MemTracker;
-class RowBatch;
class Schema;
class StorageEngine;
class Tuple;
@@ -64,7 +63,6 @@ public:
Status init();
Status write(Tuple* tuple);
- Status write(const RowBatch* row_batch, const std::vector<int>& row_idxs);
Status write(const vectorized::Block* block, const std::vector<int>&
row_idxs);
// flush the last memtable to flush queue, must call it before close_wait()
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 0beccbdf9c..134df9f221 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1010,7 +1010,6 @@ Status FragmentMgr::exec_external_plan_fragment(const
TScanOpenParams& params,
per_node_scan_ranges.insert(std::make_pair((::doris::TPlanNodeId)0,
scan_ranges));
fragment_exec_params.per_node_scan_ranges = per_node_scan_ranges;
exec_fragment_params.__set_params(fragment_exec_params);
- // batch_size for one RowBatch
TQueryOptions query_options;
query_options.batch_size = params.batch_size;
query_options.query_timeout = params.query_timeout;
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 9d8e5f2f33..3581363517 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -153,15 +153,9 @@ Status LoadChannel::add_batch(const
TabletWriterAddRequest& request,
return st;
}
- // 2. add batch to tablets channel
- if constexpr (std::is_same_v<TabletWriterAddRequest,
PTabletWriterAddBatchRequest>) {
- if (request.has_row_batch()) {
- RETURN_IF_ERROR(channel->add_batch(request, response));
- }
- } else {
- if (request.has_block()) {
- RETURN_IF_ERROR(channel->add_batch(request, response));
- }
+ // 2. add block to tablets channel
+ if (request.has_block()) {
+ RETURN_IF_ERROR(channel->add_batch(request, response));
}
// 3. handle eos
diff --git a/be/src/runtime/plan_fragment_executor.h
b/be/src/runtime/plan_fragment_executor.h
index 013c56471f..22bbe785b2 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -36,7 +36,6 @@ namespace doris {
class QueryFragmentsCtx;
class ExecNode;
class RowDescriptor;
-class RowBatch;
class DataSink;
class DataStreamMgr;
class RuntimeProfile;
diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h
index a77956c0c4..4a14a66637 100644
--- a/be/src/runtime/result_writer.h
+++ b/be/src/runtime/result_writer.h
@@ -23,7 +23,6 @@
namespace doris {
class Status;
-class RowBatch;
class RuntimeState;
struct TypeDescriptor;
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 6fdc42f2c4..7856864df4 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -466,13 +466,7 @@ Status TabletsChannel::add_batch(const
TabletWriterAddRequest& request,
}
}
- auto get_send_data = [&]() {
- if constexpr (std::is_same_v<TabletWriterAddRequest,
PTabletWriterAddBatchRequest>) {
- return RowBatch(*_row_desc, request.row_batch());
- } else {
- return vectorized::Block(request.block());
- }
- };
+ auto get_send_data = [&]() { return vectorized::Block(request.block()); };
auto send_data = get_send_data();
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
@@ -507,9 +501,6 @@ Status TabletsChannel::add_batch(const
TabletWriterAddRequest& request,
return Status::OK();
}
-template Status
-TabletsChannel::add_batch<PTabletWriterAddBatchRequest,
PTabletWriterAddBatchResult>(
- PTabletWriterAddBatchRequest const&, PTabletWriterAddBatchResult*);
template Status
TabletsChannel::add_batch<PTabletWriterAddBlockRequest,
PTabletWriterAddBlockResult>(
PTabletWriterAddBlockRequest const&, PTabletWriterAddBlockResult*);
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index e01b83209e..af956b58ca 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -231,66 +231,6 @@ void
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl
});
}
-void
PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcController*
cntl_base,
- const
PTabletWriterAddBatchRequest* request,
-
PTabletWriterAddBatchResult* response,
- google::protobuf::Closure*
done) {
- google::protobuf::Closure* new_done = new
NewHttpClosure<PTransmitDataParams>(done);
- _tablet_writer_add_batch(cntl_base, request, response, new_done);
-}
-
-void PInternalServiceImpl::tablet_writer_add_batch_by_http(
- google::protobuf::RpcController* cntl_base, const
::doris::PEmptyRequest* request,
- PTabletWriterAddBatchResult* response, google::protobuf::Closure*
done) {
- PTabletWriterAddBatchRequest* new_request = new
PTabletWriterAddBatchRequest();
- google::protobuf::Closure* new_done =
- new NewHttpClosure<PTabletWriterAddBatchRequest>(new_request,
done);
- brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
- Status st =
attachment_extract_request_contain_tuple<PTabletWriterAddBatchRequest>(new_request,
-
cntl);
- if (st.ok()) {
- _tablet_writer_add_batch(cntl_base, new_request, response, new_done);
- } else {
- st.to_protobuf(response->mutable_status());
- }
-}
-
-void
PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcController*
cntl_base,
- const
PTabletWriterAddBatchRequest* request,
-
PTabletWriterAddBatchResult* response,
- google::protobuf::Closure*
done) {
- VLOG_RPC << "tablet writer add batch, id=" << request->id()
- << ", index_id=" << request->index_id() << ", sender_id=" <<
request->sender_id()
- << ", current_queued_size=" <<
_tablet_worker_pool.get_queue_size();
- // add batch maybe cost a lot of time, and this callback thread will be
held.
- // this will influence query execution, because the pthreads under bthread
may be
- // exhausted, so we put this to a local thread pool to process
- int64_t submit_task_time_ns = MonotonicNanos();
- _tablet_worker_pool.offer([cntl_base, request, response, done,
submit_task_time_ns, this]() {
- int64_t wait_execution_time_ns = MonotonicNanos() -
submit_task_time_ns;
- brpc::ClosureGuard closure_guard(done);
- int64_t execution_time_ns = 0;
- {
- SCOPED_RAW_TIMER(&execution_time_ns);
-
- // TODO(zxy) delete in 1.2 version
- brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-
attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request,
cntl);
-
- auto st = _exec_env->load_channel_mgr()->add_batch(*request,
response);
- if (!st.ok()) {
- LOG(WARNING) << "tablet writer add batch failed, message=" <<
st
- << ", id=" << request->id() << ", index_id=" <<
request->index_id()
- << ", sender_id=" << request->sender_id()
- << ", backend id=" << request->backend_id();
- }
- st.to_protobuf(response->mutable_status());
- }
- response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO);
- response->set_wait_execution_time_us(wait_execution_time_ns /
NANOS_PER_MICRO);
- });
-}
-
void
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
controller,
const
PTabletWriterCancelRequest* request,
PTabletWriterCancelResult*
response,
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index 3ea3655974..9b2a4db254 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -78,16 +78,6 @@ public:
PTabletWriterOpenResult* response,
google::protobuf::Closure* done) override;
- void tablet_writer_add_batch(google::protobuf::RpcController* controller,
- const PTabletWriterAddBatchRequest* request,
- PTabletWriterAddBatchResult* response,
- google::protobuf::Closure* done) override;
-
- void tablet_writer_add_batch_by_http(google::protobuf::RpcController*
controller,
- const ::doris::PEmptyRequest* request,
- PTabletWriterAddBatchResult* response,
- google::protobuf::Closure* done)
override;
-
void tablet_writer_add_block(google::protobuf::RpcController* controller,
const PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
@@ -178,11 +168,6 @@ private:
::doris::PTransmitDataResult* response,
::google::protobuf::Closure* done,
const Status& extract_st);
- void _tablet_writer_add_batch(google::protobuf::RpcController* controller,
- const PTabletWriterAddBatchRequest* request,
- PTabletWriterAddBatchResult* response,
- google::protobuf::Closure* done);
-
void _tablet_writer_add_block(google::protobuf::RpcController* controller,
const PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 0f4fe065f2..571a391eb0 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -793,46 +793,6 @@ Status Block::serialize(int be_exec_version, PBlock*
pblock,
return Status::OK();
}
-void Block::serialize(RowBatch* output_batch, const RowDescriptor& row_desc) {
- auto num_rows = rows();
- auto mem_pool = output_batch->tuple_data_pool();
-
- for (int i = 0; i < num_rows; ++i) {
- auto tuple_row = output_batch->get_row(i);
- const auto& tuple_descs = row_desc.tuple_descriptors();
- auto column_offset = 0;
-
- for (int j = 0; j < tuple_descs.size(); ++j) {
- auto tuple_desc = tuple_descs[j];
- tuple_row->set_tuple(j, deep_copy_tuple(*tuple_desc, mem_pool, i,
column_offset));
- column_offset += tuple_desc->slots().size();
- }
- output_batch->commit_last_row();
- }
-}
-
-doris::Tuple* Block::deep_copy_tuple(const doris::TupleDescriptor& desc,
MemPool* pool, int row,
- int column_offset, bool padding_char) {
- auto dst =
reinterpret_cast<doris::Tuple*>(pool->allocate(desc.byte_size()));
-
- for (int i = 0; i < desc.slots().size(); ++i) {
- auto slot_desc = desc.slots()[i];
- auto& type_desc = slot_desc->type();
- const auto& column = get_by_position(column_offset + i).column;
- const auto& data_ref =
- type_desc.type != TYPE_ARRAY ? column->get_data_at(row) :
StringRef();
- bool is_null = is_column_data_null(slot_desc->type(), data_ref,
column, row);
- if (is_null) {
- dst->set_null(slot_desc->null_indicator_offset());
- } else {
- dst->set_not_null(slot_desc->null_indicator_offset());
- deep_copy_slot(dst->get_slot(slot_desc->tuple_offset()), pool,
type_desc, data_ref,
- column.get(), row, padding_char);
- }
- }
- return dst;
-}
-
inline bool Block::is_column_data_null(const doris::TypeDescriptor& type_desc,
const StringRef& data_ref, const
IColumn* column, int row) {
if (type_desc.type != TYPE_ARRAY) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 6b7cc9d5a1..803fb82f97 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -40,7 +40,6 @@
namespace doris {
class MemPool;
-class RowBatch;
class RowDescriptor;
class Status;
class Tuple;
@@ -280,9 +279,6 @@ public:
size_t* compressed_bytes, segment_v2::CompressionTypePB
compression_type,
bool allow_transfer_large_data = false) const;
- // serialize block to PRowbatch
- void serialize(RowBatch*, const RowDescriptor&);
-
std::unique_ptr<Block> create_same_struct_block(size_t size) const;
/** Compares (*this) n-th row and rhs m-th row.
@@ -346,9 +342,6 @@ public:
return res;
}
- doris::Tuple* deep_copy_tuple(const TupleDescriptor&, MemPool*, int, int,
- bool padding_char = false);
-
// for String type or Array<String> type
void shrink_char_type_column_suffix_zero(const std::vector<size_t>&
char_type_idx);
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp
b/be/src/vec/exec/vbroker_scan_node.cpp
index e79d3ce104..ac96f97965 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -211,8 +211,6 @@ Status VBrokerScanNode::close(RuntimeState* state) {
_scanner_threads[i].join();
}
- // Close
- _batch_queue.clear();
return ExecNode::close(state);
}
diff --git a/be/src/vec/exec/vbroker_scan_node.h
b/be/src/vec/exec/vbroker_scan_node.h
index 9c5e436b19..de46104088 100644
--- a/be/src/vec/exec/vbroker_scan_node.h
+++ b/be/src/vec/exec/vbroker_scan_node.h
@@ -88,7 +88,6 @@ private:
std::mutex _batch_queue_lock;
std::condition_variable _queue_reader_cond;
std::condition_variable _queue_writer_cond;
- std::deque<std::shared_ptr<RowBatch>> _batch_queue;
int _num_running_scanners;
diff --git a/be/src/vec/exec/vtable_function_node.h
b/be/src/vec/exec/vtable_function_node.h
index 85eccc047a..e28a700f46 100644
--- a/be/src/vec/exec/vtable_function_node.h
+++ b/be/src/vec/exec/vtable_function_node.h
@@ -110,7 +110,6 @@ private:
std::vector<SlotDescriptor*> _child_slots;
std::vector<SlotDescriptor*> _output_slots;
int64_t _cur_child_offset = 0;
- std::shared_ptr<RowBatch> _cur_child_batch;
std::vector<ExprContext*> _fn_ctxs;
std::vector<vectorized::VExprContext*> _vfn_ctxs;
diff --git a/be/src/vec/runtime/vsorted_run_merger.h
b/be/src/vec/runtime/vsorted_run_merger.h
index 974b2f6096..24a9ce7602 100644
--- a/be/src/vec/runtime/vsorted_run_merger.h
+++ b/be/src/vec/runtime/vsorted_run_merger.h
@@ -23,7 +23,6 @@
namespace doris {
-class RowBatch;
class RuntimeProfile;
namespace vectorized {
@@ -54,11 +53,6 @@ public:
// Return the next block of sorted rows from this merger.
Status get_next(Block* output_block, bool* eos);
- // Do not support now
- virtual Status get_batch(RowBatch** output_batch) {
- return Status::InternalError("no support method get_batch(RowBatch**
output_batch)");
- }
-
protected:
const std::vector<VExprContext*>& _ordering_expr;
const std::vector<bool>& _is_asc_order;
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 69cd1ecc9b..2163d26c4b 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -35,7 +35,6 @@
namespace doris {
class ObjectPool;
-class RowBatch;
class RuntimeState;
class RuntimeProfile;
class BufferControlBlock;
diff --git a/be/src/vec/sink/vmysql_result_writer.h
b/be/src/vec/sink/vmysql_result_writer.h
index e566a30213..3f79f0e2d6 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -24,7 +24,6 @@
namespace doris {
class BufferControlBlock;
-class RowBatch;
class MysqlRowBuffer;
class TFetchDataResult;
diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h
index 63441e3179..66fb675def 100644
--- a/be/src/vec/sink/vresult_sink.h
+++ b/be/src/vec/sink/vresult_sink.h
@@ -21,7 +21,6 @@
namespace doris {
class ObjectPool;
-class RowBatch;
class RuntimeState;
class RuntimeProfile;
class BufferControlBlock;
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 13aa7661ea..f042ce50b4 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -1247,7 +1247,6 @@ Status VOlapTableSink::close(RuntimeState* state, Status
exec_status) {
}
Expr::close(_output_expr_ctxs, state);
- _output_batch.reset();
_close_status = status;
DataSink::close(state, exec_status);
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 097bb1e6b7..1c0a1be8d1 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -303,12 +303,6 @@ protected:
// rows number received per tablet, tablet_id -> rows_num
std::vector<std::pair<int64_t, int64_t>> _tablets_received_rows;
- std::unique_ptr<RowBatch> _cur_batch;
- PTabletWriterAddBatchRequest _cur_add_batch_request;
- using AddBatchReq = std::pair<std::unique_ptr<RowBatch>,
PTabletWriterAddBatchRequest>;
- std::queue<AddBatchReq> _pending_batches;
- ReusableClosure<PTabletWriterAddBatchResult>* _add_batch_closure = nullptr;
-
std::unique_ptr<vectorized::MutableBlock> _cur_mutable_block;
PTabletWriterAddBlockRequest _cur_add_block_request;
@@ -543,7 +537,6 @@ private:
OlapTablePartitionParam* _partition = nullptr;
std::vector<ExprContext*> _output_expr_ctxs;
- std::unique_ptr<RowBatch> _output_batch;
VOlapTablePartitionParam* _vpartition = nullptr;
std::vector<vectorized::VExprContext*> _output_vexpr_ctxs;
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index f4a218b020..ce33cd1fda 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -151,8 +151,6 @@ set(OLAP_TEST_FILES
set(RUNTIME_TEST_FILES
# runtime/buffer_control_block_test.cpp
# runtime/result_buffer_mgr_test.cpp
- # runtime/result_sink_test.cpp
- # runtime/data_stream_test.cpp
# runtime/parallel_executor_test.cpp
# runtime/datetime_value_test.cpp
# runtime/dpp_sink_internal_test.cpp
diff --git a/be/test/runtime/data_stream_test.cpp
b/be/test/runtime/data_stream_test.cpp
deleted file mode 100644
index e3b6695dfa..0000000000
--- a/be/test/runtime/data_stream_test.cpp
+++ /dev/null
@@ -1,667 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include <iostream>
-#include <thread>
-
-#include "common/status.h"
-#include "exprs/slot_ref.h"
-#include "gen_cpp/BackendService.h"
-#include "gen_cpp/Descriptors_types.h"
-#include "gen_cpp/Types_types.h"
-#include "runtime/client_cache.h"
-#include "runtime/data_stream_mgr.h"
-#include "runtime/data_stream_recvr.h"
-#include "runtime/data_stream_sender.h"
-#include "runtime/descriptors.h"
-#include "runtime/raw_value.h"
-#include "runtime/row_batch.h"
-#include "runtime/runtime_state.h"
-#include "util/cpu_info.h"
-#include "util/debug_util.h"
-#include "util/disk_info.h"
-#include "util/mem_info.h"
-#include "util/thrift_server.h"
-
-using std::string;
-using std::vector;
-using std::multiset;
-
-using std::unique_ptr;
-using std::thread;
-
-namespace doris {
-
-class DorisTestBackend : public BackendServiceIf {
-public:
- DorisTestBackend(DataStreamMgr* stream_mgr) : _mgr(stream_mgr) {}
- virtual ~DorisTestBackend() {}
-
- virtual void exec_plan_fragment(TExecPlanFragmentResult& return_val,
- const TExecPlanFragmentParams& params) {}
-
- virtual void cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
- const TCancelPlanFragmentParams& params)
{}
-
- virtual void transmit_data(TTransmitDataResult& return_val, const
TTransmitDataParams& params) {
- /*
- LOG(ERROR) << "transmit_data(): instance_id=" <<
params.dest_fragment_instance_id
- << " node_id=" << params.dest_node_id
- << " #rows=" << params.row_batch.num_rows
- << " eos=" << (params.eos ? "true" : "false");
- if (!params.eos) {
- _mgr->add_data(
- params.dest_fragment_instance_id,
- params.dest_node_id,
- params.row_batch,
- params.sender_id).set_t_status(&return_val);
- } else {
- Status status = _mgr->close_sender(
- params.dest_fragment_instance_id, params.dest_node_id,
params.sender_id, params.be_number);
- status.set_t_status(&return_val);
- LOG(ERROR) << "close_sender status: " << status;
- }
- */
- }
-
- virtual void fetch_data(TFetchDataResult& return_val, const
TFetchDataParams& params) {}
-
- virtual void submit_tasks(TAgentResult& return_val,
- const std::vector<TAgentTaskRequest>& tasks) {}
-
- virtual void make_snapshot(TAgentResult& return_val, const
TSnapshotRequest& snapshot_request) {
- }
-
- virtual void release_snapshot(TAgentResult& return_val, const std::string&
snapshot_path) {}
-
- virtual void publish_cluster_state(TAgentResult& return_val,
- const TAgentPublishRequest& request) {}
-
- virtual void register_pull_load_task(TStatus& _return, const TUniqueId& id,
- const int32_t num_senders) {}
-
- virtual void deregister_pull_load_task(TStatus& _return, const TUniqueId&
id) {}
-
- virtual void report_pull_load_sub_task_info(TStatus& _return,
- const TPullLoadSubTaskInfo&
task_info) {}
-
- virtual void fetch_pull_load_task_info(TFetchPullLoadTaskInfoResult&
_return,
- const TUniqueId& id) {}
-
- virtual void
fetch_all_pull_load_task_infos(TFetchAllPullLoadTaskInfosResult& _return) {}
-
-private:
- DataStreamMgr* _mgr;
-};
-
-class DataStreamTest : public testing::Test {
-protected:
- DataStreamTest() : _runtime_state(TUniqueId(), TQueryOptions(), "",
&_exec_env), _next_val(0) {
- _exec_env.init_for_tests();
- _runtime_state.init_mem_trackers(TUniqueId());
- }
- // null dtor to pass codestyle check
- ~DataStreamTest() {}
-
- virtual void SetUp() {
- create_row_desc();
- create_tuple_comparator();
- create_row_batch();
-
- _next_instance_id.lo = 0;
- _next_instance_id.hi = 0;
- _stream_mgr = new DataStreamMgr();
-
- _broadcast_sink.dest_node_id = DEST_NODE_ID;
- _broadcast_sink.output_partition.type = TPartitionType::UNPARTITIONED;
-
- _random_sink.dest_node_id = DEST_NODE_ID;
- _random_sink.output_partition.type = TPartitionType::RANDOM;
-
- _hash_sink.dest_node_id = DEST_NODE_ID;
- _hash_sink.output_partition.type = TPartitionType::HASH_PARTITIONED;
- // there's only one column to partition on
- TExprNode expr_node;
- expr_node.node_type = TExprNodeType::SLOT_REF;
- expr_node.type.types.push_back(TTypeNode());
- expr_node.type.types.back().__isset.scalar_type = true;
- expr_node.type.types.back().scalar_type.type = TPrimitiveType::BIGINT;
- expr_node.num_children = 0;
- TSlotRef slot_ref;
- slot_ref.slot_id = 0;
- expr_node.__set_slot_ref(slot_ref);
- TExpr expr;
- expr.nodes.push_back(expr_node);
- _hash_sink.output_partition.__isset.partition_exprs = true;
- _hash_sink.output_partition.partition_exprs.push_back(expr);
-
- // Ensure that individual sender info addresses don't change
- _sender_info.reserve(MAX_SENDERS);
- _receiver_info.reserve(MAX_RECEIVERS);
- start_backend();
- }
-
- const TDataStreamSink& get_sink(TPartitionType::type partition_type) {
- switch (partition_type) {
- case TPartitionType::UNPARTITIONED:
- return _broadcast_sink;
- case TPartitionType::RANDOM:
- return _random_sink;
- case TPartitionType::HASH_PARTITIONED:
- return _hash_sink;
- default:
- DCHECK(false) << "Unhandled sink type: " << partition_type;
- }
- // Should never reach this.
- return _broadcast_sink;
- }
-
- virtual void TearDown() {
- _lhs_slot_ctx->close(nullptr);
- _rhs_slot_ctx->close(nullptr);
- _exec_env.client_cache()->test_shutdown();
- stop_backend();
- }
-
- void reset() {
- _sender_info.clear();
- _receiver_info.clear();
- _dest.clear();
- }
-
- // We reserve contiguous memory for senders in SetUp. If a test uses more
- // senders, a DCHECK will fail and you should increase this value.
- static const int MAX_SENDERS = 16;
- static const int MAX_RECEIVERS = 16;
- static const PlanNodeId DEST_NODE_ID = 1;
- static const int BATCH_CAPACITY = 100; // rows
- static const int PER_ROW_DATA = 8;
- static const int TOTAL_DATA_SIZE = 8 * 1024;
- static const int NUM_BATCHES = TOTAL_DATA_SIZE / BATCH_CAPACITY /
PER_ROW_DATA;
-
- ObjectPool _obj_pool;
- DescriptorTbl* _desc_tbl;
- const RowDescriptor* _row_desc;
- TupleRowComparator* _less_than;
- ExecEnv _exec_env;
- RuntimeState _runtime_state;
- TUniqueId _next_instance_id;
- string _stmt;
-
- // RowBatch generation
- std::unique_ptr<RowBatch> _batch;
- int _next_val;
- int64_t* _tuple_mem;
-
- // receiving node
- DataStreamMgr* _stream_mgr;
- ThriftServer* _server;
-
- // sending node(s)
- TDataStreamSink _broadcast_sink;
- TDataStreamSink _random_sink;
- TDataStreamSink _hash_sink;
- std::vector<TPlanFragmentDestination> _dest;
-
- struct SenderInfo {
- thread* thread_handle;
- Status status;
- int num_bytes_sent;
-
- SenderInfo() : thread_handle(nullptr), num_bytes_sent(0) {}
- };
- std::vector<SenderInfo> _sender_info;
-
- struct ReceiverInfo {
- TPartitionType::type stream_type;
- int num_senders;
- int receiver_num;
-
- thread* thread_handle;
- std::shared_ptr<DataStreamRecvr> stream_recvr;
- Status status;
- int num_rows_received;
- multiset<int64_t> data_values;
-
- ReceiverInfo(TPartitionType::type stream_type, int num_senders, int
receiver_num)
- : stream_type(stream_type),
- num_senders(num_senders),
- receiver_num(receiver_num),
- thread_handle(nullptr),
- stream_recvr(nullptr),
- num_rows_received(0) {}
-
- ~ReceiverInfo() {
- delete thread_handle;
- stream_recvr.reset();
- }
- };
- std::vector<ReceiverInfo> _receiver_info;
-
- // Create an instance id and add it to _dest
- void get_next_instance_id(TUniqueId* instance_id) {
- _dest.push_back(TPlanFragmentDestination());
- TPlanFragmentDestination& dest = _dest.back();
- dest.fragment_instance_id = _next_instance_id;
- dest.server.hostname = "127.0.0.1";
- dest.server.port = config::port;
- *instance_id = _next_instance_id;
- ++_next_instance_id.lo;
- }
-
- // RowDescriptor to mimic "select bigint_col from alltypesagg", except the
slot
- // isn't nullable
- void create_row_desc() {
- // create DescriptorTbl
- TTupleDescriptor tuple_desc;
- tuple_desc.__set_id(0);
- tuple_desc.__set_byteSize(8);
- tuple_desc.__set_numNullBytes(0);
- TDescriptorTable thrift_desc_tbl;
- thrift_desc_tbl.tupleDescriptors.push_back(tuple_desc);
- TSlotDescriptor slot_desc;
- slot_desc.__set_id(0);
- slot_desc.__set_parent(0);
-
- slot_desc.slotType.types.push_back(TTypeNode());
- slot_desc.slotType.types.back().__isset.scalar_type = true;
- slot_desc.slotType.types.back().scalar_type.type =
TPrimitiveType::BIGINT;
-
- slot_desc.__set_columnPos(0);
- slot_desc.__set_byteOffset(0);
- slot_desc.__set_nullIndicatorByte(0);
- slot_desc.__set_nullIndicatorBit(-1);
- slot_desc.__set_slotIdx(0);
- slot_desc.__set_isMaterialized(true);
- thrift_desc_tbl.slotDescriptors.push_back(slot_desc);
- EXPECT_TRUE(DescriptorTbl::create(&_obj_pool, thrift_desc_tbl,
&_desc_tbl).ok());
- _runtime_state.set_desc_tbl(_desc_tbl);
-
- std::vector<TTupleId> row_tids;
- row_tids.push_back(0);
-
- std::vector<bool> nullable_tuples;
- nullable_tuples.push_back(false);
- _row_desc = _obj_pool.add(new RowDescriptor(*_desc_tbl, row_tids,
nullable_tuples));
- }
-
- // Create a tuple comparator to sort in ascending order on the single
bigint column.
- void create_tuple_comparator() {
- TExprNode expr_node;
- expr_node.node_type = TExprNodeType::SLOT_REF;
- expr_node.type.types.push_back(TTypeNode());
- expr_node.type.types.back().__isset.scalar_type = true;
- expr_node.type.types.back().scalar_type.type = TPrimitiveType::BIGINT;
- expr_node.num_children = 0;
- TSlotRef slot_ref;
- slot_ref.slot_id = 0;
- expr_node.__set_slot_ref(slot_ref);
-
- SlotRef* lhs_slot = _obj_pool.add(new SlotRef(expr_node));
- _lhs_slot_ctx = _obj_pool.add(new ExprContext(lhs_slot));
- SlotRef* rhs_slot = _obj_pool.add(new SlotRef(expr_node));
- _rhs_slot_ctx = _obj_pool.add(new ExprContext(rhs_slot));
-
- _lhs_slot_ctx->prepare(&_runtime_state, *_row_desc);
- _rhs_slot_ctx->prepare(&_runtime_state, *_row_desc);
- _lhs_slot_ctx->open(nullptr);
- _rhs_slot_ctx->open(nullptr);
- SortExecExprs* sort_exprs = _obj_pool.add(new SortExecExprs());
- sort_exprs->init(vector<ExprContext*>(1, _lhs_slot_ctx),
- std::vector<ExprContext*>(1, _rhs_slot_ctx));
- _less_than = _obj_pool.add(new TupleRowComparator(*sort_exprs,
std::vector<bool>(1, true),
- std::vector<bool>(1,
false)));
- }
-
- // Create _batch, but don't fill it with data yet. Assumes we created
_row_desc.
- RowBatch* create_row_batch() {
- RowBatch* batch = new RowBatch(*_row_desc, BATCH_CAPACITY);
- int64_t* tuple_mem =
-
reinterpret_cast<int64_t*>(batch->tuple_data_pool()->allocate(BATCH_CAPACITY *
8));
- bzero(tuple_mem, BATCH_CAPACITY * 8);
-
- for (int i = 0; i < BATCH_CAPACITY; ++i) {
- int idx = batch->add_row();
- TupleRow* row = batch->get_row(idx);
- row->set_tuple(0, reinterpret_cast<Tuple*>(&tuple_mem[i]));
- batch->commit_last_row();
- }
-
- return batch;
- }
-
- void get_next_batch(RowBatch* batch, int* next_val) {
- LOG(INFO) << "batch_capacity=" << BATCH_CAPACITY << " next_val=" <<
*next_val;
- for (int i = 0; i < BATCH_CAPACITY; ++i) {
- TupleRow* row = batch->get_row(i);
- int64_t* val =
reinterpret_cast<int64_t*>(row->get_tuple(0)->get_slot(0));
- *val = (*next_val)++;
- }
- }
-
- // Start receiver (expecting given number of senders) in separate thread.
- void start_receiver(TPartitionType::type stream_type, int num_senders, int
receiver_num,
- int buffer_size, bool is_merging, TUniqueId* out_id =
nullptr) {
- VLOG_QUERY << "start receiver";
- RuntimeProfile* profile = _obj_pool.add(new
RuntimeProfile("TestReceiver"));
- TUniqueId instance_id;
- get_next_instance_id(&instance_id);
- _receiver_info.push_back(ReceiverInfo(stream_type, num_senders,
receiver_num));
- ReceiverInfo& info = _receiver_info.back();
- info.stream_recvr =
- _stream_mgr->create_recvr(&_runtime_state, *_row_desc,
instance_id, DEST_NODE_ID,
- num_senders, buffer_size, profile,
is_merging);
- if (!is_merging) {
- info.thread_handle = new thread(&DataStreamTest::read_stream,
this, &info);
- } else {
- info.thread_handle =
- new thread(&DataStreamTest::read_stream_merging, this,
&info, profile);
- }
-
- if (out_id != nullptr) {
- *out_id = instance_id;
- }
- }
-
- void join_receivers() {
- VLOG_QUERY << "join receiver\n";
-
- for (int i = 0; i < _receiver_info.size(); ++i) {
- _receiver_info[i].thread_handle->join();
- _receiver_info[i].stream_recvr->close();
- }
- }
-
- // Deplete stream and print batches
- void read_stream(ReceiverInfo* info) {
- RowBatch* batch = nullptr;
- VLOG_QUERY << "start reading";
-
- while (!(info->status =
info->stream_recvr->get_batch(&batch)).is_cancelled() &&
- (batch != nullptr)) {
- VLOG_QUERY << "read batch #rows=" << (batch != nullptr ?
batch->num_rows() : 0);
-
- for (int i = 0; i < batch->num_rows(); ++i) {
- TupleRow* row = batch->get_row(i);
-
info->data_values.insert(*static_cast<int64_t*>(row->get_tuple(0)->get_slot(0)));
- }
-
- SleepFor(MonoDelta::FromMilliseconds(
- 10)); // slow down receiver to exercise buffering logic
- }
-
- if (info->status.is_cancelled()) {
- VLOG_QUERY << "reader is cancelled";
- }
-
- VLOG_QUERY << "done reading";
- }
-
- void read_stream_merging(ReceiverInfo* info, RuntimeProfile* profile) {
- info->status = info->stream_recvr->create_merger(*_less_than);
- if (info->status.is_cancelled()) {
- return;
- }
- RowBatch batch(*_row_desc, 1024);
- VLOG_QUERY << "start reading merging";
- bool eos = false;
- while (!(info->status = info->stream_recvr->get_next(&batch,
&eos)).is_cancelled()) {
- VLOG_QUERY << "read batch #rows=" << batch.num_rows();
- for (int i = 0; i < batch.num_rows(); ++i) {
- TupleRow* row = batch.get_row(i);
-
info->data_values.insert(*static_cast<int64_t*>(row->get_tuple(0)->get_slot(0)));
- }
- SleepFor(MonoDelta::FromMilliseconds(
- 10)); // slow down receiver to exercise buffering logic
- batch.reset();
- if (eos) {
- break;
- }
- }
- if (info->status.is_cancelled()) {
- VLOG_QUERY << "reader is cancelled";
- }
- VLOG_QUERY << "done reading";
- }
-
- // Verify correctness of receivers' data values.
- void check_receivers(TPartitionType::type stream_type, int num_senders) {
- int64_t total = 0;
- multiset<int64_t> all_data_values;
-
- for (int i = 0; i < _receiver_info.size(); ++i) {
- ReceiverInfo& info = _receiver_info[i];
- EXPECT_TRUE(info.status.ok());
- total += info.data_values.size();
- DCHECK_EQ(info.stream_type, stream_type);
- DCHECK_EQ(info.num_senders, num_senders);
-
- if (stream_type == TPartitionType::UNPARTITIONED) {
- EXPECT_EQ(NUM_BATCHES * BATCH_CAPACITY * num_senders,
info.data_values.size());
- }
-
- all_data_values.insert(info.data_values.begin(),
info.data_values.end());
-
- int k = 0;
- for (multiset<int64_t>::iterator j = info.data_values.begin();
- j != info.data_values.end(); ++j, ++k) {
- if (stream_type == TPartitionType::UNPARTITIONED) {
- // unpartitioned streams contain all values as many times
as there are
- // senders
- EXPECT_EQ(k / num_senders, *j);
- } else if (stream_type == TPartitionType::HASH_PARTITIONED) {
- // hash-partitioned streams send values to the right
partition
- int64_t value = *j;
- uint32_t hash_val = RawValue::get_hash_value_fvn(&value,
TYPE_BIGINT, 0U);
- EXPECT_EQ(hash_val % _receiver_info.size(),
info.receiver_num);
- }
- }
- }
-
- if (stream_type == TPartitionType::HASH_PARTITIONED) {
- EXPECT_EQ(NUM_BATCHES * BATCH_CAPACITY * num_senders, total);
-
- int k = 0;
- for (multiset<int64_t>::iterator j = all_data_values.begin();
- j != all_data_values.end(); ++j, ++k) {
- // each sender sent all values
- EXPECT_EQ(k / num_senders, *j);
-
- if (k / num_senders != *j) {
- break;
- }
- }
- }
- }
-
- void check_senders() {
- for (int i = 0; i < _sender_info.size(); ++i) {
- EXPECT_TRUE(_sender_info[i].status.ok());
- EXPECT_GT(_sender_info[i].num_bytes_sent, 0) << "info i=" << i;
- }
- }
-
- // Start backend in separate thread.
- void start_backend() {
- std::shared_ptr<DorisTestBackend> handler(new
DorisTestBackend(_stream_mgr));
- std::shared_ptr<apache::thrift::TProcessor> processor(new
BackendServiceProcessor(handler));
- _server = new ThriftServer("DataStreamTest backend", processor,
config::port, nullptr);
- _server->start();
- }
-
- void stop_backend() {
- VLOG_QUERY << "stop backend\n";
- _server->stop_for_testing();
- delete _server;
- }
-
- void start_sender(TPartitionType::type partition_type =
TPartitionType::UNPARTITIONED,
- int channel_buffer_size = 1024) {
- VLOG_QUERY << "start sender";
- int sender_id = _sender_info.size();
- DCHECK_LT(sender_id, MAX_SENDERS);
- _sender_info.push_back(SenderInfo());
- SenderInfo& info = _sender_info.back();
- info.thread_handle = new thread(&DataStreamTest::sender, this,
sender_id,
- channel_buffer_size, partition_type);
- }
-
- void join_senders() {
- VLOG_QUERY << "join senders\n";
- for (int i = 0; i < _sender_info.size(); ++i) {
- _sender_info[i].thread_handle->join();
- }
- }
-
- void sender(int sender_num, int channel_buffer_size, TPartitionType::type
partition_type) {
- RuntimeState state(TExecPlanFragmentParams(), TQueryOptions(), "",
&_exec_env);
- state.set_desc_tbl(_desc_tbl);
- state.init_mem_trackers(TUniqueId());
- VLOG_QUERY << "create sender " << sender_num;
- const TDataStreamSink& stream_sink =
- (partition_type == TPartitionType::UNPARTITIONED ?
_broadcast_sink : _hash_sink);
- DataStreamSender sender(&_obj_pool, sender_num, *_row_desc,
stream_sink, _dest,
- channel_buffer_size);
-
- TDataSink data_sink;
- data_sink.__set_type(TDataSinkType::DATA_STREAM_SINK);
- data_sink.__set_stream_sink(stream_sink);
- EXPECT_TRUE(sender.init(data_sink).ok());
-
- EXPECT_TRUE(sender.prepare(&state).ok());
- EXPECT_TRUE(sender.open(&state).ok());
- std::unique_ptr<RowBatch> batch(create_row_batch());
- SenderInfo& info = _sender_info[sender_num];
- int next_val = 0;
-
- for (int i = 0; i < NUM_BATCHES; ++i) {
- get_next_batch(batch.get(), &next_val);
- VLOG_QUERY << "sender " << sender_num << ": #rows=" <<
batch->num_rows();
- info.status = sender.send(&state, batch.get());
-
- if (!info.status.ok()) {
- LOG(WARNING) << "something is wrong when sending: " <<
info.status;
- break;
- }
- }
-
- VLOG_QUERY << "closing sender" << sender_num;
- info.status = sender.close(&state, Status::OK());
- info.num_bytes_sent = sender.get_num_data_bytes_sent();
-
- batch->reset();
- }
-
- void test_stream(TPartitionType::type stream_type, int num_senders, int
num_receivers,
- int buffer_size, bool is_merging) {
- LOG(INFO) << "Testing stream=" << stream_type << " #senders=" <<
num_senders
- << " #receivers=" << num_receivers << " buffer_size=" <<
buffer_size;
- reset();
-
- for (int i = 0; i < num_receivers; ++i) {
- start_receiver(stream_type, num_senders, i, buffer_size,
is_merging);
- }
-
- for (int i = 0; i < num_senders; ++i) {
- start_sender(stream_type, buffer_size);
- }
-
- join_senders();
- check_senders();
- join_receivers();
- check_receivers(stream_type, num_senders);
- }
-
-private:
- ExprContext* _lhs_slot_ctx;
- ExprContext* _rhs_slot_ctx;
-};
-
-TEST_F(DataStreamTest, UnknownSenderSmallResult) {
- // starting a sender w/o a corresponding receiver does not result in an
error because
- // we cannot distinguish whether a receiver was never created or the
receiver
- // willingly tore down the stream
- // case 1: entire query result fits in single buffer, close() returns ok
- TUniqueId dummy_id;
- get_next_instance_id(&dummy_id);
- start_sender(TPartitionType::UNPARTITIONED, TOTAL_DATA_SIZE + 1024);
- join_senders();
- EXPECT_TRUE(_sender_info[0].status.ok());
- EXPECT_GT(_sender_info[0].num_bytes_sent, 0);
-}
-
-TEST_F(DataStreamTest, UnknownSenderLargeResult) {
- // case 2: query result requires multiple buffers, send() returns ok
- TUniqueId dummy_id;
- get_next_instance_id(&dummy_id);
- start_sender();
- join_senders();
- EXPECT_TRUE(_sender_info[0].status.ok());
- EXPECT_GT(_sender_info[0].num_bytes_sent, 0);
-}
-
-TEST_F(DataStreamTest, Cancel) {
- TUniqueId instance_id;
- start_receiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, false,
&instance_id);
- _stream_mgr->cancel(instance_id);
- start_receiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, true,
&instance_id);
- _stream_mgr->cancel(instance_id);
- join_receivers();
- EXPECT_TRUE(_receiver_info[0].status.is_cancelled());
-}
-
-TEST_F(DataStreamTest, BasicTest) {
- // TODO: also test that all client connections have been returned
- TPartitionType::type stream_types[] = {TPartitionType::UNPARTITIONED,
- TPartitionType::HASH_PARTITIONED};
- int sender_nums[] = {1, 3};
- int receiver_nums[] = {1, 3};
- int buffer_sizes[] = {1024, 1024 * 1024};
- bool merging[] = {false, true};
-
- // test_stream(TPartitionType::HASH_PARTITIONED, 1, 3, 1024, true);
- for (int i = 0; i < sizeof(stream_types) / sizeof(*stream_types); ++i) {
- for (int j = 0; j < sizeof(sender_nums) / sizeof(int); ++j) {
- for (int k = 0; k < sizeof(receiver_nums) / sizeof(int); ++k) {
- for (int l = 0; l < sizeof(buffer_sizes) / sizeof(int); ++l) {
- for (int m = 0; m < sizeof(merging) / sizeof(bool); ++m) {
- LOG(ERROR) << "before test: stream_type=" <<
stream_types[i]
- << " sender num=" << sender_nums[j]
- << " receiver_num=" << receiver_nums[k]
- << " buffer_size=" << buffer_sizes[l]
- << " merging=" << (merging[m] ? "true" :
"false");
- test_stream(stream_types[i], sender_nums[j],
receiver_nums[k],
- buffer_sizes[l], merging[m]);
- LOG(ERROR) << "after test: stream_type=" <<
stream_types[i]
- << " sender num=" << sender_nums[j]
- << " receiver_num=" << receiver_nums[k]
- << " buffer_size=" << buffer_sizes[l]
- << " merging=" << (merging[m] ? "true" :
"false");
- }
- }
- }
- }
- }
-}
-
-// TODO: more tests:
-// - test case for transmission error in last batch
-// - receivers getting created concurrently
-
-} // namespace doris
diff --git a/be/test/runtime/load_channel_mgr_test.cpp
b/be/test/runtime/load_channel_mgr_test.cpp
index 3569b30757..d99951d957 100644
--- a/be/test/runtime/load_channel_mgr_test.cpp
+++ b/be/test/runtime/load_channel_mgr_test.cpp
@@ -60,23 +60,6 @@ Status DeltaWriter::open(WriteRequest* req, DeltaWriter**
writer) {
return open_status;
}
-Status DeltaWriter::write(Tuple* tuple) {
- if (_k_tablet_recorder.find(_req.tablet_id) ==
std::end(_k_tablet_recorder)) {
- _k_tablet_recorder[_req.tablet_id] = 1;
- } else {
- _k_tablet_recorder[_req.tablet_id]++;
- }
- return add_status;
-}
-
-Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>&
row_idxs) {
- if (_k_tablet_recorder.find(_req.tablet_id) ==
std::end(_k_tablet_recorder)) {
- _k_tablet_recorder[_req.tablet_id] = 0;
- }
- _k_tablet_recorder[_req.tablet_id] += row_idxs.size();
- return add_status;
-}
-
Status DeltaWriter::close() {
return Status::OK();
}
@@ -177,94 +160,6 @@ void create_schema(DescriptorTbl* desc_tbl,
POlapTableSchemaParam* pschema) {
indexes->set_schema_hash(123);
}
-TEST_F(LoadChannelMgrTest, normal) {
- ExecEnv env;
- LoadChannelMgr mgr;
- mgr.init(-1);
-
- auto tdesc_tbl = create_descriptor_table();
- ObjectPool obj_pool;
- DescriptorTbl* desc_tbl = nullptr;
- DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
- auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
- RowDescriptor row_desc(*desc_tbl, {0}, {false});
- PUniqueId load_id;
- load_id.set_hi(2);
- load_id.set_lo(3);
- {
- PTabletWriterOpenRequest request;
- request.set_allocated_id(&load_id);
- request.set_index_id(4);
- request.set_txn_id(1);
- create_schema(desc_tbl, request.mutable_schema());
- for (int i = 0; i < 2; ++i) {
- auto tablet = request.add_tablets();
- tablet->set_partition_id(10 + i);
- tablet->set_tablet_id(20 + i);
- }
- request.set_num_senders(1);
- request.set_need_gen_rollup(false);
- auto st = mgr.open(request);
- request.release_id();
- EXPECT_TRUE(st.ok());
- }
-
- // add a batch
- {
- PTabletWriterAddBatchRequest request;
- request.set_allocated_id(&load_id);
- request.set_index_id(4);
- request.set_sender_id(0);
- request.set_eos(true);
- request.set_packet_seq(0);
-
- request.add_tablet_ids(20);
- request.add_tablet_ids(21);
- request.add_tablet_ids(20);
-
- RowBatch row_batch(row_desc, 1024);
-
- // row1
- {
- auto id = row_batch.add_row();
- auto tuple =
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
- row_batch.get_row(id)->set_tuple(0, tuple);
- memset(tuple, 0, tuple_desc->byte_size());
- *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) =
987654;
- *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 1234567899876;
- row_batch.commit_last_row();
- }
- // row2
- {
- auto id = row_batch.add_row();
- auto tuple =
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
- row_batch.get_row(id)->set_tuple(0, tuple);
- memset(tuple, 0, tuple_desc->byte_size());
- *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) =
12345678;
- *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 9876567899876;
- row_batch.commit_last_row();
- }
- // row3
- {
- auto id = row_batch.add_row();
- auto tuple =
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
- row_batch.get_row(id)->set_tuple(0, tuple);
- memset(tuple, 0, tuple_desc->byte_size());
- *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) =
876545678;
- *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 76543234567;
- row_batch.commit_last_row();
- }
- row_batch.serialize(request.mutable_row_batch(), &uncompressed_size,
&compressed_size);
- PTabletWriterAddBatchResult response;
- auto st = mgr.add_batch(request, &response);
- request.release_id();
- EXPECT_TRUE(st.ok());
- }
- // check content
- EXPECT_EQ(_k_tablet_recorder[20], 2);
- EXPECT_EQ(_k_tablet_recorder[21], 1);
-}
-
TEST_F(LoadChannelMgrTest, cancel) {
ExecEnv env;
LoadChannelMgr mgr;
@@ -342,373 +237,4 @@ TEST_F(LoadChannelMgrTest, open_failed) {
}
}
-TEST_F(LoadChannelMgrTest, add_failed) {
- ExecEnv env;
- LoadChannelMgr mgr;
- mgr.init(-1);
-
- auto tdesc_tbl = create_descriptor_table();
- ObjectPool obj_pool;
- DescriptorTbl* desc_tbl = nullptr;
- DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
- auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
- RowDescriptor row_desc(*desc_tbl, {0}, {false});
- PUniqueId load_id;
- load_id.set_hi(2);
- load_id.set_lo(3);
- {
- PTabletWriterOpenRequest request;
- request.set_allocated_id(&load_id);
- request.set_index_id(4);
- request.set_txn_id(1);
- create_schema(desc_tbl, request.mutable_schema());
- for (int i = 0; i < 2; ++i) {
- auto tablet = request.add_tablets();
- tablet->set_partition_id(10 + i);
- tablet->set_tablet_id(20 + i);
- }
- request.set_num_senders(1);
- request.set_need_gen_rollup(false);
- auto st = mgr.open(request);
- request.release_id();
- EXPECT_TRUE(st.ok());
- }
-
- // add a batch
- {
- PTabletWriterAddBatchRequest request;
- request.set_allocated_id(&load_id);
- request.set_index_id(4);
- request.set_sender_id(0);
- request.set_eos(true);
- request.set_packet_seq(0);
-
- request.add_tablet_ids(20);
- request.add_tablet_ids(21);
- request.add_tablet_ids(20);
-
- RowBatch row_batch(row_desc, 1024);
-
- // row1
- {
- auto id = row_batch.add_row();
- auto tuple =
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
- row_batch.get_row(id)->set_tuple(0, tuple);
- memset(tuple, 0, tuple_desc->byte_size());
- *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) =
987654;
- *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 1234567899876;
- row_batch.commit_last_row();
- }
- // row2
- {
- auto id = row_batch.add_row();
- auto tuple =
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
- row_batch.get_row(id)->set_tuple(0, tuple);
- memset(tuple, 0, tuple_desc->byte_size());
- *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) =
12345678;
- *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 9876567899876;
- row_batch.commit_last_row();
- }
- // row3
- {
- auto id = row_batch.add_row();
- auto tuple =
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
- row_batch.get_row(id)->set_tuple(0, tuple);
- memset(tuple, 0, tuple_desc->byte_size());
- *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) =
876545678;
- *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 76543234567;
- row_batch.commit_last_row();
- }
- row_batch.serialize(request.mutable_row_batch(), &uncompressed_size,
&compressed_size);
- // DeltaWriter's write will return -215
- add_status = Status::Error<TABLE_NOT_FOUND>();
- PTabletWriterAddBatchResult response;
- auto st = mgr.add_batch(request, &response);
- request.release_id();
- // st is still ok.
- EXPECT_TRUE(st.ok());
- EXPECT_EQ(2, response.tablet_errors().size());
- }
-}
-
-TEST_F(LoadChannelMgrTest, close_failed) {
- ExecEnv env;
- LoadChannelMgr mgr;
- mgr.init(-1);
-
- auto tdesc_tbl = create_descriptor_table();
- ObjectPool obj_pool;
- DescriptorTbl* desc_tbl = nullptr;
- DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
- auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
- RowDescriptor row_desc(*desc_tbl, {0}, {false});
- PUniqueId load_id;
- load_id.set_hi(2);
- load_id.set_lo(3);
- {
- PTabletWriterOpenRequest request;
- request.set_allocated_id(&load_id);
- request.set_index_id(4);
- request.set_txn_id(1);
- create_schema(desc_tbl, request.mutable_schema());
- for (int i = 0; i < 2; ++i) {
- auto tablet = request.add_tablets();
- tablet->set_partition_id(10 + i);
- tablet->set_tablet_id(20 + i);
- }
- request.set_num_senders(1);
- request.set_need_gen_rollup(false);
- auto st = mgr.open(request);
- request.release_id();
- EXPECT_TRUE(st.ok());
- }
-
- // add a batch
- {
- PTabletWriterAddBatchRequest request;
- request.set_allocated_id(&load_id);
- request.set_index_id(4);
- request.set_sender_id(0);
- request.set_eos(true);
- request.set_packet_seq(0);
-
- request.add_tablet_ids(20);
- request.add_tablet_ids(21);
- request.add_tablet_ids(20);
-
- request.add_partition_ids(10);
- request.add_partition_ids(11);
-
- RowBatch row_batch(row_desc, 1024);
-
- // row1
- {
- auto id = row_batch.add_row();
- auto tuple =
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
- row_batch.get_row(id)->set_tuple(0, tuple);
- memset(tuple, 0, tuple_desc->byte_size());
- *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) =
987654;
- *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 1234567899876;
- row_batch.commit_last_row();
- }
- // row2
- {
- auto id = row_batch.add_row();
- auto tuple =
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
- row_batch.get_row(id)->set_tuple(0, tuple);
- memset(tuple, 0, tuple_desc->byte_size());
- *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) =
12345678;
- *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 9876567899876;
- row_batch.commit_last_row();
- }
- // row3
- {
- auto id = row_batch.add_row();
- auto tuple =
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
- row_batch.get_row(id)->set_tuple(0, tuple);
- memset(tuple, 0, tuple_desc->byte_size());
- *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) =
876545678;
- *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 76543234567;
- row_batch.commit_last_row();
- }
- row_batch.serialize(request.mutable_row_batch(), &uncompressed_size,
&compressed_size);
- close_status = Status::Error<TABLE_NOT_FOUND>();
- PTabletWriterAddBatchResult response;
- auto st = mgr.add_batch(request, &response);
- request.release_id();
- // even if delta close failed, the return status is still ok, but
tablet_vec is empty
- EXPECT_TRUE(st.ok());
- EXPECT_TRUE(response.tablet_vec().empty());
- }
-}
-
-TEST_F(LoadChannelMgrTest, unknown_tablet) {
- ExecEnv env;
- LoadChannelMgr mgr;
- mgr.init(-1);
-
- auto tdesc_tbl = create_descriptor_table();
- ObjectPool obj_pool;
- DescriptorTbl* desc_tbl = nullptr;
- DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
- auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
- RowDescriptor row_desc(*desc_tbl, {0}, {false});
- PUniqueId load_id;
- load_id.set_hi(2);
- load_id.set_lo(3);
- {
- PTabletWriterOpenRequest request;
- request.set_allocated_id(&load_id);
- request.set_index_id(4);
- request.set_txn_id(1);
- create_schema(desc_tbl, request.mutable_schema());
- for (int i = 0; i < 2; ++i) {
- auto tablet = request.add_tablets();
- tablet->set_partition_id(10 + i);
- tablet->set_tablet_id(20 + i);
- }
- request.set_num_senders(1);
- request.set_need_gen_rollup(false);
- auto st = mgr.open(request);
- request.release_id();
- EXPECT_TRUE(st.ok());
- }
-
- // add a batch
- {
- PTabletWriterAddBatchRequest request;
- request.set_allocated_id(&load_id);
- request.set_index_id(4);
- request.set_sender_id(0);
- request.set_eos(true);
- request.set_packet_seq(0);
-
- request.add_tablet_ids(20);
- request.add_tablet_ids(22);
- request.add_tablet_ids(20);
-
- RowBatch row_batch(row_desc, 1024);
-
- // row1
- {
- auto id = row_batch.add_row();
- auto tuple =
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
- row_batch.get_row(id)->set_tuple(0, tuple);
- memset(tuple, 0, tuple_desc->byte_size());
- *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) =
987654;
- *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 1234567899876;
- row_batch.commit_last_row();
- }
- // row2
- {
- auto id = row_batch.add_row();
- auto tuple =
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
- row_batch.get_row(id)->set_tuple(0, tuple);
- memset(tuple, 0, tuple_desc->byte_size());
- *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) =
12345678;
- *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 9876567899876;
- row_batch.commit_last_row();
- }
- // row3
- {
- auto id = row_batch.add_row();
- auto tuple =
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
- row_batch.get_row(id)->set_tuple(0, tuple);
- memset(tuple, 0, tuple_desc->byte_size());
- *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) =
876545678;
- *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 76543234567;
- row_batch.commit_last_row();
- }
- row_batch.serialize(request.mutable_row_batch(), &uncompressed_size,
&compressed_size);
- PTabletWriterAddBatchResult response;
- auto st = mgr.add_batch(request, &response);
- request.release_id();
- EXPECT_FALSE(st.ok());
- }
-}
-
-TEST_F(LoadChannelMgrTest, duplicate_packet) {
- ExecEnv env;
- LoadChannelMgr mgr;
- mgr.init(-1);
-
- auto tdesc_tbl = create_descriptor_table();
- ObjectPool obj_pool;
- DescriptorTbl* desc_tbl = nullptr;
- DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
- auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
- RowDescriptor row_desc(*desc_tbl, {0}, {false});
- PUniqueId load_id;
- load_id.set_hi(2);
- load_id.set_lo(3);
- {
- PTabletWriterOpenRequest request;
- request.set_allocated_id(&load_id);
- request.set_index_id(4);
- request.set_txn_id(1);
- create_schema(desc_tbl, request.mutable_schema());
- for (int i = 0; i < 2; ++i) {
- auto tablet = request.add_tablets();
- tablet->set_partition_id(10 + i);
- tablet->set_tablet_id(20 + i);
- }
- request.set_num_senders(1);
- request.set_need_gen_rollup(false);
- auto st = mgr.open(request);
- request.release_id();
- EXPECT_TRUE(st.ok());
- }
-
- // add a batch
- {
- PTabletWriterAddBatchRequest request;
- request.set_allocated_id(&load_id);
- request.set_index_id(4);
- request.set_sender_id(0);
- request.set_eos(false);
- request.set_packet_seq(0);
-
- request.add_tablet_ids(20);
- request.add_tablet_ids(21);
- request.add_tablet_ids(20);
-
- RowBatch row_batch(row_desc, 1024);
-
- // row1
- {
- auto id = row_batch.add_row();
- auto tuple =
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
- row_batch.get_row(id)->set_tuple(0, tuple);
- memset(tuple, 0, tuple_desc->byte_size());
- *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) =
987654;
- *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 1234567899876;
- row_batch.commit_last_row();
- }
- // row2
- {
- auto id = row_batch.add_row();
- auto tuple =
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
- row_batch.get_row(id)->set_tuple(0, tuple);
- memset(tuple, 0, tuple_desc->byte_size());
- *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) =
12345678;
- *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 9876567899876;
- row_batch.commit_last_row();
- }
- // row3
- {
- auto id = row_batch.add_row();
- auto tuple =
(Tuple*)row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size());
- row_batch.get_row(id)->set_tuple(0, tuple);
- memset(tuple, 0, tuple_desc->byte_size());
- *(int*)tuple->get_slot(tuple_desc->slots()[0]->tuple_offset()) =
876545678;
- *(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 76543234567;
- row_batch.commit_last_row();
- }
- row_batch.serialize(request.mutable_row_batch(), &uncompressed_size,
&compressed_size);
- PTabletWriterAddBatchResult response;
- auto st = mgr.add_batch(request, &response);
- EXPECT_TRUE(st.ok());
- PTabletWriterAddBatchResult response2;
- st = mgr.add_batch(request, &response2);
- request.release_id();
- EXPECT_TRUE(st.ok());
- }
- // close
- {
- PTabletWriterAddBatchRequest request;
- request.set_allocated_id(&load_id);
- request.set_index_id(4);
- request.set_sender_id(0);
- request.set_eos(true);
- request.set_packet_seq(0);
- PTabletWriterAddBatchResult response;
- auto st = mgr.add_batch(request, &response);
- request.release_id();
- EXPECT_TRUE(st.ok());
- }
- // check content
- EXPECT_EQ(_k_tablet_recorder[20], 2);
- EXPECT_EQ(_k_tablet_recorder[21], 1);
-}
-
} // namespace doris
diff --git a/be/test/runtime/result_sink_test.cpp
b/be/test/runtime/result_sink_test.cpp
deleted file mode 100644
index 05a3caee03..0000000000
--- a/be/test/runtime/result_sink_test.cpp
+++ /dev/null
@@ -1,93 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/result_sink.h"
-
-#include <gtest/gtest.h>
-#include <stdio.h>
-#include <stdlib.h>
-
-#include <iostream>
-
-#include "exprs/bool_literal.h"
-#include "exprs/expr.h"
-#include "exprs/float_literal.h"
-#include "exprs/int_literal.h"
-#include "exprs/string_literal.h"
-#include "exprs/timestamp_literal.h"
-#include "gen_cpp/Exprs_types.h"
-#include "gen_cpp/PaloInternalService_types.h"
-#include "gen_cpp/Types_types.h"
-#include "runtime/buffer_control_block.h"
-#include "runtime/primitive_type.h"
-#include "runtime/result_buffer_mgr.h"
-#include "runtime/row_batch.h"
-#include "runtime/runtime_state.h"
-#include "runtime/tuple_row.h"
-#include "util/cpu_info.h"
-#include "util/mysql_row_buffer.h"
-
-namespace doris {
-
-class ResultSinkTest : public testing::Test {
-public:
- ResultSinkTest() {
- _runtime_state = new RuntimeState("ResultWriterTest");
- _runtime_state->_exec_env = &_exec_env;
-
- {
- TExpr expr;
- {
- TExprNode node;
-
- node.node_type = TExprNodeType::INT_LITERAL;
- node.type = to_tcolumn_type_thrift(TPrimitiveType::TINYINT);
- node.num_children = 0;
- TIntLiteral data;
- data.value = 1;
- node.__set_int_literal(data);
- expr.nodes.push_back(node);
- }
- _exprs.push_back(expr);
- }
- }
- virtual ~ResultSinkTest() { delete _runtime_state; }
-
-protected:
- virtual void SetUp() {}
-
-private:
- ExecEnv _exec_env;
- std::vector<TExpr> _exprs;
- RuntimeState* _runtime_state;
- RowDescriptor _row_desc;
- TResultSink _tsink;
-};
-
-TEST_F(ResultSinkTest, init_normal) {
- ResultSink sink(_row_desc, _exprs, _tsink, 1024);
- EXPECT_TRUE(sink.init(_runtime_state).ok());
- RowBatch row_batch(_row_desc, 1024);
- row_batch.add_row();
- row_batch.commit_last_row();
- EXPECT_TRUE(sink.send(_runtime_state, &row_batch).ok());
- EXPECT_TRUE(sink.close(_runtime_state, Status::OK()).ok());
-}
-
-} // namespace doris
-
-/* vim: set ts=4 sw=4 sts=4 tw=100 */
diff --git a/be/test/vec/exec/vtablet_sink_test.cpp
b/be/test/vec/exec/vtablet_sink_test.cpp
index a987f30150..ef7ac73ce7 100644
--- a/be/test/vec/exec/vtablet_sink_test.cpp
+++ b/be/test/vec/exec/vtablet_sink_test.cpp
@@ -289,31 +289,6 @@ public:
status.to_protobuf(response->mutable_status());
}
- void tablet_writer_add_batch(google::protobuf::RpcController* controller,
- const PTabletWriterAddBatchRequest* request,
- PTabletWriterAddBatchResult* response,
- google::protobuf::Closure* done) override {
- brpc::ClosureGuard done_guard(done);
- {
- std::lock_guard<std::mutex> l(_lock);
- _row_counters += request->tablet_ids_size();
- if (request->eos()) {
- _eof_counters++;
- }
- k_add_batch_status.to_protobuf(response->mutable_status());
-
- if (request->has_row_batch() && _row_desc != nullptr) {
- brpc::Controller* cntl =
static_cast<brpc::Controller*>(controller);
-
attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request,
cntl);
- RowBatch batch(*_row_desc, request->row_batch());
- for (int i = 0; i < batch.num_rows(); ++i) {
- LOG(INFO) << batch.get_row(i)->to_string(*_row_desc);
-
_output_set->emplace(batch.get_row(i)->to_string(*_row_desc));
- }
- }
- }
- }
-
void tablet_writer_add_block(google::protobuf::RpcController* controller,
const PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index e49503ca04..6a47156b39 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -370,12 +370,6 @@ public class MockedBackendFactory {
responseObserver.onCompleted();
}
- @Override
- public void
tabletWriterAddBatch(InternalService.PTabletWriterAddBatchRequest request,
StreamObserver<InternalService.PTabletWriterAddBatchResult> responseObserver) {
- responseObserver.onNext(null);
- responseObserver.onCompleted();
- }
-
@Override
public void
tabletWriterCancel(InternalService.PTabletWriterCancelRequest request,
StreamObserver<InternalService.PTabletWriterCancelResult> responseObserver) {
responseObserver.onNext(null);
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 552786f313..50bdb1bb68 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -551,8 +551,6 @@ service PBackendService {
rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns
(PCancelPlanFragmentResult);
rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
rpc tablet_writer_open(PTabletWriterOpenRequest) returns
(PTabletWriterOpenResult);
- rpc tablet_writer_add_batch(PTabletWriterAddBatchRequest) returns
(PTabletWriterAddBatchResult);
- rpc tablet_writer_add_batch_by_http(PEmptyRequest) returns
(PTabletWriterAddBatchResult);
rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns
(PTabletWriterAddBlockResult);
rpc tablet_writer_add_block_by_http(PEmptyRequest) returns
(PTabletWriterAddBlockResult);
rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns
(PTabletWriterCancelResult);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]