This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 63f7704140a [fix](group commit) Fix the timeout to wait wal finished
when schema change (#34021)
63f7704140a is described below
commit 63f7704140a9133b716bc17bb47dcb6cb1bc229c
Author: meiyi <[email protected]>
AuthorDate: Tue Apr 23 21:38:45 2024 +0800
[fix](group commit) Fix the timeout to wait wal finished when schema change
(#34021)
---
be/src/runtime/group_commit_mgr.cpp | 27 +++++----
.../doris/alter/MaterializedViewHandler.java | 16 +++---
.../data/insert_p0/insert_group_commit_into.out | 32 +++++------
.../insert_p0/insert_group_commit_into.groovy | 66 ++++++++++++++--------
4 files changed, 83 insertions(+), 58 deletions(-)
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 5ec20c10aef..62fbbd37979 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -74,11 +74,20 @@ Status LoadBlockQueue::add_block(RuntimeState*
runtime_state,
}
}
}
- if (_data_bytes >= _group_commit_data_bytes) {
- VLOG_DEBUG << "group commit meets commit condition for data size,
label=" << label
- << ", instance_id=" << load_instance_id << ", data_bytes="
<< _data_bytes;
- _need_commit = true;
- data_size_condition = true;
+ if (!_need_commit) {
+ if (_data_bytes >= _group_commit_data_bytes) {
+ VLOG_DEBUG << "group commit meets commit condition for data size,
label=" << label
+ << ", instance_id=" << load_instance_id << ",
data_bytes=" << _data_bytes;
+ _need_commit = true;
+ data_size_condition = true;
+ }
+ if
(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
-
+ _start_time)
+ .count() >= _group_commit_interval_ms) {
+ VLOG_DEBUG << "group commit meets commit condition for time
interval, label=" << label
+ << ", instance_id=" << load_instance_id << ",
data_bytes=" << _data_bytes;
+ _need_commit = true;
+ }
}
_get_cond.notify_all();
return Status::OK();
@@ -90,11 +99,9 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
*eos = false;
std::unique_lock l(mutex);
if (!_need_commit) {
- auto left_milliseconds =
- _group_commit_interval_ms -
std::chrono::duration_cast<std::chrono::milliseconds>(
-
std::chrono::steady_clock::now() - _start_time)
- .count();
- if (left_milliseconds <= 0) {
+ if
(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
-
+ _start_time)
+ .count() >= _group_commit_interval_ms) {
_need_commit = true;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index b28fb4d9f7b..98b6f8e4e95 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -187,6 +187,9 @@ public class MaterializedViewHandler extends AlterHandler {
*/
public void processCreateMaterializedView(CreateMaterializedViewStmt
addMVClause, Database db, OlapTable olapTable)
throws DdlException, AnalysisException {
+ // wait wal delete
+
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
+
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
olapTable.writeLockOrDdlException();
try {
olapTable.checkNormalStateForAlter();
@@ -222,10 +225,6 @@ public class MaterializedViewHandler extends AlterHandler {
olapTable.setState(OlapTableState.ROLLUP);
- // wait wal delete
-
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
-
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
-
Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2);
LOG.info("finished to create materialized view job: {}",
rollupJobV2.getJobId());
} finally {
@@ -248,6 +247,11 @@ public class MaterializedViewHandler extends AlterHandler {
public void processBatchAddRollup(String rawSql, List<AlterClause>
alterClauses, Database db, OlapTable olapTable)
throws DdlException, AnalysisException {
checkReplicaCount(olapTable);
+
+ // wait wal delete
+
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
+
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
+
Map<String, RollupJobV2> rollupNameJobMap = new LinkedHashMap<>();
// save job id for log
Set<Long> logJobIdSet = new HashSet<>();
@@ -309,10 +313,6 @@ public class MaterializedViewHandler extends AlterHandler {
// but this order is more reasonable
olapTable.setState(OlapTableState.ROLLUP);
- // wait wal delete
-
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
-
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
-
// 2 batch submit rollup job
List<AlterJobV2> rollupJobV2List = new
ArrayList<>(rollupNameJobMap.values());
batchAddAlterJobV2(rollupJobV2List);
diff --git a/regression-test/data/insert_p0/insert_group_commit_into.out
b/regression-test/data/insert_p0/insert_group_commit_into.out
index 7c39453d473..8c0cc138c47 100644
--- a/regression-test/data/insert_p0/insert_group_commit_into.out
+++ b/regression-test/data/insert_p0/insert_group_commit_into.out
@@ -1,5 +1,5 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
--- !sql --
+-- !select1 --
1 a 10
2 b -1
3 c -1
@@ -7,7 +7,7 @@
5 q 50
6 \N -1
--- !sql --
+-- !select2 --
1 a 10
1 a 10
2 b -1
@@ -20,7 +20,7 @@
6 \N -1
6 \N -1
--- !sql --
+-- !select3 --
1 a \N 10
1 a \N 10
1 a \N 10
@@ -39,11 +39,11 @@
6 \N \N -1
6 \N \N -1
--- !sql --
+-- !select4 --
2 b \N -1
6 \N \N -1
--- !sql --
+-- !select5 --
1 a 10 5
2 b -1 \N
2 b -1 \N
@@ -53,7 +53,7 @@
6 \N -1 \N
6 \N -1 \N
--- !sql --
+-- !select6 --
1 a 10
1 a 10
2 b -1
@@ -69,7 +69,7 @@
6 \N -1
6 \N -1
--- !sql --
+-- !select7 --
\N -1
\N -1
\N -1
@@ -103,11 +103,11 @@ q 50
3 3 3
4 4 4
--- !sql --
+-- !select8 --
1 test
2 or
--- !sql --
+-- !select1 --
1 a 10
2 b -1
3 c -1
@@ -115,7 +115,7 @@ q 50
5 q 50
6 \N -1
--- !sql --
+-- !select2 --
1 a 10
1 a 10
2 b -1
@@ -128,7 +128,7 @@ q 50
6 \N -1
6 \N -1
--- !sql --
+-- !select3 --
1 a \N 10
1 a \N 10
1 a \N 10
@@ -147,11 +147,11 @@ q 50
6 \N \N -1
6 \N \N -1
--- !sql --
+-- !select4 --
2 b \N -1
6 \N \N -1
--- !sql --
+-- !select5 --
1 a 10 5
2 b -1 \N
2 b -1 \N
@@ -161,7 +161,7 @@ q 50
6 \N -1 \N
6 \N -1 \N
--- !sql --
+-- !select6 --
1 a 10
1 a 10
2 b -1
@@ -177,7 +177,7 @@ q 50
6 \N -1
6 \N -1
--- !sql --
+-- !select7 --
\N -1
\N -1
\N -1
@@ -211,7 +211,7 @@ q 50
3 3 3
4 4 4
--- !sql --
+-- !select8 --
1 test
2 or
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index dc681feb3f2..d5f484c565d 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -41,7 +41,7 @@ suite("insert_group_commit_into") {
sql "use ${dbName};"
while (true) {
sleep(2000)
- def state = sql " show alter table column where tablename =
'${tableName}' order by CreateTime desc "
+ def state = sql " show alter table column where tablename =
'${tableName}' order by CreateTime desc limit 1"
logger.info("alter table state: ${state}")
if (state.size() > 0 && state[0][9] == "FINISHED") {
return true
@@ -69,6 +69,24 @@ suite("insert_group_commit_into") {
return serverInfo
}
+ def group_commit_insert_with_retry = { sql, expected_row_count ->
+ def retry = 0
+ while (true){
+ try {
+ return group_commit_insert(sql, expected_row_count)
+ } catch (Exception e) {
+ logger.warn("group_commit_insert failed, retry: " + retry + ",
error: " + e.getMessage())
+ retry++
+ if (e.getMessage().contains("is blocked on schema change") &&
retry < 20) {
+ sleep(1500)
+ continue
+ } else {
+ throw e
+ }
+ }
+ }
+ }
+
def none_group_commit_insert = { sql, expected_row_count ->
def stmt = prepareStatement """ ${sql} """
def result = stmt.executeUpdate()
@@ -120,7 +138,7 @@ suite("insert_group_commit_into") {
group_commit_insert """ insert into ${table}(id) select 6;
""", 1
getRowCount(6)
- qt_sql """ select * from ${table} order by id, name, score
asc; """
+ order_qt_select1 """ select * from ${table} order by id, name,
score asc; """
// 2. insert into and delete
sql """ delete from ${table} where id = 4; """
@@ -134,19 +152,19 @@ suite("insert_group_commit_into") {
group_commit_insert """ insert into ${table}(id) select 6;
""", 1
getRowCount(11)
- qt_sql """ select * from ${table} order by id, name, score
asc; """
+ order_qt_select2 """ select * from ${table} order by id, name,
score asc; """
// 3. insert into and light schema change: add column
group_commit_insert """ insert into ${table}(name, id)
values('c', 3); """, 1
group_commit_insert """ insert into ${table}(id) values(4);
""", 1
group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50); """, 2
sql """ alter table ${table} ADD column age int after name; """
- group_commit_insert """ insert into ${table}(id, name)
values(2, 'b'); """, 1
- group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+ group_commit_insert_with_retry """ insert into ${table}(id,
name) values(2, 'b'); """, 1
+ group_commit_insert_with_retry """ insert into ${table}(id)
select 6; """, 1
assertTrue(getAlterTableState(), "add column should success")
getRowCount(17)
- qt_sql """ select * from ${table} order by id, name,score asc;
"""
+ order_qt_select3 """ select * from ${table} order by id,
name,score asc; """
// 4. insert into and truncate table
/*sql """ insert into ${table}(name, id) values('c', 3); """
@@ -157,43 +175,43 @@ suite("insert_group_commit_into") {
group_commit_insert """ insert into ${table}(id) select 6;
""", 1
getRowCount(2)
- qt_sql """ select * from ${table} order by id, name, score
asc; """
+ order_qt_select4 """ select * from ${table} order by id, name,
score asc; """
// 5. insert into and schema change: modify column order
group_commit_insert """ insert into ${table}(name, id)
values('c', 3); """, 1
group_commit_insert """ insert into ${table}(id) values(4);
""", 1
- group_commit_insert """ insert into ${table} values (1, 'a',
5, 10),(5, 'q', 6, 50); """, 2
- // sql """ alter table ${table} order by (id, name, score,
age); """
- group_commit_insert """ insert into ${table}(id, name)
values(2, 'b'); """, 1
- group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+ group_commit_insert """ insert into ${table}(id, name, age,
score) values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2
+ sql """ alter table ${table} order by (id, name, score, age);
"""
+ group_commit_insert_with_retry """ insert into ${table}(id,
name) values(2, 'b'); """, 1
+ group_commit_insert_with_retry """ insert into ${table}(id)
select 6; """, 1
- // assertTrue(getAlterTableState(), "modify column order
should success")
+ assertTrue(getAlterTableState(), "modify column order should
success")
getRowCount(8)
- qt_sql """ select id, name, score, age from ${table} order by
id, name, score asc; """
+ order_qt_select5 """ select id, name, score, age from ${table}
order by id, name, score asc; """
// 6. insert into and light schema change: drop column
group_commit_insert """ insert into ${table}(name, id)
values('c', 3); """, 1
group_commit_insert """ insert into ${table}(id) values(4);
""", 1
- group_commit_insert """ insert into ${table} values (1, 'a',
5, 10),(5, 'q', 6, 50); """, 2
+ group_commit_insert """ insert into ${table}(id, name, age,
score) values (1, 'a', 5, 10),(5, 'q', 6, 50); """, 2
sql """ alter table ${table} DROP column age; """
- group_commit_insert """ insert into ${table}(id, name)
values(2, 'b'); """, 1
- group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+ group_commit_insert_with_retry """ insert into ${table}(id,
name) values(2, 'b'); """, 1
+ group_commit_insert_with_retry """ insert into ${table}(id)
select 6; """, 1
assertTrue(getAlterTableState(), "drop column should success")
getRowCount(14)
- qt_sql """ select * from ${table} order by id, name, score
asc; """
+ order_qt_select6 """ select * from ${table} order by id, name,
score asc; """
// 7. insert into and add rollup
group_commit_insert """ insert into ${table}(name, id)
values('c', 3); """, 1
group_commit_insert """ insert into ${table}(id) values(4);
""", 1
group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50),(101, 'a', 100); """, 2
- // sql """ alter table ${table} ADD ROLLUP r1(name, score); """
- group_commit_insert """ insert into ${table}(id, name)
values(2, 'b'); """, 1
- group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+ sql """ alter table ${table} ADD ROLLUP r1(name, score); """
+ group_commit_insert_with_retry """ insert into ${table}(id,
name) values(2, 'b'); """, 1
+ group_commit_insert_with_retry """ insert into ${table}(id)
select 6; """, 1
getRowCount(20)
- qt_sql """ select name, score from ${table} order by name asc;
"""
-
+ order_qt_select7 """ select name, score from ${table} order by
name asc; """
+ assertTrue(getAlterTableState(), "add rollup should success")
/*if (item == "nereids") {
group_commit_insert """ insert into ${table}(id, name,
score) values(10 + 1, 'h', 100); """, 1
@@ -208,7 +226,7 @@ suite("insert_group_commit_into") {
def rowCount = sql "select count(*) from ${table}"
logger.info("row count: " + rowCount)
- assertEquals(rowCount[0][0], 23)
+ assertEquals(23, rowCount[0][0])
}
} finally {
// try_sql("DROP TABLE ${table}")
@@ -476,7 +494,7 @@ suite("insert_group_commit_into") {
group_commit_insert """ insert into ${table} values(1,
'test'); """, 1
group_commit_insert """ insert into ${table}(k1,`or`) values
(2,"or"); """, 1
getRowCount(2)
- qt_sql """ select * from ${table}; """
+ order_qt_select8 """ select * from ${table}; """
}
} finally {
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]