This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c4dfe9f289 [improve](streaming-job) support user-specified mysql 
server_id with per-reader assignment (#63490)
7c4dfe9f289 is described below

commit 7c4dfe9f2892284792c5df61994ea82e3e3add97
Author: wudi <[email protected]>
AuthorDate: Thu May 28 15:11:02 2026 +0800

    [improve](streaming-job) support user-specified mysql server_id with 
per-reader assignment (#63490)
    
    ## Summary
    
    - Add an optional `server_id` source property for MySQL CDC streaming
    jobs. Accepts a single value (e.g. `5400`) or a range (e.g.
    `5400-5408`). When unset, the value is derived from the jobId hash so
    existing jobs keep their current server_id when `snapshot_parallelism =
    1`.
    - Fix a latent collision: when `snapshot_parallelism > 1` and
    source-side DML happens during snapshot, all parallel
    `SnapshotSplitReader` instances previously shared the same server_id and
    their backfill BinaryLogClient connections kicked each other out of
    MySQL's dump-thread slot, dropping binlog events between low and high
    watermark. Each subtask now gets a distinct server_id from the resolved
    range; the single binlog reader uses the range start.
    - Cross-field check: reject `server_id` range width smaller than
    `snapshot_parallelism` at job startup with a clear fix-it suggestion.
---
 .../apache/doris/job/cdc/DataSourceConfigKeys.java |   2 +
 .../streaming/DataSourceConfigValidator.java       |  88 +++++++++++++-
 .../CdcStreamTableValuedFunction.java              |   6 +
 .../streaming/DataSourceConfigValidatorTest.java   | 122 +++++++++++++++++++
 .../source/reader/mysql/MySqlSourceReader.java     |  46 ++++---
 .../apache/doris/cdcclient/utils/ConfigUtil.java   |  29 ++++-
 .../doris/cdcclient/utils/ConfigUtilTest.java      |  61 ++++++++--
 .../cdc/test_streaming_mysql_job_server_id.groovy  | 133 +++++++++++++++++++++
 8 files changed, 456 insertions(+), 31 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index 72322da2668..3708e8dc6a3 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -38,6 +38,8 @@ public class DataSourceConfigKeys {
     public static final String SNAPSHOT_SPLIT_KEY = "snapshot_split_key";
     public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
     public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
+    // MySQL CDC client identity. Single value "5400" or range "5400-5408".
+    public static final String SERVER_ID = "server_id";
     public static final String SSL_MODE = "ssl_mode";
     public static final String SSL_ROOTCERT = "ssl_rootcert";
     // PG-style spelling; MySQL normalizes to underscore form.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index 1b633605d71..4ca1e605ef5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -53,7 +53,8 @@ public class DataSourceConfigValidator {
             DataSourceConfigKeys.SSL_MODE,
             DataSourceConfigKeys.SSL_ROOTCERT,
             DataSourceConfigKeys.SLOT_NAME,
-            DataSourceConfigKeys.PUBLICATION_NAME
+            DataSourceConfigKeys.PUBLICATION_NAME,
+            DataSourceConfigKeys.SERVER_ID
     );
 
     private static final Set<String> ALLOW_SSL_MODES = Sets.newHashSet(
@@ -122,6 +123,45 @@ public class DataSourceConfigValidator {
                     "ssl_mode '" + DataSourceConfigKeys.SSL_MODE_VERIFY_CA
                             + "' requires ssl_rootcert to be set");
         }
+
+        validateServerIdConfig(input);
+    }
+
+    // Shared by validateSource and the cdc_stream TVF entrypoint so both 
reject malformed
+    // server_id at SQL-analysis time, not as a cdc_client runtime error.
+    public static void validateServerIdConfig(Map<String, String> input)
+            throws IllegalArgumentException {
+        String serverIdValue = input.get(DataSourceConfigKeys.SERVER_ID);
+        if (serverIdValue == null) {
+            return;
+        }
+        int[] range = parseServerIdRange(serverIdValue);
+        if (range == null) {
+            throw new IllegalArgumentException(
+                    "Invalid value for key '" + DataSourceConfigKeys.SERVER_ID 
+ "': "
+                            + serverIdValue
+                            + ". Expected a single value (e.g. '5400') or 
range (e.g. '5400-5408')"
+                            + " with start >= 1 and start <= end.");
+        }
+        String parallelismValue = input.getOrDefault(
+                DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
+                DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT);
+        Integer parallelism = parsePositiveInt(parallelismValue);
+        if (parallelism == null) {
+            throw new IllegalArgumentException(
+                    "Invalid value for key '" + 
DataSourceConfigKeys.SNAPSHOT_PARALLELISM
+                            + "': " + parallelismValue + ". Expected a 
positive integer.");
+        }
+        int width = range[1] - range[0] + 1;
+        // Range must cover every parallel SnapshotSplitReader; cdc_client 
throws otherwise.
+        if (width < parallelism) {
+            throw new IllegalArgumentException(
+                    "server_id range size " + width
+                            + " must be >= snapshot_parallelism " + parallelism
+                            + ". Widen the range (e.g. '" + range[0] + "-"
+                            + (range[0] + parallelism - 1)
+                            + "') or reduce parallelism.");
+        }
     }
 
     public static void validateTarget(Map<String, String> input) throws 
IllegalArgumentException {
@@ -168,6 +208,9 @@ public class DataSourceConfigValidator {
                 || key.equals(DataSourceConfigKeys.SNAPSHOT_PARALLELISM)) {
             return isPositiveInt(value);
         }
+        if (key.equals(DataSourceConfigKeys.SERVER_ID)) {
+            return parseServerIdRange(value) != null;
+        }
         return true;
     }
 
@@ -195,6 +238,49 @@ public class DataSourceConfigValidator {
         return ALLOW_SSL_MODES.contains(value);
     }
 
+    // Parse "5400" or "5400-5408" into {start, end} inclusive; null on any 
malformed input.
+    // Lower bound is 1 because MySQL server_id=0 disables replication.
+    static int[] parseServerIdRange(String value) {
+        if (value == null) {
+            return null;
+        }
+        String trimmed = value.trim();
+        if (trimmed.isEmpty()) {
+            return null;
+        }
+        try {
+            int start;
+            int end;
+            int dash = trimmed.indexOf('-');
+            if (dash < 0) {
+                start = end = Integer.parseInt(trimmed);
+            } else {
+                String left = trimmed.substring(0, dash).trim();
+                String right = trimmed.substring(dash + 1).trim();
+                if (left.isEmpty() || right.isEmpty()) {
+                    return null;
+                }
+                start = Integer.parseInt(left);
+                end = Integer.parseInt(right);
+            }
+            if (start < 1 || start > end) {
+                return null;
+            }
+            return new int[] {start, end};
+        } catch (NumberFormatException e) {
+            return null;
+        }
+    }
+
+    private static Integer parsePositiveInt(String value) {
+        try {
+            int n = Integer.parseInt(value.trim());
+            return n >= 1 ? n : null;
+        } catch (NumberFormatException e) {
+            return null;
+        }
+    }
+
     /**
      * Check if the offset value is valid for the given data source type.
      * Supported: initial, snapshot, latest, JSON binlog/lsn position.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
index 885a3ec5a33..aa60924601c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
@@ -156,6 +156,12 @@ public class CdcStreamTableValuedFunction extends 
ExternalFileTableValuedFunctio
         }
         validatePositiveIntIfPresent(properties, 
DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE);
         validatePositiveIntIfPresent(properties, 
DataSourceConfigKeys.SNAPSHOT_PARALLELISM);
+        // TVF entrypoint shares server_id checks with the from-to path's 
validateSource.
+        try {
+            DataSourceConfigValidator.validateServerIdConfig(properties);
+        } catch (IllegalArgumentException e) {
+            throw new AnalysisException(e.getMessage());
+        }
     }
 
     private static void validatePositiveIntIfPresent(Map<String, String> 
properties, String key)
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
index 769517d9f65..84cc3bdf5e0 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java
@@ -276,4 +276,126 @@ public class DataSourceConfigValidatorTest {
             assertReject(props);
         }
     }
+
+    // ─── server_id 
────────────────────────────────────────────────────────────
+
+    private static Map<String, String> serverIdInput(String value) {
+        Map<String, String> input = new HashMap<>();
+        input.put(DataSourceConfigKeys.SERVER_ID, value);
+        return input;
+    }
+
+    @Test
+    public void testServerIdAcceptsSingleValue() {
+        DataSourceConfigValidator.validateSource(serverIdInput("5400"), 
DataSourceType.MYSQL.name());
+    }
+
+    @Test
+    public void testServerIdAcceptsRange() {
+        DataSourceConfigValidator.validateSource(
+                serverIdInput("5400-5408"), DataSourceType.MYSQL.name());
+    }
+
+    @Test
+    public void testServerIdAcceptsRangeWithSpaces() {
+        DataSourceConfigValidator.validateSource(
+                serverIdInput("  5400 - 5408 "), DataSourceType.MYSQL.name());
+    }
+
+    @Test
+    public void testServerIdRejectsMalformed() {
+        // Each entry trips a different code path in parseServerIdRange.
+        String[] invalids = {
+                "abc",       // not numeric
+                "5400-",     // missing end
+                "-5408",     // missing start
+                "5400--",    // empty after second dash split
+                "5400-abc",  // non-numeric end
+                " ",         // blank
+                ""           // empty
+        };
+        for (String invalid : invalids) {
+            try {
+                DataSourceConfigValidator.validateSource(
+                        serverIdInput(invalid), DataSourceType.MYSQL.name());
+                Assert.fail("Expected IllegalArgumentException for 
server_id='" + invalid + "'");
+            } catch (IllegalArgumentException expected) {
+                Assert.assertTrue(
+                        "Error message should reference server_id, got: " + 
expected.getMessage(),
+                        expected.getMessage().contains("server_id"));
+            }
+        }
+    }
+
+    @Test
+    public void testServerIdRejectsZero() {
+        try {
+            DataSourceConfigValidator.validateSource(serverIdInput("0"), 
DataSourceType.MYSQL.name());
+            Assert.fail("Expected IllegalArgumentException for server_id='0'");
+        } catch (IllegalArgumentException expected) {
+            Assert.assertTrue(expected.getMessage().contains("server_id"));
+        }
+    }
+
+    @Test
+    public void testServerIdRejectsBackwardRange() {
+        try {
+            DataSourceConfigValidator.validateSource(
+                    serverIdInput("5408-5400"), DataSourceType.MYSQL.name());
+            Assert.fail("Expected IllegalArgumentException for 
server_id='5408-5400'");
+        } catch (IllegalArgumentException expected) {
+            Assert.assertTrue(expected.getMessage().contains("server_id"));
+        }
+    }
+
+    @Test
+    public void testServerIdRejectsNegative() {
+        try {
+            DataSourceConfigValidator.validateSource(serverIdInput("-5"), 
DataSourceType.MYSQL.name());
+            Assert.fail("Expected IllegalArgumentException for 
server_id='-5'");
+        } catch (IllegalArgumentException expected) {
+            Assert.assertTrue(expected.getMessage().contains("server_id"));
+        }
+    }
+
+    @Test
+    public void testServerIdCrossFieldWidthRejected() {
+        Map<String, String> props = new HashMap<>();
+        props.put(DataSourceConfigKeys.SERVER_ID, "5400-5402");
+        props.put(DataSourceConfigKeys.SNAPSHOT_PARALLELISM, "8");
+        try {
+            DataSourceConfigValidator.validateSource(props, 
DataSourceType.MYSQL.name());
+            Assert.fail("Expected IllegalArgumentException for range size 3 < 
parallelism 8");
+        } catch (IllegalArgumentException expected) {
+            String msg = expected.getMessage();
+            Assert.assertTrue("Message should reference snapshot_parallelism: 
" + msg,
+                    msg.contains("snapshot_parallelism"));
+            Assert.assertTrue("Message should reference server_id: " + msg,
+                    msg.contains("server_id"));
+        }
+    }
+
+    @Test
+    public void testServerIdCrossFieldWidthSinglePassesWhenParallelismOne() {
+        Map<String, String> props = new HashMap<>();
+        props.put(DataSourceConfigKeys.SERVER_ID, "5400");
+        props.put(DataSourceConfigKeys.SNAPSHOT_PARALLELISM, "1");
+        DataSourceConfigValidator.validateSource(props, 
DataSourceType.MYSQL.name());
+    }
+
+    @Test
+    public void testServerIdCrossFieldUsesDefaultParallelism() {
+        // No snapshot_parallelism set -> default ("1"). Single-value 
server_id passes.
+        Map<String, String> props = new HashMap<>();
+        props.put(DataSourceConfigKeys.SERVER_ID, "5400");
+        DataSourceConfigValidator.validateSource(props, 
DataSourceType.MYSQL.name());
+    }
+
+    @Test
+    public void testServerIdOptional() {
+        // server_id is optional; absence must not trip cross-field check.
+        Map<String, String> props = new HashMap<>();
+        props.put(DataSourceConfigKeys.SNAPSHOT_PARALLELISM, "4");
+        DataSourceConfigValidator.validateSource(props, 
DataSourceType.MYSQL.name());
+    }
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 99ac1e0355b..e5115d1c51a 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -49,6 +49,7 @@ import 
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter
 import 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState;
 import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
+import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
 import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
 import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetKind;
 import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils;
@@ -143,7 +144,6 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
     public void initialize(String jobId, DataSource dataSource, Map<String, 
String> config) {
         this.serializer.init(config);
 
-        // Initialize thread pool for parallel polling
         int parallelism =
                 Integer.parseInt(
                         config.getOrDefault(
@@ -815,39 +815,49 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
     }
 
     private SnapshotSplitReader getSnapshotSplitReader(JobBaseConfig config, 
int subtaskId) {
-        MySqlSourceConfig sourceConfig = getSourceConfig(config);
+        MySqlSourceConfig sourceConfig = getSourceConfig(config, subtaskId);
+        LOG.info(
+                "MySQL CDC snapshot reader[{}] for job {} using server_id={}",
+                subtaskId,
+                config.getJobId(),
+                sourceConfig.getServerIdRange().getServerId(subtaskId));
         final MySqlConnection jdbcConnection = 
DebeziumUtils.createMySqlConnection(sourceConfig);
         final BinaryLogClient binaryLogClient =
                 
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
         final StatefulTaskContext statefulTaskContext =
                 new StatefulTaskContext(sourceConfig, binaryLogClient, 
jdbcConnection);
-        SnapshotSplitReader snapshotReader =
-                new SnapshotSplitReader(statefulTaskContext, subtaskId);
-        return snapshotReader;
+        return new SnapshotSplitReader(statefulTaskContext, subtaskId);
     }
 
     private BinlogSplitReader getBinlogSplitReader(JobBaseConfig config) {
-        MySqlSourceConfig sourceConfig = getSourceConfig(config);
+        MySqlSourceConfig sourceConfig = getSourceConfig(config, 0);
+        LOG.info(
+                "MySQL CDC binlog reader for job {} using server_id={}",
+                config.getJobId(),
+                sourceConfig.getServerIdRange().getStartServerId());
         final MySqlConnection jdbcConnection = 
DebeziumUtils.createMySqlConnection(sourceConfig);
         final BinaryLogClient binaryLogClient =
                 
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
         final StatefulTaskContext statefulTaskContext =
                 new StatefulTaskContext(sourceConfig, binaryLogClient, 
jdbcConnection);
-        BinlogSplitReader binlogReader = new 
BinlogSplitReader(statefulTaskContext, 0);
-        return binlogReader;
+        return new BinlogSplitReader(statefulTaskContext, 0);
     }
 
     private MySqlSourceConfig getSourceConfig(JobBaseConfig config) {
         return generateMySqlConfig(config);
     }
 
-    /** Generate MySQL source config from JobBaseConfig */
+    private MySqlSourceConfig getSourceConfig(JobBaseConfig config, int 
subtaskId) {
+        return generateMySqlConfig(config.getConfig(), config.getJobId(), 
subtaskId);
+    }
+
     private MySqlSourceConfig generateMySqlConfig(JobBaseConfig config) {
-        return generateMySqlConfig(config.getConfig(), 
ConfigUtil.getServerId(config.getJobId()));
+        return generateMySqlConfig(config.getConfig(), config.getJobId(), 0);
     }
 
-    /** Generate MySQL source config from Map config */
-    private MySqlSourceConfig generateMySqlConfig(Map<String, String> 
cdcConfig, String serverId) {
+    // Per-subtask config so each reader binds a distinct server_id from the 
resolved range.
+    private MySqlSourceConfig generateMySqlConfig(
+            Map<String, String> cdcConfig, String jobId, int subtaskId) {
         MySqlSourceConfigFactory configFactory = new 
MySqlSourceConfigFactory();
         ConnectionUrl cu =
                 ConnectionUrl.getConnectionUrlInstance(
@@ -858,7 +868,15 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
         configFactory.password(cdcConfig.get(DataSourceConfigKeys.PASSWORD));
         String databaseName = cdcConfig.get(DataSourceConfigKeys.DATABASE);
         configFactory.databaseList(databaseName);
-        configFactory.serverId(serverId);
+        int parallelism =
+                Integer.parseInt(
+                        cdcConfig.getOrDefault(
+                                DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
+                                
DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
+        ServerIdRange serverIdRange =
+                ConfigUtil.resolveServerIdRange(
+                        jobId, parallelism, 
cdcConfig.get(DataSourceConfigKeys.SERVER_ID));
+        configFactory.serverId(serverIdRange.toString());
         configFactory.serverTimeZone(
                 
ConfigUtil.getTimeZoneFromProps(cu.getOriginalProperties()).toString());
 
@@ -974,7 +992,7 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
                     objectPath, 
cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY));
         }
 
-        return configFactory.createConfig(0);
+        return configFactory.createConfig(subtaskId);
     }
 
     private BinlogOffset initializeEffectiveOffset(
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
index 5aa46753a26..a999f532ea9 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
@@ -20,6 +20,7 @@ package org.apache.doris.cdcclient.utils;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
 
 import java.time.ZoneId;
 import java.util.Arrays;
@@ -43,11 +44,29 @@ public class ConfigUtil {
     private static ObjectMapper objectMapper = new ObjectMapper();
     private static final Logger LOG = 
LoggerFactory.getLogger(ConfigUtil.class);
 
-    public static String getServerId(String jobId) {
-        // Use bitwise AND with Integer.MAX_VALUE to strip the sign bit,
-        // which avoids the edge case where Math.abs(Integer.MIN_VALUE) 
returns MIN_VALUE
-        // (negative).
-        return String.valueOf(jobId.hashCode() & Integer.MAX_VALUE);
+    // Resolve user-configured range, or derive from jobId hash with width = 
parallelism.
+    // Value validation lives in FE DataSourceConfigValidator; here we trust 
the input.
+    public static ServerIdRange resolveServerIdRange(
+            String jobId, int snapshotParallelism, String userInput) {
+        ServerIdRange userRange = userInput == null ? null : 
ServerIdRange.from(userInput.trim());
+        if (userRange != null) {
+            if (userRange.getNumberOfServerIds() < snapshotParallelism) {
+                throw new IllegalArgumentException(
+                        "server_id range size "
+                                + userRange.getNumberOfServerIds()
+                                + " must be >= snapshot_parallelism "
+                                + snapshotParallelism);
+            }
+            return userRange;
+        }
+        int hash = jobId.hashCode() & Integer.MAX_VALUE;
+        int safeMax = Integer.MAX_VALUE - snapshotParallelism + 1;
+        // Use `>` (not `>=`) so parallelism=1 preserves hash==MAX_VALUE for 
back-compat.
+        int base = hash > safeMax ? hash % safeMax : hash;
+        if (base == 0) {
+            base = 1;
+        }
+        return new ServerIdRange(base, base + snapshotParallelism - 1);
     }
 
     public static ZoneId getServerTimeZoneFromJdbcUrl(String jdbcUrl) {
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
index 66d2a76d7c2..9fd6a61cdce 100644
--- 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.cdcclient.utils;
 
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
 
+import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
@@ -32,22 +33,60 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /** Unit tests for {@link ConfigUtil}. */
 class ConfigUtilTest {
 
-    // ─── getServerId 
──────────────────────────────────────────────────────────
+    // ─── resolveServerIdRange 
─────────────────────────────────────────────────
+
+    // Value/format validation lives in FE DataSourceConfigValidator; here we 
only verify the
+    // derivation algorithm and that valid input is passed through to 
flink-cdc's ServerIdRange.
+
+    @Test
+    void resolveDefaultDeriveSingle() {
+        ServerIdRange range = ConfigUtil.resolveServerIdRange("12345", 1, 
null);
+        assertEquals(1, range.getNumberOfServerIds());
+        assertTrue(range.getStartServerId() >= 1);
+    }
+
+    @Test
+    void resolveDefaultDeriveExpandsToParallelism() {
+        ServerIdRange range = ConfigUtil.resolveServerIdRange("12345", 4, 
null);
+        assertEquals(4, range.getNumberOfServerIds());
+        assertEquals(range.getStartServerId() + 3, range.getEndServerId());
+    }
+
+    @Test
+    void resolveDefaultDeriveHandlesMinHashCode() {
+        // "polygenelubricants" hashCode() == Integer.MIN_VALUE; & MAX_VALUE 
strips sign bit.
+        ServerIdRange range =
+                ConfigUtil.resolveServerIdRange("polygenelubricants", 4, null);
+        assertTrue(range.getStartServerId() >= 1);
+        assertTrue(range.getEndServerId() <= Integer.MAX_VALUE);
+    }
+
+    @Test
+    void resolveDefaultDeriveBumpsZeroHashToOne() {
+        // Empty string hashCode() == 0; bump to 1 because MySQL server_id=0 
disables replication.
+        ServerIdRange range = ConfigUtil.resolveServerIdRange("", 1, null);
+        assertEquals(1, range.getStartServerId());
+    }
+
+    @Test
+    void resolveUserSingleValue() {
+        ServerIdRange range = ConfigUtil.resolveServerIdRange("anyjob", 1, 
"5400");
+        assertEquals(5400L, range.getStartServerId());
+        assertEquals(5400L, range.getEndServerId());
+    }
 
     @Test
-    void serverIdIsNonNegative() {
-        // Any jobId hash should produce a non-negative result (bitwise AND 
strips sign bit).
-        String result = ConfigUtil.getServerId("12345");
-        assertTrue(Long.parseLong(result) >= 0, "serverId must be 
non-negative");
+    void resolveUserRange() {
+        ServerIdRange range = ConfigUtil.resolveServerIdRange("anyjob", 4, 
"5400-5408");
+        assertEquals(5400L, range.getStartServerId());
+        assertEquals(5408L, range.getEndServerId());
     }
 
     @Test
-    void serverIdHandlesMinHashCode() {
-        // Find a string whose hashCode() == Integer.MIN_VALUE to exercise the 
edge case
-        // where Math.abs(Integer.MIN_VALUE) would return a negative number.
-        // "polygenelubricants" is a well-known such string.
-        String result = ConfigUtil.getServerId("polygenelubricants");
-        assertTrue(Long.parseLong(result) >= 0, "serverId must be non-negative 
for MIN_VALUE hash");
+    void resolveRejectsWidthLessThanParallelism() {
+        IllegalArgumentException ex = 
assertThrows(IllegalArgumentException.class,
+                () -> ConfigUtil.resolveServerIdRange("anyjob", 4, "5400"));
+        assertTrue(ex.getMessage().contains("snapshot_parallelism"));
     }
 
     // ─── getTableList 
─────────────────────────────────────────────────────────
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_server_id.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_server_id.groovy
new file mode 100644
index 00000000000..d6fcbd21978
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_server_id.groovy
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_server_id", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        return
+    }
+
+    def mysql_port = context.config.otherConfigs.get("mysql_57_port")
+    def externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    def s3_endpoint = getS3Endpoint()
+    def bucket = getS3BucketName()
+    def driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+    def currentDb = (sql "select database()")[0][0]
+    def mysqlDb = "test_server_id_db"
+    def srcTable = "user_server_id"
+
+    // Prepare source schema + initial data once
+    connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+        sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+        sql """DROP TABLE IF EXISTS ${mysqlDb}.${srcTable}"""
+        sql """CREATE TABLE ${mysqlDb}.${srcTable} (
+              `name` varchar(200) NOT NULL,
+              `age` int DEFAULT NULL,
+              PRIMARY KEY (`name`)
+            ) ENGINE=InnoDB"""
+        sql """INSERT INTO ${mysqlDb}.${srcTable} VALUES ('A', 1), ('B', 2)"""
+    }
+
+    def buildCreateJob = { String jobName, String extraProps ->
+        """CREATE JOB ${jobName}
+            ON STREAMING
+            FROM MYSQL (
+                "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                "driver_url" = "${driver_url}",
+                "driver_class" = "com.mysql.cj.jdbc.Driver",
+                "user" = "root",
+                "password" = "123456",
+                "database" = "${mysqlDb}",
+                "include_tables" = "${srcTable}",
+                ${extraProps}
+            )
+            TO DATABASE ${currentDb} (
+              "table.create.properties.replication_num" = "1"
+            )
+        """
+    }
+
+    def assertCreateFails = { String jobName, String extraProps, String 
expectFragment ->
+        sql "DROP JOB IF EXISTS where jobname = '${jobName}'"
+        Exception thrown = null
+        try {
+            sql buildCreateJob(jobName, extraProps)
+        } catch (Exception ex) {
+            thrown = ex
+        }
+        assert thrown != null, "CREATE JOB ${jobName} should have failed"
+        def msg = String.valueOf(thrown.message).toLowerCase()
+        assert msg.contains(expectFragment.toLowerCase()),
+                "${jobName}: expected error containing '${expectFragment}', 
got: ${thrown.message}"
+    }
+
+    def runHappyPath = { String jobName, String extraProps, Closure 
binlogCheck = null ->
+        sql "DROP JOB IF EXISTS where jobname = '${jobName}'"
+        sql "DROP TABLE IF EXISTS ${currentDb}.${srcTable} FORCE"
+        try {
+            sql buildCreateJob(jobName, extraProps + ', "offset" = "initial"')
+            Awaitility.await().atMost(180, SECONDS).pollInterval(2, 
SECONDS).until({
+                def cnt = sql """
+                    select SucceedTaskCount from jobs("type"="insert")
+                    where Name = '${jobName}' and ExecuteType='STREAMING'
+                """
+                cnt.size() == 1 && 
Integer.parseInt(cnt.get(0).get(0).toString()) >= 1
+            })
+            def rows = sql """SELECT name FROM ${currentDb}.${srcTable} ORDER 
BY name"""
+            assert rows.size() == 2, "${jobName}: expected 2 rows, got 
${rows.size()}"
+            if (binlogCheck != null) {
+                binlogCheck()
+            }
+        } finally {
+            sql "DROP JOB IF EXISTS where jobname = '${jobName}'"
+        }
+    }
+
+    // ─── Section 1: FE validator rejects bad server_id at CREATE 
─────────────
+    assertCreateFails("test_serverid_reject_format",
+            '"offset" = "initial", "server_id" = "abc"', "server_id")
+    assertCreateFails("test_serverid_reject_zero",
+            '"offset" = "initial", "server_id" = "0"', "server_id")
+    assertCreateFails("test_serverid_reject_backward",
+            '"offset" = "initial", "server_id" = "5408-5400"', "server_id")
+    assertCreateFails("test_serverid_reject_width",
+            '"offset" = "initial", "server_id" = "99500", 
"snapshot_parallelism" = "2"',
+            "snapshot_parallelism")
+
+    // ─── Section 2: happy path — job runs, data syncs 
────────────────────────
+    // single value + binlog increment covers BinlogSplitReader binding 
startServerId.
+    runHappyPath("test_serverid_single", '"server_id" = "99001"') {
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql "INSERT INTO ${mysqlDb}.${srcTable} VALUES ('C', 3)"
+            sql "UPDATE ${mysqlDb}.${srcTable} SET age = 99 WHERE name = 'A'"
+            sql "DELETE FROM ${mysqlDb}.${srcTable} WHERE name = 'B'"
+        }
+        Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+            def after = sql "SELECT name, age FROM ${currentDb}.${srcTable} 
ORDER BY name"
+            after.size() == 2 && after[0][0] == 'A' && after[0][1] == 99 && 
after[1][0] == 'C'
+        })
+    }
+    runHappyPath("test_serverid_range",
+            '"server_id" = "99100-99103", "snapshot_parallelism" = "4", 
"snapshot_split_size" = "1"')
+    runHappyPath("test_serverid_default", '"snapshot_parallelism" = "2"')
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to