This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4337c56e98c [fix](streamingjob) Cap debezium ChangeEventQueue with a 
heap-adaptive byte limit to avoid OOM (#64511)
4337c56e98c is described below

commit 4337c56e98c18702162995f940ed00538482a84c
Author: wudi <[email protected]>
AuthorDate: Mon Jun 22 10:17:56 2026 +0800

    [fix](streamingjob) Cap debezium ChangeEventQueue with a heap-adaptive byte 
limit to avoid OOM (#64511)
    
    ### What problem does this PR solve?
    
    The cdc_client builds debezium's `ChangeEventQueue` with only a
    count-based bound (`max.queue.size=8192`) while the byte bound
    (`max.queue.size.in.bytes`) defaults to `0` (disabled). With wide rows
    (e.g. ~2MB each), the in-memory queue can grow to `2MB * 8192 ≈ 16GB`
    and OOM the process. Both PostgreSQL and MySQL paths build the queue
    from `getMaxQueueSizeInBytes()`, so a single property covers both, and
    it applies to both the snapshot and streaming phases.
    
    ### What this PR does
    
    Set a heap-adaptive byte cap on the queue buffer in
    `ConfigUtil.getDefaultDebeziumProps()`, which is shared by the Postgres
    and MySQL source readers:
    
    - Default cap is `clamp(heap/16, 64MB, 256MB)`: heap 1G -> 64MB, 2G ->
    128MB, >= 4G -> 256MB.
    - The cap is intentionally conservative because a single cdc_client JVM
    can run many queues concurrently (one per split, across multiple jobs),
    and the real batching/backpressure happens downstream in the sink rather
    than in this queue.
    - Escape hatch: `-Dcdc.max.queue.size.in.bytes=<bytes>` overrides the
    adaptive value (absolute bytes; `<= 0` disables the byte bound).
    
    Narrow tables are unaffected: 8192 rows stay well under 64MB, so the
    count bound is reached first and behavior is unchanged.
---
 be/src/runtime/cdc_client_mgr.cpp                  |  5 ++
 fs_brokers/cdc_client/pom.xml                      | 14 +++++
 .../apache/doris/cdcclient/utils/ConfigUtil.java   | 27 ++++++++++
 .../doris/cdcclient/utils/ConfigUtilTest.java      | 59 ++++++++++++++++++++++
 4 files changed, 105 insertions(+)

diff --git a/be/src/runtime/cdc_client_mgr.cpp 
b/be/src/runtime/cdc_client_mgr.cpp
index b37cadc980c..b60c2c60bb1 100644
--- a/be/src/runtime/cdc_client_mgr.cpp
+++ b/be/src/runtime/cdc_client_mgr.cpp
@@ -214,6 +214,11 @@ Status 
CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
     argv_storage.emplace_back(java_opts);
     // OOM safety net (last-wins, user opts cannot disable).
     argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError");
+    // JDK17 opens for debezium ObjectSizeCalculator reflection.
+    argv_storage.emplace_back("--add-opens=java.base/java.lang=ALL-UNNAMED");
+    argv_storage.emplace_back("--add-opens=java.base/java.util=ALL-UNNAMED");
+    argv_storage.emplace_back("--add-opens=java.base/java.math=ALL-UNNAMED");
+    argv_storage.emplace_back("--add-opens=java.base/java.nio=ALL-UNNAMED");
     argv_storage.emplace_back("-jar");
     argv_storage.emplace_back(cdc_jar_path);
     argv_storage.emplace_back(cdc_jar_port);
diff --git a/fs_brokers/cdc_client/pom.xml b/fs_brokers/cdc_client/pom.xml
index a88bddad683..0a0c7355318 100644
--- a/fs_brokers/cdc_client/pom.xml
+++ b/fs_brokers/cdc_client/pom.xml
@@ -76,6 +76,11 @@ under the License.
         <assertj.version>3.27.7</assertj.version>
         <awaitility.version>4.2.1</awaitility.version>
         <maven-failsafe-plugin.version>3.2.5</maven-failsafe-plugin.version>
+        <maven-surefire-plugin.version>3.2.5</maven-surefire-plugin.version>
+        <!-- JDK17 opens for debezium ObjectSizeCalculator reflection 
(byte-sized queue).
+             Mirrors the set CdcClientMgr passes when BE forks cdc-client.jar; 
needed because
+             surefire/failsafe run the reader path directly without going 
through that fork. -->
+        <test.add.opens>--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.math=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED</test.add.opens>
     </properties>
 
     <dependencies>
@@ -258,6 +263,14 @@ under the License.
                     <target>17</target>
                 </configuration>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>${maven-surefire-plugin.version}</version>
+                <configuration>
+                    <argLine>${test.add.opens}</argLine>
+                </configuration>
+            </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-failsafe-plugin</artifactId>
@@ -268,6 +281,7 @@ under the License.
                     </includes>
                     <forkCount>1</forkCount>
                     <reuseForks>true</reuseForks>
+                    <argLine>${test.add.opens}</argLine>
                 </configuration>
                 <executions>
                     <execution>
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
index a999f532ea9..cc2fa9da68b 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
@@ -37,6 +37,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.mysql.cj.conf.ConnectionUrl;
+import io.debezium.config.CommonConnectorConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,9 +124,35 @@ public class ConfigUtil {
         return ZoneId.systemDefault();
     }
 
+    public static final String MAX_QUEUE_BYTES_SYS_PROP = 
"cdc.max.queue.size.in.bytes";
+
+    // Heap-adaptive byte cap for the debezium ChangeEventQueue buffer.
+    // heap 1G->64MB, 2G->128MB, >=4G->256MB. -D<MAX_QUEUE_BYTES_SYS_PROP> 
overrides
+    // (<=0 disables); a malformed override is logged and ignored, falling 
back to the cap.
+    private static long resolveMaxQueueSizeInBytes() {
+        String override = System.getProperty(MAX_QUEUE_BYTES_SYS_PROP);
+        if (override != null) {
+            try {
+                long bytes = Long.parseLong(override.trim());
+                return bytes <= 0 ? 0 : bytes;
+            } catch (NumberFormatException e) {
+                LOG.warn(
+                        "Ignoring invalid -D{}={}, expected an integer byte 
count; "
+                                + "falling back to the adaptive cap",
+                        MAX_QUEUE_BYTES_SYS_PROP,
+                        override);
+            }
+        }
+        long target = Runtime.getRuntime().maxMemory() / 16;
+        return Math.max(64L * 1024 * 1024, Math.min(target, 256L * 1024 * 
1024));
+    }
+
     /** Optimized debezium parameters */
     public static Properties getDefaultDebeziumProps() {
         Properties properties = new Properties();
+        properties.put(
+                CommonConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES.name(),
+                String.valueOf(resolveMaxQueueSizeInBytes()));
         return properties;
     }
 
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
index 980f83bdc8d..cf70b6b075c 100644
--- 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
@@ -19,11 +19,13 @@ package org.apache.doris.cdcclient.utils;
 
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
 
+import io.debezium.config.CommonConnectorConfig;
 import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
 
 import java.time.ZoneId;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -128,6 +130,63 @@ class ConfigUtilTest {
         assertEquals(0, result.length);
     }
 
+    // ─── getDefaultDebeziumProps: queue byte cap 
──────────────────────────────
+
+    private static long queueBytes(Properties props) {
+        return Long.parseLong(
+                
props.getProperty(CommonConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES.name()));
+    }
+
+    @Test
+    void defaultQueueBytesWithinClamp() {
+        long bytes = queueBytes(ConfigUtil.getDefaultDebeziumProps());
+        assertTrue(bytes >= 64L * 1024 * 1024 && bytes <= 256L * 1024 * 1024,
+                "expected clamp to [64MB, 256MB] but got " + bytes);
+    }
+
+    @Test
+    void sysPropOverridesAdaptiveValue() {
+        String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
+        try {
+            System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "1048576");
+            assertEquals(1048576L, 
queueBytes(ConfigUtil.getDefaultDebeziumProps()));
+        } finally {
+            restore(prev);
+        }
+    }
+
+    @Test
+    void negativeSysPropDisablesByteBound() {
+        String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
+        try {
+            System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "-1");
+            assertEquals(0L, queueBytes(ConfigUtil.getDefaultDebeziumProps()));
+        } finally {
+            restore(prev);
+        }
+    }
+
+    @Test
+    void malformedSysPropFallsBackToClamp() {
+        String prev = System.getProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
+        try {
+            System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, "32MB");
+            long bytes = queueBytes(ConfigUtil.getDefaultDebeziumProps());
+            assertTrue(bytes >= 64L * 1024 * 1024 && bytes <= 256L * 1024 * 
1024,
+                    "malformed override should fall back to [64MB, 256MB] but 
got " + bytes);
+        } finally {
+            restore(prev);
+        }
+    }
+
+    private static void restore(String prev) {
+        if (prev == null) {
+            System.clearProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP);
+        } else {
+            System.setProperty(ConfigUtil.MAX_QUEUE_BYTES_SYS_PROP, prev);
+        }
+    }
+
     // ─── server timezone parsing 
──────────────────────────────────────────────
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to