I have a flink stream using Postgres-CDC as a source.  It's been operating
mostly fine, but I recently had to stop, and then start the stream again.
The stream is never able to start again as the replication never completes
and Flink enters a restart loop.

Upon starting the cdc reader task issues a "START REPLICATION" call on the
postgres primary, this call then spent 1-1.5 hours transferring data and
the operator is 100% busy.
I'm not sure why the connector would not resume from the most recent
snapshot, as the configuration is for 'latest-offset'.  Here are the
connector options:

     "slot.name": "flink_cleaned_v2",
     "heartbeat.interval.ms": "15000",
     "scan.snapshot.fetch.size": "8192",
     "debezium.max.queue.size": "2048",
     "debezium.max.batch.size": "1024",
     "scan.incremental.snapshot.enabled": "true",
     "scan.incremental.snapshot.chunk.size": "80960",
     "debezium.slot.drop.on.stop": "false",
     "debezium.slot.max.retries": "15",
     "debezium.slot.retry.delay.ms": "67000",

The logs on the RDS suggest that the CDC client is disconnecting, and the
logs on Flink seem to suggest the RDS is disconnecting.  I'm very confused
by this as my wal_sender_timeout is 120s.  Are there other settings I
should be adjusting? How can I figure out who is disconnecting from who?
It really feels like a socket/keep alive timeout of some kind is being
missed.

Flink 1.18
CDC 3.1.1

The RDS logs:

2024-07-02 21:02:22 UTC:10.32.1.45(7501):postgres_rw@dendrometer:[24901]:LOG:
 could not receive data from client: Connection reset by peer
2024-07-02 21:02:22 UTC:10.32.0.67(10827):postgres_rw@dendrometer:[26483]:LOG:
 could not receive data from client: Connection reset by peer

Flink throwable:

java.lang.RuntimeException: One or more fetchers have encountered exception
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:263)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:185)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:147)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:173)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
unexpected exception while polling the records
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: java.io.IOException:
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.RetriableException:
An exception occurred in the change event producer. This connector will be
restarted.
at
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101)
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 more
Caused by:
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.RetriableException:
An exception occurred in the change event producer. This connector will be
restarted.
at
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:46)
at
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:214)
at
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask.execute(PostgresStreamFetchTask.java:216)
at
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask.execute(PostgresStreamFetchTask.java:97)
at
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
... 5 more
Caused by: org.postgresql.util.PSQLException: Database connection failed
when writing to copy
at
org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1144)
at org.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:30)
at
org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:195)
at
org.postgresql.core.v3.replication.V3PGReplicationStream.timeUpdateStatus(V3PGReplicationStream.java:186)
at
org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:128)
at
org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:82)
at
io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:588)
at
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:257)
at
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:212)
... 8 more
Caused by: java.net.SocketException: Broken pipe (Write failed)
at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
at
java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
at
java.base/sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java:345)
at
java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1305)
at
java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
at
java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)
at org.postgresql.core.PGStream.flush(PGStream.java:709)
at
org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1142)

Reply via email to