On Fri, Feb 16, 2024 at 5:53 PM vignesh C <vignes...@gmail.com> wrote: > > > After the insert operation is replicated to the subscriber, the > subscriber will set the lsn value sent by the publisher in the > replication origin (in my case it was 0/1510978). publisher will then > send keepalive messages with the current WAL position in the publisher > (in my case it was 0/15109B0), but subscriber will simply send this > position as the flush_lsn to the publisher as there are no ongoing > transactions. Then since the publisher is started, it will identify > that publication does not exist and stop the walsender/apply worker > process. When the apply worker is restarted, we will get the > remote_lsn(in my case it was 0/1510978) of the origin and set it to > origin_startpos. We will start the apply worker with this > origin_startpos (origin's remote_lsn). This position will be sent as > feedback to the walsender process from the below stack: > run_apply_worker->start_apply->LogicalRepApplyLoop->send_feedback. > It will use the following send_feedback function call of > LogicalRepApplyLoop function as in below code here as nothing is > received from walsender: > LogicalRepApplyLoop function > ....... > len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); > if (len != 0) > { > /* Loop to process all available data (without blocking). */ > for (;;) > { > CHECK_FOR_INTERRUPTS(); > ... > } > } > > /* confirm all writes so far */ > send_feedback(last_received, false, false); > ....... > > In send_feedback, we will set flushpos to replication origin's > remote_lsn and send it to the walsender process. Walsender process > will receive this information and set confirmed_flush in: > ProcessStandbyReplyMessage->LogicalConfirmReceivedLocation > > Then immediately we are trying to stop the publisher instance, > shutdown checkpoint process will be triggered. In this case: > confirmed_flush = 0/1510978 will be lesser than > last_saved_confirmed_flush = 0/15109B0 which will result in Assertion > failure. > > This issue is happening because we allow setting the confirmed_flush > to a backward position. >
I see your point. > There are a couple of ways to fix this: > a) One way it not to update the confirm_flush if the lsn sent is an > older value like in Confirm_flush_dont_allow_backward.patch > @@ -1839,7 +1839,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.confirmed_flush = lsn; + if (lsn > MyReplicationSlot->data.confirmed_flush) + MyReplicationSlot->data.confirmed_flush = lsn; /* if we're past the location required for bumping xmin, do so */ if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr && @@ -1904,7 +1905,8 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) else { SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.confirmed_flush = lsn; + if (lsn > MyReplicationSlot->data.confirmed_flush) + MyReplicationSlot->data.confirmed_flush = lsn; BTW, from which code path does it update the prior value of confirmed_flush? If it is through the else check, then can we see if it may change the confirm_flush to the prior position via the first code path? I am asking because in the first code path, we can even flush the re-treated value of confirm_flush LSN. -- With Regards, Amit Kapila.