This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 07b496967bd branch-4.1: [feat](snapshot) Support storage vault for
create/list snapshot #62523 #57345 (#63534)
07b496967bd is described below
commit 07b496967bd8922a215cea761771c5322ff1dfaf
Author: Yixuan Wang <[email protected]>
AuthorDate: Thu May 28 14:44:37 2026 +0800
branch-4.1: [feat](snapshot) Support storage vault for create/list snapshot
#62523 #57345 (#63534)
pick: https://github.com/apache/doris/pull/57345
https://github.com/apache/doris/pull/62523
---------
Co-authored-by: walter <[email protected]>
---
.../schema_cluster_snapshots_scanner.cpp | 14 ++
.../schema_cluster_snapshots_scanner_test.cpp | 5 +
docker/runtime/doris-compose/Readme.md | 14 ++
docker/runtime/doris-compose/cluster.py | 54 +++++++-
docker/runtime/doris-compose/command.py | 38 +++++-
docker/runtime/doris-compose/resource/common.sh | 96 +++++++++++++
docker/runtime/doris-compose/resource/init_fe.sh | 55 +++++++-
.../java/org/apache/doris/catalog/SchemaTable.java | 1 +
.../doris/cloud/snapshot/CloudSnapshotHandler.java | 2 +-
.../AdminCreateClusterSnapshotCommand.java | 9 +-
.../AdminCreateClusterSnapshotCommandTest.java | 3 +
gensrc/proto/cloud.proto | 2 +
.../org/apache/doris/regression/suite/Suite.groovy | 150 +++++++++++++++++++++
.../doris/regression/suite/SuiteCluster.groovy | 48 +++++++
14 files changed, 484 insertions(+), 7 deletions(-)
diff --git a/be/src/information_schema/schema_cluster_snapshots_scanner.cpp
b/be/src/information_schema/schema_cluster_snapshots_scanner.cpp
index ba92ff896f3..19e1b3e10da 100644
--- a/be/src/information_schema/schema_cluster_snapshots_scanner.cpp
+++ b/be/src/information_schema/schema_cluster_snapshots_scanner.cpp
@@ -47,6 +47,7 @@ std::vector<SchemaScanner::ColumnDesc>
SchemaClusterSnapshotsScanner::_s_tbls_co
{"LABEL", TYPE_STRING, sizeof(StringRef), true},
{"MSG", TYPE_STRING, sizeof(StringRef), true},
{"COUNT", TYPE_INT, sizeof(int32_t), true},
+ {"VAULT_ID", TYPE_STRING, sizeof(StringRef), true},
};
SchemaClusterSnapshotsScanner::SchemaClusterSnapshotsScanner()
@@ -247,6 +248,19 @@ Status
SchemaClusterSnapshotsScanner::_fill_block_impl(Block* block) {
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, datas));
}
+ // resource_id
+ {
+ for (int i = 0; i < row_num; ++i) {
+ auto& snapshot = _snapshots[i];
+ if (snapshot.has_resource_id()) {
+ strs[i] = StringRef(snapshot.resource_id().c_str(),
snapshot.resource_id().size());
+ datas[i] = strs.data() + i;
+ } else {
+ datas[i] = nullptr;
+ }
+ }
+ RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, datas));
+ }
return Status::OK();
}
diff --git
a/be/test/exec/schema_scanner/schema_cluster_snapshots_scanner_test.cpp
b/be/test/exec/schema_scanner/schema_cluster_snapshots_scanner_test.cpp
index 6415f837a7c..f1392dcb831 100644
--- a/be/test/exec/schema_scanner/schema_cluster_snapshots_scanner_test.cpp
+++ b/be/test/exec/schema_scanner/schema_cluster_snapshots_scanner_test.cpp
@@ -49,6 +49,7 @@ TEST_F(SchemaClusterSnapshotsScannerTest,
test_get_next_block_internal) {
snapshot.set_ttl_seconds(3600);
snapshot.set_snapshot_label("label");
snapshot.set_reason("reason");
+ snapshot.set_resource_id("vault_1");
snapshots.push_back(snapshot);
}
@@ -62,6 +63,10 @@ TEST_F(SchemaClusterSnapshotsScannerTest,
test_get_next_block_internal) {
auto col = data_block->safe_get_by_position(0);
auto v = (*col.column)[1].get<TYPE_STRING>();
EXPECT_EQ(v, "232ds");
+
+ auto vault_col = data_block->safe_get_by_position(12);
+ auto vault_id = (*vault_col.column)[1].get<TYPE_STRING>();
+ EXPECT_EQ(vault_id, "vault_1");
}
} // namespace doris
diff --git a/docker/runtime/doris-compose/Readme.md
b/docker/runtime/doris-compose/Readme.md
index c4b60460e49..a92a1fff764 100644
--- a/docker/runtime/doris-compose/Readme.md
+++ b/docker/runtime/doris-compose/Readme.md
@@ -122,12 +122,20 @@ python docker/runtime/doris-compose/doris-compose.py up
<cluster-name> <image
[--fe-id <fd-id> --be-id <be-id>]
...
[ --cloud ]
+ [ --cluster-snapshot <cluster-snapshot-json> ]
```
if it's a new cluster, must specific the image.
add fe/be nodes with the specific image, or update existing nodes with
`--fe-id`, `--be-id`
+The `--cluster-snapshot` parameter allows you to provide a cluster snapshot
JSON content for FE-1 first startup in cloud mode only. The JSON will be
written to FE conf/cluster_snapshot.json and passed to start_fe.sh with
--cluster_snapshot parameter. This is only effective on first startup.
+
+Example:
+```shell
+python docker/runtime/doris-compose/doris-compose.py up my-cluster my-image
--cloud --cluster-snapshot '{"instance_id":"instance_id_xxx"}'
+```
+
For create a cloud cluster, steps are as below:
1. Write cloud s3 store config file, its default path is
'/tmp/doris/cloud.ini'.
@@ -142,6 +150,12 @@ The simplest way to create a cloud cluster:
python docker/runtime/doris-compose/doris-compose.py up <cluster-name>
<image> --cloud
```
+To create a cloud cluster with a custom cluster snapshot:
+
+```shell
+python docker/runtime/doris-compose/doris-compose.py up <cluster-name>
<image> --cloud --cluster-snapshot '{"instance_id":"instance_id_xxx"}'
+```
+
It will create 1 fdb, 1 meta service server, 1 recycler, 3 fe and 3 be.
### Remove node from the cluster
diff --git a/docker/runtime/doris-compose/cluster.py
b/docker/runtime/doris-compose/cluster.py
index 05529e2bf99..4345021a662 100644
--- a/docker/runtime/doris-compose/cluster.py
+++ b/docker/runtime/doris-compose/cluster.py
@@ -64,6 +64,20 @@ CLUSTER_ID = "12345678"
LOG = utils.get_logger()
+def is_true(value):
+ return str(value).strip().lower() in ("1", "true", "yes", "y", "on")
+
+
+def get_env_value(envs, name):
+ for env in envs or []:
+ pos = env.find('=')
+ if pos == -1:
+ continue
+ if env[:pos] == name:
+ return env[pos + 1:]
+ return None
+
+
def get_cluster_path(cluster_name):
return os.path.join(LOCAL_DORIS_PATH, cluster_name)
@@ -397,6 +411,7 @@ class Node(object):
"STOP_GRACE": 1 if enable_coverage else 0,
"IS_CLOUD": 1 if self.cluster.is_cloud else 0,
"SQL_MODE_NODE_MGR": 1 if self.cluster.sql_mode_node_mgr else 0,
+ "ENABLE_STORAGE_VAULT": 1 if getattr(self.cluster,
"enable_storage_vault", False) else 0,
"TDE_AK": self.get_tde_ak(),
"TDE_SK": self.get_tde_sk(),
}
@@ -528,6 +543,21 @@ class FE(Node):
"is_cloud_follower"] = self.cluster.is_cloud and
self.cluster.fe_follower
super().init()
+ def init_conf(self):
+ # Call parent's init_conf first
+ super().init_conf()
+
+ # Write cluster_snapshot.json for FE-1 in cloud mode only
+ if self.id == 1 and self.cluster.is_cloud and
self.cluster.cluster_snapshot:
+ conf_dir = os.path.join(self.get_path(), "conf")
+ snapshot_file = os.path.join(conf_dir, "cluster_snapshot.json")
+ try:
+ with open(snapshot_file, "w") as f:
+ f.write(self.cluster.cluster_snapshot)
+ LOG.info(f"Written cluster snapshot to {snapshot_file}")
+ except Exception as e:
+ LOG.warning(f"Failed to write cluster snapshot file: {e}")
+
def get_add_init_config(self):
cfg = super().get_add_init_config()
if self.cluster.fe_config:
@@ -578,6 +608,11 @@ class FE(Node):
envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id()
if self.meta["is_cloud_follower"]:
envs["IS_FE_FOLLOWER"] = 1
+ # Add CLUSTER_SNAPSHOT_FILE env var for FE-1 if the file exists
+ if self.id == 1:
+ snapshot_file = os.path.join(self.get_path(), "conf",
"cluster_snapshot.json")
+ if os.path.exists(snapshot_file):
+ envs["CLUSTER_SNAPSHOT_FILE"] =
"./conf/cluster_snapshot.json"
envs["MY_QUERY_PORT"] = self.meta["ports"]["query_port"]
envs["MY_EDITLOG_PORT"] = self.meta["ports"]["edit_log_port"]
return envs
@@ -832,7 +867,9 @@ class Cluster(object):
be_config, ms_config, recycle_config, remote_master_fe,
local_network_ip, fe_follower, be_disks, be_cluster, reg_be,
extra_hosts, coverage_dir, cloud_store_config,
- sql_mode_node_mgr, be_metaservice_endpoint, be_cluster_id,
tde_ak, tde_sk):
+ sql_mode_node_mgr, be_metaservice_endpoint, be_cluster_id,
tde_ak, tde_sk,
+ external_ms_cluster, instance_id, cluster_snapshot="",
+ enable_storage_vault=False):
self.name = name
self.subnet = subnet
self.image = image
@@ -851,6 +888,14 @@ class Cluster(object):
self.extra_hosts = extra_hosts
self.coverage_dir = coverage_dir
self.cloud_store_config = cloud_store_config
+ self.external_ms_cluster = external_ms_cluster
+ self.instance_id = instance_id
+ if not self.instance_id:
+ self.instance_id = f"instance_{name}" if self.external_ms_cluster
else "default_instance_id"
+ # cluster_snapshot is not persisted to meta, only used during cluster
creation
+ self.cluster_snapshot = cluster_snapshot
+ self.enable_storage_vault = is_true(enable_storage_vault)
+ self.is_rollback = False
self.groups = {
node_type: Group(node_type)
for node_type in Node.TYPE_ALL
@@ -869,7 +914,9 @@ class Cluster(object):
ms_config, recycle_config, remote_master_fe, local_network_ip,
fe_follower, be_disks, be_cluster, reg_be, extra_hosts,
coverage_dir, cloud_store_config, sql_mode_node_mgr,
- be_metaservice_endpoint, be_cluster_id, tde_ak, tde_sk):
+ be_metaservice_endpoint, be_cluster_id, tde_ak, tde_sk,
+ external_ms_cluster, instance_id, cluster_snapshot="",
+ enable_storage_vault=False):
if not os.path.exists(LOCAL_DORIS_PATH):
os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
os.chmod(LOCAL_DORIS_PATH, 0o777)
@@ -884,7 +931,8 @@ class Cluster(object):
be_disks, be_cluster, reg_be, extra_hosts,
coverage_dir, cloud_store_config,
sql_mode_node_mgr, be_metaservice_endpoint,
- be_cluster_id, tde_ak, tde_sk)
+ be_cluster_id, tde_ak, tde_sk,
external_ms_cluster,
+ instance_id, cluster_snapshot,
enable_storage_vault)
os.makedirs(cluster.get_path(), exist_ok=True)
os.makedirs(get_status_path(name), exist_ok=True)
cluster._save_meta()
diff --git a/docker/runtime/doris-compose/command.py
b/docker/runtime/doris-compose/command.py
index c7883454899..c0843d20f09 100644
--- a/docker/runtime/doris-compose/command.py
+++ b/docker/runtime/doris-compose/command.py
@@ -470,6 +470,35 @@ class UpCommand(Command):
"Only use when creating new cluster and specify
--remote-master-fe."
)
+ parser.add_argument(
+ "--external-ms",
+ type=str,
+ help=
+ "Use external meta service cluster (specify cluster name). " \
+ "This cluster will not create its own MS/FDB/Recycler, but use the
specified cluster's services. " \
+ "The external cluster must be a cloud cluster with MS/FDB already
running. " \
+ "Example: --external-ms shared-meta. Only use when creating new
cloud cluster."
+ )
+
+ parser.add_argument(
+ "--instance-id",
+ type=str,
+ help=
+ "Specify instance ID for cloud mode. If not specified, will
auto-generate 'default_instance_id'. " \
+ "When using external MS with multiple clusters, each cluster
should have a unique instance ID. " \
+ "Example: --instance-id prod_instance_1"
+ )
+
+ parser.add_argument(
+ "--cluster-snapshot",
+ type=str,
+ help=
+ "Cluster snapshot JSON content for FE-1 first startup in cloud
mode only. " \
+ "The JSON will be written to FE conf/cluster_snapshot.json and
passed to start_fe.sh " \
+ "with --cluster_snapshot parameter. Only effective on first
startup. " \
+ "Example: --cluster-snapshot
'{\"instance_id\":\"instance_id_xxx\"}'"
+ )
+
if self._support_boolean_action():
parser.add_argument(
"--be-metaservice-endpoint",
@@ -624,6 +653,11 @@ class UpCommand(Command):
if args.cloud:
args.sql_mode_node_mgr = True
+ instance_id = getattr(args, 'instance_id', None)
+ cluster_snapshot = getattr(args, 'cluster_snapshot', '')
+ enable_storage_vault = CLUSTER.is_true(
+ CLUSTER.get_env_value(args.env, "ENABLE_STORAGE_VAULT"))
+
cluster = CLUSTER.Cluster.new(
args.NAME, args.IMAGE, args.cloud, args.root, args.fe_config,
args.be_config, args.ms_config, args.recycle_config,
@@ -631,7 +665,9 @@ class UpCommand(Command):
args.be_disks if args.be_disks is not None else ["HDD=1"],
args.be_cluster, args.reg_be, args.extra_hosts,
args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr,
- args.be_metaservice_endpoint, args.be_cluster_id, args.tde_ak,
args.tde_sk)
+ args.be_metaservice_endpoint, args.be_cluster_id, args.tde_ak,
args.tde_sk,
+ external_ms_cluster, instance_id, cluster_snapshot,
+ enable_storage_vault)
LOG.info("Create new cluster {} succ, cluster path is {}".format(
args.NAME, cluster.get_path()))
diff --git a/docker/runtime/doris-compose/resource/common.sh
b/docker/runtime/doris-compose/resource/common.sh
index 2c53ca587a5..cc1d43eb806 100644
--- a/docker/runtime/doris-compose/resource/common.sh
+++ b/docker/runtime/doris-compose/resource/common.sh
@@ -147,3 +147,99 @@ wait_pid() {
health_log "wait end"
}
+
+create_doris_instance() {
+ while true; do
+
+ lock_cluster
+
+ if [[ "${ENABLE_STORAGE_VAULT}" =~
^([Tt][Rr][Uu][Ee]|[Yy][Ee][Ss]|[Yy]|[Oo][Nn]|1)$ ]]; then
+ output=$(curl -s
"${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999"
\
+ -d '{"instance_id":"'"${INSTANCE_ID}"'",
+ "name": "'"${INSTANCE_ID}"'",
+ "user_id": "'"${DORIS_CLOUD_USER}"'",
+ "vault": {
+ "obj_info": {
+ "ak": "'"${DORIS_CLOUD_AK}"'",
+ "sk": "'"${DORIS_CLOUD_SK}"'",
+ "bucket": "'"${DORIS_CLOUD_BUCKET}"'",
+ "endpoint": "'"${DORIS_CLOUD_ENDPOINT}"'",
+ "external_endpoint":
"'"${DORIS_CLOUD_EXTERNAL_ENDPOINT}"'",
+ "prefix": "'"${DORIS_CLOUD_PREFIX}"'",
+ "region": "'"${DORIS_CLOUD_REGION}"'",
+ "provider": "'"${DORIS_CLOUD_PROVIDER}"'"
+ }}}')
+ else
+ output=$(curl -s
"${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999"
\
+ -d '{"instance_id":"'"${INSTANCE_ID}"'",
+ "name": "'"${INSTANCE_ID}"'",
+ "user_id": "'"${DORIS_CLOUD_USER}"'",
+ "obj_info": {
+ "ak": "'"${DORIS_CLOUD_AK}"'",
+ "sk": "'"${DORIS_CLOUD_SK}"'",
+ "bucket": "'"${DORIS_CLOUD_BUCKET}"'",
+ "endpoint": "'"${DORIS_CLOUD_ENDPOINT}"'",
+ "external_endpoint":
"'"${DORIS_CLOUD_EXTERNAL_ENDPOINT}"'",
+ "prefix": "'"${DORIS_CLOUD_PREFIX}"'",
+ "region": "'"${DORIS_CLOUD_REGION}"'",
+ "provider": "'"${DORIS_CLOUD_PROVIDER}"'"
+ }}')
+ fi
+
+ unlock_cluster
+
+ health_log "create instance output: $output"
+ code=$(jq -r '.code' <<<$output)
+
+ if [ "$code" != "OK" ]; then
+ health_log "create instance failed"
+ sleep 1
+ continue
+ fi
+
+ health_log "create doris instance succ, output: $output"
+ touch $HAS_CREATE_INSTANCE_FILE
+ break
+ done
+}
+
+is_doris_instance_exists() {
+ output=$(curl -s
"${META_SERVICE_ENDPOINT}/MetaService/http/get_instance?token=greedisgood9999&instance_id=${INSTANCE_ID}")
+
+ health_log "get instance output: $output"
+ code=$(jq -r '.code' <<<$output)
+
+ if [ "$code" != "OK" ]; then
+ health_log "get instance failed"
+ return 1
+ fi
+
+ return 0
+}
+
+# Like wait_create_instance, but query meta service directly.
+wait_doris_instance_ready() {
+ ok=0
+ for ((i = 0; i < 30; i++)); do
+ is_doris_instance_exists
+ if [ $? -eq 0 ]; then
+ ok=1
+ break
+ fi
+
+ health_log "doris instance not exist yet."
+
+ sleep 1
+ done
+
+ if [ $ok -eq 0 ]; then
+ health_log "wait doris instance too long, exit"
+ exit 1
+ fi
+
+ if [ ! -f $HAS_CREATE_INSTANCE_FILE ]; then
+ touch $HAS_CREATE_INSTANCE_FILE
+ fi
+
+ health_log "check doris instance ok"
+}
diff --git a/docker/runtime/doris-compose/resource/init_fe.sh
b/docker/runtime/doris-compose/resource/init_fe.sh
index 719f85f27eb..23467dc17eb 100755
--- a/docker/runtime/doris-compose/resource/init_fe.sh
+++ b/docker/runtime/doris-compose/resource/init_fe.sh
@@ -93,7 +93,15 @@ run_fe() {
export DORIS_TDE_AK=${TDE_AK}
export DORIS_TDE_SK=${TDE_SK}
health_log "run start_fe.sh"
- bash $DORIS_HOME/bin/start_fe.sh --daemon $@ | tee -a
$DORIS_HOME/log/fe.out
+
+ # Add cluster_snapshot parameter for first startup only (when
REGISTER_FILE does not exist)
+ EXTRA_ARGS=""
+ if [ -n "$CLUSTER_SNAPSHOT_FILE" ] && [ ! -f "$REGISTER_FILE" ]; then
+ EXTRA_ARGS="--cluster_snapshot $CLUSTER_SNAPSHOT_FILE"
+ health_log "Using cluster snapshot: $CLUSTER_SNAPSHOT_FILE"
+ fi
+
+ bash $DORIS_HOME/bin/start_fe.sh --daemon $EXTRA_ARGS $@ | tee -a
$DORIS_HOME/log/fe.out
}
start_cloud_fe() {
@@ -142,6 +150,9 @@ start_cloud_fe() {
wait_create_instance
+}
+
+register_sql_server_cluster() {
action=add_cluster
node_type=FE_MASTER
if [ "$MY_ID" != "1" ]; then
@@ -197,6 +208,48 @@ start_cloud_fe() {
fi
touch $REGISTER_FILE
+}
+
+start_cloud_fe() {
+ if [ -f "$REGISTER_FILE" ] || [ -n "${CLUSTER_SNAPSHOT_FILE}" ]; then
+ fe_daemon &
+ run_fe
+
+ # Cluster snapshot is provided, need to register cluster after FE is
started.
+ if [ -n "${CLUSTER_SNAPSHOT_FILE}" ]; then
+ wait_doris_instance_ready
+ register_sql_server_cluster
+ fi
+
+ return
+ fi
+
+ # Check if SQL_MODE_NODE_MGR is set to 1
+ if [ "${SQL_MODE_NODE_MGR}" = "1" ]; then
+ health_log "SQL_MODE_NODE_MGR is set to 1. Skipping add FE."
+
+ touch $REGISTER_FILE
+
+ fe_daemon &
+ run_fe
+
+ return
+ fi
+
+ # Support to create instance in FE startup.
+ AUTO_CREATE_INSTANCE=${AUTO_CREATE_INSTANCE:-"0"}
+ if [ "a$MY_ID" == "a1" ] && [ "a$AUTO_CREATE_INSTANCE" == "a1" ]; then
+ health_log "auto create instance is enabled, trying to create instance"
+ if [ -f $HAS_CREATE_INSTANCE_FILE ]; then
+ health_log "instance has been created before, skip create instance"
+ else
+ create_doris_instance
+ fi
+ else
+ wait_create_instance
+ fi
+
+ register_sql_server_cluster
fe_daemon &
run_fe
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 50d17a944c4..7094465febc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -815,6 +815,7 @@ public class SchemaTable extends Table {
.column("LABEL", ScalarType.createStringType())
.column("MSG", ScalarType.createStringType())
.column("COUNT",
ScalarType.createType(PrimitiveType.INT))
+ .column("VAULT_ID", ScalarType.createStringType())
.build()))
.put("cluster_snapshot_properties",
new SchemaTable(SystemIdGenerator.getNextId(),
"cluster_snapshot_properties", TableType.SCHEMA,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/snapshot/CloudSnapshotHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/snapshot/CloudSnapshotHandler.java
index dad391cc948..0b6dc996df5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/snapshot/CloudSnapshotHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/snapshot/CloudSnapshotHandler.java
@@ -62,7 +62,7 @@ public class CloudSnapshotHandler extends MasterDaemon {
// do nothing
}
- public void submitJob(long ttl, String label) throws Exception {
+ public void submitJob(long ttl, String label, String vaultName) throws
Exception {
throw new NotImplementedException("submitJob is not implemented");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommand.java
index 425b6bb2176..c7b9cff0cd0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommand.java
@@ -45,11 +45,13 @@ public class AdminCreateClusterSnapshotCommand extends
Command implements Forwar
public static final String PROP_TTL = "ttl";
public static final String PROP_LABEL = "label";
+ public static final String PROP_VAULT_NAME = "vault_name";
private static final Logger LOG =
LogManager.getLogger(AdminCreateClusterSnapshotCommand.class);
private Map<String, String> properties;
private long ttl;
private String label = null;
+ private String vaultName = null;
/**
* AdminCreateClusterSnapshotCommand
@@ -64,7 +66,7 @@ public class AdminCreateClusterSnapshotCommand extends
Command implements Forwar
public void run(ConnectContext ctx, StmtExecutor executor) throws
Exception {
validate(ctx);
CloudSnapshotHandler cloudSnapshotHandler = ((CloudEnv)
ctx.getEnv()).getCloudSnapshotHandler();
- cloudSnapshotHandler.submitJob(ttl, label);
+ cloudSnapshotHandler.submitJob(ttl, label, vaultName);
}
/**
@@ -106,6 +108,11 @@ public class AdminCreateClusterSnapshotCommand extends
Command implements Forwar
if (label == null || label.isEmpty()) {
throw new AnalysisException("Property 'label' cannot be
empty");
}
+ } else if (entry.getKey().equalsIgnoreCase(PROP_VAULT_NAME)) {
+ vaultName = entry.getValue();
+ if (vaultName == null || vaultName.isEmpty()) {
+ throw new AnalysisException("Property 'vault_name' cannot
be empty");
+ }
} else {
throw new AnalysisException("Unknown property: " +
entry.getKey());
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommandTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommandTest.java
index c0ecf74a763..8d4a4dd8105 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommandTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AdminCreateClusterSnapshotCommandTest.java
@@ -100,10 +100,13 @@ public class AdminCreateClusterSnapshotCommandTest {
properties.add(Pair.of(ImmutableMap.of("ttl", "a", "label", "a"),
"Invalid value"));
properties.add(Pair.of(ImmutableMap.of("ttl", "0", "label", "a"),
"Property 'ttl' must be positive"));
properties.add(Pair.of(ImmutableMap.of("ttl", "3600", "label", ""),
"Property 'label' cannot be empty"));
+ properties.add(Pair.of(ImmutableMap.of("ttl", "3600", "label", "a",
"vault_name", ""),
+ "Property 'vault_name' cannot be empty"));
// unknown property
properties.add(Pair.of(ImmutableMap.of("ttl", "0", "a", "b"), "Unknown
property"));
// normal case
properties.add(Pair.of(ImmutableMap.of("ttl", "3600", "label", "abc"),
""));
+ properties.add(Pair.of(ImmutableMap.of("ttl", "3600", "label", "abc",
"vault_name", "vault_1"), ""));
for (Pair<Map<String, String>, String> entry : properties) {
AdminCreateClusterSnapshotCommand command0 = new
AdminCreateClusterSnapshotCommand(entry.first);
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index d16f30cb53f..bff85eee942 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -2110,6 +2110,7 @@ message BeginSnapshotRequest {
optional int64 timeout_seconds = 4;
optional int64 ttl_seconds = 5;
optional string request_ip = 6;
+ optional string vault_name = 7;
}
message BeginSnapshotResponse {
@@ -2178,6 +2179,7 @@ message SnapshotInfoPB {
optional int64 snapshot_logical_data_size = 17;
optional int64 snapshot_retained_data_size = 18;
optional int64 snapshot_billable_data_size = 19;
+ optional string resource_id = 20;
}
message ListSnapshotRequest {
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 01ed92f3e70..3c14e06eeb2 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -384,6 +384,156 @@ class Suite implements GroovyInterceptable {
}
}
+ /**
+ * Create and manage multiple Docker clusters for multi-cluster test
scenarios.
+ *
+ * Usage example:
+ * dockers([
+ * "cluster_1": new ClusterOptions(cloudMode: true, feNum: 1, beNum:
1, msNum: 1),
+ * "cluster_2": new ClusterOptions(cloudMode: true, feNum: 1, beNum:
1, msNum: 0, externalMsCluster: "cluster_1")
+ * ]) { clusters ->
+ * connectWithDockerCluster(clusters.cluster_1) { sql "..." }
+ * connectWithDockerCluster(clusters.cluster_2) { sql "..." }
+ * }
+ *
+ * Important:
+ * - Must use LinkedHashMap to preserve insertion order
+ * - Clusters are created in map insertion order
+ * - Clusters are destroyed in reverse order (dependent clusters first)
+ * - If using externalMsCluster, the referenced cluster must appear
earlier in the map
+ *
+ * @param clusterConfigs LinkedHashMap of cluster name to ClusterOptions
+ * @param manual_init_clusters Set of cluster names to skip automatic
initialization
+ * @param actionSupplier Closure receiving Map<String, SuiteCluster> for
test execution
+ */
+ void dockers(LinkedHashMap<String, ClusterOptions> clusterConfigs,
+ Set<String> manual_init_clusters = new HashSet<>(), Closure
actionSupplier) throws Exception {
+ if (context.config.excludeDockerTest) {
+ logger.info("do not run the docker suite {}, because regression
config excludeDockerTest=true", name)
+ return
+ }
+
+ if (RegressionTest.getGroupExecType(group) !=
RegressionTest.GroupExecType.DOCKER) {
+ throw new Exception("Need to add 'docker' to docker suite's belong
groups, "
+ + "see example demo_p0/docker_action.groovy")
+ }
+
+ if (context.isMultiDockerClusterRunning) {
+ throw new Exception("Nested dockers() calls are not supported")
+ }
+
+ // Validate cluster configs
+ Set<String> clusterNames = new HashSet<>()
+ for (def entry : clusterConfigs.entrySet()) {
+ String clusterName = entry.key
+ ClusterOptions options = entry.value
+
+ if (clusterNames.contains(clusterName)) {
+ throw new Exception("Duplicate cluster name: ${clusterName}")
+ }
+ clusterNames.add(clusterName)
+
+ // Validate externalMsCluster reference
+ if (options.externalMsCluster != null &&
!options.externalMsCluster.isEmpty()) {
+ if (!clusterNames.contains(options.externalMsCluster)) {
+ throw new Exception("Cluster ${clusterName} references
non-existent external MS cluster: ${options.externalMsCluster}")
+ }
+ if (options.msNum > 0) {
+ throw new Exception("Cluster ${clusterName} cannot have
its own MS when using external MS cluster")
+ }
+ }
+ }
+
+ List<String> clusterNamesReversed = new
ArrayList<>(clusterConfigs.keySet())
+ Collections.reverse(clusterNamesReversed)
+
+ // Use LinkedHashMap to preserve order
+ Map<String, SuiteCluster> clusters = new LinkedHashMap<>()
+
+ try {
+ // Create and initialize clusters in order
+ for (def entry : clusterConfigs.entrySet()) {
+ String clusterName = entry.key
+ ClusterOptions options = entry.value
+
+ logger.info("Creating cluster: ${clusterName}")
+ SuiteCluster cluster = new SuiteCluster(clusterName,
context.config)
+
+ clusters.put(clusterName, cluster)
+ }
+
+ for (String clusterName : clusterNamesReversed) {
+ clusters.get(clusterName).destroy(true)
+ }
+
+ for (def entry : clusterConfigs.entrySet()) {
+ String clusterName = entry.key
+ ClusterOptions options = entry.value
+ SuiteCluster cluster = clusters.get(clusterName)
+
+ if (manual_init_clusters.contains(clusterName)) {
+ logger.info("Skipping initialization of cluster:
${clusterName}")
+ continue
+ }
+
+ // Determine cloud mode
+ boolean isCloud = false
+ if (options.cloudMode == null) {
+ // If not specified, use config default or run both modes
+ if (context.config.runMode == RunMode.CLOUD) {
+ isCloud = true
+ } else if (context.config.runMode == RunMode.NOT_CLOUD) {
+ isCloud = false
+ } else {
+ throw new Exception("cloudMode must be specified when
runMode is UNKNOWN for multi-cluster setup")
+ }
+ } else {
+ if (options.cloudMode == true && context.config.runMode ==
RunMode.NOT_CLOUD) {
+ logger.info("Skip cluster ${clusterName} because
cloudMode=true but regression test is in local mode")
+ continue
+ }
+ if (options.cloudMode == false && context.config.runMode
== RunMode.CLOUD) {
+ logger.info("Skip cluster ${clusterName} because
cloudMode=false but regression test is in cloud mode")
+ continue
+ }
+ isCloud = options.cloudMode
+ }
+ logger.info("Initializing cluster ${cluster.name} in ${isCloud
? 'cloud' : 'not_cloud'} mode")
+ cluster.init(options, isCloud)
+ logger.info("Cluster ${clusterName} initialized successfully")
+ }
+
+ // Wait for BE to report
+ Thread.sleep(5000)
+
+ Connection originConnection = context.threadLocalConn.get()
+ context.threadLocalConn.remove()
+ context.isMultiDockerClusterRunning = true
+ try {
+ actionSupplier.call(clusters)
+ } finally {
+ context.isMultiDockerClusterRunning = false
+ if (originConnection == null) {
+ context.threadLocalConn.remove()
+ } else {
+ context.threadLocalConn.set(originConnection)
+ }
+ }
+ } finally {
+ // Destroy clusters in reverse order
+ if (!context.config.dockerEndNoKill) {
+ for (String clusterName : clusterNamesReversed) {
+ try {
+ logger.info("Destroying cluster: ${clusterName}")
+
clusters.get(clusterName).destroy(context.config.dockerEndDeleteFiles)
+ } catch (Throwable t) {
+ logger.warn("Failed to destroy cluster
${clusterName}", t)
+ }
+ }
+ }
+ }
+ }
+
String get_ccr_body(String table, String db = null) {
if (db == null) {
db = context.dbName
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index da4617ff5d3..3d9d79bc9bf 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -74,6 +74,9 @@ class ClusterOptions {
// cloud store overrides for cloud docker clusters, each item should be
'name=value'
List<String> cloudStoreConfigs = []
+ // environment variables passed to docker clusters, each item should be
'name=value'
+ List<String> environments = []
+
boolean connectToFollower = false
// 1. cloudMode = true, only create cloud cluster.
@@ -98,6 +101,26 @@ class ClusterOptions {
String tdeAk = "";
String tdeSk = "";
+ // Use external meta service cluster (shared MS/FDB)
+ // Specify the cluster name that provides MS/FDB services
+ // When set, this cluster will not create its own MS/FDB/Recycler
+ // Example: externalMsCluster = "shared-meta" (Cloud mode only)
+ String externalMsCluster = null
+
+ // Specify the instance id.
+ // When not set, "default_instance_id" will be used. (Cloud mode only)
+ String instanceId = null;
+
+ // Cluster snapshot JSON content for FE-1 first startup in cloud mode only.
+ // The JSON will be written to FE conf/cluster_snapshot.json and passed to
start_fe.sh
+ // with --cluster_snapshot parameter. Only effective on first startup.
+ // Example: clusterSnapshot = '{"cloud_unique_id":"1:instance_id:xxx"}'
+ String clusterSnapshot = null;
+
+ // Create cloud instance in storage-vault mode instead of legacy obj_info
mode.
+ // Docker framework will also create a default storage vault automatically
for new clusters.
+ Boolean enableStorageVault = false;
+
void enableDebugPoints() {
feConfigs.add('enable_debug_points=true')
beConfigs.add('enable_debug_points=true')
@@ -356,6 +379,14 @@ class SuiteCluster {
cmd += ['--extra-hosts']
cmd += options.extraHosts
}
+ def envs = new ArrayList<String>(options.environments)
+ if (options.enableStorageVault) {
+ envs.add('ENABLE_STORAGE_VAULT=1')
+ }
+ if (!envs.isEmpty()) {
+ cmd += ['--env']
+ cmd += envs
+ }
if (!options.cloudStoreConfigs.isEmpty()) {
cmd += ['--cloud-config']
cmd += options.cloudStoreConfigs
@@ -391,6 +422,23 @@ class SuiteCluster {
cmd += options.tdeSk
}
+ if (options.externalMsCluster != null && options.externalMsCluster !=
"") {
+ cmd += ['--external-ms', options.externalMsCluster]
+ }
+
+ if (options.instanceId != null && options.instanceId != "") {
+ cmd += ['--instance-id', options.instanceId]
+ }
+
+ if (options.clusterSnapshot != null && options.clusterSnapshot != "") {
+ // Remove newlines and extra whitespace to make it a compact JSON
string
+ def compactJson = options.clusterSnapshot
+ .replaceAll(/\s+/, ' ') // Replace all whitespace sequences
with single space
+ .trim() // Remove leading/trailing spaces
+ // No need to escape when using list-based execution
+ cmd += ['--cluster-snapshot', compactJson]
+ }
+
cmd += ['--wait-timeout', String.valueOf(options.waitTimeout)]
sqlModeNodeMgr = options.sqlModeNodeMgr
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]