[ 
https://issues.apache.org/jira/browse/FLINK-38265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-38265:
----------------------------------
    Description: 
When I read Postgres cdc's WAL log, there is no data suddenly.  The log show 
that the stream split is finished when some connection exception occurs:
{code:java}
2025-08-16 08:15:18,939 ERROR io.debezium.pipeline.ErrorHandler [] - Producer 
failureorg.postg
resql.util.PSQLException: FATAL: terminating connection due to administrator 
command at 
org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2725)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2412)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:371) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:502) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgStatement.execute(PgStatement.java:419) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:194)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:137)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:709) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.probeConnectionIfNeeded(PostgresStreamingChangeEventSource.java:416)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:353)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:212)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask.execute(PostgresStreamFetchTask.java:216)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask.execute(PostgresStreamFetchTask.java:97)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_372] at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_372] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_372] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_372] at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]2025-08-16 
08:15:18,953 INFO io.debezium.jdbc.JdbcConnection [] - Connection gracefully 
closed2025-08-16 08:15:18,963 ERROR 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - 
Unexpected error while closing Postgres 
connectionorg.postgresql.util.PSQLException: Unable to close connection 
properly at org.postgresql.jdbc.PgConnection.close(PgConnection.java:870) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
io.debezium.jdbc.JdbcConnection.lambda$doClose$4(JdbcConnection.java:961) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_372] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_372] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_372] at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]Caused by: 
java.net.SocketException: Broken pipe (Write failed) at 
java.net.SocketOutputStream.socketWrite0(Native Method) ~[?:1.8.0_372] at 
java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) 
~[?:1.8.0_372] at 
java.net.SocketOutputStream.write(SocketOutputStream.java:155) ~[?:1.8.0_372] 
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
~[?:1.8.0_372] at 
java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) 
~[?:1.8.0_372] at org.postgresql.core.PGStream.flush(PGStream.java:724) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.core.QueryExecutorCloseAction.close(QueryExecutorCloseAction.java:73)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgConnectionCleaningAction.onClean(PgConnectionCleaningAction.java:89)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.util.LazyCleaner$Node.onClean(LazyCleaner.java:219) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.util.LazyCleaner$Node.clean(LazyCleaner.java:210) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgConnection.close(PgConnection.java:867) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] ... 5 more


2025-08-16 08:15:35,931 INFO 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished reading from splits [stream-split]
2025-08-16 08:15:35,932 INFO 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
reading split(s) [stream-split]
 {code}
 The reason is as follows:

 
 # When PostgresStreamingChangeEventSource met connection exception, it will 
put into event queue, rather than throw it outside.  
!image-2025-08-19-14-39-55-986.png|width=422,height=77!!image-2025-08-19-14-40-24-446.png|width=331,height=156!
 # IncrementalSourceStreamFetcher will seen the 
streamFetchTask as finished, and will stop it.
!image-2025-08-19-14-41-40-638.png|width=226,height=206!
 # When polling data, though there are still data and exception in event queue, 
we will ignore them and return null.

!image-2025-08-19-14-43-19-792.png|width=453,height=321!

 

Thus, we should not stop the task until receiving End Watermark.

 

  was:
When I read Postgres cdc's WAL log, there is no data suddenly.  The log show 
that the stream split is finished when some connection exception occurs:

 
 # 5-08-16 08:15:18,939 ERROR io.debezium.pipeline.ErrorHandler [] - Producer 
failureorg.postg
 # resql.util.PSQLException: FATAL: terminating connection due to administrator 
command at 
org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2725)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2412)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:371) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:502) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgStatement.execute(PgStatement.java:419) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:194)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:137)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:709) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.probeConnectionIfNeeded(PostgresStreamingChangeEventSource.java:416)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:353)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:212)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask.execute(PostgresStreamFetchTask.java:216)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask.execute(PostgresStreamFetchTask.java:97)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_372] at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_372] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_372] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_372] at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]2025-08-16 
08:15:18,953 INFO io.debezium.jdbc.JdbcConnection [] - Connection gracefully 
closed2025-08-16 08:15:18,963 ERROR 
io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] - 
Unexpected error while closing Postgres 
connectionorg.postgresql.util.PSQLException: Unable to close connection 
properly at org.postgresql.jdbc.PgConnection.close(PgConnection.java:870) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
io.debezium.jdbc.JdbcConnection.lambda$doClose$4(JdbcConnection.java:961) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_372] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_372] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_372] at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]Caused by: 
java.net.SocketException: Broken pipe (Write failed) at 
java.net.SocketOutputStream.socketWrite0(Native Method) ~[?:1.8.0_372] at 
java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) 
~[?:1.8.0_372] at 
java.net.SocketOutputStream.write(SocketOutputStream.java:155) ~[?:1.8.0_372] 
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
~[?:1.8.0_372] at 
java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) 
~[?:1.8.0_372] at org.postgresql.core.PGStream.flush(PGStream.java:724) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.core.QueryExecutorCloseAction.close(QueryExecutorCloseAction.java:73)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgConnectionCleaningAction.onClean(PgConnectionCleaningAction.java:89)
 ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.util.LazyCleaner$Node.onClean(LazyCleaner.java:219) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.util.LazyCleaner$Node.clean(LazyCleaner.java:210) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
