This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 89845dc916d [fix] (move-memtable) fix errors caused by checks in the
inverted index file writer (#37621)
89845dc916d is described below
commit 89845dc916d6d8c29f8a06e6af226e0d3e6dc40b
Author: Sun Chenyang <[email protected]>
AuthorDate: Thu Jul 11 19:43:57 2024 +0800
[fix] (move-memtable) fix errors caused by checks in the inverted index
file writer (#37621)
## Proposed changes

1. thread A flush segment 0, flush inverted index 0
2. thread B flush segment 1
3. thread A add segment 0, dst be will check inverted index file size ==
segment file size, it will fail at this point.
check the inverted index file size == the segment file size during
`close()`
---
be/src/runtime/load_stream_writer.cpp | 15 ++--
.../test_move_memtable_multi_segment_index.out | 4 +
.../test_move_memtable_multi_segment_index.groovy | 100 +++++++++++++++++++++
.../suites/variant_github_events_p0/load.groovy | 1 +
.../variant_github_events_p0_new/load.groovy | 1 +
regression-test/suites/variant_p0/load.groovy | 1 +
6 files changed, 114 insertions(+), 8 deletions(-)
diff --git a/be/src/runtime/load_stream_writer.cpp
b/be/src/runtime/load_stream_writer.cpp
index 925229a43ce..3e66787a9bd 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -187,13 +187,6 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const
SegmentStatistics& st
if (!_is_init) {
return Status::Corruption("add_segment failed, LoadStreamWriter is
not inited");
}
- if (_inverted_file_writers.size() > 0 &&
- _inverted_file_writers.size() != _segment_file_writers.size()) {
- return Status::Corruption(
- "add_segment failed, inverted file writer size is {},"
- "segment file writer size is {}",
- _inverted_file_writers.size(),
_segment_file_writers.size());
- }
DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid",
{ segid = _segment_file_writers.size(); });
RETURN_IF_ERROR(_calc_file_size(segid, FileType::SEGMENT_FILE,
&segment_file_size));
@@ -255,7 +248,13 @@ Status LoadStreamWriter::close() {
if (_is_canceled) {
return Status::InternalError("flush segment failed");
}
-
+ if (_inverted_file_writers.size() > 0 &&
+ _inverted_file_writers.size() != _segment_file_writers.size()) {
+ return Status::Corruption(
+ "LoadStreamWriter close failed, inverted file writer size is
{},"
+ "segment file writer size is {}",
+ _inverted_file_writers.size(), _segment_file_writers.size());
+ }
for (const auto& writer : _segment_file_writers) {
if (writer->state() != io::FileWriter::State::CLOSED) {
return Status::Corruption("LoadStreamWriter close failed, segment
{} is not closed",
diff --git
a/regression-test/data/load/insert/test_move_memtable_multi_segment_index.out
b/regression-test/data/load/insert/test_move_memtable_multi_segment_index.out
new file mode 100644
index 00000000000..c569d3acd09
--- /dev/null
+++
b/regression-test/data/load/insert/test_move_memtable_multi_segment_index.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_select_count --
+67843
+
diff --git
a/regression-test/suites/load/insert/test_move_memtable_multi_segment_index.groovy
b/regression-test/suites/load/insert/test_move_memtable_multi_segment_index.groovy
new file mode 100644
index 00000000000..ac225812484
--- /dev/null
+++
b/regression-test/suites/load/insert/test_move_memtable_multi_segment_index.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_move_memtable_multi_segment_index", "nonConcurrent"){
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+ def set_be_config = { key, value ->
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ }
+ def load_json_data = {table_name, file_name ->
+ // load the json data
+ streamLoad {
+ table "${table_name}"
+
+ // set http request header params
+ set 'read_json_by_line', 'true'
+ set 'format', 'json'
+ set 'max_filter_ratio', '0.1'
+ set 'memtable_on_sink_node', 'true'
+ file file_name // import json file
+ time 10000 // limit inflight 10s
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ logger.info("Stream load ${file_name} result:
${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ // assertEquals(json.NumberTotalRows, json.NumberLoadedRows +
json.NumberUnselectedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ }
+ try {
+ set_be_config("write_buffer_size", "2097152")
+ def table_name = "github_events"
+ sql """DROP TABLE IF EXISTS ${table_name}"""
+ table_name = "github_events"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ k bigint,
+ v variant,
+ INDEX idx_var(v) USING INVERTED PROPERTIES("parser" =
"english") COMMENT ''
+ )
+ DUPLICATE KEY(`k`)
+ DISTRIBUTED BY HASH(k) BUCKETS 1
+ properties("replication_num" = "1", "disable_auto_compaction" =
"true");
+ """
+
+ load_json_data.call(table_name, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-0.json'}""")
+ load_json_data.call(table_name, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-1.json'}""")
+ load_json_data.call(table_name, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-2.json'}""")
+ load_json_data.call(table_name, """${getS3Url() +
'/regression/gharchive.m/2015-01-01-3.json'}""")
+ load_json_data.call(table_name, """${getS3Url() +
'/regression/gharchive.m/2022-11-07-16.json'}""")
+ load_json_data.call(table_name, """${getS3Url() +
'/regression/gharchive.m/2022-11-07-10.json'}""")
+ load_json_data.call(table_name, """${getS3Url() +
'/regression/gharchive.m/2022-11-07-22.json'}""")
+ load_json_data.call(table_name, """${getS3Url() +
'/regression/gharchive.m/2022-11-07-23.json'}""")
+
+ sql """DROP TABLE IF EXISTS github_events_2"""
+ sql """
+ CREATE TABLE IF NOT EXISTS `github_events_2` (
+ `k` BIGINT NULL,
+ `v` text NULL,
+ INDEX idx_var (`v`) USING INVERTED PROPERTIES("parser" =
"english") COMMENT ''
+ ) ENGINE = OLAP DUPLICATE KEY(`k`) COMMENT 'OLAP' DISTRIBUTED BY
HASH(`k`) BUCKETS 1 PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """
+ insert into github_events_2 select 1, cast(v["repo"]["name"] as
string) FROM github_events;
+ """
+ qt_sql_select_count """ select count(*) from github_events_2; """
+ } finally {
+ set_be_config("write_buffer_size", "209715200")
+ }
+
+}
diff --git a/regression-test/suites/variant_github_events_p0/load.groovy
b/regression-test/suites/variant_github_events_p0/load.groovy
index cec4fb69fd4..8159a5752e5 100644
--- a/regression-test/suites/variant_github_events_p0/load.groovy
+++ b/regression-test/suites/variant_github_events_p0/load.groovy
@@ -127,6 +127,7 @@ suite("regression_test_variant_github_events_p0",
"nonConcurrent"){
set 'read_json_by_line', 'true'
set 'format', 'json'
set 'max_filter_ratio', '0.1'
+ set 'memtable_on_sink_node', 'true'
file file_name // import json file
time 10000 // limit inflight 10s
diff --git a/regression-test/suites/variant_github_events_p0_new/load.groovy
b/regression-test/suites/variant_github_events_p0_new/load.groovy
index c063ebecf26..4d8d304aeb1 100644
--- a/regression-test/suites/variant_github_events_p0_new/load.groovy
+++ b/regression-test/suites/variant_github_events_p0_new/load.groovy
@@ -34,6 +34,7 @@ suite("regression_test_variant_github_events_p0",
"nonConcurrent"){
set 'read_json_by_line', 'true'
set 'format', 'json'
set 'max_filter_ratio', '0.1'
+ set 'memtable_on_sink_node', 'true'
file file_name // import json file
time 10000 // limit inflight 10s
diff --git a/regression-test/suites/variant_p0/load.groovy
b/regression-test/suites/variant_p0/load.groovy
index cbd6bc1178c..c84620a3d91 100644
--- a/regression-test/suites/variant_p0/load.groovy
+++ b/regression-test/suites/variant_p0/load.groovy
@@ -26,6 +26,7 @@ suite("regression_test_variant", "nonConcurrent"){
set 'read_json_by_line', 'true'
set 'format', 'json'
set 'max_filter_ratio', '0.1'
+ set 'memtable_on_sink_node', 'true'
file file_name // import json file
time 10000 // limit inflight 10s
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]