On Fri, 2 May 2025 at 09:23, vignesh C <vignes...@gmail.com> wrote: > > On Fri, 2 May 2025 at 06:30, Tom Lane <t...@sss.pgh.pa.us> wrote: > > > > vignesh C <vignes...@gmail.com> writes: > > > I agree with your analysis. I was able to reproduce the issue by > > > delaying the invalidation of the subscription until the walsender > > > finished decoding the INSERT operation following the ALTER > > > SUBSCRIPTION through a debugger and using the lsn from the pg_waldump > > > of the INSERT after the ALTER SUBSCRIPTION. > > > > Can you be a little more specific about how you reproduced this? > > I tried inserting sleep() calls in various likely-looking spots > > and could not get a failure that way. > > Test Steps: > 1) Set up logical replication: > Create a publication on the publisher > Create a subscription on the subscriber > 2) Create the following table on the publisher: > CREATE TABLE tab_3 (a int); > 3) Create the same table on the subscriber: > CREATE TABLE tab_3 (a int); > 4) On the subscriber, alter the subscription to refer to a > non-existent publication: > ALTER SUBSCRIPTION sub1 SET PUBLICATION tap_pub_3; > 5) Insert data on the publisher: > INSERT INTO tab_3 VALUES (1); > > As expected, the publisher logs the following warning in normal case: > 2025-05-02 08:56:45.350 IST [516197] WARNING: skipped loading > publication: tap_pub_3 > 2025-05-02 08:56:45.350 IST [516197] DETAIL: The publication does > not exist at this point in the WAL. > 2025-05-02 08:56:45.350 IST [516197] HINT: Create the publication > if it does not exist. > > To simulate a delay in subscription invalidation, I modified the > maybe_reread_subscription() function as follows: > diff --git a/src/backend/replication/logical/worker.c > b/src/backend/replication/logical/worker.c > index 4151a4b2a96..0831784aca3 100644 > --- a/src/backend/replication/logical/worker.c > +++ b/src/backend/replication/logical/worker.c > @@ -3970,6 +3970,10 @@ maybe_reread_subscription(void) > MemoryContext oldctx; > Subscription *newsub; > bool started_tx = false; > + bool test = true; > + > + if (test) > + return; > > This change delays the subscription invalidation logic, preventing the > apply worker from detecting the subscription change immediately. > > With the patch applied, repeat steps 1–5. > Using pg_waldump, identify the LSN of the insert: > rmgr: Heap len (rec/tot): 59/ 59, tx: 756, lsn: > 0/01711848, prev 0/01711810, desc: INSERT+INIT off: 1 > rmgr: Transaction len (rec/tot): 46/ 46, tx: 756, lsn: > 0/01711888, prev 0/01711848, desc: COMMIT 2025-05-02 09:06:09.400926 > IST > > Check the confirmed flush LSN from the walsender via gdb by attaching > it to the walsender process > (gdb) p *MyReplicationSlot > ... > confirmed_flush = 24241928 > (gdb) p /x 24241928 > $4 = 0x171e708 > > Now attach to the apply worker, set a breakpoint at > maybe_reread_subscription, and continue execution. Once control > reaches the function, set test = false. Now it will identify that > subscription is invalidated and restart the apply worker. > > As the walsender has already confirmed_flush position after the > insert, causing the newly started apply worker to miss the inserted > row entirely. This leads to the CI failure. This issue can arise when > the walsender advances more quickly than the apply worker is able to > detect and react to the subscription change. > > I could not find a simpler way to reproduce this.
A simpler way to consistently reproduce the issue is to add a 1-second sleep in the LogicalRepApplyLoop function, just before the call to WaitLatchOrSocket. This reproduces the test failure consistently for me. The failure reason is the same as in [1]. [1] - https://www.postgresql.org/message-id/CALDaNm2Q_pfwiCkaV920iXEbh4D%3D5MmD_tNQm_GRGX6-MsLxoQ%40mail.gmail.com Regards, Vignesh
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4151a4b2a96..d0056f5655c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3702,6 +3702,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (last_received < end_lsn) last_received = end_lsn; + elog(LOG, "Send feedback from 1"); send_feedback(last_received, reply_requested, false); UpdateWorkerStats(last_received, timestamp, true); } @@ -3714,6 +3715,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) } } + elog(LOG, "Send feedback from 2"); /* confirm all writes so far */ send_feedback(last_received, false, false); @@ -3739,6 +3741,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (endofstream) break; + sleep(1); + /* * Wait for more data or latch. If we have unflushed transactions, * wake up after WalWriterDelay to see if they've been flushed yet (in @@ -3812,6 +3816,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) } } + elog(LOG, "Send feedback from 3"); send_feedback(last_received, requestReply, requestReply); /* @@ -3910,7 +3915,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) pq_sendint64(reply_message, now); /* sendTime */ pq_sendbyte(reply_message, requestReply); /* replyRequested */ - elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X", + elog(LOG, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X", force, LSN_FORMAT_ARGS(recvpos), LSN_FORMAT_ARGS(writepos), diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9fa8beb6103..9896a8d74d5 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -4062,7 +4062,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr) { - elog(DEBUG2, "sending replication keepalive"); + elog(LOG, "sending replication keepalive - writePtr %X/%X", + LSN_FORMAT_ARGS(writePtr)); /* construct the message... */ resetStringInfo(&output_message);