org.postgresql.jdbc.PgConnection.close(PgConnection.java:867) 
~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] ... 5 
more2025-08-16 08:15:35,931 INFO 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished reading from splits [stream-split]2025-08-16 08:15:35,932 INFO 
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
reading split(s) [stream-split]

 The reason is as follows:

 
 # When PostgresStreamingChangeEventSource met connection exception, it will 
put into event queue, rather than throw it outside.  
!image-2025-08-19-14-39-55-986.png|width=422,height=77!!image-2025-08-19-14-40-24-446.png|width=331,height=156!
 # IncrementalSourceStreamFetcher will seen the 
streamFetchTask as finished, and will stop it.
!image-2025-08-19-14-41-40-638.png|width=226,height=206!
 # When polling data, though there are still data and exception in event queue, 
we will ignore them and return null.

!image-2025-08-19-14-43-19-792.png|width=453,height=321!

 

Thus, we should not stop the task until receiving End Watermark.

 


> Stream Split shouldn't finish when exception occors
> ---------------------------------------------------
>
>                 Key: FLINK-38265
>                 URL: https://issues.apache.org/jira/browse/FLINK-38265
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.4.0
>            Reporter: Hongshun Wang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: cdc-3.5.0
>
>         Attachments: image-2025-08-19-14-37-36-179.png, 
> image-2025-08-19-14-39-55-986.png, image-2025-08-19-14-40-24-446.png, 
> image-2025-08-19-14-41-40-638.png, image-2025-08-19-14-43-19-792.png
>
>
> When I read Postgres cdc's WAL log, there is no data suddenly.  The log show 
> that the stream split is finished when some connection exception occurs:
> {code:java}
> 2025-08-16 08:15:18,939 ERROR io.debezium.pipeline.ErrorHandler [] - Producer 
> failureorg.postg
> resql.util.PSQLException: FATAL: terminating connection due to administrator 
> command at 
> org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2725)
>  ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2412)
>  ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:371) 
> ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:502) 
> ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> org.postgresql.jdbc.PgStatement.execute(PgStatement.java:419) 
> ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:194)
>  ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:137)
>  ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:709) 
> ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.probeConnectionIfNeeded(PostgresStreamingChangeEventSource.java:416)
>  ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:353)
>  ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:212)
>  ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask.execute(PostgresStreamFetchTask.java:216)
>  ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask.execute(PostgresStreamFetchTask.java:97)
>  ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
>  ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_372] at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [?:1.8.0_372] at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_372] at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_372] at java.lang.Thread.run(Thread.java:879) 
> [?:1.8.0_372]2025-08-16 08:15:18,953 INFO io.debezium.jdbc.JdbcConnection [] 
> - Connection gracefully closed2025-08-16 08:15:18,963 ERROR 
> io.debezium.connector.postgresql.connection.PostgresReplicationConnection [] 
> - Unexpected error while closing Postgres 
> connectionorg.postgresql.util.PSQLException: Unable to close connection 
> properly at org.postgresql.jdbc.PgConnection.close(PgConnection.java:870) 
> ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> io.debezium.jdbc.JdbcConnection.lambda$doClose$4(JdbcConnection.java:961) 
> ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_372] at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_372] at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_372] at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]Caused 
> by: java.net.SocketException: Broken pipe (Write failed) at 
> java.net.SocketOutputStream.socketWrite0(Native Method) ~[?:1.8.0_372] at 
> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) 
> ~[?:1.8.0_372] at 
> java.net.SocketOutputStream.write(SocketOutputStream.java:155) ~[?:1.8.0_372] 
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
> ~[?:1.8.0_372] at 
> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) 
> ~[?:1.8.0_372] at org.postgresql.core.PGStream.flush(PGStream.java:724) 
> ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> org.postgresql.core.QueryExecutorCloseAction.close(QueryExecutorCloseAction.java:73)
>  ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> org.postgresql.jdbc.PgConnectionCleaningAction.onClean(PgConnectionCleaningAction.java:89)
>  ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> org.postgresql.util.LazyCleaner$Node.onClean(LazyCleaner.java:219) 
> ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> org.postgresql.util.LazyCleaner$Node.clean(LazyCleaner.java:210) 
> ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] at 
> org.postgresql.jdbc.PgConnection.close(PgConnection.java:867) 
> ~[flink-sql-connector-postgres-cdc-3.4-SNAPSHOT21.jar:3.4-SNAPSHOT] ... 5 more
> 2025-08-16 08:15:35,931 INFO 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished reading from splits [stream-split]
> 2025-08-16 08:15:35,932 INFO 
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
> reading split(s) [stream-split]
>  {code}
>  The reason is as follows:
>  
>  # When PostgresStreamingChangeEventSource met connection exception, it will 
> put into event queue, rather than throw it outside.  
> !image-2025-08-19-14-39-55-986.png|width=422,height=77!!image-2025-08-19-14-40-24-446.png|width=331,height=156!
>  # IncrementalSourceStreamFetcher will seen the 
> streamFetchTask as finished, and will stop it.
> !image-2025-08-19-14-41-40-638.png|width=226,height=206!
>  # When polling data, though there are still data and exception in event 
> queue, we will ignore them and return null.
> !image-2025-08-19-14-43-19-792.png|width=453,height=321!
>  
> Thus, we should not stop the task until receiving End Watermark.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to