This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 231952a57d9ce300588a23f7e2ee0614fc58259b Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com> AuthorDate: Thu Feb 2 11:00:23 2023 +0800 [fix](load) sequence column do not compare correctly in memtable (#16211) --- be/src/olap/memtable.cpp | 5 +- be/test/olap/delta_writer_test.cpp | 115 ++++++++++++++------- .../unique/test_unique_table_new_sequence.out | 8 +- .../unique/test_unique_table_sequence.out | 6 +- .../data/data_model_p0/unique/unique_key_data1.csv | 1 + .../data/data_model_p0/unique/unique_key_data2.csv | 3 +- .../unique/test_unique_table_new_sequence.groovy | 8 +- .../unique/test_unique_table_sequence.groovy | 8 +- 8 files changed, 100 insertions(+), 54 deletions(-) diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index adc57dfee7..2fdf41e158 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -321,8 +321,9 @@ void MemTable::_replace_row(const ContiguousRow& src_row, TableKey row_in_skipli void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist) { if (_tablet_schema->has_sequence_col()) { auto sequence_idx = _tablet_schema->sequence_col_idx(); - auto res = _input_mutable_block.compare_at(row_in_skiplist->_row_pos, new_row->_row_pos, - sequence_idx, _input_mutable_block, -1); + DCHECK_LT(sequence_idx, _input_mutable_block.columns()); + auto col_ptr = _input_mutable_block.mutable_columns()[sequence_idx].get(); + auto res = col_ptr->compare_at(row_in_skiplist->_row_pos, new_row->_row_pos, *col_ptr, -1); // dst sequence column larger than src, don't need to update if (res > 0) { return; diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index 16051d1adc..b3aa765c2f 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -29,6 +29,7 @@ #include "gen_cpp/internal_service.pb.h" #include "olap/field.h" #include "olap/options.h" +#include "olap/rowset/beta_rowset.h" #include "olap/storage_engine.h" #include "olap/tablet.h" #include "olap/tablet_meta_manager.h" @@ -247,7 +248,7 @@ static void create_tablet_request_with_sequence_col(int64_t tablet_id, int32_t s request->tablet_schema.short_key_column_count = 2; request->tablet_schema.keys_type = TKeysType::UNIQUE_KEYS; request->tablet_schema.storage_type = TStorageType::COLUMN; - request->tablet_schema.__set_sequence_col_idx(2); + request->tablet_schema.__set_sequence_col_idx(4); request->__set_storage_format(TStorageFormat::V2); TColumn k1; @@ -262,13 +263,6 @@ static void create_tablet_request_with_sequence_col(int64_t tablet_id, int32_t s k2.column_type.type = TPrimitiveType::SMALLINT; request->tablet_schema.columns.push_back(k2); - TColumn sequence_col; - sequence_col.column_name = SEQUENCE_COL; - sequence_col.__set_is_key(false); - sequence_col.column_type.type = TPrimitiveType::INT; - sequence_col.__set_aggregation_type(TAggregationType::REPLACE); - request->tablet_schema.columns.push_back(sequence_col); - TColumn v1; v1.column_name = "v1"; v1.__set_is_key(false); @@ -282,6 +276,13 @@ static void create_tablet_request_with_sequence_col(int64_t tablet_id, int32_t s v2.column_type.type = TPrimitiveType::DATEV2; v2.__set_aggregation_type(TAggregationType::REPLACE); request->tablet_schema.columns.push_back(v2); + + TColumn sequence_col; + sequence_col.column_name = SEQUENCE_COL; + sequence_col.__set_is_key(false); + sequence_col.column_type.type = TPrimitiveType::INT; + sequence_col.__set_aggregation_type(TAggregationType::REPLACE); + request->tablet_schema.columns.push_back(sequence_col); } static TDescriptorTable create_descriptor_tablet() { @@ -346,15 +347,15 @@ static TDescriptorTable create_descriptor_tablet_with_sequence_col() { TSlotDescriptorBuilder().type(TYPE_TINYINT).column_name("k1").column_pos(0).build()); tuple_builder.add_slot( TSlotDescriptorBuilder().type(TYPE_SMALLINT).column_name("k2").column_pos(1).build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_DATETIME).column_name("v1").column_pos(2).build()); + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_DATEV2).column_name("v2").column_pos(3).build()); tuple_builder.add_slot(TSlotDescriptorBuilder() .type(TYPE_INT) .column_name(SEQUENCE_COL) - .column_pos(2) + .column_pos(4) .build()); - tuple_builder.add_slot( - TSlotDescriptorBuilder().type(TYPE_DATETIME).column_name("v1").column_pos(3).build()); - tuple_builder.add_slot( - TSlotDescriptorBuilder().type(TYPE_DATEV2).column_name("v2").column_pos(4).build()); tuple_builder.build(&dtb); return dtb.desc_tbl(); @@ -790,23 +791,43 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { int16_t c2 = 456; columns[1]->insert_data((const char*)&c2, sizeof(c2)); - int32_t c3 = 1; - columns[2]->insert_data((const char*)&c3, sizeof(c2)); + DateTimeValue c3; + c3.from_date_str("2020-07-16 19:39:43", 19); + int64_t c3_int = c3.to_int64(); + columns[2]->insert_data((const char*)&c3_int, sizeof(c3)); - DateTimeValue c4; - c4.from_date_str("2020-07-16 19:39:43", 19); - int64_t c4_int = c4.to_int64(); + doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType> c4; + c4.set_time(2022, 6, 6, 0, 0, 0, 0); + uint32_t c4_int = c4.to_date_int_val(); columns[3]->insert_data((const char*)&c4_int, sizeof(c4)); - doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType> c5; - c5.set_time(2022, 6, 6, 0, 0, 0, 0); - uint32_t c5_int = c5.to_date_int_val(); - columns[4]->insert_data((const char*)&c5_int, sizeof(c5)); - + int32_t c5 = 100; + columns[4]->insert_data((const char*)&c5, sizeof(c2)); res = delta_writer->write(&block, {0}); ASSERT_TRUE(res.ok()); } + { + int8_t c1 = 123; + columns[0]->insert_data((const char*)&c1, sizeof(c1)); + + int16_t c2 = 456; + columns[1]->insert_data((const char*)&c2, sizeof(c2)); + + DateTimeValue c3; + c3.from_date_str("2020-07-31 19:39:43", 19); + int64_t c3_int = c3.to_int64(); + columns[2]->insert_data((const char*)&c3_int, sizeof(c3)); + + doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType> c4; + c4.set_time(2022, 7, 6, 0, 0, 0, 0); + uint32_t c4_int = c4.to_date_int_val(); + columns[3]->insert_data((const char*)&c4_int, sizeof(c4)); + int32_t c5 = 90; + columns[4]->insert_data((const char*)&c5, sizeof(c2)); + res = delta_writer->write(&block, {1}); + ASSERT_TRUE(res.ok()); + } res = delta_writer->close(); ASSERT_TRUE(res.ok()); res = delta_writer->close_wait(PSlaveTabletNodes(), false); @@ -824,20 +845,42 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs; StorageEngine::instance()->txn_manager()->get_txn_related_tablets( write_req.txn_id, write_req.partition_id, &tablet_related_rs); - for (auto& tablet_rs : tablet_related_rs) { - std::cout << "start to publish txn" << std::endl; - RowsetSharedPtr rowset = tablet_rs.second; - res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, - write_req.tablet_id, write_req.schema_hash, - tablet_rs.first.tablet_uid, version); - ASSERT_TRUE(res.ok()); - std::cout << "start to add inc rowset:" << rowset->rowset_id() - << ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first - << "-" << rowset->version().second << std::endl; - res = tablet->add_inc_rowset(rowset); - ASSERT_TRUE(res.ok()); - } + ASSERT_EQ(1, tablet_related_rs.size()); + + std::cout << "start to publish txn" << std::endl; + RowsetSharedPtr rowset = tablet_related_rs.begin()->second; + res = k_engine->txn_manager()->publish_txn( + meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id, + write_req.schema_hash, tablet_related_rs.begin()->first.tablet_uid, version); + ASSERT_TRUE(res.ok()); + std::cout << "start to add inc rowset:" << rowset->rowset_id() + << ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first + << "-" << rowset->version().second << std::endl; + res = tablet->add_inc_rowset(rowset); + ASSERT_TRUE(res.ok()); ASSERT_EQ(1, tablet->num_rows()); + std::vector<segment_v2::SegmentSharedPtr> segments; + res = ((BetaRowset*)rowset.get())->load_segments(&segments); + ASSERT_TRUE(res.ok()); + ASSERT_EQ(1, rowset->num_segments()); + ASSERT_EQ(1, segments.size()); + + // read data, verify the data correct + OlapReaderStatistics stats; + StorageReadOptions opts; + opts.stats = &stats; + opts.tablet_schema = rowset->tablet_schema(); + + std::unique_ptr<RowwiseIterator> iter; + Schema schema(rowset->tablet_schema()); + segments[0]->new_iterator(schema, opts, &iter); + auto read_block = rowset->tablet_schema()->create_block(); + res = iter->next_batch(&read_block); + ASSERT_TRUE(res.ok()); + ASSERT_EQ(1, read_block.rows()); + // get the value from sequence column + auto seq_v = read_block.get_by_position(4).column->get_int(0); + ASSERT_EQ(100, seq_v); res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false); ASSERT_TRUE(res.ok()); diff --git a/regression-test/data/data_model_p0/unique/test_unique_table_new_sequence.out b/regression-test/data/data_model_p0/unique/test_unique_table_new_sequence.out index 62f2bf221b..825b7039c1 100644 --- a/regression-test/data/data_model_p0/unique/test_unique_table_new_sequence.out +++ b/regression-test/data/data_model_p0/unique/test_unique_table_new_sequence.out @@ -5,18 +5,18 @@ 3 6 13 -- !all -- -1 2 14 +1 2 15 2 5 12 3 6 13 -- !all -- -1 2 14 +1 2 15 15 8 19 2 5 12 3 6 13 -- !all -- -1 10 14 0 14 +1 10 15 0 15 15 8 19 0 19 2 5 14 0 12 3 6 11 0 13 @@ -36,7 +36,7 @@ __DORIS_DELETE_SIGN__ TINYINT No false 0 REPLACE __DORIS_SEQUENCE_COL__ INT Yes false \N REPLACE -- !all -- -1 10 14 0 14 +1 10 15 0 15 15 8 19 0 19 2 5 14 0 12 21 8 22 0 22 diff --git a/regression-test/data/data_model_p0/unique/test_unique_table_sequence.out b/regression-test/data/data_model_p0/unique/test_unique_table_sequence.out index e5cc5b4907..da9bce4389 100644 --- a/regression-test/data/data_model_p0/unique/test_unique_table_sequence.out +++ b/regression-test/data/data_model_p0/unique/test_unique_table_sequence.out @@ -5,17 +5,17 @@ 3 6 13 -- !all -- -1 2 14 +1 2 15 2 5 12 3 6 13 -- !all -- -1 10 14 +1 10 15 2 5 14 3 6 11 -- !all -- -1 10 14 0 14 +1 10 15 0 15 15 9 18 0 \N 2 5 14 0 12 3 6 11 0 13 diff --git a/regression-test/data/data_model_p0/unique/unique_key_data1.csv b/regression-test/data/data_model_p0/unique/unique_key_data1.csv index e1c2df2611..1a65108428 100644 --- a/regression-test/data/data_model_p0/unique/unique_key_data1.csv +++ b/regression-test/data/data_model_p0/unique/unique_key_data1.csv @@ -1,3 +1,4 @@ 1,4,11 2,5,12 3,6,13 +2,7,9 diff --git a/regression-test/data/data_model_p0/unique/unique_key_data2.csv b/regression-test/data/data_model_p0/unique/unique_key_data2.csv index ef491a3b64..b6d15e945a 100644 --- a/regression-test/data/data_model_p0/unique/unique_key_data2.csv +++ b/regression-test/data/data_model_p0/unique/unique_key_data2.csv @@ -1,3 +1,4 @@ -1,2,14 +1,2,15 2,3,2 3,4,3 +1,9,14 diff --git a/regression-test/suites/data_model_p0/unique/test_unique_table_new_sequence.groovy b/regression-test/suites/data_model_p0/unique/test_unique_table_new_sequence.groovy index 4c70ddb272..a1808b4d6f 100644 --- a/regression-test/suites/data_model_p0/unique/test_unique_table_new_sequence.groovy +++ b/regression-test/suites/data_model_p0/unique/test_unique_table_new_sequence.groovy @@ -49,8 +49,8 @@ suite("test_unique_table_new_sequence") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) - assertEquals(3, json.NumberTotalRows) - assertEquals(3, json.NumberLoadedRows) + assertEquals(4, json.NumberTotalRows) + assertEquals(4, json.NumberLoadedRows) assertEquals(0, json.NumberFilteredRows) assertEquals(0, json.NumberUnselectedRows) } @@ -75,8 +75,8 @@ suite("test_unique_table_new_sequence") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) - assertEquals(3, json.NumberTotalRows) - assertEquals(3, json.NumberLoadedRows) + assertEquals(4, json.NumberTotalRows) + assertEquals(4, json.NumberLoadedRows) assertEquals(0, json.NumberFilteredRows) assertEquals(0, json.NumberUnselectedRows) } diff --git a/regression-test/suites/data_model_p0/unique/test_unique_table_sequence.groovy b/regression-test/suites/data_model_p0/unique/test_unique_table_sequence.groovy index 938abd6c76..91645b8118 100644 --- a/regression-test/suites/data_model_p0/unique/test_unique_table_sequence.groovy +++ b/regression-test/suites/data_model_p0/unique/test_unique_table_sequence.groovy @@ -49,8 +49,8 @@ suite("test_unique_table_sequence") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) - assertEquals(3, json.NumberTotalRows) - assertEquals(3, json.NumberLoadedRows) + assertEquals(4, json.NumberTotalRows) + assertEquals(4, json.NumberLoadedRows) assertEquals(0, json.NumberFilteredRows) assertEquals(0, json.NumberUnselectedRows) } @@ -76,8 +76,8 @@ suite("test_unique_table_sequence") { log.info("Stream load result: ${result}".toString()) def json = parseJson(result) assertEquals("success", json.Status.toLowerCase()) - assertEquals(3, json.NumberTotalRows) - assertEquals(3, json.NumberLoadedRows) + assertEquals(4, json.NumberTotalRows) + assertEquals(4, json.NumberLoadedRows) assertEquals(0, json.NumberFilteredRows) assertEquals(0, json.NumberUnselectedRows) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org