loserwang1024 opened a new pull request, #4101:
URL: https://github.com/apache/flink-cdc/pull/4101

   
   
   When I read Postgres cdc's WAL log, there is no data suddenly.  The log show 
that the stream split is finished when some connection exception occurs:
   
   ```java
   5-08-16 08:15:18,939 ERROR io.debezium.pipeline.ErrorHandler [] - Producer 
failureorg.postg
   resql.util.PSQLException: FATAL: terminating connection due to administrator 
command at 
org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2725)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2412)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:371) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:502) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgStatement.execute(PgStatement.java:419) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:194)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postg
 resql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:137) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:709) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.probeConnectionIfNeeded(PostgresStreamingChangeEventSource.java:416)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:353)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:212)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask.execute(Postg
 resStreamFetchTask.java:216) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask.execute(PostgresStreamFetchTask.java:97)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_372] at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_372] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_372] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_372] at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]2025-08-16 
08:15:18,953 INFO io.debezium.jdbc.JdbcConnection [] - Connection 
 gracefully closed2025-08-16 08:15:18,963 ERROR 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - 
Unexpected error while closing Postgres 
connectionorg.postgresql.util.PSQLException: Unable to close connection 
properly at org.postgresql.jdbc.PgConnection.close(PgConnection.java:870) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
io.debezium.jdbc.JdbcConnection.lambda$doClose$4(JdbcConnection.java:961) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_372] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_372] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_372] at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]Caused by: 
java.net.SocketException: Broken pipe (Write failed) at 
java.net.SocketOutputStream.socketWrite0(Native Method) ~[?:1.8.0_372] at 
java.net.
 SocketOutputStream.socketWrite(SocketOutputStream.java:111) ~[?:1.8.0_372] at 
java.net.SocketOutputStream.write(SocketOutputStream.java:155) ~[?:1.8.0_372] 
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
~[?:1.8.0_372] at 
java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) 
~[?:1.8.0_372] at org.postgresql.core.PGStream.flush(PGStream.java:724) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.core.QueryExecutorCloseAction.close(QueryExecutorCloseAction.java:73)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgConnectionCleaningAction.onClean(PgConnectionCleaningAction.java:89)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.util.LazyCleaner$Node.onClean(LazyCleaner.java:219) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.util.LazyCleaner$Node.clean(LazyCleaner.java:210) 
~[flink-sql-connect
 or-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgConnection.close(PgConnection.java:867) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] ... 5 more
   
   
   2025-08-16 08:15:35,931 INFO 
   org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished reading from splits [stream-split]
   2025-08-16 08:15:35,932 INFO 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
reading split(s) [stream-split]
   ```
   
    The reason is as follows:
   
   1. When PostgresStreamingChangeEventSource met connection exception, it will 
put into event queue, rather than throw it outside. 
   2. IncrementalSourceStreamFetcher will seen the  streamFetchTask as 
finished, and will stop it.
   3. When polling data, though there are still data and exception in event 
queue, we will ignore them and return null.
   
   
   Thus, we should not stop the task until receiving End Watermark.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to