This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a159972fb12 [fix](streaming-job) cdc client PostgreSQL snapshot honors 
scan.snapshot.fetch.size to avoid wide-table OOM (#64938)
a159972fb12 is described below

commit a159972fb12923fcb42fc41ffe66cf7255d7fa0e
Author: wudi <[email protected]>
AuthorDate: Thu Jul 2 10:40:35 2026 +0800

    [fix](streaming-job) cdc client PostgreSQL snapshot honors 
scan.snapshot.fetch.size to avoid wide-table OOM (#64938)
    
    ## Proposed changes
    
    This PR fixes three PostgreSQL streaming job issues:
    
    - Honor `scan.snapshot.fetch.size` during snapshot reads to avoid
    loading a
      large snapshot chunk into memory at once.
    - Drain the current CDC batch before stopping at a heartbeat. Records
    after the
    heartbeat have already been dequeued and could otherwise be lost when
    the
      reader is reused.
    - Use a dedicated non-pooled connection for publication and
    replication-slot
    cleanup, so background cleanup still works when the reader pool is
    exhausted.
---
 .../cdcclient/service/PipelineCoordinator.java     |  8 +--
 .../reader/postgres/PostgresSourceReader.java      | 26 ++++++----
 .../source/fetch/PostgresScanFetchTask.java        | 10 ++--
 .../reader/postgres/PostgresSourceReaderTest.java  | 57 ++++++++++++++++++++++
 4 files changed, 84 insertions(+), 17 deletions(-)

diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 56d64546c69..4a88dfcc02b 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -589,13 +589,13 @@ public class PipelineCoordinator {
                                         && maxIntervalMillis > 0
                                         && elapsedTime >= maxIntervalMillis;
 
-                        if (!isSnapshotSplit && timeoutReached) {
+                        if (!isSnapshotSplit && timeoutReached && !shouldStop) 
{
                             LOG.info(
-                                    "Binlog split max interval reached and 
heartbeat received, stopping data reading");
+                                    "Binlog split max interval reached; 
draining current batch before stopping");
                             shouldStop = true;
-                            break;
                         }
-                        // Skip heartbeat messages during normal processing
+                        // Drain the rest of this batch instead of breaking: 
records after the
+                        // heartbeat are already dequeued and the reused 
reader won't re-read them.
                         continue;
                     }
 
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 20b5694fd76..2574b53a196 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -78,6 +78,7 @@ import io.debezium.connector.postgresql.connection.Lsn;
 import io.debezium.connector.postgresql.connection.PostgresConnection;
 import 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
 import io.debezium.connector.postgresql.spi.SlotState;
+import io.debezium.jdbc.JdbcConfiguration;
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
 import io.debezium.relational.TableId;
@@ -690,11 +691,12 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
                     jobId);
             return true;
         }
-        PostgresDialect dialect = new 
PostgresDialect(getSourceConfig(jobConfig));
+        JdbcConfiguration jdbcConfig =
+                
getSourceConfig(jobConfig).getDbzConnectorConfig().getJdbcConfig();
         boolean cleaned = true;
         if (dropPub) {
             LOG.info("Dropping auto-created publication {} for job {}", 
pubName, jobId);
-            try (PostgresConnection connection = dialect.openJdbcConnection()) 
{
+            try (PostgresConnection connection = 
createCleanupConnection(jdbcConfig)) {
                 connection.execute("DROP PUBLICATION IF EXISTS " + pubName);
             } catch (Exception ex) {
                 LOG.warn(
@@ -703,7 +705,7 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
                         jobId,
                         ex.getMessage());
             }
-            if (publicationExists(dialect, pubName)) {
+            if (publicationExists(jdbcConfig, pubName)) {
                 LOG.warn(
                         "Publication {} for job {} still present after drop, 
will retry",
                         pubName,
@@ -713,8 +715,8 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         }
         if (dropSlot) {
             LOG.info("Dropping auto-created replication slot {} for job {}", 
slotName, jobId);
-            try {
-                dialect.removeSlot(slotName);
+            try (PostgresConnection connection = 
createCleanupConnection(jdbcConfig)) {
+                connection.dropReplicationSlot(slotName);
             } catch (Exception ex) {
                 LOG.warn(
                         "Drop of replication slot {} for job {} failed: {}",
@@ -722,7 +724,7 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
                         jobId,
                         ex.getMessage());
             }
-            if (slotExists(dialect, slotName)) {
+            if (slotExists(jdbcConfig, slotName)) {
                 LOG.warn(
                         "Replication slot {} for job {} still present after 
drop, will retry",
                         slotName,
@@ -733,8 +735,12 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         return cleaned;
     }
 
-    private boolean slotExists(PostgresDialect dialect, String slotName) {
-        try (PostgresConnection connection = dialect.openJdbcConnection()) {
+    static PostgresConnection createCleanupConnection(JdbcConfiguration 
jdbcConfig) {
+        return new PostgresConnection(jdbcConfig, 
PostgresConnection.CONNECTION_GENERAL);
+    }
+
+    private boolean slotExists(JdbcConfiguration jdbcConfig, String slotName) {
+        try (PostgresConnection connection = 
createCleanupConnection(jdbcConfig)) {
             return connection.queryAndMap(
                     "SELECT 1 FROM pg_replication_slots WHERE slot_name = '" + 
slotName + "'",
                     rs -> rs.next());
@@ -747,8 +753,8 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         }
     }
 
-    private boolean publicationExists(PostgresDialect dialect, String pubName) 
{
-        try (PostgresConnection connection = dialect.openJdbcConnection()) {
+    private boolean publicationExists(JdbcConfiguration jdbcConfig, String 
pubName) {
+        try (PostgresConnection connection = 
createCleanupConnection(jdbcConfig)) {
             return connection.queryAndMap(
                     "SELECT 1 FROM pg_publication WHERE pubname = '" + pubName 
+ "'",
                     rs -> rs.next());
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
index 6c26cf4e74a..0462cb9d1ed 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
@@ -65,6 +65,8 @@ import static 
io.debezium.connector.postgresql.Utils.refreshSchema;
  * Copied from Flink Cdc 3.6.0
  *
  * <p>Line 333~336: modified createDataEventsForTable to fix FLINK-39748.
+ *
+ * <p>Line 326: use sourceConfig.getFetchSize() for the snapshot fetch size to 
fix FLINK-40007.
  */
 public class PostgresScanFetchTask extends AbstractScanFetchTask {
 
@@ -103,6 +105,7 @@ public class PostgresScanFetchTask extends 
AbstractScanFetchTask {
 
         PostgresSnapshotSplitReadTask snapshotSplitReadTask =
                 new PostgresSnapshotSplitReadTask(
+                        (PostgresSourceConfig) ctx.getSourceConfig(),
                         ctx.getConnection(),
                         ctx.getDbzConnectorConfig(),
                         ctx.getDatabaseSchema(),
@@ -219,7 +222,7 @@ public class PostgresScanFetchTask extends 
AbstractScanFetchTask {
                 LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class);
 
         private final PostgresConnection jdbcConnection;
-        private final PostgresConnectorConfig connectorConfig;
+        private final PostgresSourceConfig sourceConfig;
         private final PostgresEventDispatcher<TableId> eventDispatcher;
         private final SnapshotSplit snapshotSplit;
         private final PostgresOffsetContext offsetContext;
@@ -228,6 +231,7 @@ public class PostgresScanFetchTask extends 
AbstractScanFetchTask {
         private final Clock clock;
 
         public PostgresSnapshotSplitReadTask(
+                PostgresSourceConfig sourceConfig,
                 PostgresConnection jdbcConnection,
                 PostgresConnectorConfig connectorConfig,
                 PostgresSchema databaseSchema,
@@ -237,7 +241,7 @@ public class PostgresScanFetchTask extends 
AbstractScanFetchTask {
                 SnapshotSplit snapshotSplit) {
             super(connectorConfig, snapshotProgressListener);
             this.jdbcConnection = jdbcConnection;
-            this.connectorConfig = connectorConfig;
+            this.sourceConfig = sourceConfig;
             this.snapshotProgressListener = snapshotProgressListener;
             this.databaseSchema = databaseSchema;
             this.eventDispatcher = eventDispatcher;
@@ -319,7 +323,7 @@ public class PostgresScanFetchTask extends 
AbstractScanFetchTask {
                                     snapshotSplit.getSplitStart(),
                                     snapshotSplit.getSplitEnd(),
                                     
snapshotSplit.getSplitKeyType().getFieldCount(),
-                                    connectorConfig.getSnapshotFetchSize());
+                                    sourceConfig.getFetchSize());
                     ResultSet rs = selectStatement.executeQuery()) {
 
                 ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, 
table);
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReaderTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReaderTest.java
new file mode 100644
index 00000000000..617013ce4d7
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReaderTest.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.source.reader.postgres;
+
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+import io.debezium.config.Configuration;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+
+class PostgresSourceReaderTest {
+
+    @Test
+    void cleanupConnectionUsesDebeziumDirectConnectionFactory() throws 
Exception {
+        JdbcConfiguration config =
+                JdbcConfiguration.adapt(
+                        Configuration.create()
+                                .with(JdbcConfiguration.HOSTNAME, "localhost")
+                                .with(JdbcConfiguration.PORT, 5432)
+                                .with(JdbcConfiguration.DATABASE, "postgres")
+                                .with(JdbcConfiguration.USER, "user")
+                                .with(JdbcConfiguration.PASSWORD, "password")
+                                .build());
+
+        try (PostgresConnection expected =
+                        new PostgresConnection(config, 
PostgresConnection.CONNECTION_GENERAL);
+                PostgresConnection actual =
+                        PostgresSourceReader.createCleanupConnection(config)) {
+            assertSame(connectionFactory(expected), connectionFactory(actual));
+        }
+    }
+
+    private static Object connectionFactory(JdbcConnection connection) throws 
Exception {
+        Field field = JdbcConnection.class.getDeclaredField("factory");
+        field.setAccessible(true);
+        return field.get(connection);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to