This is an automated email from the ASF dual-hosted git repository.

airborne pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 2ddaab3b469 [cherry-pick](branch-2.0) fix inverted index file size 
(#37838)
2ddaab3b469 is described below

commit 2ddaab3b4692f995a9b1292a12bed0efe2254551
Author: Sun Chenyang <[email protected]>
AuthorDate: Tue Jul 16 11:28:56 2024 +0800

    [cherry-pick](branch-2.0) fix inverted index file size (#37838)
    
    ## Proposed changes
    
    pick from master #37232
---
 be/src/olap/rowset/vertical_beta_rowset_writer.cpp |   4 +-
 .../suites/inverted_index_p0/test_show_data.groovy | 195 ++++++++++++++++++++-
 2 files changed, 195 insertions(+), 4 deletions(-)

diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp 
b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index 33d638b63f3..05730ec9f3a 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -151,8 +151,7 @@ Status VerticalBetaRowsetWriter::_flush_columns(
         _segment_num_rows.resize(_cur_writer_idx + 1);
         _segment_num_rows[_cur_writer_idx] = 
_segment_writers[_cur_writer_idx]->row_count();
     }
-    _total_index_size +=
-            static_cast<int64_t>(index_size) + 
(*segment_writer)->get_inverted_index_file_size();
+    _total_index_size += static_cast<int64_t>(index_size);
     return Status::OK();
 }
 
@@ -220,6 +219,7 @@ Status VerticalBetaRowsetWriter::final_flush() {
             return st;
         }
         _total_data_size += segment_size + 
segment_writer->get_inverted_index_file_size();
+        _total_index_size += segment_writer->get_inverted_index_file_size();
         segment_writer.reset();
     }
     return Status::OK();
