This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e22f5891d2 [WIP](row store) two phase opt read row store (#18654)
e22f5891d2 is described below
commit e22f5891d2fba4a54cb421b065f8da5f00cc4286
Author: lihangyu <[email protected]>
AuthorDate: Tue May 16 13:21:58 2023 +0800
[WIP](row store) two phase opt read row store (#18654)
---
be/src/exec/rowid_fetcher.cpp | 166 +++++++++++++++------
be/src/exec/rowid_fetcher.h | 27 +++-
be/src/exec/tablet_info.h | 7 +
be/src/olap/rowset/beta_rowset_writer.cpp | 1 -
be/src/olap/rowset/segment_v2/segment_writer.cpp | 3 +
be/src/olap/tablet.cpp | 5 +-
be/src/olap/tablet.h | 2 +-
be/src/olap/utils.h | 12 ++
be/src/runtime/descriptors.cpp | 4 +-
be/src/runtime/descriptors.h | 2 +-
be/src/service/internal_service.cpp | 125 ++++++++--------
be/src/service/internal_service.h | 2 +-
be/src/service/point_query_executor.cpp | 8 +-
be/src/vec/exec/vexchange_node.cpp | 27 ----
be/src/vec/exec/vexchange_node.h | 7 -
be/src/vec/exprs/vexpr_context.h | 8 +
be/src/vec/exprs/vslot_ref.cpp | 5 +-
be/src/vec/jsonb/serialize.cpp | 4 +
be/src/vec/sink/vresult_sink.cpp | 38 ++++-
be/src/vec/sink/vresult_sink.h | 5 +
.../org/apache/doris/analysis/CreateTableStmt.java | 6 +-
.../java/org/apache/doris/analysis/SelectStmt.java | 63 +++++---
.../glue/translator/PhysicalPlanTranslator.java | 45 ++++++
.../org/apache/doris/planner/ExchangeNode.java | 22 ---
.../org/apache/doris/planner/OriginalPlanner.java | 50 +++++--
.../java/org/apache/doris/planner/ResultSink.java | 16 ++
.../java/org/apache/doris/planner/SortNode.java | 2 +-
.../org/apache/doris/system/SystemInfoService.java | 12 ++
gensrc/proto/internal_service.proto | 27 +++-
gensrc/thrift/DataSinks.thrift | 11 +-
gensrc/thrift/PlanNodes.thrift | 2 -
.../scalar_types/sql/dup_key_2pr_q01.out | 8 +
.../scalar_types/sql/dup_key_2pr_q02.out | 4 +
.../scalar_types/sql/dup_key_2pr_q03.out | 4 +
.../scalar_types/sql/dup_key_2pr_q04.out | 4 +
.../suites/datatype_p0/scalar_types/load.groovy | 8 +-
.../scalar_types/sql/dup_key_2pr_q01.sql | 1 +
.../scalar_types/sql/dup_key_2pr_q02.sql | 1 +
.../scalar_types/sql/dup_key_2pr_q03.sql | 1 +
.../scalar_types/sql/dup_key_2pr_q04.sql | 1 +
40 files changed, 513 insertions(+), 233 deletions(-)
diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index 46909904f9..a90182bce9 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -18,22 +18,26 @@
#include "exec/rowid_fetcher.h"
#include <brpc/callback.h>
-#include <brpc/controller.h>
#include <butil/endpoint.h>
#include <fmt/format.h>
+#include <gen_cpp/data.pb.h>
#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
+#include <cstdint>
#include <ostream>
#include <string>
#include <unordered_map>
#include <utility>
+#include <vector>
#include "bthread/countdown_event.h"
#include "common/config.h"
+#include "common/consts.h"
#include "exec/tablet_info.h" // DorisNodesInfo
#include "olap/olap_common.h"
#include "olap/utils.h"
@@ -48,11 +52,15 @@
#include "vec/common/assert_cast.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h" // Block
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/jsonb/serialize.h"
namespace doris {
-Status RowIDFetcher::init(DorisNodesInfo* nodes_info) {
- for (auto [node_id, node_info] : nodes_info->nodes_info()) {
+Status RowIDFetcher::init() {
+ DorisNodesInfo nodes_info;
+ nodes_info.setNodes(_fetch_option.t_fetch_opt.nodes_info);
+ for (auto [node_id, node_info] : nodes_info.nodes_info()) {
auto client =
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
node_info.host, node_info.brpc_port);
if (!client) {
@@ -65,29 +73,33 @@ Status RowIDFetcher::init(DorisNodesInfo* nodes_info) {
return Status::OK();
}
-static std::string format_rowid(const GlobalRowLoacation& location) {
- return fmt::format("{} {} {} {}", location.tablet_id,
- location.row_location.rowset_id.to_string(),
- location.row_location.segment_id,
location.row_location.row_id);
-}
-
-PMultiGetRequest RowIDFetcher::_init_fetch_request(const
vectorized::ColumnString& row_ids) {
+PMultiGetRequest RowIDFetcher::_init_fetch_request(const
vectorized::ColumnString& row_locs) const {
PMultiGetRequest mget_req;
- _tuple_desc->to_protobuf(mget_req.mutable_desc());
- for (auto slot : _tuple_desc->slots()) {
+ _fetch_option.desc->to_protobuf(mget_req.mutable_desc());
+ for (SlotDescriptor* slot : _fetch_option.desc->slots()) {
+ // ignore rowid
+ if (slot->col_name() == BeConsts::ROWID_COL) {
+ continue;
+ }
slot->to_protobuf(mget_req.add_slots());
}
- for (size_t i = 0; i < row_ids.size(); ++i) {
- PMultiGetRequest::RowId row_id;
- StringRef row_id_rep = row_ids.get_data_at(i);
+ for (size_t i = 0; i < row_locs.size(); ++i) {
+ PRowLocation row_loc;
+ StringRef row_id_rep = row_locs.get_data_at(i);
+ // TODO: When transferring data between machines with different byte
orders (endianness),
+ // not performing proper handling may lead to issues in parsing and
exchanging the data.
auto location = reinterpret_cast<const
GlobalRowLoacation*>(row_id_rep.data);
- row_id.set_tablet_id(location->tablet_id);
- row_id.set_rowset_id(location->row_location.rowset_id.to_string());
- row_id.set_segment_id(location->row_location.segment_id);
- row_id.set_ordinal_id(location->row_location.row_id);
- *mget_req.add_rowids() = std::move(row_id);
+ row_loc.set_tablet_id(location->tablet_id);
+ row_loc.set_rowset_id(location->row_location.rowset_id.to_string());
+ row_loc.set_segment_id(location->row_location.segment_id);
+ row_loc.set_ordinal_id(location->row_location.row_id);
+ *mget_req.add_row_locs() = std::move(row_loc);
}
- mget_req.set_be_exec_version(_st->be_exec_version());
+ PUniqueId& query_id = *mget_req.mutable_query_id();
+ query_id.set_hi(_fetch_option.runtime_state->query_id().hi);
+ query_id.set_lo(_fetch_option.runtime_state->query_id().lo);
+
mget_req.set_be_exec_version(_fetch_option.runtime_state->be_exec_version());
+ mget_req.set_fetch_row_store(_fetch_option.t_fetch_opt.fetch_row_store);
return mget_req;
}
@@ -95,9 +107,12 @@ static void fetch_callback(bthread::CountdownEvent*
counter) {
Defer __defer([&] { counter->signal(); });
}
-static Status MergeRPCResults(const std::vector<PMultiGetResponse>& rsps,
- const std::vector<brpc::Controller>& cntls,
- vectorized::MutableBlock* output_block) {
+Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request,
+ const std::vector<PMultiGetResponse>&
rsps,
+ const std::vector<brpc::Controller>&
cntls,
+ vectorized::Block* output_block,
+ std::vector<PRowLocation>* rows_id)
const {
+ output_block->clear();
for (const auto& cntl : cntls) {
if (cntl.Failed()) {
LOG(WARNING) << "Failed to fetch meet rpc error:" <<
cntl.ErrorText()
@@ -105,25 +120,61 @@ static Status MergeRPCResults(const
std::vector<PMultiGetResponse>& rsps,
return Status::InternalError(cntl.ErrorText());
}
}
- for (const auto& resp : rsps) {
+
+ auto merge_function = [&](const PMultiGetResponse& resp) {
Status st(resp.status());
if (!st.ok()) {
LOG(WARNING) << "Failed to fetch " << st.to_string();
return st;
}
+ for (const PRowLocation& row_id : resp.row_locs()) {
+ rows_id->push_back(row_id);
+ }
+ // Merge binary rows
+ if (request.fetch_row_store()) {
+ CHECK(resp.row_locs().size() == resp.binary_row_data_size());
+ if (output_block->is_empty_column()) {
+ *output_block = vectorized::Block(_fetch_option.desc->slots(),
1);
+ }
+ for (int i = 0; i < resp.binary_row_data_size(); ++i) {
+ vectorized::JsonbSerializeUtil::jsonb_to_block(
+ *_fetch_option.desc, resp.binary_row_data(i).data(),
+ resp.binary_row_data(i).size(), *output_block);
+ }
+ return Status::OK();
+ }
+ // Merge partial blocks
vectorized::Block partial_block(resp.block());
- RETURN_IF_ERROR(output_block->merge(partial_block));
+ CHECK(resp.row_locs().size() == partial_block.rows());
+ if (output_block->is_empty_column()) {
+ output_block->swap(partial_block);
+ } else if (partial_block.columns() != output_block->columns()) {
+ return Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "Merge block not match, self:[{}], input:[{}], ",
output_block->dump_types(),
+ partial_block.dump_types());
+ } else {
+ for (int i = 0; i < output_block->columns(); ++i) {
+
output_block->get_by_position(i).column->assume_mutable()->insert_range_from(
+ *partial_block.get_by_position(i)
+ .column->convert_to_full_column_if_const()
+ .get(),
+ 0, partial_block.rows());
+ }
+ }
+ return Status::OK();
+ };
+
+ for (const auto& resp : rsps) {
+ RETURN_IF_ERROR(merge_function(resp));
}
return Status::OK();
}
-Status RowIDFetcher::fetch(const vectorized::ColumnPtr& row_ids,
- vectorized::MutableBlock* res_block) {
+Status RowIDFetcher::fetch(const vectorized::ColumnPtr& column_row_ids,
+ vectorized::Block* res_block) {
CHECK(!_stubs.empty());
- res_block->clear_column_data();
- vectorized::MutableBlock mblock({_tuple_desc}, row_ids->size());
PMultiGetRequest mget_req = _init_fetch_request(assert_cast<const
vectorized::ColumnString&>(
- *vectorized::remove_nullable(row_ids).get()));
+ *vectorized::remove_nullable(column_row_ids).get()));
std::vector<PMultiGetResponse> resps(_stubs.size());
std::vector<brpc::Controller> cntls(_stubs.size());
bthread::CountdownEvent counter(_stubs.size());
@@ -133,20 +184,51 @@ Status RowIDFetcher::fetch(const vectorized::ColumnPtr&
row_ids,
_stubs[i]->multiget_data(&cntls[i], &mget_req, &resps[i], callback);
}
counter.wait();
- RETURN_IF_ERROR(MergeRPCResults(resps, cntls, &mblock));
- // final sort by row_ids sequence, since row_ids is already sorted
- vectorized::Block tmp = mblock.to_block();
- std::unordered_map<std::string, uint32_t> row_order;
- vectorized::ColumnPtr row_id_column = tmp.get_columns().back();
- for (size_t x = 0; x < row_id_column->size(); ++x) {
+
+ // Merge
+ std::vector<PRowLocation> rows_locs;
+ rows_locs.reserve(rows_locs.size());
+ RETURN_IF_ERROR(_merge_rpc_results(mget_req, resps, cntls, res_block,
&rows_locs));
+
+ // Final sort by row_ids sequence, since row_ids is already sorted if need
+ std::map<GlobalRowLoacation, size_t> positions;
+ for (size_t i = 0; i < rows_locs.size(); ++i) {
+ RowsetId rowset_id;
+ rowset_id.init(rows_locs[i].rowset_id());
+ GlobalRowLoacation grl(rows_locs[i].tablet_id(), rowset_id,
rows_locs[i].segment_id(),
+ rows_locs[i].ordinal_id());
+ positions[grl] = i;
+ };
+ vectorized::IColumn::Permutation permutation;
+ permutation.reserve(column_row_ids->size());
+ for (size_t i = 0; i < column_row_ids->size(); ++i) {
auto location =
- reinterpret_cast<const
GlobalRowLoacation*>(row_id_column->get_data_at(x).data);
- row_order[format_rowid(*location)] = x;
+ reinterpret_cast<const
GlobalRowLoacation*>(column_row_ids->get_data_at(i).data);
+ permutation.push_back(positions[*location]);
+ }
+ size_t num_rows = res_block->rows();
+ for (size_t i = 0; i < res_block->columns(); ++i) {
+ res_block->get_by_position(i).column =
+ res_block->get_by_position(i).column->permute(permutation,
num_rows);
}
- for (size_t x = 0; x < row_ids->size(); ++x) {
- auto location = reinterpret_cast<const
GlobalRowLoacation*>(row_ids->get_data_at(x).data);
- res_block->add_row(&tmp, row_order[format_rowid(*location)]);
+ // shrink for char type
+ std::vector<size_t> char_type_idx;
+ for (size_t i = 0; i < _fetch_option.desc->slots().size(); i++) {
+ auto column_desc = _fetch_option.desc->slots()[i];
+ auto type_desc = column_desc->type();
+ do {
+ if (type_desc.type == TYPE_CHAR) {
+ char_type_idx.emplace_back(i);
+ break;
+ } else if (type_desc.type != TYPE_ARRAY) {
+ break;
+ }
+ // for Array<Char> or Array<Array<Char>>
+ type_desc = type_desc.children[0];
+ } while (true);
}
+ res_block->shrink_char_type_column_suffix_zero(char_type_idx);
+ VLOG_DEBUG << "dump block:" << res_block->dump_data(0, 10);
return Status::OK();
}
diff --git a/be/src/exec/rowid_fetcher.h b/be/src/exec/rowid_fetcher.h
index 7f789b1c40..ae57c295a5 100644
--- a/be/src/exec/rowid_fetcher.h
+++ b/be/src/exec/rowid_fetcher.h
@@ -17,12 +17,16 @@
#pragma once
+#include <brpc/controller.h>
+#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <memory>
#include <vector>
#include "common/status.h"
+#include "exec/tablet_info.h" // DorisNodesInfo
+#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
namespace doris {
@@ -38,18 +42,29 @@ class MutableBlock;
// fetch rows by global rowid
// tablet_id/rowset_name/segment_id/ordinal_id
+
+struct FetchOption {
+ TupleDescriptor* desc = nullptr;
+ RuntimeState* runtime_state = nullptr;
+ TFetchOption t_fetch_opt;
+};
+
class RowIDFetcher {
public:
- RowIDFetcher(TupleDescriptor* desc, RuntimeState* st) : _tuple_desc(desc),
_st(st) {}
- Status init(DorisNodesInfo* nodes_info);
- Status fetch(const vectorized::ColumnPtr& row_ids,
vectorized::MutableBlock* block);
+ RowIDFetcher(const FetchOption& fetch_opt) : _fetch_option(fetch_opt) {}
+ Status init();
+ Status fetch(const vectorized::ColumnPtr& row_ids, vectorized::Block*
block);
private:
- PMultiGetRequest _init_fetch_request(const vectorized::ColumnString&
row_ids);
+ PMultiGetRequest _init_fetch_request(const vectorized::ColumnString&
row_ids) const;
+ Status _merge_rpc_results(const PMultiGetRequest& request,
+ const std::vector<PMultiGetResponse>& rsps,
+ const std::vector<brpc::Controller>& cntls,
+ vectorized::Block* output_block,
+ std::vector<PRowLocation>* rows_id) const;
std::vector<std::shared_ptr<PBackendService_Stub>> _stubs;
- TupleDescriptor* _tuple_desc;
- RuntimeState* _st;
+ FetchOption _fetch_option;
};
} // namespace doris
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index b4146ba6e0..bcfc5541eb 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -263,11 +263,18 @@ struct NodeInfo {
class DorisNodesInfo {
public:
+ DorisNodesInfo() = default;
DorisNodesInfo(const TPaloNodesInfo& t_nodes) {
for (auto& node : t_nodes.nodes) {
_nodes.emplace(node.id, node);
}
}
+ void setNodes(const TPaloNodesInfo& t_nodes) {
+ _nodes.clear();
+ for (auto& node : t_nodes.nodes) {
+ _nodes.emplace(node.id, node);
+ }
+ }
const NodeInfo* find_node(int64_t id) const {
auto it = _nodes.find(id);
if (it != std::end(_nodes)) {
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 60652c9d8c..5955e97d2c 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -421,7 +421,6 @@ Status BetaRowsetWriter::_add_block(const
vectorized::Block* block,
max_row_add =
(*segment_writer)->max_row_to_add(row_avg_size_in_bytes);
DCHECK(max_row_add > 0);
}
-
size_t input_row_num = std::min(block_row_num - row_offset,
size_t(max_row_add));
auto s = (*segment_writer)->append_block(block, row_offset,
input_row_num);
if (UNLIKELY(!s.ok())) {
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index ac8715fd2d..bcf685a046 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -330,6 +330,9 @@ void
SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) {
break;
}
}
+ if (row_column_id == 0) {
+ return;
+ }
vectorized::ColumnString* row_store_column =
static_cast<vectorized::ColumnString*>(block.get_by_position(row_column_id)
.column->assume_mutable_ref()
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 6a2fc1553d..cfb7a22871 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2477,7 +2477,7 @@ Status Tablet::fetch_value_by_rowids(RowsetSharedPtr
input_rowset, uint32_t segi
Status Tablet::lookup_row_data(const Slice& encoded_key, const RowLocation&
row_location,
RowsetSharedPtr input_rowset, const
TupleDescriptor* desc,
- OlapReaderStatistics& stats, vectorized::Block*
block,
+ OlapReaderStatistics& stats, std::string&
values,
bool write_to_cache) {
// read row data
BetaRowsetSharedPtr rowset =
std::static_pointer_cast<BetaRowset>(input_rowset);
@@ -2525,11 +2525,12 @@ Status Tablet::lookup_row_data(const Slice&
encoded_key, const RowLocation& row_
RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 1,
column_ptr));
assert(column_ptr->size() == 1);
auto string_column =
static_cast<vectorized::ColumnString*>(column_ptr.get());
+ StringRef value = string_column->get_data_at(0);
+ values = value.to_string();
if (write_to_cache) {
StringRef value = string_column->get_data_at(0);
RowCache::instance()->insert({tablet_id(), encoded_key}, Slice
{value.data, value.size});
}
- vectorized::JsonbSerializeUtil::jsonb_to_block(*desc, *string_column,
*block);
return Status::OK();
}
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index e666dcb16d..4f7fc8dd9e 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -402,7 +402,7 @@ public:
// Lookup a row with TupleDescriptor and fill Block
Status lookup_row_data(const Slice& encoded_key, const RowLocation&
row_location,
RowsetSharedPtr rowset, const TupleDescriptor* desc,
- OlapReaderStatistics& stats, vectorized::Block*
block,
+ OlapReaderStatistics& stats, std::string& values,
bool write_to_cache = false);
Status fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t segid,
diff --git a/be/src/olap/utils.h b/be/src/olap/utils.h
index f91b625db3..e4a167b987 100644
--- a/be/src/olap/utils.h
+++ b/be/src/olap/utils.h
@@ -301,6 +301,18 @@ struct GlobalRowLoacation {
: tablet_id(tid), row_location(rsid, sid, rid) {}
uint32_t tablet_id;
RowLocation row_location;
+
+ bool operator==(const GlobalRowLoacation& rhs) const {
+ return tablet_id == rhs.tablet_id && row_location == rhs.row_location;
+ }
+
+ bool operator<(const GlobalRowLoacation& rhs) const {
+ if (tablet_id != rhs.tablet_id) {
+ return tablet_id < rhs.tablet_id;
+ } else {
+ return row_location < rhs.row_location;
+ }
+ }
};
} // namespace doris
diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index c13e2b53d5..f4cb3f6dff 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -517,11 +517,11 @@ std::string RowDescriptor::debug_string() const {
return ss.str();
}
-int RowDescriptor::get_column_id(int slot_id) const {
+int RowDescriptor::get_column_id(int slot_id, bool force_materialize_slot)
const {
int column_id_counter = 0;
for (const auto tuple_desc : _tuple_desc_map) {
for (const auto slot : tuple_desc->slots()) {
- if (!slot->need_materialize()) {
+ if (!force_materialize_slot && !slot->need_materialize()) {
continue;
}
if (slot->id() == slot_id) {
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 8fd6654e13..48ea79d879 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -494,7 +494,7 @@ public:
std::string debug_string() const;
- int get_column_id(int slot_id) const;
+ int get_column_id(int slot_id, bool force_materialize_slot = false) const;
private:
// Initializes tupleIdxMap during c'tor using the _tuple_desc_map.
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 80d02f6d58..c674223b8a 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -101,6 +101,7 @@
#include "util/time.h"
#include "util/uid_util.h"
#include "vec/columns/column.h"
+#include "vec/columns/column_string.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type.h"
@@ -109,6 +110,7 @@
#include "vec/exec/format/json/new_json_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
+#include "vec/jsonb/serialize.h"
#include "vec/runtime/vdata_stream_mgr.h"
namespace google {
@@ -1414,19 +1416,29 @@ void
PInternalServiceImpl::response_slave_tablet_pull_rowset(
}
}
-static Status read_by_rowids(
- std::pair<size_t, size_t> row_range_idx, const TupleDescriptor& desc,
- const google::protobuf::RepeatedPtrField<PMultiGetRequest_RowId>&
rowids,
- vectorized::Block* sub_block) {
- //read from row_range.first to row_range.second
- for (size_t i = row_range_idx.first; i < row_range_idx.second; ++i) {
+Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
+ PMultiGetResponse* response) {
+ OlapReaderStatistics stats;
+ vectorized::Block result_block;
+
+ // init desc
+ TupleDescriptor desc(request.desc());
+ std::vector<SlotDescriptor> slots;
+ slots.reserve(request.slots().size());
+ for (const auto& pslot : request.slots()) {
+ slots.push_back(SlotDescriptor(pslot));
+ desc.add_slot(&slots.back());
+ }
+
+ // read row by row
+ for (size_t i = 0; i < request.row_locs_size(); ++i) {
+ const auto& row_loc = request.row_locs(i);
MonotonicStopWatch watch;
watch.start();
- auto row_id = rowids[i];
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
- row_id.tablet_id(), true /*include deleted*/);
+ row_loc.tablet_id(), true /*include deleted*/);
RowsetId rowset_id;
- rowset_id.init(row_id.rowset_id());
+ rowset_id.init(row_loc.rowset_id());
if (!tablet) {
continue;
}
@@ -1436,6 +1448,13 @@ static Status read_by_rowids(
LOG(INFO) << "no such rowset " << rowset_id;
continue;
}
+ size_t row_size = 0;
+ Defer _defer([&]() {
+ LOG_EVERY_N(INFO, 100)
+ << "multiget_data single_row, cost(us):" <<
watch.elapsed_time() / 1000
+ << ", row_size:" << row_size;
+ *response->add_row_locs() = row_loc;
+ });
const TabletSchemaSPtr tablet_schema = rowset->tablet_schema();
VLOG_DEBUG << "get tablet schema column_num:" <<
tablet_schema->num_columns()
<< ", version:" << tablet_schema->schema_version()
@@ -1445,18 +1464,34 @@ static Status read_by_rowids(
// find segment
auto it = std::find_if(segment_cache.get_segments().begin(),
segment_cache.get_segments().end(),
- [&row_id](const segment_v2::SegmentSharedPtr&
seg) {
- return seg->id() == row_id.segment_id();
+ [&row_loc](const segment_v2::SegmentSharedPtr&
seg) {
+ return seg->id() == row_loc.segment_id();
});
if (it == segment_cache.get_segments().end()) {
continue;
}
segment_v2::SegmentSharedPtr segment = *it;
- for (int x = 0; x < desc.slots().size() - 1; ++x) {
+ GlobalRowLoacation row_location(row_loc.tablet_id(),
rowset->rowset_id(),
+ row_loc.segment_id(),
row_loc.ordinal_id());
+ // fetch by row store, more effcient way
+ if (request.fetch_row_store()) {
+ CHECK(tablet->tablet_schema()->store_row_column());
+ RowLocation loc(rowset_id, segment->id(), row_loc.ordinal_id());
+ string* value = response->add_binary_row_data();
+ RETURN_IF_ERROR(tablet->lookup_row_data({}, loc, rowset, &desc,
stats, *value));
+ row_size = value->size();
+ continue;
+ }
+
+ // fetch by column store
+ if (result_block.is_empty_column()) {
+ result_block = vectorized::Block(desc.slots(),
request.row_locs().size());
+ }
+ for (int x = 0; x < desc.slots().size(); ++x) {
int index =
tablet_schema->field_index(desc.slots()[x]->col_unique_id());
segment_v2::ColumnIterator* column_iterator = nullptr;
vectorized::MutableColumnPtr column =
- sub_block->get_by_position(x).column->assume_mutable();
+ result_block.get_by_position(x).column->assume_mutable();
if (index < 0) {
column->insert_default();
continue;
@@ -1471,58 +1506,22 @@ static Status read_by_rowids(
opt.stats = &stats;
opt.use_page_cache = !config::disable_storage_page_cache;
column_iterator->init(opt);
- std::vector<segment_v2::rowid_t> rowids {
- static_cast<segment_v2::rowid_t>(row_id.ordinal_id())};
- RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 1,
column));
+ std::vector<segment_v2::rowid_t> single_row_loc {
+ static_cast<segment_v2::rowid_t>(row_loc.ordinal_id())};
+
RETURN_IF_ERROR(column_iterator->read_by_rowids(single_row_loc.data(), 1,
column));
}
- LOG_EVERY_N(INFO, 100) << "multiget_data single_row, cost(us):"
- << watch.elapsed_time() / 1000;
- GlobalRowLoacation row_location(row_id.tablet_id(),
rowset->rowset_id(),
- row_id.segment_id(),
row_id.ordinal_id());
- sub_block->get_columns().back()->assume_mutable()->insert_data(
- reinterpret_cast<const char*>(&row_location),
sizeof(GlobalRowLoacation));
- }
- return Status::OK();
-}
-
-Status PInternalServiceImpl::_multi_get(const PMultiGetRequest* request,
- PMultiGetResponse* response) {
- TupleDescriptor desc(request->desc());
- std::vector<SlotDescriptor> slots;
- slots.reserve(request->slots().size());
- for (const auto& pslot : request->slots()) {
- slots.push_back(SlotDescriptor(pslot));
- desc.add_slot(&slots.back());
}
- assert(desc.slots().back()->col_name() == BeConsts::ROWID_COL);
- vectorized::Block block(desc.slots(), request->rowids().size());
- RETURN_IF_ERROR(
- read_by_rowids(std::pair {0, request->rowids_size()}, desc,
request->rowids(), &block));
- std::vector<size_t> char_type_idx;
- for (size_t i = 0; i < desc.slots().size(); i++) {
- auto column_desc = desc.slots()[i];
- auto type_desc = column_desc->type();
- do {
- if (type_desc.type == TYPE_CHAR) {
- char_type_idx.emplace_back(i);
- break;
- } else if (type_desc.type != TYPE_ARRAY) {
- break;
- }
- // for Array<Char> or Array<Array<Char>>
- type_desc = type_desc.children[0];
- } while (true);
+ // serialize block if not empty
+ if (!result_block.is_empty_column()) {
+ VLOG_DEBUG << "dump block:" << result_block.dump_data(0, 10)
+ << ", be_exec_version:" << request.be_exec_version();
+ [[maybe_unused]] size_t compressed_size = 0;
+ [[maybe_unused]] size_t uncompressed_size = 0;
+ int be_exec_version = request.has_be_exec_version() ?
request.be_exec_version() : 0;
+ RETURN_IF_ERROR(result_block.serialize(be_exec_version,
response->mutable_block(),
+ &uncompressed_size,
&compressed_size,
+
segment_v2::CompressionTypePB::LZ4));
}
- // shrink char_type suffix zero data
- block.shrink_char_type_column_suffix_zero(char_type_idx);
- VLOG_DEBUG << "dump block:" << block.dump_data(0, 10)
- << ", be_exec_version:" << request->be_exec_version();
-
- [[maybe_unused]] size_t compressed_size = 0;
- [[maybe_unused]] size_t uncompressed_size = 0;
- int be_exec_version = request->has_be_exec_version() ?
request->be_exec_version() : 0;
- RETURN_IF_ERROR(block.serialize(be_exec_version,
response->mutable_block(), &uncompressed_size,
- &compressed_size,
segment_v2::CompressionTypePB::LZ4));
return Status::OK();
}
@@ -1536,7 +1535,7 @@ void
PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro
watch.start();
brpc::ClosureGuard closure_guard(done);
response->mutable_status()->set_status_code(0);
- Status st = _multi_get(request, response);
+ Status st = _multi_get(*request, response);
st.to_protobuf(response->mutable_status());
LOG(INFO) << "multiget_data finished, cost(us):" <<
watch.elapsed_time() / 1000;
});
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index dae16604ad..8c6057d978 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -207,7 +207,7 @@ private:
void _response_pull_slave_rowset(const std::string& remote_host, int64_t
brpc_port,
int64_t txn_id, int64_t tablet_id,
int64_t node_id,
bool is_succeed);
- Status _multi_get(const PMultiGetRequest* request, PMultiGetResponse*
response);
+ Status _multi_get(const PMultiGetRequest& request, PMultiGetResponse*
response);
void _get_column_ids_by_tablet_ids(google::protobuf::RpcController*
controller,
const PFetchColIdsRequest* request,
diff --git a/be/src/service/point_query_executor.cpp
b/be/src/service/point_query_executor.cpp
index 21d6ac06ac..ab2ae7519e 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -23,6 +23,8 @@
#include <gen_cpp/internal_service.pb.h>
#include <stdlib.h>
+#include <vector>
+
#include "olap/lru_cache.h"
#include "olap/olap_tuple.h"
#include "olap/row_cursor.h"
@@ -284,11 +286,15 @@ Status PointQueryExecutor::_lookup_row_data() {
if (!_row_read_ctxs[i]._row_location.has_value()) {
continue;
}
+ std::string value;
RETURN_IF_ERROR(_tablet->lookup_row_data(
_row_read_ctxs[i]._primary_key,
_row_read_ctxs[i]._row_location.value(),
*(_row_read_ctxs[i]._rowset_ptr), _reusable->tuple_desc(),
- _profile_metrics.read_stats, _result_block.get(),
+ _profile_metrics.read_stats, value,
!config::disable_storage_row_cache /*whether write row
cache*/));
+ // serilize value to block, currently only jsonb row formt
+
vectorized::JsonbSerializeUtil::jsonb_to_block(*_reusable->tuple_desc(),
value.data(),
+ value.size(),
*_result_block);
}
return Status::OK();
}
diff --git a/be/src/vec/exec/vexchange_node.cpp
b/be/src/vec/exec/vexchange_node.cpp
index a00f9cd839..8535f4a261 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -57,11 +57,6 @@ Status VExchangeNode::init(const TPlanNode& tnode,
RuntimeState* state) {
_is_asc_order = tnode.exchange_node.sort_info.is_asc_order;
_nulls_first = tnode.exchange_node.sort_info.nulls_first;
- if (tnode.exchange_node.__isset.nodes_info) {
- _nodes_info = _pool->add(new
DorisNodesInfo(tnode.exchange_node.nodes_info));
- }
- _use_two_phase_read =
tnode.exchange_node.sort_info.__isset.use_two_phase_read &&
- tnode.exchange_node.sort_info.use_two_phase_read;
return Status::OK();
}
@@ -100,19 +95,6 @@ Status VExchangeNode::open(RuntimeState* state) {
return Status::OK();
}
-Status VExchangeNode::_second_phase_fetch_data(RuntimeState* state, Block*
final_block) {
- auto row_id_col = final_block->get_by_position(final_block->columns() - 1);
- auto tuple_desc = _row_descriptor.tuple_descriptors()[0];
- RowIDFetcher id_fetcher(tuple_desc, state);
- RETURN_IF_ERROR(id_fetcher.init(_nodes_info));
- MutableBlock materialized_block(_row_descriptor.tuple_descriptors(),
final_block->rows());
- // fetch will sort block by sequence of ROWID_COL
- RETURN_IF_ERROR(id_fetcher.fetch(row_id_col.column, &materialized_block));
- // Notice swap may change the structure of final_block
- final_block->swap(materialized_block.to_block());
- return Status::OK();
-}
-
Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
"VExchangeNode::get_next");
SCOPED_TIMER(runtime_profile()->total_time_counter());
@@ -123,12 +105,6 @@ Status VExchangeNode::get_next(RuntimeState* state, Block*
block, bool* eos) {
_is_ready = true;
return Status::OK();
}
- if (_use_two_phase_read) {
- // Block structure may be changed by calling
_second_phase_fetch_data() before.
- // So we should clear block before _stream_recvr->get_next, since
- // blocks in VSortedRunMerger may not compatible with this block.
- block->clear();
- }
auto status = _stream_recvr->get_next(block, eos);
// In vsortrunmerger, it will set eos=true, and block not empty
// so that eos==true, could not make sure that block not have valid data
@@ -153,9 +129,6 @@ Status VExchangeNode::get_next(RuntimeState* state, Block*
block, bool* eos) {
}
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
- if (_use_two_phase_read && block->rows() > 0) {
- RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
- }
return status;
}
diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h
index 46f5aa619c..58b61dc9af 100644
--- a/be/src/vec/exec/vexchange_node.h
+++ b/be/src/vec/exec/vexchange_node.h
@@ -61,9 +61,6 @@ public:
// Status collect_query_statistics(QueryStatistics* statistics) override;
void set_num_senders(int num_senders) { _num_senders = num_senders; }
- // final materializtion, used only in topn node
- Status _second_phase_fetch_data(RuntimeState* state, Block* final_block);
-
private:
int _num_senders;
bool _is_merging;
@@ -78,10 +75,6 @@ private:
VSortExecExprs _vsort_exec_exprs;
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;
-
- // for fetch data by rowids
- DorisNodesInfo* _nodes_info = nullptr;
- bool _use_two_phase_read = false;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index 5069a5a615..e00d34559b 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -90,6 +90,10 @@ public:
void clone_fn_contexts(VExprContext* other);
+ bool force_materialize_slot() const { return _force_materialize_slot; }
+
+ void set_force_materialize_slot() { _force_materialize_slot = true; }
+
private:
friend class VExpr;
@@ -112,5 +116,9 @@ private:
/// The depth of expression-tree.
int _depth_num = 0;
+
+ // This flag only works on VSlotRef.
+ // Force to materialize even if the slot need_materialize is false, we
just ignore need_materialize flag
+ bool _force_materialize_slot = false;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vslot_ref.cpp b/be/src/vec/exprs/vslot_ref.cpp
index 4b2e182563..5a34999acc 100644
--- a/be/src/vec/exprs/vslot_ref.cpp
+++ b/be/src/vec/exprs/vslot_ref.cpp
@@ -27,6 +27,7 @@
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "vec/core/block.h"
+#include "vec/exprs/vexpr_context.h"
namespace doris {
namespace vectorized {
@@ -59,12 +60,12 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const
doris::RowDescriptor&
state->desc_tbl().debug_string());
}
_column_name = &slot_desc->col_name();
- if (!slot_desc->need_materialize()) {
+ if (!context->force_materialize_slot() && !slot_desc->need_materialize()) {
// slot should be ignored manually
_column_id = -1;
return Status::OK();
}
- _column_id = desc.get_column_id(_slot_id);
+ _column_id = desc.get_column_id(_slot_id,
context->force_materialize_slot());
if (_column_id < 0) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"VSlotRef {} have invalid slot id: {}, desc: {}, slot_desc:
{}, desc_tbl: {}",
diff --git a/be/src/vec/jsonb/serialize.cpp b/be/src/vec/jsonb/serialize.cpp
index 69bc603b2f..c8b2eff0f4 100644
--- a/be/src/vec/jsonb/serialize.cpp
+++ b/be/src/vec/jsonb/serialize.cpp
@@ -25,6 +25,10 @@
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
+#include "runtime/jsonb_value.h"
+#include "runtime/primitive_type.h"
+#include "runtime/types.h"
+#include "util/bitmap_value.h"
#include "util/jsonb_document.h"
#include "util/jsonb_stream.h"
#include "util/jsonb_writer.h"
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index 15d41f1f02..d61dc13875 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -24,14 +24,19 @@
#include <new>
#include "common/config.h"
+#include "common/consts.h"
#include "common/object_pool.h"
+#include "exec/rowid_fetcher.h"
+#include "gutil/port.h"
#include "runtime/buffer_control_block.h"
+#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/result_buffer_mgr.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "util/telemetry/telemetry.h"
#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
#include "vec/sink/vmysql_result_writer.h"
#include "vec/sink/vresult_writer.h"
@@ -51,7 +56,7 @@ VResultSink::VResultSink(const RowDescriptor& row_desc, const
std::vector<TExpr>
} else {
_sink_type = sink.type;
}
-
+ _fetch_option = sink.fetch_option;
_name = "ResultSink";
}
@@ -61,6 +66,12 @@ Status VResultSink::prepare_exprs(RuntimeState* state) {
// From the thrift expressions create the real exprs.
RETURN_IF_ERROR(
VExpr::create_expr_trees(state->obj_pool(), _t_output_expr,
&_output_vexpr_ctxs));
+ if (_fetch_option.use_two_phase_fetch) {
+ for (VExprContext* expr_ctx : _output_vexpr_ctxs) {
+ // Must materialize if it a slot, or the slot column id will be -1
+ expr_ctx->set_force_materialize_slot();
+ }
+ }
// Prepare the exprs to run.
RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
return Status::OK();
@@ -100,9 +111,32 @@ Status VResultSink::open(RuntimeState* state) {
return VExpr::open(_output_vexpr_ctxs, state);
}
+Status VResultSink::second_phase_fetch_data(RuntimeState* state, Block*
final_block) {
+ auto row_id_col = final_block->get_by_position(final_block->columns() - 1);
+ CHECK(row_id_col.name == BeConsts::ROWID_COL);
+ auto tuple_desc = _row_desc.tuple_descriptors()[0];
+ FetchOption fetch_option;
+ fetch_option.desc = tuple_desc;
+ fetch_option.t_fetch_opt = _fetch_option;
+ fetch_option.runtime_state = state;
+ RowIDFetcher id_fetcher(fetch_option);
+ RETURN_IF_ERROR(id_fetcher.init());
+ RETURN_IF_ERROR(id_fetcher.fetch(row_id_col.column, final_block));
+ return Status::OK();
+}
+
Status VResultSink::send(RuntimeState* state, Block* block, bool eos) {
INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span,
"VResultSink::send");
- return _writer->append_block(*block);
+ if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
+ RETURN_IF_ERROR(second_phase_fetch_data(state, block));
+ }
+ RETURN_IF_ERROR(_writer->append_block(*block));
+ if (_fetch_option.use_two_phase_fetch) {
+ // Block structure may be changed by calling
_second_phase_fetch_data().
+ // So we should clear block in case of unmatched columns
+ block->clear();
+ }
+ return Status::OK();
}
Status VResultSink::close(RuntimeState* state, Status exec_status) {
diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h
index adf5dd249d..97916544bf 100644
--- a/be/src/vec/sink/vresult_sink.h
+++ b/be/src/vec/sink/vresult_sink.h
@@ -28,6 +28,7 @@
#include "common/status.h"
#include "exec/data_sink.h"
+#include "vec/sink/vresult_writer.h"
namespace doris {
class RuntimeState;
@@ -141,6 +142,7 @@ public:
private:
Status prepare_exprs(RuntimeState* state);
+ Status second_phase_fetch_data(RuntimeState* state, Block* final_block);
TResultSinkType::type _sink_type;
// set file options when sink type is FILE
std::unique_ptr<ResultFileOptions> _file_opts;
@@ -156,6 +158,9 @@ private:
std::shared_ptr<VResultWriter> _writer;
RuntimeProfile* _profile; // Allocated from _pool
int _buf_size; // Allocated from _pool
+
+ // for fetch data by rowids
+ TFetchOption _fetch_option;
};
} // namespace vectorized
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index d7a47f9279..18db7b5847 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -331,7 +331,6 @@ public class CreateTableStmt extends DdlStmt {
}
boolean enableUniqueKeyMergeOnWrite = false;
- boolean enableStoreRowColumn = false;
// analyze key desc
if (engineName.equalsIgnoreCase("olap")) {
// olap table
@@ -401,7 +400,6 @@ public class CreateTableStmt extends DdlStmt {
throw new AnalysisException(
PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE + "
property only support unique key table");
}
-
if (keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) {
enableUniqueKeyMergeOnWrite = true;
if (properties != null) {
@@ -409,7 +407,6 @@ public class CreateTableStmt extends DdlStmt {
// so we just clone a properties map here.
enableUniqueKeyMergeOnWrite =
PropertyAnalyzer.analyzeUniqueKeyMergeOnWrite(
new HashMap<>(properties));
- enableStoreRowColumn =
PropertyAnalyzer.analyzeStoreRowColumn(new HashMap<>(properties));
}
}
@@ -455,7 +452,8 @@ public class CreateTableStmt extends DdlStmt {
columnDefs.add(ColumnDef.newDeleteSignColumnDef(AggregateType.REPLACE));
}
}
- if (enableStoreRowColumn) {
+ // add a hidden column as row store
+ if (properties != null && PropertyAnalyzer.analyzeStoreRowColumn(new
HashMap<>(properties))) {
columnDefs.add(ColumnDef.newRowStoreColumnDef());
}
if (Config.enable_hidden_version_column_by_default && keysDesc != null
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index c8bc8ddf43..0a9bd5a50e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -658,7 +658,10 @@ public class SelectStmt extends QueryStmt {
Set<SlotRef> orderingSlots = Sets.newHashSet();
Set<SlotRef> conjuntSlots = Sets.newHashSet();
TreeNode.collect(resultExprs,
Predicates.instanceOf(SlotRef.class), resultSlots);
- TreeNode.collect(sortInfo.getOrderingExprs(),
Predicates.instanceOf(SlotRef.class), orderingSlots);
+ if (sortInfo != null) {
+ TreeNode.collect(sortInfo.getOrderingExprs(),
+ Predicates.instanceOf(SlotRef.class), orderingSlots);
+ }
if (whereClause != null) {
whereClause.collect(SlotRef.class, conjuntSlots);
}
@@ -714,20 +717,23 @@ public class SelectStmt extends QueryStmt {
||
!ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) {
return false;
}
- if (!evaluateOrderBy) {
- // Need evaluate orderby, if sort node was eliminated then this
optmization
- // could be useless
- return false;
- }
- // Only handle the simplest `SELECT ... FROM <tbl> WHERE ... ORDER BY
... LIMIT ...` query
+ // Only handle the simplest `SELECT ... FROM <tbl> WHERE ... [ORDER BY
...] [LIMIT ...]` query
if (getAggInfo() != null
|| getHavingPred() != null
|| getWithClause() != null
|| getAnalyticInfo() != null) {
return false;
}
+ // ignore short circuit query
+ if (isPointQueryShortCircuit()) {
+ return false;
+ }
+ // ignore insert into select
+ if (fromInsert) {
+ return false;
+ }
+ // ensure no sub query
if (!analyzer.isRootAnalyzer()) {
- // ensure no sub query
return false;
}
// If select stmt has inline view or this is an inline view query stmt
analyze call
@@ -750,24 +756,35 @@ public class SelectStmt extends QueryStmt {
if (!olapTable.getEnableLightSchemaChange()) {
return false;
}
- // Only TOPN query at present
- if (getOrderByElements() == null
- || !hasLimit()
- || getLimit() <= 0
- || getLimit() >
ConnectContext.get().getSessionVariable().topnOptLimitThreshold) {
- return false;
- }
- // Check order by exprs are all slot refs
- // Rethink? implement more generic to support all exprs
- LOG.debug("getOrderingExprs {}", sortInfo.getOrderingExprs());
- LOG.debug("getOrderByElements {}", getOrderByElements());
- for (Expr sortExpr : sortInfo.getOrderingExprs()) {
- if (!(sortExpr instanceof SlotRef)) {
+ if (getOrderByElements() != null) {
+ if (!evaluateOrderBy) {
+ // Need evaluate orderby, if sort node was eliminated then
this optmization
+ // could be useless
return false;
}
+ // case1: general topn query, like: select * from tbl where xxx
order by yyy limit n
+ if (!hasLimit()
+ || getLimit() <= 0
+ || getLimit() >
ConnectContext.get().getSessionVariable().topnOptLimitThreshold) {
+ return false;
+ }
+ // Check order by exprs are all slot refs
+ // Rethink? implement more generic to support all exprs
+ LOG.debug("getOrderingExprs {}", sortInfo.getOrderingExprs());
+ LOG.debug("getOrderByElements {}", getOrderByElements());
+ for (Expr sortExpr : sortInfo.getOrderingExprs()) {
+ if (!(sortExpr instanceof SlotRef)) {
+ return false;
+ }
+ }
+ isTwoPhaseOptEnabled = true;
+ return true;
+ } else {
+ // case2: optimize scan utilize row store column, query like
select * from tbl where xxx [limit xxx]
+ // TODO: we only optimize query with select * at present
+ return olapTable.storeRowColumn() &&
selectList.getItems().stream().anyMatch(e -> e.isStar());
}
- isTwoPhaseOptEnabled = true;
- return true;
+ // return false;
}
public List<TupleId> getTableRefIds() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index b7214f7d9d..e0340ba20b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -38,6 +38,7 @@ import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Function.NullableMode;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
@@ -134,6 +135,7 @@ import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.RepeatNode;
+import org.apache.doris.planner.ResultSink;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SchemaScanNode;
import org.apache.doris.planner.SelectNode;
@@ -146,6 +148,7 @@ import org.apache.doris.planner.external.HudiScanNode;
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
+import org.apache.doris.thrift.TFetchOption;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TPushAggOp;
@@ -200,6 +203,47 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
this.statsErrorEstimator = statsErrorEstimator;
}
+ // We use two phase read to optimize sql like: select * from tbl [where
xxx = ???] [order by column1] [limit n]
+ // in the first phase, we add an extra column `RowId` to Block, and sort
blocks in TopN nodes
+ // in the second phase, we have n rows, we do a fetch rpc to get all
rowids data for the n rows
+ // and reconconstruct the final block
+ private void setResultSinkFetchOptionIfNeed() {
+ boolean needFetch = false;
+ // Only single olap table should be fetched
+ OlapTable fetchOlapTable = null;
+ for (PlanFragment fragment : context.getPlanFragments()) {
+ PlanNode node = fragment.getPlanRoot();
+ PlanNode parent = null;
+ // OlapScanNode is the last node.
+ // So, just get the last two node and check if they are SortNode
and OlapScan.
+ while (node.getChildren().size() != 0) {
+ parent = node;
+ node = node.getChildren().get(0);
+ }
+
+ // case1: general topn optimized query
+ if ((node instanceof OlapScanNode) && (parent instanceof
SortNode)) {
+ SortNode sortNode = (SortNode) parent;
+ OlapScanNode scanNode = (OlapScanNode) node;
+ if (sortNode.getUseTwoPhaseReadOpt()) {
+ needFetch = true;
+ fetchOlapTable = scanNode.getOlapTable();
+ break;
+ }
+ }
+ }
+ for (PlanFragment fragment : context.getPlanFragments()) {
+ if (needFetch && fragment.getSink() instanceof ResultSink) {
+ TFetchOption fetchOption = new TFetchOption();
+ fetchOption.setFetchRowStore(fetchOlapTable.storeRowColumn());
+ fetchOption.setUseTwoPhaseFetch(true);
+
fetchOption.setNodesInfo(Env.getCurrentSystemInfo().createAliveNodesInfo());
+ ((ResultSink) fragment.getSink()).setFetchOption(fetchOption);
+ break;
+ }
+ }
+ }
+
/**
* Translate Nereids Physical Plan tree to Stale Planner PlanFragment tree.
*
@@ -236,6 +280,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
for (PlanFragment fragment : context.getPlanFragments()) {
fragment.finalize(null);
}
+ setResultSinkFetchOptionIfNeed();
Collections.reverse(context.getPlanFragments());
context.getDescTable().computeMemLayout();
return rootFragment;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index c0cddcc242..ab82dad209 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -24,17 +24,12 @@ import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.SortInfo;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
-import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.StatsRecursiveDerive;
-import org.apache.doris.system.Backend;
-import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TExchangeNode;
import org.apache.doris.thrift.TExplainLevel;
-import org.apache.doris.thrift.TNodeInfo;
-import org.apache.doris.thrift.TPaloNodesInfo;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
@@ -150,9 +145,6 @@ public class ExchangeNode extends PlanNode {
}
if (mergeInfo != null) {
msg.exchange_node.setSortInfo(mergeInfo.toThrift());
- if (mergeInfo.useTwoPhaseRead()) {
- msg.exchange_node.setNodesInfo(createNodesInfo());
- }
}
msg.exchange_node.setOffset(offset);
}
@@ -174,18 +166,4 @@ public class ExchangeNode extends PlanNode {
public String getNodeExplainString(String prefix, TExplainLevel
detailLevel) {
return prefix + "offset: " + offset + "\n";
}
-
- /**
- * Set the parameters used to fetch data by rowid column
- * after init().
- */
- private TPaloNodesInfo createNodesInfo() {
- TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
- SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
- for (Long id : systemInfoService.getBackendIds(true /*need alive*/)) {
- Backend backend = systemInfoService.getBackend(id);
- nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0,
backend.getHost(), backend.getBrpcPort()));
- }
- return nodesInfo;
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index 49058a021c..42e72d6c84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -35,6 +35,7 @@ import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
@@ -51,6 +52,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.statistics.query.StatsDelta;
+import org.apache.doris.thrift.TFetchOption;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TRuntimeFilterMode;
@@ -305,6 +307,10 @@ public class OriginalPlanner extends Planner {
// Double check this plan to ensure it's a general topn
query
injectRowIdColumnSlot();
((SortNode) singleNodePlan).setUseTwoPhaseReadOpt(true);
+ } else if (singleNodePlan instanceof OlapScanNode &&
singleNodePlan.getChildren().size() == 0) {
+ // Optimize query like `SELECT ... FROM <tbl> WHERE ...
LIMIT ...`.
+ // This typically used when row store enabled, to reduce
scan cost
+ injectRowIdColumnSlot();
} else {
// This is not a general topn query, rollback
needMaterialize flag
for (SlotDescriptor slot :
analyzer.getDescTbl().getSlotDescs().values()) {
@@ -463,11 +469,13 @@ public class OriginalPlanner extends Planner {
return slotDesc;
}
- // We use two phase read to optimize sql like: select * from tbl [where
xxx = ???] order by column1 limit n
+ // We use two phase read to optimize sql like: select * from tbl [where
xxx = ???] [order by column1] [limit n]
// in the first phase, we add an extra column `RowId` to Block, and sort
blocks in TopN nodes
// in the second phase, we have n rows, we do a fetch rpc to get all
rowids date for the n rows
// and reconconstruct the final block
private void injectRowIdColumnSlot() {
+ boolean injected = false;
+ OlapTable olapTable = null;
for (PlanFragment fragment : fragments) {
PlanNode node = fragment.getPlanRoot();
PlanNode parent = null;
@@ -478,17 +486,37 @@ public class OriginalPlanner extends Planner {
node = node.getChildren().get(0);
}
- if (!(node instanceof OlapScanNode) || !(parent instanceof
SortNode)) {
- continue;
+ // case1
+ if ((node instanceof OlapScanNode) && (parent instanceof
SortNode)) {
+ SortNode sortNode = (SortNode) parent;
+ OlapScanNode scanNode = (OlapScanNode) node;
+ SlotDescriptor slot = injectRowIdColumnSlot(analyzer,
scanNode.getTupleDesc());
+ injectRowIdColumnSlot(analyzer,
sortNode.getSortInfo().getSortTupleDescriptor());
+ SlotRef extSlot = new SlotRef(slot);
+ sortNode.getResolvedTupleExprs().add(extSlot);
+ sortNode.getSortInfo().setUseTwoPhaseRead();
+ injected = true;
+ olapTable = scanNode.getOlapTable();
+ break;
+ }
+ // case2
+ if ((node instanceof OlapScanNode) && parent == null) {
+ OlapScanNode scanNode = (OlapScanNode) node;
+ injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc());
+ injected = true;
+ olapTable = scanNode.getOlapTable();
+ break;
+ }
+ }
+ for (PlanFragment fragment : fragments) {
+ if (injected && fragment.getSink() instanceof ResultSink) {
+ TFetchOption fetchOption = new TFetchOption();
+ fetchOption.setFetchRowStore(olapTable.storeRowColumn());
+ fetchOption.setUseTwoPhaseFetch(true);
+
fetchOption.setNodesInfo(Env.getCurrentSystemInfo().createAliveNodesInfo());
+ ((ResultSink) fragment.getSink()).setFetchOption(fetchOption);
+ break;
}
- SortNode sortNode = (SortNode) parent;
- OlapScanNode scanNode = (OlapScanNode) node;
- SlotDescriptor slot = injectRowIdColumnSlot(analyzer,
scanNode.getTupleDesc());
- injectRowIdColumnSlot(analyzer,
sortNode.getSortInfo().getSortTupleDescriptor());
- SlotRef extSlot = new SlotRef(slot);
- sortNode.getResolvedTupleExprs().add(extSlot);
- sortNode.getSortInfo().setUseTwoPhaseRead();
- break;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java
index c940851de8..0a79881d58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java
@@ -21,6 +21,7 @@ import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TFetchOption;
import org.apache.doris.thrift.TResultSink;
/**
@@ -30,6 +31,8 @@ import org.apache.doris.thrift.TResultSink;
*/
public class ResultSink extends DataSink {
private final PlanNodeId exchNodeId;
+ // Two phase fetch option
+ private TFetchOption fetchOption;
public ResultSink(PlanNodeId exchNodeId) {
this.exchNodeId = exchNodeId;
@@ -43,13 +46,26 @@ public class ResultSink extends DataSink {
strBuilder.append("V");
}
strBuilder.append("RESULT SINK\n");
+ if (fetchOption != null) {
+ strBuilder.append(prefix).append(" ").append("OPT TWO PHASE\n");
+ if (fetchOption.isFetchRowStore()) {
+ strBuilder.append(prefix).append(" ").append("FETCH ROW
STORE\n");
+ }
+ }
return strBuilder.toString();
}
+ public void setFetchOption(TFetchOption fetchOption) {
+ this.fetchOption = fetchOption;
+ }
+
@Override
protected TDataSink toThrift() {
TDataSink result = new TDataSink(TDataSinkType.RESULT_SINK);
TResultSink tResultSink = new TResultSink();
+ if (fetchOption != null) {
+ tResultSink.setFetchOption(fetchOption);
+ }
result.setResultSink(tResultSink);
return result;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
index 743c4fb54f..aab8f44186 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
@@ -144,7 +144,7 @@ public class SortNode extends PlanNode {
}
public boolean getUseTwoPhaseReadOpt() {
- return useTopnOpt;
+ return this.useTwoPhaseReadOpt;
}
public void setUseTwoPhaseReadOpt(boolean useTwoPhaseReadOpt) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index de4db69569..2c5fd90f21 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -35,6 +35,8 @@ import org.apache.doris.common.util.NetUtils;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend.BackendState;
+import org.apache.doris.thrift.TNodeInfo;
+import org.apache.doris.thrift.TPaloNodesInfo;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageMedium;
@@ -156,6 +158,16 @@ public class SystemInfoService {
}
};
+ public static TPaloNodesInfo createAliveNodesInfo() {
+ TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
+ SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
+ for (Long id : systemInfoService.getBackendIds(true /*need alive*/)) {
+ Backend backend = systemInfoService.getBackend(id);
+ nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0,
backend.getHost(), backend.getBrpcPort()));
+ }
+ return nodesInfo;
+ }
+
// for deploy manager
public void addBackends(List<HostInfo> hostInfos, boolean isFree)
throws UserException {
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 3b905897c8..7a12d2be12 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -1,4 +1,5 @@
// Licensed to the Apache Software Foundation (ASF) under one
+
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
@@ -603,23 +604,35 @@ message PFetchTableSchemaResult {
repeated PTypeDesc column_types = 4;
}
+message PRowLocation {
+ optional int64 tablet_id = 1;
+ optional string rowset_id = 2;
+ optional uint64 segment_id = 3;
+ optional uint64 ordinal_id = 4;
+}
+
message PMultiGetRequest {
- message RowId {
- optional int64 tablet_id = 1;
- optional string rowset_id = 2;
- optional uint64 segment_id = 3;
- optional uint64 ordinal_id = 4;
- };
- repeated RowId rowids = 1;
+ repeated PRowLocation row_locs = 1;
optional PTupleDescriptor desc = 2;
repeated PSlotDescriptor slots = 3;
// for compability
optional int32 be_exec_version = 4;
+ optional bool fetch_row_store = 5;
+ optional PUniqueId query_id = 6;
};
message PMultiGetResponse {
optional PBlock block = 1;
optional PStatus status = 2;
+
+ // more effecient serialization fields for row store
+ enum RowFormat {
+ JSONB = 0;
+ };
+ optional RowFormat format = 3;
+ repeated bytes binary_row_data = 4;
+ // for sorting rows
+ repeated PRowLocation row_locs = 5;
};
message PFetchColIdsRequest {
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index f2a191a777..7883aab0b6 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -160,9 +160,18 @@ struct TMultiCastDataStreamSink {
2: optional list<list<TPlanFragmentDestination>> destinations;
}
+struct TFetchOption {
+ 1: optional bool use_two_phase_fetch;
+ // Nodes in this cluster, used for second phase fetch
+ 2: optional Descriptors.TPaloNodesInfo nodes_info;
+ // Whether fetch row store
+ 3: optional bool fetch_row_store;
+}
+
struct TResultSink {
1: optional TResultSinkType type;
- 2: optional TResultFileSinkOptions file_options // deprecated
+ 2: optional TResultFileSinkOptions file_options; // deprecated
+ 3: optional TFetchOption fetch_option;
}
struct TResultFileSink {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index ba97ae77e5..8afc3fde4c 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -908,8 +908,6 @@ struct TExchangeNode {
2: optional TSortInfo sort_info
// This is tHe number of rows to skip before returning results
3: optional i64 offset
- // Nodes in this cluster, used for second phase fetch
- 4: optional Descriptors.TPaloNodesInfo nodes_info
}
struct TOlapRewriteNode {
diff --git
a/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q01.out
b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q01.out
new file mode 100644
index 0000000000..ed6e469cbf
--- /dev/null
+++ b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q01.out
@@ -0,0 +1,8 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !dup_key_2pr_q01 --
+\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N \N
+\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N \N
+2022-04-15T18:12:20 -1476674200 76077621753438032.418 false -81
-17630 -10269 -268792458 13778.415 -2.147435208942339E9
93226194917360130.580 2022-07-02 2022-08-07T20:32:33 2022-02-06
195.140.76.121 [email protected] Maywood Junction 85
+2022-12-09T05:37:25 115193592 2535754112868463.975 true 85
-4281 22951 -1062042991 4304.926 -2.147336076742606E9
1661169792864805.553 2022-10-14 2022-07-19T18:25:57 2022-12-01
118.127.225.101 [email protected] Milwaukee Point 50
+2022-08-07T21:42:59 380691749 46148671567644994.456 false 102
-6175 -147 -521124424 17149.252 -2.147205556984866E9
44529271870524715.164 2022-03-18 2022-02-07T01:28:54 2022-05-25
0.24.121.144 [email protected] Glendale Hill 78
+
diff --git
a/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q02.out
b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q02.out
new file mode 100644
index 0000000000..ff911d4bf8
--- /dev/null
+++ b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q02.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !dup_key_2pr_q02 --
+2023-01-10T15:40:04 -49426869 59390574629503991.413 true -19
-6791 -6536 240107090 22801.043 7.01523382557248E8
6338683861031199.120 2022-05-15 2022-01-26T07:36:05 2022-04-26
139.41.223.19 [email protected] Laurel Way 44
+
diff --git
a/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q03.out
b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q03.out
new file mode 100644
index 0000000000..c0b3e57752
--- /dev/null
+++ b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q03.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !dup_key_2pr_q03 --
+2022-01-11T06:20:14 1799797425 30891281288216959.961 false -5
7495 -10784 1909249054 -31658.44 -1.992204200787845E9
6080150162640151.680 2022-04-08 2022-09-16T21:29:08 2022-04-18
29.121.52.42 [email protected] Coleman Lane 51
+
diff --git
a/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q04.out
b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q04.out
new file mode 100644
index 0000000000..b05344a0b4
--- /dev/null
+++ b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q04.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !dup_key_2pr_q04 --
+2022-01-10T16:11:53 1906472648 30323570439371776.353 true -64
23204 -27723 1753599187 -28017.578 -8.7412093534361E7
34175877199258167.909 2023-01-07 2022-07-02T15:40:57 2022-10-09
60.230.5.23 [email protected] Summit Plaza 70
+
diff --git a/regression-test/suites/datatype_p0/scalar_types/load.groovy
b/regression-test/suites/datatype_p0/scalar_types/load.groovy
index 1eb73201d3..954f1df03a 100644
--- a/regression-test/suites/datatype_p0/scalar_types/load.groovy
+++ b/regression-test/suites/datatype_p0/scalar_types/load.groovy
@@ -48,7 +48,7 @@ suite("test_scalar_types_load", "p0") {
DUPLICATE KEY(`k1`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 10
- PROPERTIES("replication_num" = "1");
+ PROPERTIES("replication_num" = "1", "store_row_column" = "true");
"""
// load data
@@ -100,7 +100,7 @@ suite("test_scalar_types_load", "p0") {
DUPLICATE KEY(`c_datetimev2`, `c_bigint`, `c_decimalv3`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`c_bigint`) BUCKETS 10
- PROPERTIES("replication_num" = "1");
+ PROPERTIES("replication_num" = "1", "store_row_column" = "true");
"""
// insert data into unique key table1 2 times
@@ -136,7 +136,7 @@ suite("test_scalar_types_load", "p0") {
UNIQUE KEY(`c_datetimev2`, `c_bigint`, `c_decimalv3`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`c_bigint`) BUCKETS 10
- PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write"
= "true");
+ PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write"
= "true", "store_row_column" = "true");
"""
// insert data into unique key table1 2 times
@@ -232,7 +232,7 @@ suite("test_scalar_types_load", "p0") {
DUPLICATE KEY(`k1`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 10
- PROPERTIES("replication_num" = "1");
+ PROPERTIES("replication_num" = "1", "store_row_column" = "true");
"""
// insert data into dup table with index
diff --git
a/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q01.sql
b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q01.sql
new file mode 100644
index 0000000000..a193207216
--- /dev/null
+++ b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q01.sql
@@ -0,0 +1 @@
+SELECT * FROM tbl_scalar_types_dup_3keys ORDER BY c_double, c_datetimev2,
c_decimal LIMIT 5;
\ No newline at end of file
diff --git
a/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q02.sql
b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q02.sql
new file mode 100644
index 0000000000..a181e88cb8
--- /dev/null
+++ b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q02.sql
@@ -0,0 +1 @@
+SELECT * FROM tbl_scalar_types_dup_3keys where c_datetimev2 = '2023-01-10
15:40:04';
\ No newline at end of file
diff --git
a/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q03.sql
b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q03.sql
new file mode 100644
index 0000000000..820b02760d
--- /dev/null
+++ b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q03.sql
@@ -0,0 +1 @@
+SELECT * FROM tbl_scalar_types_dup_3keys where c_bigint = 1799797425;
\ No newline at end of file
diff --git
a/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q04.sql
b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q04.sql
new file mode 100644
index 0000000000..7d91b8ba31
--- /dev/null
+++ b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q04.sql
@@ -0,0 +1 @@
+SELECT * FROM tbl_scalar_types_dup_3keys where c_largeint = '1753599187';
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]