This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e7454fc2a7 [improvement](binlog)Support inverted index format v2 in 
CCR (#33415)
8e7454fc2a7 is described below

commit 8e7454fc2a7ef1bf862eddf1a46ff935648af12d
Author: qiye <jianliang5...@gmail.com>
AuthorDate: Mon Apr 15 09:13:13 2024 +0800

    [improvement](binlog)Support inverted index format v2 in CCR (#33415)
---
 be/src/olap/rowset/beta_rowset.cpp                 |  49 +++--
 be/src/olap/snapshot_manager.cpp                   |  57 +++--
 be/src/olap/tablet.cpp                             |  19 +-
 be/src/service/backend_service.cpp                 | 104 ++++++---
 .../apache/doris/regression/suite/Syncer.groovy    |  10 +-
 .../test_binlog_config_change.groovy               | 217 +++++++++++++++++++
 .../inverted_index/test_get_binlog.groovy          | 239 +++++++++++++++++++++
 .../inverted_index/test_ingest_binlog.groovy       | 223 +++++++++++++++++++
 .../inverted_index/test_multi_buckets.groovy       | 180 ++++++++++++++++
 .../inverted_index/test_backup_restore.groovy      | 196 +++++++++++++++++
 10 files changed, 1220 insertions(+), 74 deletions(-)

diff --git a/be/src/olap/rowset/beta_rowset.cpp 
b/be/src/olap/rowset/beta_rowset.cpp
index 405cfb15af7..ac97fe0f28d 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -576,24 +576,41 @@ Status BetaRowset::add_to_binlog() {
         }
         linked_success_files.push_back(binlog_file);
 
-        for (const auto& index : _schema->indexes()) {
-            if (index.index_type() != IndexType::INVERTED) {
-                continue;
+        if (_schema->get_inverted_index_storage_format() == 
InvertedIndexStorageFormatPB::V1) {
+            for (const auto& index : _schema->indexes()) {
+                if (index.index_type() != IndexType::INVERTED) {
+                    continue;
+                }
+                auto index_id = index.index_id();
+                auto index_file = InvertedIndexDescriptor::get_index_file_name(
+                        seg_file, index_id, index.get_index_suffix());
+                auto binlog_index_file = (std::filesystem::path(binlog_dir) /
+                                          
std::filesystem::path(index_file).filename())
+                                                 .string();
+                VLOG_DEBUG << "link " << index_file << " to " << 
binlog_index_file;
+                if (!local_fs->link_file(index_file, binlog_index_file).ok()) {
+                    status = Status::Error<OS_ERROR>(
+                            "fail to create hard link. from={}, to={}, 
errno={}", index_file,
+                            binlog_index_file, Errno::no());
+                    return status;
+                }
+                linked_success_files.push_back(binlog_index_file);
             }
-            auto index_id = index.index_id();
-            auto index_file = InvertedIndexDescriptor::get_index_file_name(
-                    seg_file, index_id, index.get_index_suffix());
-            auto binlog_index_file = (std::filesystem::path(binlog_dir) /
-                                      
std::filesystem::path(index_file).filename())
-                                             .string();
-            VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file;
-            if (!local_fs->link_file(index_file, binlog_index_file).ok()) {
-                status = Status::Error<OS_ERROR>(
-                        "fail to create hard link. from={}, to={}, errno={}", 
index_file,
-                        binlog_index_file, Errno::no());
-                return status;
+        } else {
+            if (_schema->has_inverted_index()) {
+                auto index_file = 
InvertedIndexDescriptor::get_index_file_name(seg_file);
+                auto binlog_index_file = (std::filesystem::path(binlog_dir) /
+                                          
std::filesystem::path(index_file).filename())
+                                                 .string();
+                VLOG_DEBUG << "link " << index_file << " to " << 
binlog_index_file;
+                if (!local_fs->link_file(index_file, binlog_index_file).ok()) {
+                    status = Status::Error<OS_ERROR>(
+                            "fail to create hard link. from={}, to={}, 
errno={}", index_file,
+                            binlog_index_file, Errno::no());
+                    return status;
+                }
+                linked_success_files.push_back(binlog_index_file);
             }
-            linked_success_files.push_back(binlog_index_file);
         }
     }
 
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 1c06414a311..f19abfb0d28 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -663,26 +663,47 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
                 }
                 linked_success_files.push_back(snapshot_segment_file_path);
 
-                for (const auto& index : tablet_schema.indexes()) {
-                    if (index.index_type() != IndexType::INVERTED) {
-                        continue;
+                if (tablet_schema.get_inverted_index_storage_format() ==
+                    InvertedIndexStorageFormatPB::V1) {
+                    for (const auto& index : tablet_schema.indexes()) {
+                        if (index.index_type() != IndexType::INVERTED) {
+                            continue;
+                        }
+                        auto index_id = index.index_id();
+                        auto index_file = 
ref_tablet->get_segment_index_filepath(
+                                rowset_id, segment_index, index_id);
+                        auto snapshot_segment_index_file_path =
+                                fmt::format("{}/{}_{}_{}.binlog-index", 
schema_full_path, rowset_id,
+                                            segment_index, index_id);
+                        VLOG_DEBUG << "link " << index_file << " to "
+                                   << snapshot_segment_index_file_path;
+                        res = io::global_local_filesystem()->link_file(
+                                index_file, snapshot_segment_index_file_path);
+                        if (!res.ok()) {
+                            LOG(WARNING) << "fail to link binlog index file. 
[src=" << index_file
+                                         << ", dest=" << 
snapshot_segment_index_file_path << "]";
+                            break;
+                        }
+                        
linked_success_files.push_back(snapshot_segment_index_file_path);
                     }
-                    auto index_id = index.index_id();
-                    auto index_file = ref_tablet->get_segment_index_filepath(
-                            rowset_id, segment_index, index_id);
-                    auto snapshot_segment_index_file_path =
-                            fmt::format("{}/{}_{}_{}.binlog-index", 
schema_full_path, rowset_id,
-                                        segment_index, index_id);
-                    VLOG_DEBUG << "link " << index_file << " to "
-                               << snapshot_segment_index_file_path;
-                    res = io::global_local_filesystem()->link_file(
-                            index_file, snapshot_segment_index_file_path);
-                    if (!res.ok()) {
-                        LOG(WARNING) << "fail to link binlog index file. 
[src=" << index_file
-                                     << ", dest=" << 
snapshot_segment_index_file_path << "]";
-                        break;
+                } else {
+                    if (tablet_schema.has_inverted_index()) {
+                        auto index_file =
+                                
InvertedIndexDescriptor::get_index_file_name(segment_file_path);
+                        auto snapshot_segment_index_file_path =
+                                fmt::format("{}/{}_{}.binlog-index", 
schema_full_path, rowset_id,
+                                            segment_index);
+                        VLOG_DEBUG << "link " << index_file << " to "
+                                   << snapshot_segment_index_file_path;
+                        res = io::global_local_filesystem()->link_file(
+                                index_file, snapshot_segment_index_file_path);
+                        if (!res.ok()) {
+                            LOG(WARNING) << "fail to link binlog index file. 
[src=" << index_file
+                                         << ", dest=" << 
snapshot_segment_index_file_path << "]";
+                            break;
+                        }
+                        
linked_success_files.push_back(snapshot_segment_index_file_path);
                     }
-                    
linked_success_files.push_back(snapshot_segment_index_file_path);
                 }
             }
 
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 6734bc70063..b97fdfe8e72 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2390,14 +2390,25 @@ std::string 
Tablet::get_segment_filepath(std::string_view rowset_id, int64_t seg
 std::string Tablet::get_segment_index_filepath(std::string_view rowset_id,
                                                std::string_view segment_index,
                                                std::string_view index_id) 
const {
-    // TODO(qiye): support inverted index file format v2, when 
https://github.com/apache/doris/pull/30145 is merged
-    return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, 
segment_index, index_id);
+    auto format = 
_tablet_meta->tablet_schema()->get_inverted_index_storage_format();
+    if (format == doris::InvertedIndexStorageFormatPB::V1) {
+        return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, 
segment_index,
+                           index_id);
+    } else {
+        return fmt::format("{}/_binlog/{}_{}.idx", _tablet_path, rowset_id, 
segment_index);
+    }
 }
 
 std::string Tablet::get_segment_index_filepath(std::string_view rowset_id, 
int64_t segment_index,
                                                int64_t index_id) const {
-    // TODO(qiye): support inverted index file format v2, when 
https://github.com/apache/doris/pull/30145 is merged
-    return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, 
segment_index, index_id);
+    auto format = 
_tablet_meta->tablet_schema()->get_inverted_index_storage_format();
+    if (format == doris::InvertedIndexStorageFormatPB::V1) {
+        return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, 
segment_index,
+                           index_id);
+    } else {
+        DCHECK(index_id == -1);
+        return fmt::format("{}/_binlog/{}_{}.idx", _tablet_path, rowset_id, 
segment_index);
+    }
 }
 
 std::vector<std::string> Tablet::get_binlog_filepath(std::string_view 
binlog_version) const {
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index c61e0b86556..b1a110144ef 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -307,41 +307,81 @@ void _ingest_binlog(StorageEngine& engine, 
IngestBinlogArg* arg) {
     std::vector<uint64_t> segment_index_file_sizes;
     std::vector<std::string> segment_index_file_names;
     auto tablet_schema = rowset_meta->tablet_schema();
-    for (const auto& index : tablet_schema->indexes()) {
-        if (index.index_type() != IndexType::INVERTED) {
-            continue;
+    if (tablet_schema->get_inverted_index_storage_format() == 
InvertedIndexStorageFormatPB::V1) {
+        for (const auto& index : tablet_schema->indexes()) {
+            if (index.index_type() != IndexType::INVERTED) {
+                continue;
+            }
+            auto index_id = index.index_id();
+            for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
+                auto get_segment_index_file_size_url = fmt::format(
+                        
"{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={"
+                        "}",
+                        binlog_api_url, "get_segment_index_file", 
request.remote_tablet_id,
+                        remote_rowset_id, segment_index, index_id);
+                uint64_t segment_index_file_size;
+                auto get_segment_index_file_size_cb =
+                        [&get_segment_index_file_size_url,
+                         &segment_index_file_size](HttpClient* client) {
+                            
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
+                            client->set_timeout_ms(kMaxTimeoutMs);
+                            RETURN_IF_ERROR(client->head());
+                            return 
client->get_content_length(&segment_index_file_size);
+                        };
+                auto index_file = 
InvertedIndexDescriptor::inverted_index_file_path(
+                        local_tablet->tablet_path(), rowset_meta->rowset_id(), 
segment_index,
+                        index_id, index.get_index_suffix());
+                segment_index_file_names.push_back(index_file);
+
+                status = HttpClient::execute_with_retry(max_retry, 1,
+                                                        
get_segment_index_file_size_cb);
+                if (!status.ok()) {
+                    LOG(WARNING) << "failed to get segment file size from "
+                                 << get_segment_index_file_size_url
+                                 << ", status=" << status.to_string();
+                    status.to_thrift(&tstatus);
+                    return;
+                }
+
+                segment_index_file_sizes.push_back(segment_index_file_size);
+                
segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
+            }
         }
-        auto index_id = index.index_id();
+    } else {
         for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
-            auto get_segment_index_file_size_url = fmt::format(
-                    
"{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={"
-                    "}",
-                    binlog_api_url, "get_segment_index_file", 
request.remote_tablet_id,
-                    remote_rowset_id, segment_index, index_id);
-            uint64_t segment_index_file_size;
-            auto get_segment_index_file_size_cb = 
[&get_segment_index_file_size_url,
-                                                   
&segment_index_file_size](HttpClient* client) {
-                RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
-                client->set_timeout_ms(kMaxTimeoutMs);
-                RETURN_IF_ERROR(client->head());
-                return client->get_content_length(&segment_index_file_size);
-            };
-            auto index_file = 
InvertedIndexDescriptor::inverted_index_file_path(
-                    local_tablet->tablet_path(), rowset_meta->rowset_id(), 
segment_index, index_id,
-                    index.get_index_suffix());
-            segment_index_file_names.push_back(index_file);
-
-            status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_index_file_size_cb);
-            if (!status.ok()) {
-                LOG(WARNING) << "failed to get segment file size from "
-                             << get_segment_index_file_size_url
-                             << ", status=" << status.to_string();
-                status.to_thrift(&tstatus);
-                return;
-            }
+            if (tablet_schema->has_inverted_index()) {
+                auto get_segment_index_file_size_url = fmt::format(
+                        
"{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={"
+                        "}",
+                        binlog_api_url, "get_segment_index_file", 
request.remote_tablet_id,
+                        remote_rowset_id, segment_index, -1);
+                uint64_t segment_index_file_size;
+                auto get_segment_index_file_size_cb =
+                        [&get_segment_index_file_size_url,
+                         &segment_index_file_size](HttpClient* client) {
+                            
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
+                            client->set_timeout_ms(kMaxTimeoutMs);
+                            RETURN_IF_ERROR(client->head());
+                            return 
client->get_content_length(&segment_index_file_size);
+                        };
+                auto local_segment_path = BetaRowset::segment_file_path(
+                        local_tablet->tablet_path(), rowset_meta->rowset_id(), 
segment_index);
+                auto index_file = 
InvertedIndexDescriptor::get_index_file_name(local_segment_path);
+                segment_index_file_names.push_back(index_file);
+
+                status = HttpClient::execute_with_retry(max_retry, 1,
+                                                        
get_segment_index_file_size_cb);
+                if (!status.ok()) {
+                    LOG(WARNING) << "failed to get segment file size from "
+                                 << get_segment_index_file_size_url
+                                 << ", status=" << status.to_string();
+                    status.to_thrift(&tstatus);
+                    return;
+                }
 
-            segment_index_file_sizes.push_back(segment_index_file_size);
-            
segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
+                segment_index_file_sizes.push_back(segment_index_file_size);
+                
segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
+            }
         }
     }
 
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
index 47000ab74fd..874a0e5c0be 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
@@ -427,7 +427,9 @@ class Syncer {
 
     Boolean checkRestoreFinish() {
         String checkSQL = "SHOW RESTORE FROM TEST_" + context.db
-        List<Object> row = suite.sql(checkSQL)[0]
+        int size = suite.sql(checkSQL).size()
+        logger.info("Now size is ${size}")
+        List<Object> row = suite.sql(checkSQL)[size-1]
         logger.info("Now row is ${row}")
 
         return (row[4] as String) == "FINISHED"
@@ -645,9 +647,9 @@ class Syncer {
 
         // step 2: get partitionIds
         metaMap.values().forEach {
-            baseSql += "/" + it.id.toString() + "/partitions"
+            def partitionSql = baseSql + "/" + it.id.toString() + "/partitions"
             Map<Long, Long> partitionInfo = Maps.newHashMap()
-            sqlInfo = sendSql.call(baseSql, toSrc)
+            sqlInfo = sendSql.call(partitionSql, toSrc)
             for (List<Object> row : sqlInfo) {
                 partitionInfo.put(row[0] as Long, row[2] as Long)
             }
@@ -660,7 +662,7 @@ class Syncer {
             for (Entry<Long, Long> info : partitionInfo) {
 
                 // step 3.1: get partition/indexId
-                String partitionSQl = baseSql + "/" + info.key.toString()
+                String partitionSQl = partitionSql + "/" + info.key.toString()
                 sqlInfo = sendSql.call(partitionSQl, toSrc)
                 if (sqlInfo.isEmpty()) {
                     logger.error("Target cluster partition-${info.key} indexId 
fault.")
diff --git 
a/regression-test/suites/ccr_syncer_p0/inverted_index/test_binlog_config_change.groovy
 
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_binlog_config_change.groovy
new file mode 100644
index 00000000000..cccf25780d7
--- /dev/null
+++ 
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_binlog_config_change.groovy
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_binlog_config_change_index") {
+
+    def syncer = getSyncer()
+    if (!syncer.checkEnableFeatureBinlog()) {
+        logger.info("fe enable_feature_binlog is false, skip case 
test_binlog_config_change_index")
+        return
+    }
+    def insert_data = { tableName ->
+        [
+            """INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 
100);""",
+            """INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 
99);""",
+            """INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple", 
100);""",
+            """INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear", 
99);""",
+            """INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple", 
100);""",
+            """INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear", 
99);"""
+        ]
+    }
+
+    def sqls = { tableName ->
+        [
+            """ select * from ${tableName} order by id, name, hobbies, score 
""",
+            """ select * from ${tableName} where name match "andy" order by 
id, name, hobbies, score """,
+            """ select * from ${tableName} where hobbies match "pear" order by 
id, name, hobbies, score """,
+            """ select * from ${tableName} where score < 100 order by id, 
name, hobbies, score """
+        ]
+    }
+
+    def run_sql = { tableName -> 
+        sqls(tableName).each { sqlStatement ->
+            def target_res = target_sql sqlStatement
+            def res = sql sqlStatement
+            assertEquals(res, target_res)
+        }
+    }
+
+    def create_table_v1 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( "replication_num" = "1");
+        """
+    }
+
+    def create_table_v2 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( "replication_num" = "1", "inverted_index_storage_format" 
= "V2");
+        """
+    }
+
+    def create_table_mow_v1 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( 
+            "replication_num" = "1", 
+            "binlog.enable" = "true", 
+            "enable_unique_key_merge_on_write" = "true");
+        """
+    }
+
+    def create_table_mow_v2 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( 
+            "replication_num" = "1", 
+            "binlog.enable" = "true", 
+            "enable_unique_key_merge_on_write" = "true",
+            "inverted_index_storage_format" = "V2");
+        """
+    }
+
+    def run_test = {create_table, tableName -> 
+        sql "DROP TABLE IF EXISTS ${tableName}"
+        sql create_table
+        sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
+
+        target_sql "DROP TABLE IF EXISTS ${tableName}"
+        target_sql create_table
+
+        assertTrue(syncer.getTargetMeta("${tableName}"))
+
+        // test 1: target cluster follow source cluster
+        logger.info("=== Test 1: Target cluster follow source cluster case 
===")
+
+        insert_data(tableName).each { sqlStatement ->
+            sql sqlStatement
+            assertTrue(syncer.getBinlog("${tableName}"))
+            assertTrue(syncer.beginTxn("${tableName}"))
+            assertTrue(syncer.getBackendClients())
+            assertTrue(syncer.ingestBinlog())
+            assertTrue(syncer.commitTxn())
+            assertTrue(syncer.checkTargetVersion())
+            syncer.closeBackendClients()
+        }
+
+        target_sql " sync "
+        def res = target_sql """SELECT * FROM ${tableName}"""
+        if (tableName.contains("mow")) {
+            assertEquals(res.size(), insert_data(tableName).size() / 2 as 
Integer)
+        } else {
+            assertEquals(res.size(), insert_data(tableName).size())
+        }
+        run_sql(tableName)
+    }
+
+
+    // inverted index format v1
+    logger.info("=== Test 1: Inverted index format v1 case ===")
+    def tableName = "tbl_binlog_config_change_index_v1"
+    run_test.call(create_table_v1(tableName), tableName)
+
+    // inverted index format v2
+    logger.info("=== Test 2: Inverted index format v2 case ===")
+    tableName = "tbl_binlog_config_change_index_v2"
+    run_test.call(create_table_v2(tableName), tableName)
+
+    // inverted index format v1 with mow
+    logger.info("=== Test 3: Inverted index format v1 with mow case ===")
+    tableName = "tbl_binlog_config_change_index_mow_v1"
+    run_test.call(create_table_mow_v1(tableName), tableName)
+
+    // inverted index format v2 with mow
+    logger.info("=== Test 4: Inverted index format v2 with mow case ===")
+    tableName = "tbl_binlog_config_change_index_mow_v2"
+    run_test.call(create_table_mow_v2(tableName), tableName)
+
+    // TODO: bugfix
+    // test 2: source cluster disable and re-enable binlog
+    // target_sql "DROP TABLE IF EXISTS ${tableName}"
+    // target_sql """
+    //     CREATE TABLE if NOT EXISTS ${tableName} 
+    //     (
+    //         `test` INT,
+    //         `id` INT
+    //     )
+    //     ENGINE=OLAP
+    //     UNIQUE KEY(`test`, `id`)
+    //     DISTRIBUTED BY HASH(id) BUCKETS 1 
+    //     PROPERTIES ( 
+    //         "replication_allocation" = "tag.location.default: 1"
+    //     )
+    // """
+    // sql """ALTER TABLE ${tableName} set ("binlog.enable" = "false")"""
+    // sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
+
+    // syncer.context.seq = -1
+
+    // assertTrue(syncer.getBinlog("${tableName}"))
+    // assertTrue(syncer.beginTxn("${tableName}"))
+    // assertTrue(syncer.ingestBinlog())
+    // assertTrue(syncer.commitTxn())
+    // assertTrue(syncer.checkTargetVersion())
+
+    // res = target_sql """SELECT * FROM ${tableName} WHERE test=${test_num}"""
+    // assertTrue(res.size() == insert_num)
+
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/ccr_syncer_p0/inverted_index/test_get_binlog.groovy 
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_get_binlog.groovy
new file mode 100644
index 00000000000..b837f799e58
--- /dev/null
+++ b/regression-test/suites/ccr_syncer_p0/inverted_index/test_get_binlog.groovy
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_get_binlog_case_index") {
+    def syncer = getSyncer()
+    if (!syncer.checkEnableFeatureBinlog()) {
+        logger.info("fe enable_feature_binlog is false, skip case 
test_get_binlog_case_index")
+        return
+    }
+
+    def insert_data = { tableName ->
+        [
+            """INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 
99);""",
+            """INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple", 
100);""",
+            """INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear", 
99);""",
+            """INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple", 
100);""",
+            """INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear", 
99);"""
+        ]
+    }
+
+    def sqls = { tableName ->
+        [
+            """ select * from ${tableName} order by id, name, hobbies, score 
""",
+            """ select * from ${tableName} where name match "andy" order by 
id, name, hobbies, score """,
+            """ select * from ${tableName} where hobbies match "pear" order by 
id, name, hobbies, score """,
+            """ select * from ${tableName} where score < 100 order by id, 
name, hobbies, score """
+        ]
+    }
+
+    def run_sql = { tableName -> 
+        sqls(tableName).each { sqlStatement ->
+            def target_res = target_sql sqlStatement
+            def res = sql sqlStatement
+            assertEquals(res, target_res)
+        }
+    }
+
+    def create_table_v1 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( "replication_num" = "1");
+        """
+    }
+
+    def create_table_v2 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( "replication_num" = "1", "inverted_index_storage_format" 
= "V2");
+        """
+    }
+
+    def create_table_mow_v1 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( 
+            "replication_num" = "1", 
+            "binlog.enable" = "true", 
+            "enable_unique_key_merge_on_write" = "true");
+        """
+    }
+
+    def create_table_mow_v2 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( 
+            "replication_num" = "1", 
+            "binlog.enable" = "true", 
+            "enable_unique_key_merge_on_write" = "true",
+            "inverted_index_storage_format" = "V2");
+        """
+    }
+
+    def run_test = { create_table, tableName ->
+        long seq = -1
+        sql "DROP TABLE IF EXISTS ${tableName}"
+        sql create_table
+        sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
+        sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 
100); """
+        assertTrue(syncer.getBinlog("${tableName}"))
+        long firstSeq = syncer.context.seq
+
+        logger.info("=== Test 1: normal case ===")
+        insert_data(tableName).each { sqlStatement ->
+            sql sqlStatement
+            assertTrue(syncer.getBinlog("${tableName}"))
+        }
+
+        long endSeq = syncer.context.seq
+
+        logger.info("=== Test 2: Abnormal seq case ===")
+        logger.info("=== Test 2.1: too old seq case ===")
+        syncer.context.seq = -1
+        assertTrue(syncer.context.seq == -1)
+        assertTrue(syncer.getBinlog("${tableName}"))
+        assertTrue(syncer.context.seq == firstSeq)
+
+
+        logger.info("=== Test 2.2: too new seq case ===")
+        syncer.context.seq = endSeq + 100
+        assertTrue((syncer.getBinlog("${tableName}")) == false)
+
+
+        logger.info("=== Test 2.3: not find table case ===")
+        assertTrue(syncer.getBinlog("this_is_an_invalid_tbl") == false)
+
+
+        logger.info("=== Test 2.4: seq between first and end case ===")
+        long midSeq = (firstSeq + endSeq) / 2
+        syncer.context.seq = midSeq
+        assertTrue(syncer.getBinlog("${tableName}"))
+        long test5Seq = syncer.context.seq
+        assertTrue(firstSeq <= test5Seq && test5Seq <= endSeq)
+
+        logger.info("=== Test 3: Get binlog with different priv user case ===")
+        logger.info("=== Test 3.1: read only user get binlog case ===")
+        // TODO: bugfix
+        // syncer.context.seq = -1
+        // readOnlyUser = "read_only_user"
+        // sql """DROP USER IF EXISTS ${readOnlyUser}"""
+        // sql """CREATE USER ${readOnlyUser} IDENTIFIED BY '123456'"""
+        // sql """GRANT ALL ON ${context.config.defaultDb}.* TO 
${readOnlyUser}"""
+        // sql """GRANT SELECT_PRIV ON TEST_${context.dbName}.${tableName} TO 
${readOnlyUser}"""
+        // syncer.context.user = "${readOnlyUser}"
+        // syncer.context.passwd = "123456"
+        // assertTrue(syncer.getBinlog("${tableName}"))
+
+    }
+
+    // inverted index format v1
+    logger.info("=== Test 1: Inverted index format v1 case ===")
+    def tableName = "tbl_get_binlog_case_index_v1"
+    run_test.call(create_table_v1(tableName), tableName)
+    
+    // inverted index format v2
+    logger.info("=== Test 2: Inverted index format v2 case ===")
+    tableName = "tbl_get_binlog_case_index_v2"
+    run_test.call(create_table_v2(tableName), tableName)
+
+    // inverted index format v1 with mow
+    logger.info("=== Test 3: Inverted index format v1 with mow case ===")
+    tableName = "tbl_get_binlog_case_index_mow_v1"
+    run_test.call(create_table_mow_v1(tableName), tableName)
+
+    // inverted index format v2 with mow
+    logger.info("=== Test 4: Inverted index format v2 with mow case ===")
+    tableName = "tbl_get_binlog_case_index_mow_v2"
+    run_test.call(create_table_mow_v2(tableName), tableName)
+
+    logger.info("=== Test 3: no priv user get binlog case ===")
+    syncer.context.seq = -1
+    def noPrivUser = "no_priv_user2"
+    def emptyTable = "tbl_empty_test"
+    sql "DROP TABLE IF EXISTS ${emptyTable}"
+    sql """
+        CREATE TABLE ${emptyTable} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( "replication_num" = "1");
+    """
+    sql """CREATE USER IF NOT EXISTS ${noPrivUser} IDENTIFIED BY '123456'"""
+    sql """GRANT ALL ON ${context.config.defaultDb}.* TO ${noPrivUser}"""
+    sql """GRANT ALL ON TEST_${context.dbName}.${emptyTable} TO 
${noPrivUser}"""
+    syncer.context.user = "${noPrivUser}"
+    syncer.context.passwd = "123456"
+    assertTrue((syncer.getBinlog("${tableName}")) == false)
+    
+
+    logger.info("=== Test 3.3: Non-existent user set in syncer get binlog case 
===")
+    syncer.context.user = "this_is_an_invalid_user"
+    syncer.context.passwd = "this_is_an_invalid_user"
+    assertTrue(syncer.getBinlog("${tableName}", false) == false)
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/ccr_syncer_p0/inverted_index/test_ingest_binlog.groovy 
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_ingest_binlog.groovy
new file mode 100644
index 00000000000..12ba49e084d
--- /dev/null
+++ 
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_ingest_binlog.groovy
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_ingest_binlog_index") {
+
+    def syncer = getSyncer()
+    if (!syncer.checkEnableFeatureBinlog()) {
+        logger.info("fe enable_feature_binlog is false, skip case 
test_ingest_binlog_index")
+        return
+    }
+
+    def insert_data = { tableName ->
+        [
+            """INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 
100);""",
+            """INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 
99);""",
+            """INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple", 
100);""",
+            """INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear", 
99);""",
+            """INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple", 
100);""",
+            """INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear", 
99);"""
+        ]
+    }
+
+    def sqls = { tableName ->
+        [
+            """ select * from ${tableName} order by id, name, hobbies, score 
""",
+            """ select * from ${tableName} where name match "andy" order by 
id, name, hobbies, score """,
+            """ select * from ${tableName} where hobbies match "pear" order by 
id, name, hobbies, score """,
+            """ select * from ${tableName} where score < 100 order by id, 
name, hobbies, score """
+        ]
+    }
+
+    def run_sql = { tableName -> 
+        sqls(tableName).each { sqlStatement ->
+            def target_res = target_sql sqlStatement
+            def res = sql sqlStatement
+            assertEquals(res, target_res)
+        }
+    }
+
+    def create_table_v1 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( "replication_num" = "1");
+        """
+    }
+
+    def create_table_v2 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( "replication_num" = "1", "inverted_index_storage_format" 
= "V2");
+        """
+    }
+
+    def create_table_mow_v1 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( 
+            "replication_num" = "1", 
+            "binlog.enable" = "true", 
+            "enable_unique_key_merge_on_write" = "true");
+        """
+    }
+
+    def create_table_mow_v2 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( 
+            "replication_num" = "1", 
+            "binlog.enable" = "true", 
+            "enable_unique_key_merge_on_write" = "true",
+            "inverted_index_storage_format" = "V2");
+        """
+    }
+
+    def run_test = { create_table, tableName ->
+        sql "DROP TABLE IF EXISTS ${tableName}"
+        sql create_table
+        sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
+
+        target_sql "DROP TABLE IF EXISTS ${tableName}"
+        target_sql create_table
+        assertTrue(syncer.getTargetMeta("${tableName}"))
+
+        logger.info("=== Test 1: Common ingest binlog case ===")
+        insert_data.call(tableName).each { sqlStatement ->
+            sql sqlStatement
+            assertTrue(syncer.getBinlog("${tableName}"))
+            assertTrue(syncer.beginTxn("${tableName}"))
+            assertTrue(syncer.getBackendClients())
+            assertTrue(syncer.ingestBinlog())
+            assertTrue(syncer.commitTxn())
+            assertTrue(syncer.checkTargetVersion())
+            syncer.closeBackendClients()
+        }
+
+        target_sql " sync "
+        res = target_sql """SELECT * FROM ${tableName}"""
+        if (tableName.contains("mow")) {
+            assertEquals(res.size(), insert_data(tableName).size() / 2 as 
Integer)
+        } else {
+            assertEquals(res.size(), insert_data(tableName).size())
+        }
+        run_sql.call(tableName)
+
+        logger.info("=== Test 2: Wrong IngestBinlogRequest case ===")
+        sql """INSERT INTO ${tableName} VALUES (4, "bason", "bason hate pear", 
99);"""
+        assertTrue(syncer.getBinlog("${tableName}"))
+        assertTrue(syncer.beginTxn("${tableName}"))
+        assertTrue(syncer.getBackendClients())
+
+
+        logger.info("=== Test 2.1: Wrong txnId case ===")
+        // TODO: bugfix
+        // def originTxnId = syncer.context.txnId
+        // syncer.context.txnId = -1
+        // assertTrue(syncer.ingestBinlog() == false)
+        // syncer.context.txnId = originTxnId
+
+
+        logger.info("=== Test 2.2: Wrong binlog version case ===")
+        // -1 means use the number of syncer.context
+        // Boolean ingestBinlog(long fakePartitionId = -1, long fakeVersion = 
-1)
+        // use fakeVersion = 1, 1 is doris be talet first version, so no 
binlog, only http error
+        assertTrue(syncer.ingestBinlog(-1, 1) == false)
+
+
+        logger.info("=== Test 2.3: Wrong partitionId case ===")
+        // TODO: bugfix
+        // assertTrue(syncer.ingestBinlog(1, -1) == false)
+
+
+        logger.info("=== Test 2.4: Right case ===")
+        assertTrue(syncer.ingestBinlog())
+        assertTrue(syncer.commitTxn())
+        assertTrue(syncer.checkTargetVersion())
+        target_sql " sync "
+        res = target_sql """SELECT * FROM ${tableName} WHERE id=4"""
+        assertEquals(res.size(), 1)
+
+
+        // End Test 2
+        syncer.closeBackendClients()
+    }
+
+    // inverted index format v1
+    logger.info("=== Test 1: Inverted index format v1 case ===")
+    def tableName = "tbl_ingest_binlog_index_v1"
+    run_test.call(create_table_v1(tableName), tableName)
+
+    // inverted index format v2
+    logger.info("=== Test 2: Inverted index format v2 case ===")
+    tableName = "tbl_ingest_binlog_index_v2"
+    run_test.call(create_table_v2(tableName), tableName)
+
+    // inverted index format v1 with mow
+    logger.info("=== Test 3: Inverted index format v1 with mow case ===")
+    tableName = "tbl_ingest_binlog_index_mow_v1"
+    run_test.call(create_table_mow_v1(tableName), tableName)
+
+    // inverted index format v2 with mow
+    logger.info("=== Test 4: Inverted index format v2 with mow case ===")
+    tableName = "tbl_ingest_binlog_index_mow_v2"
+    run_test.call(create_table_mow_v2(tableName), tableName)
+    
+}
diff --git 
a/regression-test/suites/ccr_syncer_p0/inverted_index/test_multi_buckets.groovy 
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_multi_buckets.groovy
new file mode 100644
index 00000000000..4a7b7263af2
--- /dev/null
+++ 
b/regression-test/suites/ccr_syncer_p0/inverted_index/test_multi_buckets.groovy
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_multi_buckets_index") {
+
+    def syncer = getSyncer()
+    if (!syncer.checkEnableFeatureBinlog()) {
+        logger.info("fe enable_feature_binlog is false, skip case 
test_multi_buckets_index")
+        return
+    }
+    def insert_data = { tableName ->
+        [
+            """INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 
100);""",
+            """INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 
99);""",
+            """INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple", 
100);""",
+            """INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear", 
99);""",
+            """INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple", 
100);""",
+            """INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear", 
99);"""
+        ]
+    }
+
+    def sqls = { tableName ->
+        [
+            """ select * from ${tableName} order by id, name, hobbies, score 
""",
+            """ select * from ${tableName} where name match "andy" order by 
id, name, hobbies, score """,
+            """ select * from ${tableName} where hobbies match "pear" order by 
id, name, hobbies, score """,
+            """ select * from ${tableName} where score < 100 order by id, 
name, hobbies, score """
+        ]
+    }
+
+    def run_sql = { tableName -> 
+        sqls(tableName).each { sqlStatement ->
+            def target_res = target_sql sqlStatement
+            def res = sql sqlStatement
+            assertEquals(res, target_res)
+        }
+    }
+
+    def create_table_v1 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 3
+        PROPERTIES ( "replication_num" = "1");
+        """
+    }
+
+    def create_table_v2 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 3
+        PROPERTIES ( "replication_num" = "1", "inverted_index_storage_format" 
= "V2");
+        """
+    }
+
+    def create_table_mow_v1 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 3
+        PROPERTIES ( 
+            "replication_num" = "1", 
+            "binlog.enable" = "true", 
+            "enable_unique_key_merge_on_write" = "true");
+        """
+    }
+
+    def create_table_mow_v2 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 3
+        PROPERTIES ( 
+            "replication_num" = "1", 
+            "binlog.enable" = "true", 
+            "enable_unique_key_merge_on_write" = "true",
+            "inverted_index_storage_format" = "V2");
+        """
+    }
+
+    def run_test = { create_table, tableName ->
+        sql "DROP TABLE IF EXISTS ${tableName}"
+        sql create_table
+        sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
+
+        target_sql "DROP TABLE IF EXISTS ${tableName}"
+        target_sql create_table
+        assertTrue(syncer.getTargetMeta("${tableName}"))
+
+        insert_data(tableName).each { sqlStatement ->
+            sql sqlStatement
+            assertTrue(syncer.getBinlog("${tableName}"))
+            assertTrue(syncer.beginTxn("${tableName}"))
+            assertTrue(syncer.getBackendClients())
+            assertTrue(syncer.ingestBinlog())
+            assertTrue(syncer.commitTxn())
+            assertTrue(syncer.checkTargetVersion())
+            syncer.closeBackendClients()
+        }
+
+        target_sql " sync "
+        def res = target_sql """SELECT * FROM ${tableName}"""
+        if (tableName.contains("mow")) {
+            assertEquals(res.size(), insert_data(tableName).size() / 2 as 
Integer)
+        } else {
+            assertEquals(res.size(), insert_data(tableName).size())
+        }
+        run_sql(tableName)
+    }
+
+    // inverted index format v1
+    logger.info("=== Test 1: Inverted index format v1 case ===")
+    def tableName = "tbl_multi_buckets_index_v1"
+    run_test.call(create_table_v1(tableName), tableName)
+    // inverted index format v2
+    logger.info("=== Test 2: Inverted index format v2 case ===")
+    tableName = "tbl_multi_buckets_index_v2"
+    run_test.call(create_table_v2(tableName), tableName)
+
+    // inverted index format v1 with mow
+    logger.info("=== Test 3: Inverted index format v1 with mow case ===")
+    tableName = "tbl_multi_buckets_index_mow_v1"
+    run_test.call(create_table_mow_v1(tableName), tableName)
+
+    // inverted index format v2 with mow
+    logger.info("=== Test 4: Inverted index format v2 with mow case ===")
+    tableName = "tbl_multi_buckets_index_mow_v2"
+    run_test.call(create_table_mow_v2(tableName), tableName)
+}
diff --git 
a/regression-test/suites/ccr_syncer_p1/inverted_index/test_backup_restore.groovy
 
b/regression-test/suites/ccr_syncer_p1/inverted_index/test_backup_restore.groovy
new file mode 100644
index 00000000000..86f4cc89913
--- /dev/null
+++ 
b/regression-test/suites/ccr_syncer_p1/inverted_index/test_backup_restore.groovy
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_backup_restore_index") {
+
+    def syncer = getSyncer()
+    if (!syncer.checkEnableFeatureBinlog()) {
+        logger.info("fe enable_feature_binlog is false, skip case 
test_backup_restore")
+        return
+    }
+    
+    def insert_data = { tableName ->
+        [
+            """INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 
100);""",
+            """INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 
99);""",
+            """INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple", 
100);""",
+            """INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear", 
99);""",
+            """INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple", 
100);""",
+            """INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear", 
99);"""
+        ]
+    }
+
+    def sqls = { tableName ->
+        [
+            """ select * from ${tableName} order by id, name, hobbies, score 
""",
+            """ select * from ${tableName} where name match "andy" order by 
id, name, hobbies, score """,
+            """ select * from ${tableName} where hobbies match "pear" order by 
id, name, hobbies, score """,
+            """ select * from ${tableName} where score < 100 order by id, 
name, hobbies, score """
+        ]
+    }
+
+    def run_sql = { tableName -> 
+        sqls(tableName).each { sqlStatement ->
+            def target_res = target_sql sqlStatement
+            def res = sql sqlStatement
+            assertEquals(res, target_res)
+        }
+    }
+
+    def create_table_v1 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( 
+            "replication_num" = "1",
+            "binlog.enable" = "true"
+        );
+        """
+    }
+
+    def create_table_v2 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( 
+            "replication_num" = "1", 
+            "binlog.enable" = "true",
+            "inverted_index_storage_format" = "V2"
+        );
+        """
+    }
+
+    def create_table_mow_v1 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( 
+            "replication_num" = "1", 
+            "binlog.enable" = "true", 
+            "enable_unique_key_merge_on_write" = "true");
+        """
+    }
+
+    def create_table_mow_v2 = { tableName ->
+        """
+        CREATE TABLE ${tableName} (
+            `id` int(11) NULL,
+            `name` varchar(255) NULL,
+            `hobbies` text NULL,
+            `score` int(11) NULL,
+            index index_name (name) using inverted,
+            index index_hobbies (hobbies) using inverted 
properties("parser"="english"),
+            index index_score (score) using inverted
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES ( 
+            "replication_num" = "1", 
+            "binlog.enable" = "true", 
+            "enable_unique_key_merge_on_write" = "true",
+            "inverted_index_storage_format" = "V2");
+        """
+    }
+
+    def run_test = { create_table, tableName -> 
+        sql "DROP TABLE IF EXISTS ${tableName}"
+        sql create_table
+        
+        logger.info("=== Test 1: Common backup and restore ===")
+        def snapshotName = "snapshot_test_" + tableName
+        insert_data(tableName).each { sqlStatement ->
+            sql sqlStatement
+        }
+        sql " sync "
+        def res = sql "SELECT * FROM ${tableName}"
+        if (tableName.contains("mow")) {
+            assertEquals(res.size(), insert_data(tableName).size() / 2 as 
Integer)
+        } else {
+            assertEquals(res.size(), insert_data(tableName).size())
+        }
+
+        sql """ 
+                BACKUP SNAPSHOT ${context.dbName}.${snapshotName} 
+                TO `__keep_on_local__` 
+                ON (${tableName})
+                PROPERTIES ("type" = "full")
+            """
+        syncer.waitSnapshotFinish()
+        assertTrue(syncer.getSnapshot("${snapshotName}", "${tableName}"))
+        assertTrue(syncer.restoreSnapshot(true))
+        syncer.waitTargetRestoreFinish()
+        target_sql " sync "
+        res = target_sql "SELECT * FROM ${tableName}"
+        if (tableName.contains("mow")) {
+            assertEquals(res.size(), insert_data(tableName).size() / 2 as 
Integer)
+        } else {
+            assertEquals(res.size(), insert_data(tableName).size())
+        }
+        run_sql(tableName)
+    }
+
+    // inverted index format v1
+    logger.info("=== Test 1: Inverted index format v1 case ===")
+    def tableName = "tbl_backup_restore_index_v1"
+    run_test.call(create_table_v1(tableName), tableName)
+    
+    // inverted index format v2
+    logger.info("=== Test 2: Inverted index format v2 case ===")
+    tableName = "tbl_backup_restore_index_v2"
+    run_test.call(create_table_v2(tableName), tableName)
+
+    // inverted index format v1 with mow
+    logger.info("=== Test 3: Inverted index format v1 with mow case ===")
+    tableName = "tbl_backup_restore_index_mow_v1"
+    run_test.call(create_table_mow_v1(tableName), tableName)
+
+    // inverted index format v2 with mow
+    logger.info("=== Test 4: Inverted index format v2 with mow case ===")
+    tableName = "tbl_backup_restore_index_mow_v2"
+    run_test.call(create_table_mow_v2(tableName), tableName)
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to