diff --git a/regression-test/suites/inverted_index_p0/test_show_data.groovy 
b/regression-test/suites/inverted_index_p0/test_show_data.groovy
index 339152d378d..4db0843b7c2 100644
--- a/regression-test/suites/inverted_index_p0/test_show_data.groovy
+++ b/regression-test/suites/inverted_index_p0/test_show_data.groovy
@@ -41,7 +41,7 @@ suite("test_show_data", "p0") {
     }
 
     def load_httplogs_data = {table_name, label, read_flag, format_flag, 
file_name, ignore_failure=false,
-                        expected_succ_rows = -1, load_to_single_tablet = 
'true' ->
+                        expected_succ_rows = -1 ->
 
         // load the json data
         streamLoad {
@@ -152,4 +152,195 @@ suite("test_show_data", "p0") {
     } finally {
         //try_sql("DROP TABLE IF EXISTS ${testTable}")
     }
-}
\ No newline at end of file
+}
+
+suite("test_show_data_with_compaction", "p0") {
+    // define a sql table
+    def tableWithIndexCompaction = "test_with_index_compaction"
+    def tableWithOutIndexCompaction = "test_without_index_compaction"
+    def delta_time = 5000
+    def timeout = 60000
+    def alter_res = "null"
+    def useTime = 0
+    String database = context.config.getDbNameByFile(context.file)
+    boolean invertedIndexCompactionEnable = true
+
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+    backend_id = backendId_to_backendIP.keySet()[0]
+    def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+    
+    logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
+    assertEquals(code, 0)
+    def configList = parseJson(out.trim())
+    assert configList instanceof List
+
+    for (Object ele in (List) configList) {
+        assert ele instanceof List<String>
+        if (((List<String>) ele)[0] == "inverted_index_compaction_enable") {
+            invertedIndexCompactionEnable = 
Boolean.parseBoolean(((List<String>) ele)[2])
+            logger.info("inverted_index_compaction_enable: ${((List<String>) 
ele)[2]}")
+        }
+    }
+
+    def set_be_config = { key, value ->
+        for (String backend_id: backendId_to_backendIP.keySet()) {
+            (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), key, value)
+            logger.info("update config: code=" + code + ", out=" + out + ", 
err=" + err)
+        }
+    }
+
+    def create_table_with_index = {testTablex ->
+        // multi-line sql
+        def result = sql """
+                        CREATE TABLE IF NOT EXISTS ${testTablex} (
+                          `@timestamp` int(11) NULL,
+                          `clientip` varchar(20) NULL,
+                          `request` text NULL,
+                          `status` int(11) NULL,
+                          `size` int(11) NULL,
+                          INDEX request_idx (`request`) USING INVERTED 
PROPERTIES("parser"="english") COMMENT ''
+                        ) ENGINE=OLAP
+                        DUPLICATE KEY(`@timestamp`)
+                        DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1
+                        PROPERTIES (
+                            "replication_allocation" = "tag.location.default: 
1",
+                            "disable_auto_compaction" = "true"
+                        );
+                        """
+    }
+
+    def load_httplogs_data = {table_name, label, read_flag, format_flag, 
file_name, ignore_failure=false,
+                        expected_succ_rows = -1 ->
+
+        // load the json data
+        streamLoad {
+            table "${table_name}"
+
+            // set http request header params
+            set 'label', label + "_" + UUID.randomUUID().toString()
+            set 'read_json_by_line', read_flag
+            set 'format', format_flag
+            file file_name // import json file
+            time 10000 // limit inflight 10s
+            if (expected_succ_rows >= 0) {
+                set 'max_filter_ratio', '1'
+            }
+
+            // if declared a check callback, the default check condition will 
ignore.
+            // So you must check all condition
+            check { result, exception, startTime, endTime ->
+                       if (ignore_failure && expected_succ_rows < 0) { return }
+                    if (exception != null) {
+                        throw exception
+                    }
+                    log.info("Stream load result: ${result}".toString())
+                    def json = parseJson(result)
+                    assertEquals("success", json.Status.toLowerCase())
+                    if (expected_succ_rows >= 0) {
+                        assertEquals(json.NumberLoadedRows, expected_succ_rows)
+                    } else {
+                        assertEquals(json.NumberTotalRows, 
json.NumberLoadedRows + json.NumberUnselectedRows)
+                        assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes 
> 0)
+                }
+            }
+        }
+    }
+
+    def wait_for_show_data_finish = { table_name, OpTimeout, origin_size, 
maxRetries = 5 ->
+        def size = origin_size
+        def retries = 0
+        def last_size = origin_size
+
+        while (retries < maxRetries) {
+            for (int t = 0; t < OpTimeout; t += delta_time) {
+                def result = sql """show data from 
${database}.${table_name};"""
+                if (result.size() > 0) {
+                    logger.info(table_name + " show data, detail: " + 
result[0].toString())
+                    size = result[0][2].replace(" KB", "").toDouble()
+                }
+                useTime += delta_time
+                Thread.sleep(delta_time)
+
+                // If size changes, break the for loop to check in the next 
while iteration
+                if (size != origin_size && size != last_size) {
+                    break
+                }
+            }
+
+            if (size != last_size) {
+                last_size = size
+            } else {
+                // If size didn't change during the last OpTimeout period, 
return size
+                if (size != origin_size) {
+                    return size
+                }
+            }
+
+            retries++
+        }
+        return "wait_timeout"
+    }
+
+
+    try {
+        
+        def run_compaction_and_wait = { tableName ->
+            
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
+            def tablets = sql_return_maparray """ show tablets from 
${tableName}; """
+
+            // trigger compactions for all tablets in ${tableName}
+            for (def tablet in tablets) {
+                String tablet_id = tablet.TabletId
+                backend_id = tablet.BackendId
+                (code, out, err) = 
be_run_full_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+                logger.info("Run compaction: code=" + code + ", out=" + out + 
", err=" + err)
+                assertEquals(code, 0)
+                def compactJson = parseJson(out.trim())
+                if (compactJson.status.toLowerCase() == "fail") {
+                    logger.info("Compaction was done automatically!")
+                } else {
+                    assertEquals("success", compactJson.status.toLowerCase())
+                }
+            }
+
+            // wait for all compactions done
+            for (def tablet in tablets) {
+                boolean running = true
+                do {
+                    Thread.sleep(1000)
+                    String tablet_id = tablet.TabletId
+                    backend_id = tablet.BackendId
+                    (code, out, err) = 
be_get_compaction_status(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+                    logger.info("Get compaction status: code=" + code + ", 
out=" + out + ", err=" + err)
+                    assertEquals(code, 0)
+                    def compactionStatus = parseJson(out.trim())
+                    assertEquals("success", 
compactionStatus.status.toLowerCase())
+                    running = compactionStatus.run_status
+                } while (running)
+            }
+        }
+
+        set_be_config.call("inverted_index_compaction_enable", "false")
+        sql "DROP TABLE IF EXISTS ${tableWithIndexCompaction}"
+        create_table_with_index.call(tableWithIndexCompaction)
+
+        load_httplogs_data.call(tableWithIndexCompaction, '1', 'true', 'json', 
'documents-1000.json')
+        load_httplogs_data.call(tableWithIndexCompaction, '2', 'true', 'json', 
'documents-1000.json')
+        load_httplogs_data.call(tableWithIndexCompaction, '3', 'true', 'json', 
'documents-1000.json')
+        load_httplogs_data.call(tableWithIndexCompaction, '4', 'true', 'json', 
'documents-1000.json')
+        load_httplogs_data.call(tableWithIndexCompaction, '5', 'true', 'json', 
'documents-1000.json')
+
+        sql "sync"
+
+        run_compaction_and_wait(tableWithIndexCompaction)
+        def with_index_size = 
wait_for_show_data_finish(tableWithIndexCompaction, 60000, 0)
+        assertTrue(with_index_size != "wait_timeout")
+
+    } finally {
+        // sql "DROP TABLE IF EXISTS ${tableWithIndexCompaction}"
+        set_be_config.call("inverted_index_compaction_enable", 
invertedIndexCompactionEnable.toString())
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to