This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new a3e16d3db09 branch-2.1: [fix](olap) clear storage_policy property when
is_being_synced = true #48229 (#48304)
a3e16d3db09 is described below
commit a3e16d3db094a35cdc0e75c03cdb9f4934574141
Author: walter <[email protected]>
AuthorDate: Tue Feb 25 20:33:05 2025 +0800
branch-2.1: [fix](olap) clear storage_policy property when is_being_synced
= true #48229 (#48304)
cherry pick from #48229
---
.../java/org/apache/doris/catalog/OlapTable.java | 2 +
.../apache/doris/common/util/PropertyAnalyzer.java | 9 +
.../apache/doris/datasource/InternalCatalog.java | 3 +
.../ccr_syncer_p0/test_is_being_synced.groovy | 204 +++++++++++++++++++++
4 files changed, 218 insertions(+)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 694a0cecbc9..f34c9192e04 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2225,6 +2225,8 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf {
if (properties.containsKey(DynamicPartitionProperty.STORAGE_POLICY)) {
properties.remove(DynamicPartitionProperty.STORAGE_POLICY);
}
+ // storage policy is invalid for table/partition when table is being
synced
+ partitionInfo.refreshTableStoragePolicy("");
}
public void checkChangeReplicaAllocation() throws DdlException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index af09aab8784..251b1a6a383 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -243,6 +243,7 @@ public class PropertyAnalyzer {
String newStoragePolicy = oldStoragePolicy;
boolean hasStoragePolicy = false;
boolean storageMediumSpecified = false;
+ boolean isBeingSynced = false;
for (Map.Entry<String, String> entry : properties.entrySet()) {
String key = entry.getKey();
@@ -268,6 +269,8 @@ public class PropertyAnalyzer {
} else if (key.equalsIgnoreCase(PROPERTIES_STORAGE_POLICY)) {
hasStoragePolicy = true;
newStoragePolicy = value;
+ } else if (key.equalsIgnoreCase(PROPERTIES_IS_BEING_SYNCED)) {
+ isBeingSynced = Boolean.parseBoolean(value);
}
} // end for properties
@@ -296,6 +299,12 @@ public class PropertyAnalyzer {
cooldownTimestamp = DataProperty.MAX_COOLDOWN_TIME_MS;
}
+ // when isBeingSynced property is set to true, the storage policy will
be ignored
+ if (isBeingSynced) {
+ hasStoragePolicy = false;
+ newStoragePolicy = "";
+ }
+
if (hasStoragePolicy && !"".equals(newStoragePolicy)) {
// check remote storage policy
StoragePolicy checkedPolicy =
StoragePolicy.ofCheck(newStoragePolicy);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index a58c0e4a149..e5a6c1199b1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -2685,6 +2685,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
partitionInfo.setIsInMemory(partitionId, isInMemory);
partitionInfo.setTabletType(partitionId, tabletType);
partitionInfo.setIsMutable(partitionId, isMutable);
+ if (isBeingSynced) {
+ partitionInfo.refreshTableStoragePolicy("");
+ }
}
// check colocation properties
try {
diff --git a/regression-test/suites/ccr_syncer_p0/test_is_being_synced.groovy
b/regression-test/suites/ccr_syncer_p0/test_is_being_synced.groovy
new file mode 100644
index 00000000000..5f40d92b7c9
--- /dev/null
+++ b/regression-test/suites/ccr_syncer_p0/test_is_being_synced.groovy
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_is_being_synced") {
+ def syncer = getSyncer()
+ if (!syncer.checkEnableFeatureBinlog()) {
+ logger.info("fe enable_feature_binlog is false, skip case
test_is_being_synced")
+ return
+ }
+
+ String suiteName = "test_is_being_synced"
+ String repoName = "repo_" + UUID.randomUUID().toString().replace("-", "")
+ String dbName = "${suiteName}_db"
+ String tableName = "${suiteName}_table"
+ String resourceName = "${suiteName}_resource"
+ String policyName = "${suiteName}_policy"
+
+ sql """
+ CREATE RESOURCE IF NOT EXISTS "${resourceName}"
+ PROPERTIES(
+ "type"="s3",
+ "AWS_ENDPOINT" = "${getS3Endpoint()}",
+ "AWS_REGION" = "${getS3Region()}",
+ "AWS_ROOT_PATH" = "regression/cooldown",
+ "AWS_ACCESS_KEY" = "${getS3AK()}",
+ "AWS_SECRET_KEY" = "${getS3SK()}",
+ "AWS_MAX_CONNECTIONS" = "50",
+ "AWS_REQUEST_TIMEOUT_MS" = "3000",
+ "AWS_CONNECTION_TIMEOUT_MS" = "1000",
+ "AWS_BUCKET" = "${getS3BucketName()}",
+ "s3_validity_check" = "true"
+ );
+ """
+
+ sql """
+ CREATE STORAGE POLICY IF NOT EXISTS ${policyName}
+ PROPERTIES(
+ "storage_resource" = "${resourceName}",
+ "cooldown_ttl" = "300"
+ )
+ """
+
+ sql "DROP DATABASE IF EXISTS ${dbName} FORCE"
+ sql "CREATE DATABASE ${dbName}"
+
+ // 1. Create a table with is_being_synced property
+ sql """
+ CREATE TABLE ${dbName}.${tableName}
+ (
+ k1 INT,
+ v1 INT
+ )
+ ENGINE=OLAP
+ DUPLICATE KEY(k1)
+ PARTITION BY RANGE(k1)
+ (
+ PARTITION p1 VALUES LESS THAN (100),
+ PARTITION p2 VALUES LESS THAN (200),
+ PARTITION p3 VALUES LESS THAN (300) ("storage_policy" =
"${policyName}")
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1",
+ "is_being_synced" = "true",
+ "storage_policy" = "${policyName}"
+ )
+ """
+
+ def show_result = sql "SHOW CREATE TABLE ${dbName}.${tableName}"
+ def create_table_sql = show_result[0][1]
+ logger.info("${create_table_sql}")
+
+ assertFalse(create_table_sql.containsIgnoreCase("${policyName}"))
+ assertTrue(create_table_sql.containsIgnoreCase('"is_being_synced" =
"true"'))
+
+ // 2. Create table with is_being_synced property, storage policy is not
exists
+ sql """ DROP TABLE ${dbName}.${tableName} FORCE """
+ sql """
+ CREATE TABLE ${dbName}.${tableName}
+ (
+ k1 INT,
+ v1 INT
+ )
+ ENGINE=OLAP
+ DUPLICATE KEY(k1)
+ PARTITION BY RANGE(k1)
+ (
+ PARTITION p1 VALUES LESS THAN (100),
+ PARTITION p2 VALUES LESS THAN (200),
+ PARTITION p3 VALUES LESS THAN (300) ("storage_policy" =
"unknown_partition_storage_policy")
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1",
+ "is_being_synced" = "true",
+ "storage_policy" = "unknown_table_storage_policy"
+ )
+ """
+
+ show_result = sql "SHOW CREATE TABLE ${dbName}.${tableName}"
+ create_table_sql = show_result[0][1]
+ logger.info("${create_table_sql}")
+
+
assertFalse(create_table_sql.containsIgnoreCase('unknown_partition_storage_policy'))
+
assertFalse(create_table_sql.containsIgnoreCase('unknown_table_storage_policy'))
+
+ // 3. For list partition
+ sql """ DROP TABLE ${dbName}.${tableName} FORCE """
+ sql """
+ CREATE TABLE ${dbName}.${tableName}
+ (
+ k1 INT,
+ v1 INT
+ )
+ ENGINE=OLAP
+ DUPLICATE KEY(k1)
+ PARTITION BY LIST(k1)
+ (
+ PARTITION p1 VALUES IN ((100), (200), (300)),
+ PARTITION p2 VALUES IN ((400), (500), (600)),
+ PARTITION p3 VALUES IN ((700), (800)) ("storage_policy" =
"unknown_partition_storage_policy")
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1",
+ "is_being_synced" = "true",
+ "storage_policy" = "unknown_table_storage_policy"
+ )
+ """
+
+ show_result = sql "SHOW CREATE TABLE ${dbName}.${tableName}"
+ create_table_sql = show_result[0][1]
+ logger.info("${create_table_sql}")
+
+
assertFalse(create_table_sql.containsIgnoreCase('unknown_partition_storage_policy'))
+
assertFalse(create_table_sql.containsIgnoreCase('unknown_table_storage_policy'))
+
+ // 4. For single partition
+ sql """ DROP TABLE ${dbName}.${tableName} FORCE """
+ sql """
+ CREATE TABLE ${dbName}.${tableName}
+ (
+ k1 INT,
+ v1 INT
+ )
+ ENGINE=OLAP
+ DUPLICATE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1",
+ "is_being_synced" = "true",
+ "storage_policy" = "unknown_table_storage_policy"
+ )
+ """
+
+ show_result = sql "SHOW CREATE TABLE ${dbName}.${tableName}"
+ create_table_sql = show_result[0][1]
+ logger.info("${create_table_sql}")
+
+
assertFalse(create_table_sql.containsIgnoreCase('unknown_table_storage_policy'))
+
+ // 5. For auto partition
+ sql """ DROP TABLE ${dbName}.${tableName} FORCE """
+ sql """
+ CREATE TABLE ${dbName}.${tableName}
+ (
+ k1 INT,
+ v1 INT,
+ `TIME_STAMP` datev2 NOT NULL COMMENT '采集日期'
+ )
+ ENGINE=OLAP
+ DUPLICATE KEY(k1)
+ AUTO PARTITION BY RANGE (date_trunc(`TIME_STAMP`, 'month'))
+ (
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1",
+ "is_being_synced" = "true",
+ "storage_policy" = "unknown_table_storage_policy"
+ )
+ """
+
+ show_result = sql "SHOW CREATE TABLE ${dbName}.${tableName}"
+ create_table_sql = show_result[0][1]
+ logger.info("${create_table_sql}")
+
+
assertFalse(create_table_sql.containsIgnoreCase('unknown_table_storage_policy'))
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]