[ https://issues.apache.org/jira/browse/FLINK-38265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ruan Hang reassigned FLINK-38265: --------------------------------- Assignee: Hongshun Wang > Stream Split shouldn't finish when exception occors > --------------------------------------------------- > > Key: FLINK-38265 > URL: https://issues.apache.org/jira/browse/FLINK-38265 > Project: Flink > Issue Type: Improvement > Components: Flink CDC > Affects Versions: cdc-3.4.0 > Reporter: Hongshun Wang > Assignee: Hongshun Wang > Priority: Major > Labels: pull-request-available > Fix For: cdc-3.5.0 > > Attachments: image-2025-08-19-14-37-36-179.png, > image-2025-08-19-14-39-55-986.png, image-2025-08-19-14-40-24-446.png, > image-2025-08-19-14-41-40-638.png, image-2025-08-19-14-43-19-792.png, > image-2025-08-19-15-22-13-109.png > > > When I read Postgres cdc's WAL log, there is no data suddenly. The log show > that the stream split is finished when some connection exception occurs: > {code:java} > 2025-08-16 08:15:18,939 ERROR io.debezium.pipeline.ErrorHandler [] - Producer > failureorg.postg > resql.util.PSQLException: FATAL: terminating connection due to administrator > command at > org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2725) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2412) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:371) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:502) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > org.postgresql.jdbc.PgStatement.execute(PgStatement.java:419) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:194) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:137) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:709) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.probeConnectionIfNeeded(PostgresStreamingChangeEventSource.java:416) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:353) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:212) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask.execute(PostgresStreamFetchTask.java:216) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask.execute(PostgresStreamFetchTask.java:97) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_372] at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [?:1.8.0_372] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_372] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_372] at java.lang.Thread.run(Thread.java:879) > [?:1.8.0_372]2025-08-16 08:15:18,953 INFO io.debezium.jdbc.JdbcConnection [] > - Connection gracefully closed2025-08-16 08:15:18,963 ERROR > io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] > - Unexpected error while closing Postgres > connectionorg.postgresql.util.PSQLException: Unable to close connection > properly at org.postgresql.jdbc.PgConnection.close(PgConnection.java:870) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > io.debezium.jdbc.JdbcConnection.lambda$doClose$4(JdbcConnection.java:961) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_372] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_372] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_372] at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]Caused > by: java.net.SocketException: Broken pipe (Write failed) at > java.net.SocketOutputStream.socketWrite0(Native Method) ~[?:1.8.0_372] at > java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) > ~[?:1.8.0_372] at > java.net.SocketOutputStream.write(SocketOutputStream.java:155) ~[?:1.8.0_372] > at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) > ~[?:1.8.0_372] at > java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) > ~[?:1.8.0_372] at org.postgresql.core.PGStream.flush(PGStream.java:724) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > org.postgresql.core.QueryExecutorCloseAction.close(QueryExecutorCloseAction.java:73) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > org.postgresql.jdbc.PgConnectionCleaningAction.onClean(PgConnectionCleaningAction.java:89) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > org.postgresql.util.LazyCleaner$Node.onClean(LazyCleaner.java:219) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > org.postgresql.util.LazyCleaner$Node.clean(LazyCleaner.java:210) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at > org.postgresql.jdbc.PgConnection.close(PgConnection.java:867) > ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] ... 5 more > 2025-08-16 08:15:35,931 INFO > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Finished reading from splits [stream-split] > 2025-08-16 08:15:35,932 INFO > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished > reading split(s) [stream-split] > {code} > The reason is as follows: > > # When PostgresStreamingChangeEventSource met connection exception, it will > put into event queue, rather than throw it outside. > !image-2025-08-19-14-39-55-986.png|width=422,height=77!!image-2025-08-19-14-40-24-446.png|width=331,height=156! > # IncrementalSourceStreamFetcher will seen the > streamFetchTask as finished, and will stop it. > !image-2025-08-19-14-41-40-638.png|width=226,height=206! > # When polling data, though there are still data and exception in event > queue, we will ignore them and return null. > !image-2025-08-19-14-43-19-792.png|width=453,height=321! > 4. Then stream split will be seen as finished. > !image-2025-08-19-15-22-13-109.png|width=400,height=228! > Thus, we should not stop the task until receiving End Watermark. > -- This message was sent by Atlassian Jira (v8.20.10#820010)