I have a flink stream using Postgres-CDC as a source. It's been operating mostly fine, but I recently had to stop, and then start the stream again. The stream is never able to start again as the replication never completes and Flink enters a restart loop.
Upon starting the cdc reader task issues a "START REPLICATION" call on the postgres primary, this call then spent 1-1.5 hours transferring data and the operator is 100% busy. I'm not sure why the connector would not resume from the most recent snapshot, as the configuration is for 'latest-offset'. Here are the connector options: "slot.name": "flink_cleaned_v2", "heartbeat.interval.ms": "15000", "scan.snapshot.fetch.size": "8192", "debezium.max.queue.size": "2048", "debezium.max.batch.size": "1024", "scan.incremental.snapshot.enabled": "true", "scan.incremental.snapshot.chunk.size": "80960", "debezium.slot.drop.on.stop": "false", "debezium.slot.max.retries": "15", "debezium.slot.retry.delay.ms": "67000", The logs on the RDS suggest that the CDC client is disconnecting, and the logs on Flink seem to suggest the RDS is disconnecting. I'm very confused by this as my wal_sender_timeout is 120s. Are there other settings I should be adjusting? How can I figure out who is disconnecting from who? It really feels like a socket/keep alive timeout of some kind is being missed. Flink 1.18 CDC 3.1.1 The RDS logs: 2024-07-02 21:02:22 UTC:10.32.1.45(7501):postgres_rw@dendrometer:[24901]:LOG: could not receive data from client: Connection reset by peer 2024-07-02 21:02:22 UTC:10.32.0.67(10827):postgres_rw@dendrometer:[26483]:LOG: could not receive data from client: Connection reset by peer Flink throwable: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:263) at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:185) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:147) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:173) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more Caused by: java.io.IOException: org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.RetriableException: An exception occurred in the change event producer. This connector will be restarted. at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ... 6 more Caused by: org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.RetriableException: An exception occurred in the change event producer. This connector will be restarted. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:46) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:214) at org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask.execute(PostgresStreamFetchTask.java:216) at org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask.execute(PostgresStreamFetchTask.java:97) at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) ... 5 more Caused by: org.postgresql.util.PSQLException: Database connection failed when writing to copy at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1144) at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:30) at org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:195) at org.postgresql.core.v3.replication.V3PGReplicationStream.timeUpdateStatus(V3PGReplicationStream.java:186) at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:128) at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:82) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:588) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:257) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:212) ... 8 more Caused by: java.net.SocketException: Broken pipe (Write failed) at java.base/java.net.SocketOutputStream.socketWrite0(Native Method) at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110) at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150) at java.base/sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java:345) at java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1305) at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81) at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142) at org.postgresql.core.PGStream.flush(PGStream.java:709) at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1142)