Hi, The following assertion failure was seen while testing one scenario for other patch: TRAP: failed Assert("s->data.confirmed_flush >= s->last_saved_confirmed_flush"), File: "slot.c", Line: 1760, PID: 545314 postgres: checkpointer performing shutdown checkpoint(ExceptionalCondition+0xbb)[0x564ee6870c58] postgres: checkpointer performing shutdown checkpoint(CheckPointReplicationSlots+0x18e)[0x564ee65e9c71] postgres: checkpointer performing shutdown checkpoint(+0x1e1403)[0x564ee61be403] postgres: checkpointer performing shutdown checkpoint(CreateCheckPoint+0x78a)[0x564ee61bdace] postgres: checkpointer performing shutdown checkpoint(ShutdownXLOG+0x150)[0x564ee61bc735] postgres: checkpointer performing shutdown checkpoint(+0x5ae28c)[0x564ee658b28c] postgres: checkpointer performing shutdown checkpoint(CheckpointerMain+0x31e)[0x564ee658ad55] postgres: checkpointer performing shutdown checkpoint(AuxiliaryProcessMain+0x1d1)[0x564ee65888d9] postgres: checkpointer performing shutdown checkpoint(+0x5b7200)[0x564ee6594200] postgres: checkpointer performing shutdown checkpoint(PostmasterMain+0x14da)[0x564ee658f12f] postgres: checkpointer performing shutdown checkpoint(+0x464fc6)[0x564ee6441fc6] /lib/x86_64-linux-gnu/libc.so.6(+0x29d90)[0x7ff6afa29d90] /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0x80)[0x7ff6afa29e40] postgres: checkpointer performing shutdown checkpoint(_start+0x25)[0x564ee60b8e05]
I was able to reproduce this issue with the following steps: -- Setup -- Publisher node: create table t1(c1 int); create table t2(c1 int); create publication pub1 for table t1; create publication pub2 for table t2; -- Subscriber node: create table t1(c1 int); create table t2(c1 int); create subscription test1 connection 'dbname=postgres host=localhost port=5432' publication pub1, pub2; select * from pg_subscription; -- Actual test insert into t1 values(10); insert into t2 values(20); select pg_sleep(10); drop publication pub2; insert into t1 values(10); insert into t2 values(20); Stop the publisher to see the assertion. For me the issue reproduces about twice in five times using the assert_failure.sh script attached. After the insert operation is replicated to the subscriber, the subscriber will set the lsn value sent by the publisher in the replication origin (in my case it was 0/1510978). publisher will then send keepalive messages with the current WAL position in the publisher (in my case it was 0/15109B0), but subscriber will simply send this position as the flush_lsn to the publisher as there are no ongoing transactions. Then since the publisher is started, it will identify that publication does not exist and stop the walsender/apply worker process. When the apply worker is restarted, we will get the remote_lsn(in my case it was 0/1510978) of the origin and set it to origin_startpos. We will start the apply worker with this origin_startpos (origin's remote_lsn). This position will be sent as feedback to the walsender process from the below stack: run_apply_worker->start_apply->LogicalRepApplyLoop->send_feedback. It will use the following send_feedback function call of LogicalRepApplyLoop function as in below code here as nothing is received from walsender: LogicalRepApplyLoop function ....... len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); if (len != 0) { /* Loop to process all available data (without blocking). */ for (;;) { CHECK_FOR_INTERRUPTS(); ... } } /* confirm all writes so far */ send_feedback(last_received, false, false); ....... In send_feedback, we will set flushpos to replication origin's remote_lsn and send it to the walsender process. Walsender process will receive this information and set confirmed_flush in: ProcessStandbyReplyMessage->LogicalConfirmReceivedLocation Then immediately we are trying to stop the publisher instance, shutdown checkpoint process will be triggered. In this case: confirmed_flush = 0/1510978 will be lesser than last_saved_confirmed_flush = 0/15109B0 which will result in Assertion failure. This issue is happening because we allow setting the confirmed_flush to a backward position. There are a couple of ways to fix this: a) One way it not to update the confirm_flush if the lsn sent is an older value like in Confirm_flush_dont_allow_backward.patch b) Another way is to remove the assertion in CheckPointReplicationSlots and marking the slot as dirty only if confirmed_flush is greater than last_saved_confirmed_flush like in Assert_confirmed_flush_will_always_not_be_less_than_last_saved_confirmed_flush.patch I preferred the first approach. Thoughts? Regards, Vignesh
assert_failure.sh
Description: application/shellscript
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index a53815f2ed..98aa629ced 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1839,7 +1839,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.confirmed_flush = lsn; + if (lsn > MyReplicationSlot->data.confirmed_flush) + MyReplicationSlot->data.confirmed_flush = lsn; /* if we're past the location required for bumping xmin, do so */ if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr && @@ -1904,7 +1905,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) else { SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.confirmed_flush = lsn; + if (lsn > MyReplicationSlot->data.confirmed_flush) + MyReplicationSlot->data.confirmed_flush = lsn; SpinLockRelease(&MyReplicationSlot->mutex); } }
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 2180a38063..890dd33892 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1757,10 +1757,8 @@ CheckPointReplicationSlots(bool is_shutdown) { SpinLockAcquire(&s->mutex); - Assert(s->data.confirmed_flush >= s->last_saved_confirmed_flush); - if (s->data.invalidated == RS_INVAL_NONE && - s->data.confirmed_flush != s->last_saved_confirmed_flush) + s->data.confirmed_flush > s->last_saved_confirmed_flush) { s->just_dirtied = true; s->dirty = true;