Hello hackers, Introduction Using a large number of synchronous standbys creates excessive load on the primary node. To solve this problem, cascading synchronous replication can be used.
Overview of Changes This patch adds synchronous cascading replication mechanics to PostgreSQL. With it, standby servers will consider configuration parameters related to synchronous replication. They will select walsenders LSN positions from walsdender data structures and compute the synchronous LSN position for write, flush, and apply among them using the synchronous replication algorithm, then calculate the minimum value between these values and the corresponding positions of the standby server. To avoid synchronization problems and unnecessary overhead, these calculations are performed by the walreceiver process. The offset positions will be transmitted in the standby reply message instead of the server's own positions. This will occur if the SyncRepRequested condition is met and if at least one synchronous standby server is specified in synchronous_standby_names. In case the walsender processes fail to calculate synchronous LSN values (for example, because there are not enough synchronous standbys), the server will send DefaultSendingLSN. This value is between InvalidXLogRecPtr and FirstNormalUnloggedLSN. Sending InvalidXLogRecPtr is not allowed because in the pg_stat_replication function, a standby sending such value will be displayed as asynchronous, although it is not. The value 2 was chosen for DefaultSendingLSN since 1 is used by one of the access methods. When receiving a DefaultSendingLSN position value from a synchronous standby, the server will use it as a regular LSN. This allows transaction execution to continue if the configuration permits it. If not, transaction execution stops until the cluster failure is resolved. Overview of Individual Patch Parts The first part adds the SyncRepGetSendingSyncRecPtr function, which is written similarly to SyncRepGetSyncRecPtr and is responsible for calculating the LSN positions to be sent. These functions contained a large common code section, which was moved to the SyncRepGetSyncRecPtrBySyncRepMethod function. Also, for optimization purposes, the walsender process serving a synchronous standby can call the WalRcvForceReply function. The second part of the patch is responsible for redistributing code in the syncrep.c file into sections. This is necessary to preserve the semantics of the sections used in this file, since now some functions can be used by the walreceiver process, while others can be used by both walreceiver and walsender. The third part adds a special notation in pg_stat_replication for standbys sending DefaultSendingLSN. If such a standby is synchronous, it is marked with a "?" symbol. In the author's opinion, this notation can simplify problem searching in the cluster, but does not claim to be a serious solution for failure detection. The fourth part of the patch contains fixes in recovery tests numbered 9 and 12. These tests created circular dependencies between servers. This was not a problem as long as standby ignored synchronous replication parameters, but with this patch the tests broke. Also, tests for the new mechanics were added to test 7, which is responsible for synchronous replication. Possible Topologies As part of the patch, connection of asynchronous and synchronous standbys to a synchronous standby is allowed. However, offset positions sent by asynchronous standbys will not be considered, since the synchronous replication algorithm is used. For the same reason, connecting a synchronous standby to an asynchronous one is theoretically possible but meaningless. Additional Information The patch contains no platform-dependent elements, compiles with the -Wall flag, and successfully passes tests. Performance optimization is a separate task, and in the author's opinion, deserves a separate patch. Nevertheless, local testing using Docker containers showed insignificant performance degradation when using synchronous cascading chains. This patch is intended primarily for discussion. It was developed for the master branch, commit hash: b227b0bb4e032e19b3679bedac820eba3ac0d1cf. Best wishes, Grigoriy Novikov!
From 387e5d15f89cdff505c28d61f4d89b6dd5f4dbcb Mon Sep 17 00:00:00 2001 From: reVInotip <[email protected]> Date: Sat, 18 Oct 2025 10:45:08 +0700 Subject: [PATCH 1/4] Cascade sync rep Implemented cascaded synchronous replication support. The new SyncRepGetSendingSyncRecPtr function in syncrep.c computes replication status values by aggregating positions from synchronous standbys with local server positions. Furthermore, walsender processes can now prompt walreceiver processes for immediate feedback delivery to upstream replication sources. --- src/backend/replication/syncrep.c | 160 +++++++++++++++++++++++----- src/backend/replication/walreceiver.c | 20 +++- src/backend/replication/walsender.c | 2 + src/include/access/xlogdefs.h | 7 ++ src/include/replication/syncrep.h | 4 + src/include/replication/walsender_private.h | 3 +- 6 files changed, 166 insertions(+), 30 deletions(-) diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 24019c96ff2..0dbf338529f 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -105,6 +105,11 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync); +static bool SyncRepGetSyncRecPtrBySyncRepMethod(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys); static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, @@ -624,31 +624,13 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, return false; } - /* - * In a priority-based sync replication, the synced positions are the - * oldest ones among sync standbys. In a quorum-based, they are the Nth - * latest ones. - * - * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest - * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation - * because it's a bit more efficient. - * - * XXX If the numbers of current and requested sync standbys are the same, - * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced - * positions even in a quorum-based sync replication. - */ - if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) - { - SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, num_standbys); - } - else - { - SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, num_standbys, - SyncRepConfig->num_sync); - } + /* Try calculate sync Write, Flush and Apply positions */ + if (!SyncRepGetSyncRecPtrBySyncRepMethod(writePtr, flushPtr, applyPtr, sync_standbys, num_standbys)) + { + pfree(sync_standbys); + return false; + } pfree(sync_standbys); return true; } @@ -867,8 +867,6 @@ SyncRepGetStandbyPriority(void) * Since synchronous cascade replication is not allowed, we always set the * priority of cascading walsender to zero. */ - if (am_cascading_walsender) - return 0; if (!SyncStandbysDefined() || SyncRepConfig == NULL) return 0; @@ -1048,6 +1048,133 @@ SyncRepQueueIsOrderedByLSN(int mode) } #endif +/* + * =========================================================== + * Synchronous Replication functions for wal receiver processes + * =========================================================== + */ + +/* + * Calculate sync Write, Flush and Apply positions for sending to replication source. + * If synchronous replication not configured return myWritePtr, myFlushPtr and myApplyPtr. + */ +void +SyncRepGetSendingSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, + XLogRecPtr myWritePtr, XLogRecPtr myFlushPtr, XLogRecPtr myApplyPtr) +{ + SyncRepStandbyData *sync_standbys; + int num_standbys; + + /* + * Initialize default results. + * We use InvalidXLogRecPtr instead of DefaultSendingLSN to ensure the correct operation of + * the algorithm under priority-based sync replication + */ + *writePtr = InvalidXLogRecPtr; + *flushPtr = InvalidXLogRecPtr; + *applyPtr = InvalidXLogRecPtr; + + /* + * Returns the standby's Write, Flush, and Apply positions even if it is not + * configured as a synchronous standby, or if there are no synchronous replicas configured. + */ + if (SyncRepConfig == NULL || SyncRepConfig->num_sync == 0) + { + *writePtr = myWritePtr; + *flushPtr = myFlushPtr; + *applyPtr = myApplyPtr; + return; + } + + /* Get standbys that are considered as synchronous at this moment */ + num_standbys = SyncRepGetCandidateStandbys(&sync_standbys); + + /* + * Nothing more to do if there are not enough synchronous standbys. + */ + if (num_standbys < SyncRepConfig->num_sync) + { + *writePtr = DefaultSendingLSN; + *flushPtr = DefaultSendingLSN; + *applyPtr = DefaultSendingLSN; + + elog(DEBUG3, "waiting %d standbys, but only %d is connecting", SyncRepConfig->num_sync, num_standbys); + pfree(sync_standbys); + return; + } + + /* + * Try calculate sync Write, Flush and Apply positions. + * If return false writePtr et al not be changed. + */ + if (!SyncRepGetSyncRecPtrBySyncRepMethod(writePtr, flushPtr, applyPtr, sync_standbys, num_standbys)) + { + *writePtr = DefaultSendingLSN; + *flushPtr = DefaultSendingLSN; + *applyPtr = DefaultSendingLSN; + + elog(DEBUG3, "something went wrong then trying calculate sync write, flush and apply positions"); + pfree(sync_standbys); + return; + } + + *writePtr = Min(myWritePtr, *writePtr); + *flushPtr = Min(myFlushPtr, *flushPtr); + *applyPtr = Min(myApplyPtr, *applyPtr); + + pfree(sync_standbys); +} + +/* + * =========================================================== + * Synchronous Replication functions for wal receiver and wal sender processes + * =========================================================== + */ + +/* + * Calculates the Write, Flush, and Apply positions for synchronous + * standbys using the synchronous replication method. Returns true if the calculation is successful, + * otherwise false. + */ +static bool +SyncRepGetSyncRecPtrBySyncRepMethod(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys) +{ + /* Quick out if not even configured to be synchronous */ + if (SyncRepConfig == NULL) + return false; + + /* + * In a priority-based sync replication, the synced positions are the + * oldest ones among sync standbys. In a quorum-based, they are the Nth + * latest ones. + * + * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest + * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation + * because it's a bit more efficient. + * + * XXX If the numbers of current and requested sync standbys are the same, + * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced + * positions even in a quorum-based sync replication. + */ + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + { + SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, + sync_standbys, num_standbys); + } + else + { + SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, + sync_standbys, num_standbys, + SyncRepConfig->num_sync); + } + + return true; +} + /* * =========================================================== * Synchronous Replication functions executed by any process diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 7361ffc9dcf..38af9c9b5dc 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -66,6 +66,7 @@ #include "postmaster/auxprocess.h" #include "postmaster/interrupt.h" #include "replication/walreceiver.h" +#include "replication/syncrep.h" #include "replication/walsender.h" #include "storage/ipc.h" #include "storage/proc.h" @@ -529,7 +562,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) if (walrcv->force_reply) { /* - * The recovery process has asked us to send apply + * The recovery or one of walsender processes has asked us to send apply * feedback now. Make sure the flag is really set to * false in shared memory before sending the reply, so * we don't miss a new request for a reply. @@ -1077,7 +1110,10 @@ XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli) /* * Send reply message to primary, indicating our current WAL locations, oldest - * xmin and the current time. + * xmin and the current time. When synchronous replication is enabled, transmit + * the oldest write, flush, and apply positions from the current node and its + * standbys to the primary. If position calculation fails, fall back to + * DefaultSendingLSN. * * If 'force' is not set, the message is only sent if enough time has * passed since last status update to reach wal_receiver_status_interval. @@ -1125,9 +1161,15 @@ XLogWalRcvSendReply(bool force, bool requestReply) WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now); /* Construct a new message */ - writePtr = LogstreamResult.Write; - flushPtr = LogstreamResult.Flush; - applyPtr = GetXLogReplayRecPtr(NULL); + if (SyncRepRequested()) + SyncRepGetSendingSyncRecPtr(&writePtr, &flushPtr, &applyPtr, + LogstreamResult.Write, LogstreamResult.Flush, GetXLogReplayRecPtr(NULL)); + else + { + writePtr = LogstreamResult.Write; + flushPtr = LogstreamResult.Flush; + applyPtr = GetXLogReplayRecPtr(NULL); + } resetStringInfo(&reply_message); pq_sendbyte(&reply_message, PqReplMsg_StandbyStatusUpdate); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 0855bae3535..2c92ef6d530 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2500,6 +2500,8 @@ ProcessStandbyReplyMessage(void) if (!am_cascading_walsender) SyncRepReleaseWaiters(); + else if (SyncRepRequested() && MyWalSnd->sync_standby_priority > 0) + WalRcvForceReply(); /* * Advance our local xmin horizon when the client confirmed a flush. diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index 514f03df0b6..ae5bfc6881e 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -28,6 +28,13 @@ typedef uint64 XLogRecPtr; #define InvalidXLogRecPtr 0 #define XLogRecPtrIsInvalid(r) ((r) == InvalidXLogRecPtr) +/* + * The default value for sending from a cascade synchronous standby to another cascade + * synchronous standby or to the primary, used when the sending LSN cannot be calculated + * (for example, when the number of synchronous standbys is less than required). + */ +#define DefaultSendingLSN ((XLogRecPtr) 2) + /* * First LSN to use for "fake" LSNs. * diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 675669a79f7..8b6cec6b76d 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -92,6 +92,10 @@ extern int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys); /* called by checkpointer */ extern void SyncRepUpdateSyncStandbysDefined(void); +/* called by walreciever */ +extern void SyncRepGetSendingSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, + XLogRecPtr myWritePtr, XLogRecPtr myFlushPtr, XLogRecPtr myApplyPtr); + /* * Internal functions for parsing synchronous_standby_names grammar, * in syncrep_gram.y and syncrep_scanner.l diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index e98701038f5..3305bba82e4 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -50,7 +50,8 @@ typedef struct WalSnd /* * The xlog locations that have been written, flushed, and applied by * standby-side. These may be invalid if the standby-side has not offered - * values yet. + * values yet. If walsender is synchronous this locations will be contains + * minimum (oldest) of the synchronous standbys among the entire subtree. */ XLogRecPtr write; XLogRecPtr flush;
From 387e5d15f89cdff505c28d61f4d89b6dd5f4dbcb Mon Sep 17 00:00:00 2001 From: reVInotip <[email protected]> Date: Sat, 18 Oct 2025 10:45:08 +0700 Subject: [PATCH 2/4] Refactor syncrep The code has been refactored to preserve the semantic sections in syncrep.c. Several functions have been reorganized and moved between different code sections to better align with their usage patterns and improve logical grouping. --- src/backend/replication/syncrep.c | 492 +++++++++++++++++++------------------- 1 file changed, 243 insertions(+), 249 deletions(-) diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 1be667c637d..4390fb7a998 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -640,6 +640,249 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, return true; } +/* + * Check if we are in the list of sync standbys, and if so, determine + * priority sequence. Return priority if set, or zero to indicate that + * we are not a potential sync standby. + * + * Compare the parameter SyncRepStandbyNames against the application_name + * for this WALSender, or allow any name if we find a wildcard "*". + */ +static int +SyncRepGetStandbyPriority(void) +{ + const char *standby_name; + int priority; + bool found = false; + + if (!SyncStandbysDefined() || SyncRepConfig == NULL) + return 0; + + standby_name = SyncRepConfig->member_names; + for (priority = 1; priority <= SyncRepConfig->nmembers; priority++) + { + if (pg_strcasecmp(standby_name, application_name) == 0 || + strcmp(standby_name, "*") == 0) + { + found = true; + break; + } + standby_name += strlen(standby_name) + 1; + } + + if (!found) + return 0; + + /* + * In quorum-based sync replication, all the standbys in the list have the + * same priority, one. + */ + return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1; +} + +/* + * Walk the specified queue from head. Set the state of any backends that + * need to be woken, remove them from the queue, and then wake them. + * Pass all = true to wake whole queue; otherwise, just wake up to + * the walsender's LSN. + * + * The caller must hold SyncRepLock in exclusive mode. + */ +static int +SyncRepWakeQueue(bool all, int mode) +{ + volatile WalSndCtlData *walsndctl = WalSndCtl; + int numprocs = 0; + dlist_mutable_iter iter; + + Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); + Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE)); + Assert(SyncRepQueueIsOrderedByLSN(mode)); + + dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode]) + { + PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); + + /* + * Assume the queue is ordered by LSN + */ + if (!all && walsndctl->lsn[mode] < proc->waitLSN) + return numprocs; + + /* + * Remove from queue. + */ + dlist_delete_thoroughly(&proc->syncRepLinks); + + /* + * SyncRepWaitForLSN() reads syncRepState without holding the lock, so + * make sure that it sees the queue link being removed before the + * syncRepState change. + */ + pg_write_barrier(); + + /* + * Set state to complete; see SyncRepWaitForLSN() for discussion of + * the various states. + */ + proc->syncRepState = SYNC_REP_WAIT_COMPLETE; + + /* + * Wake only when we have set state and removed from queue. + */ + SetLatch(&(proc->procLatch)); + + numprocs++; + } + + return numprocs; +} + +/* + * The checkpointer calls this as needed to update the shared + * sync_standbys_status flag, so that backends don't remain permanently wedged + * if synchronous_standby_names is unset. It's safe to check the current value + * without the lock, because it's only ever updated by one process. But we + * must take the lock to change it. + */ +void +SyncRepUpdateSyncStandbysDefined(void) +{ + bool sync_standbys_defined = SyncStandbysDefined(); + + if (sync_standbys_defined != + ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) != 0)) + { + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + + /* + * If synchronous_standby_names has been reset to empty, it's futile + * for backends to continue waiting. Since the user no longer wants + * synchronous replication, we'd better wake them up. + */ + if (!sync_standbys_defined) + { + int i; + + for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) + SyncRepWakeQueue(true, i); + } + + /* + * Only allow people to join the queue when there are synchronous + * standbys defined. Without this interlock, there's a race + * condition: we might wake up all the current waiters; then, some + * backend that hasn't yet reloaded its config might go to sleep on + * the queue (and never wake up). This prevents that. + */ + WalSndCtl->sync_standbys_status = SYNC_STANDBY_INIT | + (sync_standbys_defined ? SYNC_STANDBY_DEFINED : 0); + + LWLockRelease(SyncRepLock); + } + else if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT) == 0) + { + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + + /* + * Note that there is no need to wake up the queues here. We would + * reach this path only if SyncStandbysDefined() returns false, or it + * would mean that some backends are waiting with the GUC set. See + * SyncRepWaitForLSN(). + */ + Assert(!SyncStandbysDefined()); + + /* + * Even if there is no sync standby defined, let the readers of this + * information know that the sync standby data has been initialized. + * This can just be done once, hence the previous check on + * SYNC_STANDBY_INIT to avoid useless work. + */ + WalSndCtl->sync_standbys_status |= SYNC_STANDBY_INIT; + + LWLockRelease(SyncRepLock); + } +} + +#ifdef USE_ASSERT_CHECKING +static bool +SyncRepQueueIsOrderedByLSN(int mode) +{ + XLogRecPtr lastLSN; + dlist_iter iter; + + Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); + + lastLSN = 0; + + dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode]) + { + PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); + + /* + * Check the queue is ordered by LSN and that multiple procs don't + * have matching LSNs + */ + if (proc->waitLSN <= lastLSN) + return false; + + lastLSN = proc->waitLSN; + } + + return true; +} +#endif + +/* + * =========================================================== + * Synchronous Replication functions for wal receiver and wal sender processes + * =========================================================== + */ + +/* + * Calculates the Write, Flush, and Apply positions for synchronous + * standbys using the synchronous replication method. Returns true if the calculation is successful, + * otherwise false. + */ +static bool +SyncRepGetSyncRecPtrBySyncRepMethod(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys) +{ + /* Quick out if not even configured to be synchronous */ + if (SyncRepConfig == NULL) + return false; + + /* + * In a priority-based sync replication, the synced positions are the + * oldest ones among sync standbys. In a quorum-based, they are the Nth + * latest ones. + * + * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest + * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation + * because it's a bit more efficient. + * + * XXX If the numbers of current and requested sync standbys are the same, + * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced + * positions even in a quorum-based sync replication. + */ + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + { + SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, + sync_standbys, num_standbys); + } + else + { + SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, + sync_standbys, num_standbys, + SyncRepConfig->num_sync); + } + + return true; +} + /* * Calculate the oldest Write, Flush and Apply positions among sync standbys. */ @@ -834,205 +1077,6 @@ standby_priority_comparator(const void *a, const void *b) return sa->walsnd_index - sb->walsnd_index; } - -/* - * Check if we are in the list of sync standbys, and if so, determine - * priority sequence. Return priority if set, or zero to indicate that - * we are not a potential sync standby. - * - * Compare the parameter SyncRepStandbyNames against the application_name - * for this WALSender, or allow any name if we find a wildcard "*". - */ -static int -SyncRepGetStandbyPriority(void) -{ - const char *standby_name; - int priority; - bool found = false; - - /* - * Since synchronous cascade replication is not allowed, we always set the - * priority of cascading walsender to zero. - */ - - if (!SyncStandbysDefined() || SyncRepConfig == NULL) - return 0; - - standby_name = SyncRepConfig->member_names; - for (priority = 1; priority <= SyncRepConfig->nmembers; priority++) - { - if (pg_strcasecmp(standby_name, application_name) == 0 || - strcmp(standby_name, "*") == 0) - { - found = true; - break; - } - standby_name += strlen(standby_name) + 1; - } - - if (!found) - return 0; - - /* - * In quorum-based sync replication, all the standbys in the list have the - * same priority, one. - */ - return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1; -} - -/* - * Walk the specified queue from head. Set the state of any backends that - * need to be woken, remove them from the queue, and then wake them. - * Pass all = true to wake whole queue; otherwise, just wake up to - * the walsender's LSN. - * - * The caller must hold SyncRepLock in exclusive mode. - */ -static int -SyncRepWakeQueue(bool all, int mode) -{ - volatile WalSndCtlData *walsndctl = WalSndCtl; - int numprocs = 0; - dlist_mutable_iter iter; - - Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); - Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE)); - Assert(SyncRepQueueIsOrderedByLSN(mode)); - - dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode]) - { - PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); - - /* - * Assume the queue is ordered by LSN - */ - if (!all && walsndctl->lsn[mode] < proc->waitLSN) - return numprocs; - - /* - * Remove from queue. - */ - dlist_delete_thoroughly(&proc->syncRepLinks); - - /* - * SyncRepWaitForLSN() reads syncRepState without holding the lock, so - * make sure that it sees the queue link being removed before the - * syncRepState change. - */ - pg_write_barrier(); - - /* - * Set state to complete; see SyncRepWaitForLSN() for discussion of - * the various states. - */ - proc->syncRepState = SYNC_REP_WAIT_COMPLETE; - - /* - * Wake only when we have set state and removed from queue. - */ - SetLatch(&(proc->procLatch)); - - numprocs++; - } - - return numprocs; -} - -/* - * The checkpointer calls this as needed to update the shared - * sync_standbys_status flag, so that backends don't remain permanently wedged - * if synchronous_standby_names is unset. It's safe to check the current value - * without the lock, because it's only ever updated by one process. But we - * must take the lock to change it. - */ -void -SyncRepUpdateSyncStandbysDefined(void) -{ - bool sync_standbys_defined = SyncStandbysDefined(); - - if (sync_standbys_defined != - ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) != 0)) - { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); - - /* - * If synchronous_standby_names has been reset to empty, it's futile - * for backends to continue waiting. Since the user no longer wants - * synchronous replication, we'd better wake them up. - */ - if (!sync_standbys_defined) - { - int i; - - for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) - SyncRepWakeQueue(true, i); - } - - /* - * Only allow people to join the queue when there are synchronous - * standbys defined. Without this interlock, there's a race - * condition: we might wake up all the current waiters; then, some - * backend that hasn't yet reloaded its config might go to sleep on - * the queue (and never wake up). This prevents that. - */ - WalSndCtl->sync_standbys_status = SYNC_STANDBY_INIT | - (sync_standbys_defined ? SYNC_STANDBY_DEFINED : 0); - - LWLockRelease(SyncRepLock); - } - else if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT) == 0) - { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); - - /* - * Note that there is no need to wake up the queues here. We would - * reach this path only if SyncStandbysDefined() returns false, or it - * would mean that some backends are waiting with the GUC set. See - * SyncRepWaitForLSN(). - */ - Assert(!SyncStandbysDefined()); - - /* - * Even if there is no sync standby defined, let the readers of this - * information know that the sync standby data has been initialized. - * This can just be done once, hence the previous check on - * SYNC_STANDBY_INIT to avoid useless work. - */ - WalSndCtl->sync_standbys_status |= SYNC_STANDBY_INIT; - - LWLockRelease(SyncRepLock); - } -} - -#ifdef USE_ASSERT_CHECKING -static bool -SyncRepQueueIsOrderedByLSN(int mode) -{ - XLogRecPtr lastLSN; - dlist_iter iter; - - Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); - - lastLSN = 0; - - dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode]) - { - PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); - - /* - * Check the queue is ordered by LSN and that multiple procs don't - * have matching LSNs - */ - if (proc->waitLSN <= lastLSN) - return false; - - lastLSN = proc->waitLSN; - } - - return true; -} -#endif - /* * =========================================================== * Synchronous Replication functions for wal receiver processes @@ -1110,56 +1154,6 @@ SyncRepGetSendingSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecP pfree(sync_standbys); } -/* - * =========================================================== - * Synchronous Replication functions for wal receiver and wal sender processes - * =========================================================== - */ - -/* - * Calculates the Write, Flush, and Apply positions for synchronous - * standbys using the synchronous replication method. Returns true if the calculation is successful, - * otherwise false. - */ -static bool -SyncRepGetSyncRecPtrBySyncRepMethod(XLogRecPtr *writePtr, - XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, - SyncRepStandbyData *sync_standbys, - int num_standbys) -{ - /* Quick out if not even configured to be synchronous */ - if (SyncRepConfig == NULL) - return false; - - /* - * In a priority-based sync replication, the synced positions are the - * oldest ones among sync standbys. In a quorum-based, they are the Nth - * latest ones. - * - * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest - * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation - * because it's a bit more efficient. - * - * XXX If the numbers of current and requested sync standbys are the same, - * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced - * positions even in a quorum-based sync replication. - */ - if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) - { - SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, num_standbys); - } - else - { - SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, num_standbys, - SyncRepConfig->num_sync); - } - - return true; -} - /* * =========================================================== * Synchronous Replication functions executed by any process
From f4f56dafc0c925b3d8909b985cc8245988f74b5d Mon Sep 17 00:00:00 2001 From: reVInotip <[email protected]> Date: Sat, 18 Oct 2025 13:00:17 +0700 Subject: [PATCH 3/4] Pg stat replication Add new states to pg_stat_replication. They were added for the synchronous node that sends invalid values. These are indicated by a question mark and are needed to speed up problem localization. --- src/backend/replication/walsender.c | 18 +++++++++++++++--- src/include/access/xlogdefs.h | 1 + 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 90b12edf58a..2c92ef6d530 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -4099,14 +4099,26 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) * be switched to "potential" ones at the next moment. So, it's * basically useless to report "sync" or "potential" as their sync * states. We report just "quorum" for them. + * + * A question mark at the end of the role name indicates that the sending + * LSN cannot be calculated on this synchronous standby or on its standbys. */ if (priority == 0) values[10] = CStringGetTextDatum("async"); else if (is_sync_standby) - values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? - CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); + { + if (XLogRecPtrIsDefaultSending(flush)) + values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? + CStringGetTextDatum("sync?") : CStringGetTextDatum("quorum?"); + else + values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? + CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); + } else - values[10] = CStringGetTextDatum("potential"); + if (XLogRecPtrIsDefaultSending(flush)) + values[10] = CStringGetTextDatum("potential?"); + else + values[10] = CStringGetTextDatum("potential"); if (replyTime == 0) nulls[11] = true; diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index c870c51d13d..ae5bfc6881e 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -34,6 +34,7 @@ typedef uint64 XLogRecPtr; * (for example, when the number of synchronous standbys is less than required). */ #define DefaultSendingLSN ((XLogRecPtr) 2) +#define XLogRecPtrIsDefaultSending(r) ((r) == DefaultSendingLSN) /* * First LSN to use for "fake" LSNs.
From 84c9f286681138c9a1995539d23907990a76f21a Mon Sep 17 00:00:00 2001 From: reVInotip <[email protected]> Date: Sat, 18 Oct 2025 10:54:27 +0700 Subject: [PATCH 4/4] Add tap tests for cascade sync replication This patch contains rewritten tap tests for synchronous replication. These tests now take cascading into account. Also the test infrastructure has been updated to address configuration requirements for cascaded synchronous replication. Server configurations that were incompatible with cascaded synchronous replication have been identified and corrected across the test suite. --- src/test/perl/PostgreSQL/Test/Cluster.pm | 30 ++ src/test/recovery/t/007_sync_rep.pl | 571 ++++++++++++++++++++++------- src/test/recovery/t/009_twophase.pl | 18 +- src/test/recovery/t/012_subtransactions.pl | 5 + 4 files changed, 487 insertions(+), 137 deletions(-) diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm index 35413f14019..9c14f9fdad0 100644 --- a/src/test/perl/PostgreSQL/Test/Cluster.pm +++ b/src/test/perl/PostgreSQL/Test/Cluster.pm @@ -1409,6 +1409,36 @@ primary_conninfo='$root_connstr' return; } +# Internal routine to enable waiting synchronous standbys +# on primary node +sub enable_waiting_standbys +{ + my ($self) = @_; + my $name = $self->name; + + print "### Enabling waiting standbys for node \"$name\"\n"; + $self->append_conf( + 'postgresql.conf', qq( + synchronous_commit=on + )); + return; +} + +# Internal routine to ignore synchronous_standby_names +# on standby node +sub disable_waiting_standbys +{ + my ($self) = @_; + my $name = $self->name; + + print "### Now ignore synchronous_standby_names for node \"$name\"\n"; + $self->append_conf( + 'postgresql.conf', qq( + synchronous_commit=local + )); + return; +} + # Internal routine to enable archive recovery command on a standby node sub enable_restoring { diff --git a/src/test/recovery/t/007_sync_rep.pl b/src/test/recovery/t/007_sync_rep.pl index 602652a7638..ad463287b4d 100644 --- a/src/test/recovery/t/007_sync_rep.pl +++ b/src/test/recovery/t/007_sync_rep.pl @@ -50,6 +50,194 @@ sub start_standby_and_wait return; } +sub check_replication_state +{ + my ($source_node, $node_standby_1, $node_standby_2, + $node_standby_3, $node_standby_4, $backup_name) = @_; + + my $stdby1_name = $node_standby_1->name; + my $stdby2_name = $node_standby_2->name; + my $stdby3_name = $node_standby_3->name; + my $stdby4_name = $node_standby_4->name; + + # Check that sync_state is determined correctly when + # synchronous_standby_names is specified in old syntax. + test_sync_state( + $source_node, +"$stdby1_name|1|sync +$stdby2_name|2|potential +$stdby3_name|0|async", + 'old syntax of synchronous_standby_names', + "$stdby1_name,$stdby2_name"); + + # Check that all the standbys are considered as either sync or + # potential when * is specified in synchronous_standby_names. + # Note that standby1 is chosen as sync standby because + # it's stored in the head of WalSnd array which manages + # all the standbys though they have the same priority. + test_sync_state( + $source_node, +"$stdby1_name|1|sync +$stdby2_name|1|potential +$stdby3_name|1|potential", + 'asterisk in synchronous_standby_names', + '*'); + + # Stop and start standbys to rearrange the order of standbys + # in WalSnd array. Now, if standbys have the same priority, + # standby2 is selected preferentially and standby3 is next. + $node_standby_1->stop; + $node_standby_2->stop; + $node_standby_3->stop; + + # Make sure that each standby reports back to the primary in the wanted + # order. + start_standby_and_wait($source_node, $node_standby_2); + start_standby_and_wait($source_node, $node_standby_3); + + # Specify 2 as the number of sync standbys. + # Check that two standbys are in 'sync' state. + test_sync_state( + $source_node, +"$stdby2_name|2|sync +$stdby3_name|3|sync", + '2 synchronous standbys', + "2($stdby1_name,$stdby2_name,$stdby3_name)"); + + # Start standby1 + start_standby_and_wait($source_node, $node_standby_1); + + $node_standby_4->init_from_backup($source_node, $backup_name, + has_streaming => 1); + $node_standby_4->start; + + # Check that standby1 and standby2 whose names appear earlier in + # synchronous_standby_names are considered as sync. Also check that + # standby3 appearing later represents potential, and standby4 is + # in 'async' state because it's not in the list. + test_sync_state( + $source_node, +"$stdby1_name|1|sync +$stdby2_name|2|sync +$stdby3_name|3|potential +$stdby4_name|0|async", + '2 sync, 1 potential, and 1 async'); + + # Check that sync_state of each standby is determined correctly + # when num_sync exceeds the number of names of potential sync standbys + # specified in synchronous_standby_names. + test_sync_state( + $source_node, +"$stdby1_name|0|async +$stdby2_name|4|sync +$stdby3_name|3|sync +$stdby4_name|1|sync", + 'num_sync exceeds the num of potential sync standbys', + "6($stdby4_name,standby,$stdby3_name,$stdby2_name)"); + + # The setting that * comes before another standby name is acceptable + # but does not make sense in most cases. Check that sync_state is + # chosen properly even in case of that setting. standby1 is selected + # as synchronous as it has the highest priority, and is followed by a + # second standby listed first in the WAL sender array, which is + # standby2 in this case. + test_sync_state( + $source_node, +"$stdby1_name|1|sync +$stdby2_name|2|sync +$stdby3_name|2|potential +$stdby4_name|2|potential", + 'asterisk before another standby name', + "2($stdby1_name,*,$stdby2_name)"); + + # Check that the setting of '2(*)' chooses standby2 and standby3 that are stored + # earlier in WalSnd array as sync standbys. + test_sync_state( + $source_node, +"$stdby1_name|1|potential +$stdby2_name|1|sync +$stdby3_name|1|sync +$stdby4_name|1|potential", + 'multiple standbys having the same priority are chosen as sync', + '2(*)'); + + # Stop Standby3 which is considered in 'sync' state. + $node_standby_3->stop; + + # Check that the state of standby1 stored earlier in WalSnd array than + # standby4 is transited from potential to sync. + test_sync_state( + $source_node, +"$stdby1_name|1|sync +$stdby2_name|1|sync +$stdby4_name|1|potential", + 'potential standby found earlier in array is promoted to sync'); + + # Check that standby1 and standby2 are chosen as sync standbys + # based on their priorities. + test_sync_state( + $source_node, +"$stdby1_name|1|sync +$stdby2_name|2|sync +$stdby4_name|0|async", + 'priority-based sync replication specified by FIRST keyword', + "FIRST 2($stdby1_name, $stdby2_name)"); + + # Check that all the listed standbys are considered as candidates + # for sync standbys in a quorum-based sync replication. + test_sync_state( + $source_node, +"$stdby1_name|1|quorum +$stdby2_name|1|quorum +$stdby4_name|0|async", + '2 quorum and 1 async', + "ANY 2($stdby1_name, $stdby2_name)"); + + # Start Standby3 which will be considered in 'quorum' state. + $node_standby_3->start; + + # Check that the setting of 'ANY 2(*)' chooses all standbys as + # candidates for quorum sync standbys. + test_sync_state( + $source_node, +"$stdby1_name|1|quorum +$stdby2_name|1|quorum +$stdby3_name|1|quorum +$stdby4_name|1|quorum", + 'all standbys are considered as candidates for quorum sync standbys', + 'ANY 2(*)'); + + return; +} + +# Cluster topology (the arrow shows the direction of replication) +# +# |-> cascade_standby10 |-> standby +# | +# |-> cascade_standby11 +# |-> cascade_standby00 -> | +# | |-> cascade_standby12 +# | | +# | |-> cascade_standby13 +# | +# | |-> standby00 +# | | +# | |-> standby01 +# |-> cascade_standby01 -> | +# | |-> standby02 +# | | +# primary -> | |-> standby03 +# | +# | |-> standby10 +# | | +# | |-> standby11 +# |-> cascade_standby02 -> | +# | |-> standby12 +# | | +# | |-> standby13 +# | +# |-> cascade_standby03 + # Initialize primary node my $node_primary = PostgreSQL::Test::Cluster->new('primary'); $node_primary->init(allows_streaming => 1); @@ -62,160 +250,275 @@ $node_primary->backup($backup_name); # Create all the standbys. Their status on the primary is checked to ensure # the ordering of each one of them in the WAL sender array of the primary. -# Create standby1 linking to primary -my $node_standby_1 = PostgreSQL::Test::Cluster->new('standby1'); -$node_standby_1->init_from_backup($node_primary, $backup_name, - has_streaming => 1); -start_standby_and_wait($node_primary, $node_standby_1); +my @cascade_standbys0; +my @cascade_standby0_names; -# Create standby2 linking to primary -my $node_standby_2 = PostgreSQL::Test::Cluster->new('standby2'); -$node_standby_2->init_from_backup($node_primary, $backup_name, - has_streaming => 1); -start_standby_and_wait($node_primary, $node_standby_2); +my $node; +my $i = 0; +for (; $i < 3; $i++) { + push @cascade_standby0_names, "cascade_standby0$i"; + + $node = PostgreSQL::Test::Cluster->new($cascade_standby0_names[$i]); + $node->init_from_backup($node_primary, $backup_name, + has_streaming => 1); + start_standby_and_wait($node_primary, $node); + + push @cascade_standbys0, $node; +} + +# This cascade standby will be initted in check_replication_state function +push @cascade_standby0_names, "cascade_standby0$i"; + +$node = PostgreSQL::Test::Cluster->new($cascade_standby0_names[$i]); +push @cascade_standbys0, $node; +$i = 0; + +# Create cascade standby and it`s last standbys. +$backup_name = 'cascade_standby00_backup'; + +# Take backup of cascade_standby00 +$cascade_standbys0[0]->backup($backup_name); + +my @cascade_standbys1; +my @cascade_standby1_names; +for (; $i < 3; $i++) { + push @cascade_standby1_names, "cascade_standby1$i"; + + $node = PostgreSQL::Test::Cluster->new($cascade_standby1_names[$i]); + $node->init_from_backup($cascade_standbys0[0], $backup_name, + has_streaming => 1); + start_standby_and_wait($cascade_standbys0[0], $node); -# Create standby3 linking to primary -my $node_standby_3 = PostgreSQL::Test::Cluster->new('standby3'); -$node_standby_3->init_from_backup($node_primary, $backup_name, + push @cascade_standbys1, $node; +} + +# This cascade standby will be initted in check_replication_state function +push @cascade_standby1_names, "cascade_standby1$i"; + +$node = PostgreSQL::Test::Cluster->new($cascade_standby1_names[$i]); +push @cascade_standbys1, $node; +$i = 0; + +# Take backup +$backup_name = 'cascade_standby10_backup'; +$cascade_standbys1[0]->backup($backup_name); + +# Create standby5 linking to cascade standby +my $standby = PostgreSQL::Test::Cluster->new('standby'); +$standby->init_from_backup($cascade_standbys1[0], $backup_name, has_streaming => 1); -start_standby_and_wait($node_primary, $node_standby_3); +start_standby_and_wait($cascade_standbys1[0], $standby); + +my $standby_name = $standby->name; + +# Take backup of cascade_standby01 +$backup_name = 'cascade_standby01_backup'; +$cascade_standbys0[1]->backup($backup_name); + +my @standbys0; +for (; $i < 3; $i++) { + $node = PostgreSQL::Test::Cluster->new("standby0$i"); + $node->init_from_backup($cascade_standbys0[1], $backup_name, + has_streaming => 1); + start_standby_and_wait($cascade_standbys0[1], $node); + + push @standbys0, $node; +} + +# This cascade_standby will be initted in check_replication_state function +$node = PostgreSQL::Test::Cluster->new("standby0$i"); +push @standbys0, $node; +$i = 0; + +# Take backup of cascade_standby02 +$backup_name = 'cascade_standby02_backup'; +$cascade_standbys0[2]->backup($backup_name); + +my @standbys1; +for ($i = 0; $i < 3; $i++) { + $node = PostgreSQL::Test::Cluster->new("standby1$i"); + $node->init_from_backup($cascade_standbys0[2], $backup_name, + has_streaming => 1); + start_standby_and_wait($cascade_standbys0[2], $node); + + push @standbys1, $node; +} + +# This cascade standby will be initted in check_replication_state function +$node = PostgreSQL::Test::Cluster->new("standby1$i"); +push @standbys1, $node; + +# Set up initial topology -# Check that sync_state is determined correctly when -# synchronous_standby_names is specified in old syntax. test_sync_state( - $node_primary, qq(standby1|1|sync -standby2|2|potential -standby3|0|async), - 'old syntax of synchronous_standby_names', - 'standby1,standby2'); - -# Check that all the standbys are considered as either sync or -# potential when * is specified in synchronous_standby_names. -# Note that standby1 is chosen as sync standby because -# it's stored in the head of WalSnd array which manages -# all the standbys though they have the same priority. + $node_primary, +"${cascade_standby0_names[0]}|1|sync +${cascade_standby0_names[1]}|2|sync +${cascade_standby0_names[2]}|3|sync", + 'set up initial topology for primary server and it`s standbys', + "FIRST 3 (${cascade_standby0_names[0]}, ${cascade_standby0_names[1]}, ${cascade_standby0_names[2]})"); + test_sync_state( - $node_primary, qq(standby1|1|sync -standby2|1|potential -standby3|1|potential), - 'asterisk in synchronous_standby_names', - '*'); - -# Stop and start standbys to rearrange the order of standbys -# in WalSnd array. Now, if standbys have the same priority, -# standby2 is selected preferentially and standby3 is next. -$node_standby_1->stop; -$node_standby_2->stop; -$node_standby_3->stop; - -# Make sure that each standby reports back to the primary in the wanted -# order. -start_standby_and_wait($node_primary, $node_standby_2); -start_standby_and_wait($node_primary, $node_standby_3); - -# Specify 2 as the number of sync standbys. -# Check that two standbys are in 'sync' state. + $cascade_standbys0[0], +"${cascade_standby1_names[0]}|1|quorum +${cascade_standby1_names[1]}|1|quorum +${cascade_standby1_names[2]}|0|async", + 'set up initial topology for cascade standby server and it`s standbys', + "ANY 2 (${cascade_standby1_names[0]}, ${cascade_standby1_names[1]})"); + test_sync_state( - $node_primary, qq(standby2|2|sync -standby3|3|sync), - '2 synchronous standbys', - '2(standby1,standby2,standby3)'); + $cascade_standbys1[0], +"$standby_name|1|sync", + 'make sync standby 5', + "$standby_name"); -# Start standby1 -start_standby_and_wait($node_primary, $node_standby_1); +my ($stdout, $stderr, $timed_out); -# Create standby4 linking to primary -my $node_standby_4 = PostgreSQL::Test::Cluster->new('standby4'); -$node_standby_4->init_from_backup($node_primary, $backup_name, - has_streaming => 1); -$node_standby_4->start; +# Check that queries working +my $cmdret = $node_primary->psql('postgres', 'CREATE TABLE test (id integer, data text);', + stdout => \$stdout, stderr => \$stderr, + timeout => $PostgreSQL::Test::Utils::timeout_default, + timed_out => \$timed_out, + on_error_die => 1, + extra_params => ['--single-transaction']); + +ok($cmdret == 0 && $timed_out == 0, "Query works"); + +# Try to stop standby 5 +$standby->stop; + +# This query needs only for status of sync replicas can be changed +$cmdret = $node_primary->psql('postgres', 'INSERT INTO test VALUES (1, \'first insertion\')', + stdout => \$stdout, stderr => \$stderr, + timeout => $PostgreSQL::Test::Utils::timeout_default, + timed_out => \$timed_out, + on_error_die => 1, + extra_params => ['--single-transaction']); + +ok($timed_out == 1, "Query fails with time out"); -# Check that standby1 and standby2 whose names appear earlier in -# synchronous_standby_names are considered as sync. Also check that -# standby3 appearing later represents potential, and standby4 is -# in 'async' state because it's not in the list. test_sync_state( - $node_primary, qq(standby1|1|sync -standby2|2|sync -standby3|3|potential -standby4|0|async), - '2 sync, 1 potential, and 1 async'); - -# Check that sync_state of each standby is determined correctly -# when num_sync exceeds the number of names of potential sync standbys -# specified in synchronous_standby_names. + $node_primary, +"${cascade_standby0_names[0]}|1|sync? +${cascade_standby0_names[1]}|2|sync +${cascade_standby0_names[2]}|3|sync", + "check that ${cascade_standby0_names[0]} in primary marks as invalid"); + +test_sync_state( + $cascade_standbys0[0], +"${cascade_standby1_names[0]}|1|quorum? +${cascade_standby1_names[1]}|1|quorum +${cascade_standby1_names[2]}|0|async", + "check that ${cascade_standby1_names[0]} in cascade standby marks as invalid"); + +$standby->start; + +$cmdret = $node_primary->psql('postgres', 'INSERT INTO test VALUES (2, \'second insertion\');', + stdout => \$stdout, stderr => \$stderr, + timeout => $PostgreSQL::Test::Utils::timeout_default, + timed_out => \$timed_out, + on_error_die => 1, + extra_params => ['--single-transaction']); + +ok($cmdret == 0 && $timed_out == 0, 'Query works again'); + test_sync_state( - $node_primary, qq(standby1|0|async -standby2|4|sync -standby3|3|sync -standby4|1|sync), - 'num_sync exceeds the num of potential sync standbys', - '6(standby4,standby0,standby3,standby2)'); - -# The setting that * comes before another standby name is acceptable -# but does not make sense in most cases. Check that sync_state is -# chosen properly even in case of that setting. standby1 is selected -# as synchronous as it has the highest priority, and is followed by a -# second standby listed first in the WAL sender array, which is -# standby2 in this case. + $node_primary, +"${cascade_standby0_names[0]}|1|sync +${cascade_standby0_names[1]}|2|sync +${cascade_standby0_names[2]}|3|sync", + 'check that primary state backs to normal'); + test_sync_state( - $node_primary, qq(standby1|1|sync -standby2|2|sync -standby3|2|potential -standby4|2|potential), - 'asterisk before another standby name', - '2(standby1,*,standby2)'); - -# Check that the setting of '2(*)' chooses standby2 and standby3 that are stored -# earlier in WalSnd array as sync standbys. + $cascade_standbys0[0], +"${cascade_standby1_names[0]}|1|quorum +${cascade_standby1_names[1]}|1|quorum +${cascade_standby1_names[2]}|0|async", + 'check that cascade standby state backs to normal'); + test_sync_state( - $node_primary, qq(standby1|1|potential -standby2|1|sync -standby3|1|sync -standby4|1|potential), - 'multiple standbys having the same priority are chosen as sync', - '2(*)'); - -# Stop Standby3 which is considered in 'sync' state. -$node_standby_3->stop; - -# Check that the state of standby1 stored earlier in WalSnd array than -# standby4 is transited from potential to sync. + $cascade_standbys1[0], +"$standby_name|1|sync", + 'check that standby 5 is sync'); + +check_replication_state($node_primary, $cascade_standbys0[0], $cascade_standbys0[1], $cascade_standbys0[2], $cascade_standbys0[3], 'primary_backup'); + test_sync_state( - $node_primary, qq(standby1|1|sync -standby2|1|sync -standby4|1|potential), - 'potential standby found earlier in array is promoted to sync'); + $node_primary, +"${cascade_standby0_names[0]}|1|sync +${cascade_standby0_names[1]}|2|sync +${cascade_standby0_names[2]}|3|potential +${cascade_standby0_names[3]}|0|async", + 'check that primary state backs to normal', + "FIRST 2 (${cascade_standby0_names[0]}, ${cascade_standby0_names[1]}, ${cascade_standby0_names[2]})"); + +# Reorder standbys in WalSnd array after testing + +$cascade_standbys1[0]->stop; +$cascade_standbys1[1]->stop; +$cascade_standbys1[2]->stop; + +$cascade_standbys0[0]->stop; + +start_standby_and_wait($node_primary, $cascade_standbys0[0]); +start_standby_and_wait($cascade_standbys0[0], $cascade_standbys1[0]); +start_standby_and_wait($cascade_standbys0[0], $cascade_standbys1[1]); +start_standby_and_wait($cascade_standbys0[0], $cascade_standbys1[2]); + +check_replication_state($cascade_standbys0[0], $cascade_standbys1[0], $cascade_standbys1[1], $cascade_standbys1[2], $cascade_standbys1[3], 'cascade_standby00_backup'); -# Check that standby1 and standby2 are chosen as sync standbys -# based on their priorities. test_sync_state( - $node_primary, qq(standby1|1|sync -standby2|2|sync -standby4|0|async), - 'priority-based sync replication specified by FIRST keyword', - 'FIRST 2(standby1, standby2)'); - -# Check that all the listed standbys are considered as candidates -# for sync standbys in a quorum-based sync replication. + $node_primary, +"${cascade_standby0_names[0]}|1|sync +${cascade_standby0_names[1]}|2|sync +${cascade_standby0_names[2]}|3|potential +${cascade_standby0_names[3]}|0|async", + 'check that primary state does not change after testing cascade sync replication on standby 1'); + +# Reorder standbys in WalSnd array after testing + +$standbys0[0]->stop; +$standbys0[1]->stop; +$standbys0[2]->stop; + +$cascade_standbys0[1]->stop; + +start_standby_and_wait($node_primary, $cascade_standbys0[1]); +start_standby_and_wait($cascade_standbys0[1], $standbys0[0]); +start_standby_and_wait($cascade_standbys0[1], $standbys0[1]); +start_standby_and_wait($cascade_standbys0[1], $standbys0[2]); + +check_replication_state($cascade_standbys0[1], $standbys0[0], $standbys0[1], $standbys0[2], $standbys0[3], 'cascade_standby01_backup'); + test_sync_state( - $node_primary, qq(standby1|1|quorum -standby2|1|quorum -standby4|0|async), - '2 quorum and 1 async', - 'ANY 2(standby1, standby2)'); + $node_primary, +"${cascade_standby0_names[0]}|1|sync +${cascade_standby0_names[1]}|2|sync +${cascade_standby0_names[2]}|3|potential +${cascade_standby0_names[3]}|0|async", + 'check that primary state does not change after testing cascade sync replication on standby 2'); + +# Reorder standbys in WalSnd array after testing + +$standbys1[0]->stop; +$standbys1[1]->stop; +$standbys1[2]->stop; + +$cascade_standbys0[2]->stop; + +start_standby_and_wait($node_primary, $cascade_standbys0[2]); +start_standby_and_wait($cascade_standbys0[2], $standbys1[0]); +start_standby_and_wait($cascade_standbys0[2], $standbys1[1]); +start_standby_and_wait($cascade_standbys0[2], $standbys1[2]); -# Start Standby3 which will be considered in 'quorum' state. -$node_standby_3->start; +check_replication_state($cascade_standbys0[2], $standbys1[0], $standbys1[1], $standbys1[2], $standbys1[3], 'cascade_standby02_backup'); -# Check that the setting of 'ANY 2(*)' chooses all standbys as -# candidates for quorum sync standbys. test_sync_state( - $node_primary, qq(standby1|1|quorum -standby2|1|quorum -standby3|1|quorum -standby4|1|quorum), - 'all standbys are considered as candidates for quorum sync standbys', - 'ANY 2(*)'); + $node_primary, +"${cascade_standby0_names[0]}|1|sync +${cascade_standby0_names[1]}|2|sync +${cascade_standby0_names[2]}|3|potential +${cascade_standby0_names[3]}|0|async", + 'check that primary state does not change after testing cascade sync replication on standby 2'); done_testing(); diff --git a/src/test/recovery/t/009_twophase.pl b/src/test/recovery/t/009_twophase.pl index 1a662ebe499..40648a0a016 100644 --- a/src/test/recovery/t/009_twophase.pl +++ b/src/test/recovery/t/009_twophase.pl @@ -45,10 +45,12 @@ $node_london->backup('london_backup'); my $node_paris = PostgreSQL::Test::Cluster->new('paris'); $node_paris->init_from_backup($node_london, 'london_backup', has_streaming => 1); +# Configure paris node for non waiting because it initially will be synchronous standby $node_paris->append_conf( 'postgresql.conf', qq( subtransaction_buffers = 32 )); +$node_paris->disable_waiting_standbys(); $node_paris->start; # Switch to synchronous replication in both directions @@ -230,14 +232,15 @@ note "Now paris is primary and london is standby"; ($cur_primary, $cur_standby) = ($node_paris, $node_london); $cur_primary_name = $cur_primary->name; -# because london is not running at this point, we can't use syncrep commit -# on this command +# we already configure paris node for non wait, so even if london node +# isn`t running at this point we can prepare commit $psql_rc = $cur_primary->psql('postgres', - "SET synchronous_commit = off; COMMIT PREPARED 'xact_009_10'"); + "COMMIT PREPARED 'xact_009_10'"); is($psql_rc, '0', "Restore of prepared transaction on promoted standby"); # restart old primary as new standby $cur_standby->enable_streaming($cur_primary); +$cur_standby->disable_waiting_standbys(); $cur_standby->start; ############################################################################### @@ -247,14 +250,20 @@ $cur_standby->start; # consistent. ############################################################################### +# here we need to enable sycnrep commit because current primary has been configured +# for non waiting $cur_primary->psql( 'postgres', " BEGIN; + SET synchronous_commit = on; INSERT INTO t_009_tbl VALUES (23, 'issued to ${cur_primary_name}'); SAVEPOINT s1; INSERT INTO t_009_tbl VALUES (24, 'issued to ${cur_primary_name}'); PREPARE TRANSACTION 'xact_009_11';"); $cur_primary->stop; + +# configure cur standby to be primary +$cur_standby->enable_waiting_standbys(); $cur_standby->restart; $cur_standby->promote; @@ -297,6 +306,8 @@ $cur_standby->promote; # change roles note "Now paris is primary and london is standby"; ($cur_primary, $cur_standby) = ($node_paris, $node_london); +$cur_primary->enable_waiting_standbys(); +$cur_primary->restart; $cur_primary_name = $cur_primary->name; $cur_primary->psql( @@ -308,6 +319,7 @@ is($psql_out, '1', # restart old primary as new standby $cur_standby->enable_streaming($cur_primary); +$cur_standby->disable_waiting_standbys(); $cur_standby->start; $cur_primary->psql('postgres', "COMMIT PREPARED 'xact_009_12'"); diff --git a/src/test/recovery/t/012_subtransactions.pl b/src/test/recovery/t/012_subtransactions.pl index 25df73463b7..74323a7c8b4 100644 --- a/src/test/recovery/t/012_subtransactions.pl +++ b/src/test/recovery/t/012_subtransactions.pl @@ -121,6 +121,7 @@ is($psql_out, '8128', "Visible"); # restore state ($node_primary, $node_standby) = ($node_standby, $node_primary); $node_standby->enable_streaming($node_primary); +$node_standby->disable_waiting_standbys(); $node_standby->start; $node_standby->psql( 'postgres', @@ -166,6 +167,9 @@ is($psql_out, '-1', "Not visible"); # restore state ($node_primary, $node_standby) = ($node_standby, $node_primary); +$node_primary->enable_waiting_standbys(); +$node_primary->restart(); + $node_standby->enable_streaming($node_primary); $node_standby->start; $psql_rc = $node_primary->psql('postgres', "COMMIT PREPARED 'xact_012_1'"); @@ -203,6 +207,7 @@ is($psql_out, '-1', "Not visible"); # restore state ($node_primary, $node_standby) = ($node_standby, $node_primary); $node_standby->enable_streaming($node_primary); +$node_standby->disable_waiting_standbys(); $node_standby->start; $psql_rc = $node_primary->psql('postgres', "ROLLBACK PREPARED 'xact_012_1'"); is($psql_rc, '0', -- 2.51.0
