This is an automated email from the ASF dual-hosted git repository.
airborne pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 2ddaab3b469 [cherry-pick](branch-2.0) fix inverted index file size
(#37838)
2ddaab3b469 is described below
commit 2ddaab3b4692f995a9b1292a12bed0efe2254551
Author: Sun Chenyang <[email protected]>
AuthorDate: Tue Jul 16 11:28:56 2024 +0800
[cherry-pick](branch-2.0) fix inverted index file size (#37838)
## Proposed changes
pick from master #37232
---
be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 4 +-
.../suites/inverted_index_p0/test_show_data.groovy | 195 ++++++++++++++++++++-
2 files changed, 195 insertions(+), 4 deletions(-)
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index 33d638b63f3..05730ec9f3a 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -151,8 +151,7 @@ Status VerticalBetaRowsetWriter::_flush_columns(
_segment_num_rows.resize(_cur_writer_idx + 1);
_segment_num_rows[_cur_writer_idx] =
_segment_writers[_cur_writer_idx]->row_count();
}
- _total_index_size +=
- static_cast<int64_t>(index_size) +
(*segment_writer)->get_inverted_index_file_size();
+ _total_index_size += static_cast<int64_t>(index_size);
return Status::OK();
}
@@ -220,6 +219,7 @@ Status VerticalBetaRowsetWriter::final_flush() {
return st;
}
_total_data_size += segment_size +
segment_writer->get_inverted_index_file_size();
+ _total_index_size += segment_writer->get_inverted_index_file_size();
segment_writer.reset();
}
return Status::OK();
diff --git a/regression-test/suites/inverted_index_p0/test_show_data.groovy
b/regression-test/suites/inverted_index_p0/test_show_data.groovy
index 339152d378d..4db0843b7c2 100644
--- a/regression-test/suites/inverted_index_p0/test_show_data.groovy
+++ b/regression-test/suites/inverted_index_p0/test_show_data.groovy
@@ -41,7 +41,7 @@ suite("test_show_data", "p0") {
}
def load_httplogs_data = {table_name, label, read_flag, format_flag,
file_name, ignore_failure=false,
- expected_succ_rows = -1, load_to_single_tablet =
'true' ->
+ expected_succ_rows = -1 ->
// load the json data
streamLoad {
@@ -152,4 +152,195 @@ suite("test_show_data", "p0") {
} finally {
//try_sql("DROP TABLE IF EXISTS ${testTable}")
}
-}
\ No newline at end of file
+}
+
+suite("test_show_data_with_compaction", "p0") {
+ // define a sql table
+ def tableWithIndexCompaction = "test_with_index_compaction"
+ def tableWithOutIndexCompaction = "test_without_index_compaction"
+ def delta_time = 5000
+ def timeout = 60000
+ def alter_res = "null"
+ def useTime = 0
+ String database = context.config.getDbNameByFile(context.file)
+ boolean invertedIndexCompactionEnable = true
+
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+
+ logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == "inverted_index_compaction_enable") {
+ invertedIndexCompactionEnable =
Boolean.parseBoolean(((List<String>) ele)[2])
+ logger.info("inverted_index_compaction_enable: ${((List<String>)
ele)[2]}")
+ }
+ }
+
+ def set_be_config = { key, value ->
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ }
+
+ def create_table_with_index = {testTablex ->
+ // multi-line sql
+ def result = sql """
+ CREATE TABLE IF NOT EXISTS ${testTablex} (
+ `@timestamp` int(11) NULL,
+ `clientip` varchar(20) NULL,
+ `request` text NULL,
+ `status` int(11) NULL,
+ `size` int(11) NULL,
+ INDEX request_idx (`request`) USING INVERTED
PROPERTIES("parser"="english") COMMENT ''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`@timestamp`)
+ DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default:
1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+ }
+
+ def load_httplogs_data = {table_name, label, read_flag, format_flag,
file_name, ignore_failure=false,
+ expected_succ_rows = -1 ->
+
+ // load the json data
+ streamLoad {
+ table "${table_name}"
+
+ // set http request header params
+ set 'label', label + "_" + UUID.randomUUID().toString()
+ set 'read_json_by_line', read_flag
+ set 'format', format_flag
+ file file_name // import json file
+ time 10000 // limit inflight 10s
+ if (expected_succ_rows >= 0) {
+ set 'max_filter_ratio', '1'
+ }
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (ignore_failure && expected_succ_rows < 0) { return }
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ if (expected_succ_rows >= 0) {
+ assertEquals(json.NumberLoadedRows, expected_succ_rows)
+ } else {
+ assertEquals(json.NumberTotalRows,
json.NumberLoadedRows + json.NumberUnselectedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes
> 0)
+ }
+ }
+ }
+ }
+
+ def wait_for_show_data_finish = { table_name, OpTimeout, origin_size,
maxRetries = 5 ->
+ def size = origin_size
+ def retries = 0
+ def last_size = origin_size
+
+ while (retries < maxRetries) {
+ for (int t = 0; t < OpTimeout; t += delta_time) {
+ def result = sql """show data from
${database}.${table_name};"""
+ if (result.size() > 0) {
+ logger.info(table_name + " show data, detail: " +
result[0].toString())
+ size = result[0][2].replace(" KB", "").toDouble()
+ }
+ useTime += delta_time
+ Thread.sleep(delta_time)
+
+ // If size changes, break the for loop to check in the next
while iteration
+ if (size != origin_size && size != last_size) {
+ break
+ }
+ }
+
+ if (size != last_size) {
+ last_size = size
+ } else {
+ // If size didn't change during the last OpTimeout period,
return size
+ if (size != origin_size) {
+ return size
+ }
+ }
+
+ retries++
+ }
+ return "wait_timeout"
+ }
+
+
+ try {
+
+ def run_compaction_and_wait = { tableName ->
+
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
+ def tablets = sql_return_maparray """ show tablets from
${tableName}; """
+
+ // trigger compactions for all tablets in ${tableName}
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ backend_id = tablet.BackendId
+ (code, out, err) =
be_run_full_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out +
", err=" + err)
+ assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ if (compactJson.status.toLowerCase() == "fail") {
+ logger.info("Compaction was done automatically!")
+ } else {
+ assertEquals("success", compactJson.status.toLowerCase())
+ }
+ }
+
+ // wait for all compactions done
+ for (def tablet in tablets) {
+ boolean running = true
+ do {
+ Thread.sleep(1000)
+ String tablet_id = tablet.TabletId
+ backend_id = tablet.BackendId
+ (code, out, err) =
be_get_compaction_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Get compaction status: code=" + code + ",
out=" + out + ", err=" + err)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success",
compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+ }
+
+ set_be_config.call("inverted_index_compaction_enable", "false")
+ sql "DROP TABLE IF EXISTS ${tableWithIndexCompaction}"
+ create_table_with_index.call(tableWithIndexCompaction)
+
+ load_httplogs_data.call(tableWithIndexCompaction, '1', 'true', 'json',
'documents-1000.json')
+ load_httplogs_data.call(tableWithIndexCompaction, '2', 'true', 'json',
'documents-1000.json')
+ load_httplogs_data.call(tableWithIndexCompaction, '3', 'true', 'json',
'documents-1000.json')
+ load_httplogs_data.call(tableWithIndexCompaction, '4', 'true', 'json',
'documents-1000.json')
+ load_httplogs_data.call(tableWithIndexCompaction, '5', 'true', 'json',
'documents-1000.json')
+
+ sql "sync"
+
+ run_compaction_and_wait(tableWithIndexCompaction)
+ def with_index_size =
wait_for_show_data_finish(tableWithIndexCompaction, 60000, 0)
+ assertTrue(with_index_size != "wait_timeout")
+
+ } finally {
+ // sql "DROP TABLE IF EXISTS ${tableWithIndexCompaction}"
+ set_be_config.call("inverted_index_compaction_enable",
invertedIndexCompactionEnable.toString())
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]