On Fri, 2 May 2025 at 09:23, vignesh C <vignes...@gmail.com> wrote:
>
> On Fri, 2 May 2025 at 06:30, Tom Lane <t...@sss.pgh.pa.us> wrote:
> >
> > vignesh C <vignes...@gmail.com> writes:
> > > I agree with your analysis. I was able to reproduce the issue by
> > > delaying the invalidation of the subscription until the walsender
> > > finished decoding the INSERT operation following the ALTER
> > > SUBSCRIPTION through a debugger and using the lsn from the pg_waldump
> > > of the INSERT after the ALTER SUBSCRIPTION.
> >
> > Can you be a little more specific about how you reproduced this?
> > I tried inserting sleep() calls in various likely-looking spots
> > and could not get a failure that way.
>
> Test Steps:
> 1) Set up logical replication:
> Create a publication on the publisher
> Create a subscription on the subscriber
> 2) Create the following table on the publisher:
> CREATE TABLE tab_3 (a int);
> 3) Create the same table on the subscriber:
> CREATE TABLE tab_3 (a int);
> 4) On the subscriber, alter the subscription to refer to a
> non-existent publication:
> ALTER SUBSCRIPTION sub1 SET PUBLICATION tap_pub_3;
> 5) Insert data on the publisher:
> INSERT INTO tab_3 VALUES (1);
>
> As expected, the publisher logs the following warning in normal case:
> 2025-05-02 08:56:45.350 IST [516197] WARNING:  skipped loading
> publication: tap_pub_3
> 2025-05-02 08:56:45.350 IST [516197] DETAIL:   The publication does
> not exist at this point in the WAL.
> 2025-05-02 08:56:45.350 IST [516197] HINT:     Create the publication
> if it does not exist.
>
> To simulate a delay in subscription invalidation, I modified the
> maybe_reread_subscription() function as follows:
> diff --git a/src/backend/replication/logical/worker.c
> b/src/backend/replication/logical/worker.c
> index 4151a4b2a96..0831784aca3 100644
> --- a/src/backend/replication/logical/worker.c
> +++ b/src/backend/replication/logical/worker.c
> @@ -3970,6 +3970,10 @@ maybe_reread_subscription(void)
>      MemoryContext oldctx;
>      Subscription *newsub;
>      bool            started_tx = false;
> +    bool            test = true;
> +
> +    if (test)
> +        return;
>
> This change delays the subscription invalidation logic, preventing the
> apply worker from detecting the subscription change immediately.
>
> With the patch applied, repeat steps 1–5.
> Using pg_waldump, identify the LSN of the insert:
> rmgr: Heap        len (rec/tot):     59/    59, tx: 756, lsn:
> 0/01711848, prev 0/01711810, desc: INSERT+INIT off: 1
> rmgr: Transaction len (rec/tot):     46/    46, tx: 756, lsn:
> 0/01711888, prev 0/01711848, desc: COMMIT 2025-05-02 09:06:09.400926
> IST
>
> Check the confirmed flush LSN from the walsender via gdb by attaching
> it to the walsender process
> (gdb) p *MyReplicationSlot
> ...
> confirmed_flush = 24241928
> (gdb) p /x 24241928
> $4 = 0x171e708
>
> Now attach to the apply worker, set a breakpoint at
> maybe_reread_subscription, and continue execution. Once control
> reaches the function, set test = false. Now it will identify that
> subscription is invalidated and restart the apply worker.
>
> As the walsender has already confirmed_flush position after the
> insert, causing the newly started apply worker to miss the inserted
> row entirely. This leads to the CI failure. This issue can arise when
> the walsender advances more quickly than the apply worker is able to
> detect and react to the subscription change.
>
> I could not find a simpler way to reproduce this.

A simpler way to consistently reproduce the issue is to add a 1-second
sleep in the LogicalRepApplyLoop function, just before the call to
WaitLatchOrSocket. This reproduces the test failure consistently for
me. The failure reason is the same as in [1].

[1] - 
https://www.postgresql.org/message-id/CALDaNm2Q_pfwiCkaV920iXEbh4D%3D5MmD_tNQm_GRGX6-MsLxoQ%40mail.gmail.com

Regards,
Vignesh
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4151a4b2a96..d0056f5655c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3702,6 +3702,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 						if (last_received < end_lsn)
 							last_received = end_lsn;
 
+						elog(LOG, "Send feedback from 1");
 						send_feedback(last_received, reply_requested, false);
 						UpdateWorkerStats(last_received, timestamp, true);
 					}
@@ -3714,6 +3715,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			}
 		}
 
+		elog(LOG, "Send feedback from 2");
 		/* confirm all writes so far */
 		send_feedback(last_received, false, false);
 
@@ -3739,6 +3741,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		if (endofstream)
 			break;
 
+		sleep(1);
+
 		/*
 		 * Wait for more data or latch.  If we have unflushed transactions,
 		 * wake up after WalWriterDelay to see if they've been flushed yet (in
@@ -3812,6 +3816,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 				}
 			}
 
+			elog(LOG, "Send feedback from 3");
 			send_feedback(last_received, requestReply, requestReply);
 
 			/*
@@ -3910,7 +3915,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 	pq_sendint64(reply_message, now);	/* sendTime */
 	pq_sendbyte(reply_message, requestReply);	/* replyRequested */
 
-	elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
+	elog(LOG, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
 		 force,
 		 LSN_FORMAT_ARGS(recvpos),
 		 LSN_FORMAT_ARGS(writepos),
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9fa8beb6103..9896a8d74d5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -4062,7 +4062,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 static void
 WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
 {
-	elog(DEBUG2, "sending replication keepalive");
+	elog(LOG, "sending replication keepalive - writePtr %X/%X",
+		 LSN_FORMAT_ARGS(writePtr));
 
 	/* construct the message... */
 	resetStringInfo(&output_message);

Reply via email to