This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new c3f80f191bf [Fix](partial update) Fix core when doing partial update
on tables with row column after schema change #26632 (#26695)
c3f80f191bf is described below
commit c3f80f191bf580dd54502122990ab617d877cecc
Author: bobhan1 <[email protected]>
AuthorDate: Fri Nov 10 09:58:39 2023 +0800
[Fix](partial update) Fix core when doing partial update on tables with row
column after schema change #26632 (#26695)
---
be/src/olap/rowset/segment_v2/segment_writer.cpp | 4 +-
be/src/olap/tablet.cpp | 14 ++--
be/src/olap/tablet.h | 6 +-
...test_partial_update_schema_change_row_store.out | 76 ++++++++++++++++++++++
.../test_partial_update_schema_change.groovy | 7 +-
..._partial_update_schema_change_row_store.groovy} | 69 ++++++++++++--------
6 files changed, 139 insertions(+), 37 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index add9e7f8697..8e7bd5a90eb 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -578,8 +578,8 @@ Status
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
read_index[id_and_pos.pos] = read_idx++;
}
if (has_row_column) {
- auto st = _tablet->fetch_value_through_row_column(rowset,
seg_it.first, rids,
-
cids_missing, old_value_block);
+ auto st = _tablet->fetch_value_through_row_column(
+ rowset, *_tablet_schema, seg_it.first, rids,
cids_missing, old_value_block);
if (!st.ok()) {
LOG(WARNING) << "failed to fetch value through row column";
return st;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 17f522a3b84..7c9a6ecfaa6 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2686,7 +2686,8 @@ void Tablet::update_max_version_schema(const
TabletSchemaSPtr& tablet_schema) {
}
// fetch value by row column
-Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset,
uint32_t segid,
+Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset,
+ const TabletSchema&
tablet_schema, uint32_t segid,
const std::vector<uint32_t>&
rowids,
const std::vector<uint32_t>&
cids,
vectorized::Block& block) {
@@ -2694,7 +2695,6 @@ Status
Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint
BetaRowsetSharedPtr rowset =
std::static_pointer_cast<BetaRowset>(input_rowset);
CHECK(rowset);
- const TabletSchemaSPtr tablet_schema = rowset->tablet_schema();
SegmentCacheHandle segment_cache;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset,
&segment_cache, true));
// find segment
@@ -2713,10 +2713,10 @@ Status
Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint
LOG_EVERY_N(INFO, 500) << "fetch_value_by_rowids, cost(us):" <<
watch.elapsed_time() / 1000
<< ", row_batch_size:" << rowids.size();
});
- CHECK(tablet_schema->store_row_column());
+ CHECK(tablet_schema.store_row_column());
// create _source column
std::unique_ptr<segment_v2::ColumnIterator> column_iterator;
-
RETURN_IF_ERROR(segment->new_column_iterator(tablet_schema->column(BeConsts::ROW_STORE_COL),
+
RETURN_IF_ERROR(segment->new_column_iterator(tablet_schema.column(BeConsts::ROW_STORE_COL),
&column_iterator));
segment_v2::ColumnIteratorOptions opt;
OlapReaderStatistics stats;
@@ -2735,7 +2735,7 @@ Status
Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint
std::vector<std::string> default_values;
default_values.resize(cids.size());
for (int i = 0; i < cids.size(); ++i) {
- const TabletColumn& column = tablet_schema->column(cids[i]);
+ const TabletColumn& column = tablet_schema.column(cids[i]);
vectorized::DataTypePtr type =
vectorized::DataTypeFactory::instance().create_data_type(column);
col_uid_to_idx[column.unique_id()] = i;
@@ -3253,8 +3253,8 @@ Status Tablet::read_columns_by_plan(TabletSchemaSPtr
tablet_schema,
(*read_index)[id_and_pos.pos] = read_idx++;
}
if (has_row_column) {
- auto st = fetch_value_through_row_column(rowset_iter->second,
seg_it.first, rids,
- cids_to_read, block);
+ auto st = fetch_value_through_row_column(rowset_iter->second,
*tablet_schema,
+ seg_it.first, rids,
cids_to_read, block);
if (!st.ok()) {
LOG(WARNING) << "failed to fetch value through row column";
return st;
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 7fbf2b9a98f..f7176bd4bcc 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -442,7 +442,11 @@ public:
const TabletColumn& tablet_column,
vectorized::MutableColumnPtr& dst);
- Status fetch_value_through_row_column(RowsetSharedPtr input_rowset,
uint32_t segid,
+ // We use the TabletSchema from the caller because the TabletSchema in the
rowset'meta
+ // may be outdated due to schema change. Also note that the the cids
should indicate the indexes
+ // of the columns in the TabletSchema passed in.
+ Status fetch_value_through_row_column(RowsetSharedPtr input_rowset,
+ const TabletSchema& tablet_schema,
uint32_t segid,
const std::vector<uint32_t>& rowids,
const std::vector<uint32_t>& cids,
vectorized::Block& block);
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.out
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.out
new file mode 100644
index 00000000000..86df3374712
--- /dev/null
+++
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.out
@@ -0,0 +1,76 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql1 --
+1 0 0 0 0 0 0 0 0 0
+
+-- !sql2 --
+1 1 1 0 0 0 0 0 0 0 0
+
+-- !sql3 --
+1 1 1 0 0 0 0 0 0 0
10
+
+-- !sql4 --
+1 0 0 0 0 0 0 0 0 0
+
+-- !sql5 --
+1 1 1 0 0 0 0 0 0
+
+-- !sql6 --
+1 2 1 0 0 0 0 1 0
+
+-- !sql7 --
+1 0 0 0 0 0 0 0 0 0
+
+-- !sql8 --
+1 1 1.0 0 0 0 0 0 0 0
+
+-- !sql9 --
+1
+
+-- !sql10 --
+1 0 0 0 0 0 0 0 0 0
+
+-- !sql11 --
+1 1 1 0 0 0 0 0 0 0
+
+-- !sql12 --
+1 0 0 0 0 0 0 0 0 0
+
+-- !sql13 --
+1 1 1 0 0 0 0 0 0 0
+
+-- !sql14 --
+1 0 0 0 0 0 0 0 0 0
+
+-- !sql15 --
+1 1 1 0 0 0 0 0 0 0 0
+
+-- !sql16 --
+1 1 1 0 0 0 0 0 0 0
10
+
+-- !sql17 --
+1 0 0 0 0 0 0 0 0 0
+
+-- !sql18 --
+1 1 1 0 0 0 0 0 0
+
+-- !sql19 --
+1 0 0 0 0 0 0 0 0 0
+
+-- !sql20 --
+1 1 1.0 0 0 0 0 0 0 0
+
+-- !sql21 --
+1
+
+-- !sql23 --
+1 0 0 0 0 0 0 0 0 0
+
+-- !sql24 --
+1 1 1 0 0 0 0 0 0 0
+
+-- !sql25 --
+1 0 0 0 0 0 0 0 0 0
+
+-- !sql26 --
+1 1 1 0 0 0 0 0 0 0
+
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
index 332662af4f4..4bce896ed48 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
@@ -17,7 +17,10 @@
// under the License.
suite("test_partial_update_schema_change", "p0") {
- // test add value column
+
+ /* ============================================== light schema change
cases: ============================================== */
+
+ // test add value column
def tableName = "test_partial_update_light_schema_change_add_column"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
@@ -618,6 +621,8 @@ suite("test_partial_update_schema_change", "p0") {
sql """ DROP TABLE IF EXISTS ${tableName} """
+ /* ============================================== schema change cases:
============================================== */
+
// test add value column
tableName = "test_partial_update_schema_change_add_column"
sql """ DROP TABLE IF EXISTS ${tableName} """
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy
similarity index 94%
copy from
regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
copy to
regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy
index 332662af4f4..1fa4ec39d36 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy
@@ -16,9 +16,12 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_partial_update_schema_change", "p0") {
- // test add value column
- def tableName = "test_partial_update_light_schema_change_add_column"
+suite("test_partial_update_row_store_schema_change", "p0") {
+
+ /* ============================================== light schema change
cases: ============================================== */
+
+ // test add value column
+ def tableName =
"test_partial_update_row_store_light_schema_change_add_column"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
@@ -36,6 +39,7 @@ suite("test_partial_update_schema_change", "p0") {
PROPERTIES(
"replication_num" = "1",
"light_schema_change" = "true",
+ "store_row_column" = "true",
"enable_unique_key_merge_on_write" = "true")
"""
@@ -70,7 +74,7 @@ suite("test_partial_update_schema_change", "p0") {
def try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
- Thread.sleep(1000)
+ Thread.sleep(1200)
if(res[0][9].toString() == "FINISHED"){
break;
}
@@ -140,7 +144,7 @@ suite("test_partial_update_schema_change", "p0") {
// test delete value column
- tableName = "test_partial_update_light_schema_change_delete_column"
+ tableName =
"test_partial_update_row_store_light_schema_change_delete_column"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
@@ -158,6 +162,7 @@ suite("test_partial_update_schema_change", "p0") {
PROPERTIES(
"replication_num" = "1",
"light_schema_change" = "true",
+ "store_row_column" = "true",
"enable_unique_key_merge_on_write" = "true")
"""
@@ -192,7 +197,7 @@ suite("test_partial_update_schema_change", "p0") {
try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
- Thread.sleep(1000)
+ Thread.sleep(1200)
if(res[0][9].toString() == "FINISHED"){
break;
}
@@ -262,7 +267,7 @@ suite("test_partial_update_schema_change", "p0") {
// test update value column
- tableName = "test_partial_update_light_schema_change_update_column"
+ tableName =
"test_partial_update_row_store_light_schema_change_update_column"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
@@ -280,6 +285,7 @@ suite("test_partial_update_schema_change", "p0") {
PROPERTIES(
"replication_num" = "1",
"light_schema_change" = "true",
+ "store_row_column" = "true",
"enable_unique_key_merge_on_write" = "true")
"""
@@ -314,7 +320,7 @@ suite("test_partial_update_schema_change", "p0") {
try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
- Thread.sleep(1000)
+ Thread.sleep(1200)
if(res[0][9].toString() == "FINISHED"){
break;
}
@@ -354,7 +360,7 @@ suite("test_partial_update_schema_change", "p0") {
// test add key column
- tableName = "test_partial_update_light_schema_change_add_key_column"
+ tableName =
"test_partial_update_row_store_light_schema_change_add_key_column"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
@@ -364,6 +370,7 @@ suite("test_partial_update_schema_change", "p0") {
PROPERTIES(
"replication_num" = "1",
"light_schema_change" = "true",
+ "store_row_column" = "true",
"enable_unique_key_merge_on_write" = "true")
"""
@@ -398,7 +405,7 @@ suite("test_partial_update_schema_change", "p0") {
try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
- Thread.sleep(1000)
+ Thread.sleep(1200)
if(res[0][9].toString() == "FINISHED"){
break;
}
@@ -410,7 +417,7 @@ suite("test_partial_update_schema_change", "p0") {
try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
- Thread.sleep(1000)
+ Thread.sleep(1200)
if(res[0][9].toString() == "FINISHED"){
break;
}
@@ -449,7 +456,7 @@ suite("test_partial_update_schema_change", "p0") {
// test create index
- tableName = "test_partial_update_light_schema_change_create_index"
+ tableName =
"test_partial_update_row_store_light_schema_change_create_index"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
@@ -467,6 +474,7 @@ suite("test_partial_update_schema_change", "p0") {
PROPERTIES(
"replication_num" = "1",
"light_schema_change" = "true",
+ "store_row_column" = "true",
"enable_unique_key_merge_on_write" = "true")
"""
@@ -501,7 +509,7 @@ suite("test_partial_update_schema_change", "p0") {
try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
- Thread.sleep(1000)
+ Thread.sleep(1200)
if(res[0][9].toString() == "FINISHED"){
break;
}
@@ -540,7 +548,7 @@ suite("test_partial_update_schema_change", "p0") {
sql """ DROP TABLE IF EXISTS ${tableName} """
// test change properties
- tableName = "test_partial_update_light_schema_change_properties"
+ tableName = "test_partial_update_row_store_light_schema_change_properties"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
@@ -558,6 +566,7 @@ suite("test_partial_update_schema_change", "p0") {
PROPERTIES(
"replication_num" = "1",
"light_schema_change" = "true",
+ "store_row_column" = "true",
"enable_unique_key_merge_on_write" = "true")
"""
@@ -618,8 +627,10 @@ suite("test_partial_update_schema_change", "p0") {
sql """ DROP TABLE IF EXISTS ${tableName} """
+ /* ============================================== schema change cases:
============================================== */
+
// test add value column
- tableName = "test_partial_update_schema_change_add_column"
+ tableName = "test_partial_update_row_store_schema_change_add_column"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
@@ -636,6 +647,7 @@ suite("test_partial_update_schema_change", "p0") {
UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
+ "store_row_column" = "true",
"enable_unique_key_merge_on_write" = "true")
"""
@@ -670,7 +682,7 @@ suite("test_partial_update_schema_change", "p0") {
try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
- Thread.sleep(1000)
+ Thread.sleep(1200)
if(res[0][9].toString() == "FINISHED"){
break;
}
@@ -740,7 +752,7 @@ suite("test_partial_update_schema_change", "p0") {
// test delete value column
- tableName = "test_partial_update_schema_change_delete_column"
+ tableName = "test_partial_update_row_store_schema_change_delete_column"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
@@ -757,6 +769,7 @@ suite("test_partial_update_schema_change", "p0") {
UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
+ "store_row_column" = "true",
"enable_unique_key_merge_on_write" = "true")
"""
@@ -791,7 +804,7 @@ suite("test_partial_update_schema_change", "p0") {
try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
- Thread.sleep(1000)
+ Thread.sleep(1200)
if(res[0][9].toString() == "FINISHED"){
break;
}
@@ -857,7 +870,7 @@ suite("test_partial_update_schema_change", "p0") {
// test update value column
- tableName = "test_partial_update_schema_change_update_column"
+ tableName = "test_partial_update_row_store_schema_change_update_column"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
@@ -874,6 +887,7 @@ suite("test_partial_update_schema_change", "p0") {
UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
+ "store_row_column" = "true",
"enable_unique_key_merge_on_write" = "true")
"""
@@ -908,7 +922,7 @@ suite("test_partial_update_schema_change", "p0") {
try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
- Thread.sleep(1000)
+ Thread.sleep(1200)
if(res[0][9].toString() == "FINISHED"){
break;
}
@@ -948,7 +962,7 @@ suite("test_partial_update_schema_change", "p0") {
// test add key column
- tableName = "test_partial_update_schema_change_add_key_column"
+ tableName = "test_partial_update_row_store_schema_change_add_key_column"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
@@ -957,6 +971,7 @@ suite("test_partial_update_schema_change", "p0") {
UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
+ "store_row_column" = "true",
"enable_unique_key_merge_on_write" = "true")
"""
@@ -991,7 +1006,7 @@ suite("test_partial_update_schema_change", "p0") {
try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
- Thread.sleep(1000)
+ Thread.sleep(1200)
if(res[0][9].toString() == "FINISHED"){
break;
}
@@ -1002,7 +1017,7 @@ suite("test_partial_update_schema_change", "p0") {
try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
- Thread.sleep(1000)
+ Thread.sleep(1200)
if(res[0][9].toString() == "FINISHED"){
break;
}
@@ -1038,7 +1053,7 @@ suite("test_partial_update_schema_change", "p0") {
// test create index
- tableName = "test_partial_update_schema_change_create_index"
+ tableName = "test_partial_update_row_store_schema_change_create_index"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
@@ -1055,6 +1070,7 @@ suite("test_partial_update_schema_change", "p0") {
UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
+ "store_row_column" = "true",
"enable_unique_key_merge_on_write" = "true")
"""
@@ -1088,7 +1104,7 @@ suite("test_partial_update_schema_change", "p0") {
try_times=100
while(true){
def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName =
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
- Thread.sleep(1000)
+ Thread.sleep(1200)
if(res[0][9].toString() == "FINISHED"){
break;
}
@@ -1127,7 +1143,7 @@ suite("test_partial_update_schema_change", "p0") {
sql """ DROP TABLE IF EXISTS ${tableName} """
// test change properties
- tableName = "test_partial_update_schema_change_properties"
+ tableName = "test_partial_update_row_store_schema_change_properties"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
@@ -1144,6 +1160,7 @@ suite("test_partial_update_schema_change", "p0") {
UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
+ "store_row_column" = "true",
"enable_unique_key_merge_on_write" = "true")
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]