Hello,Recently we’ve faced an interesting situation on a standby environment with configured cascade replication and large number (~100) of configured walsenders. We’ve noticed a very high CPU consumption on such environment with the most time-consuming operation being signal delivery from startup recovery process to walsenders via WalSndWakeup invocations from ApplyWalRecord in xlogrecovery.c.
The startup standby process notifies walsenders for downstream systems using ConditionVariableBroadcast (CV), so only processes waiting on this CV need to be contacted. However in case of high load we seems to be hitting here a bottleneck anyway. The current implementation tries to send notification after processing of each WAL record (i.e. during each invocation of ApplyWalRecord), so this implies high rate of WalSndWakeup invocations. At the same time, this also provides each walsender with very small chunk of data to process, so almost every process will be present in the CV wait list for the next iteration. As result, waiting list should be always fully packed in such case, which additionally reduces performance of WAL records processing by the standby instance.
To reproduce such behavior we could use a simple environment with three servers: primary instance, attached physical standby and its downstream server with large number of logical replication subscriptions. Attached is the synthetic test case (test_scenario.zip) to reproduce this behavior – script ‘test_prepare.sh’ could be used to create required environment with test data and ‘test_execute.sh’ script executes ‘pgbench’ tool with simple updates against primary instance to trigger replication to other servers. All CPUs on my machine could be completely saturated with just about 6 clients after 30 seconds in the test. Please check the environment properties at the top of these scripts before running them, as they need to be updated in order to specify location for installed PG build, target location for database instances creation and used ports.
Thinking about possible ways to improve such case, some sort of batching may be a working option. We could try to postpone sending notification until recovery has applied some number of messages. This will reduce rate of CV notifications and will also give receivers more data to process, so they may not need to enter the CV wait state so often. Counting applied records is not difficult, but the tricky part here is to ensure that we do not postpone notifications for too long in case of low load.
The first possible approach to limit such delay is to just ask ReadRecord to return control back on the first iteration if there are currently no more incoming records. From the standby recovery perspective we are either reading and applying records from the upstream instance or waiting for new message to become available. We could introduce additional field in XLogPageReadPrivate to notify XlogPageRead on first iteration of ReadRecord to perform non-blocking read and return null message if no data is currently available. In this case we may immediately notify senders and then switch to regular blocking read invocations.
However, in this approach we could not control time required to apply a single record and thus could not effectively control the resulting delay. We can hit a message which requires a lot of time to apply (e.g. ‘drop database’), which will hold all messages in the batch from being delivered to walsenders. Another possible option here is to use a timer to schedule notification if we are either waiting for records or applying them. So, in this case the WalSndWakeup will be invoked either after applying certain number of messages or after expiration of timeout since last notification.
We’ve tried to create a proof of concept for such approach and made another finding: it seems, that current implementation of ConditionVariable notification does not work correctly if process is waiting on some CV while also trying to send a broadcast to some other CV from the interrupt handler. The comments in code implies that such case is supported, but in reality it could leave CV in the inconsistent state and cause problems on subsequent invocations of CV broadcast. The invocation of ConditionVariableBroadcast removes current process from the wakeup list of CV referenced by the ‘cv_sleep_target’ variable and then clears this variable as well. Once the ConditionVariableTimedSleep method returns from the sleep it adds itself back to the wakeup list, but then just returns control back to the client without restoring the ‘cv_sleep_target’ value. In this case subsequent invocation of the ConditionVariableCancelSleep won’t be able to locate correct CV and correctly complete the wait cycle (current process will remain in the list), so the next invocation of ConditionVariableBroadcast for this CV may fail on assertions (we are not expecting to find ourselves on the list of processes which need to receive our signal). I think, that the correct approach here is to just return control to the client just after the wait if ‘cv_sleep_target’ has changed. We do not need to add ourselves to the wakeup list in such case, as client will check exit condition in the cycle and then either invoke ConditionVariableCancelSleep or ConditionVariableTimedSleep again. Both methods will correctly update the CV state, so we won’t miss any notifications or leave the CV in the improper state.
We’ve prepared two test patches on top of current master to address both issues: * 0001-CV-correctly-handle-cv_sleep_target-change.patch – adjust CV logic to correctly process case with ‘cv_sleep_target’ change during sleep. * 0002-Implement-batching-for-cascade-replication.patch – test patch to implement possible batching approach in xlogreceiver.c with timer. Currently it uses GUC variables to allow testing of different batch sizes and timeout values.
With both patches applied we’ve noticed a significant reduction in CPU consumption while using the synthetic test program mentioned above. If only the second patch is applied to the build, then problem with CV variable could be reproduced by building server with casserts enabled and running TAP tests in ‘src/test/recovery’ directory.
It would be great to hear any thoughts on these observations and fixing approaches, as well as possible pitfalls of proposed changes.
Thanks, Alexey
From e5cfabfa7fd8f05f1a7c8421e60087d672fb59e4 Mon Sep 17 00:00:00 2001 From: Alexey Makhmutov <a.makhmu...@postgrespro.ru> Date: Fri, 28 Mar 2025 16:16:36 +0300 Subject: [PATCH 1/2] Ensure that content of the wait list on CV is aligned with cv_sleep_target value Wait on the ConditionVariable could be interleaved with interruption handlers, such as timers. If such handler uses CV calls (i.e. ConditionVariableBroadcast), then value of the cv_sleep_target could be null or point to a different CV after wakeup. In this case we should not try to add ourselves to the wakeup wait list, as subsequent call to ConditionVariableCancelSleep won't be able to find our CV to clear its list. We should instead return control to the client, so the exit condition for the CV external wait loop could be checked. If wait is still required, then next invocation of ConditionVariableTimedSleep will perform the required setup procedures for CV. Behavior before this change results in the mismatch between cv_sleep_target value and content of the wakeup list, so the next invocation of ConditionVariableBroadcast method may fail due to presence of current process in the list. --- src/backend/storage/lmgr/condition_variable.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c index 228303e77f7..3b5d9764403 100644 --- a/src/backend/storage/lmgr/condition_variable.c +++ b/src/backend/storage/lmgr/condition_variable.c @@ -165,6 +165,17 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, /* Reset latch before examining the state of the wait list. */ ResetLatch(MyLatch); + /* + * First, check that we are still expected to wait on this variable. + * If sleep target has changed, then we need to return spuriously to + * allow caller to check the exit continue and invoke the wait. + * function again to properly prepare for the next sleep. + * Note, that we don't need to change wait list in this case, as we + * should be deleted from the list in case of cv_sleep_target change. + */ + if (cv != cv_sleep_target) + return false; + /* * If this process has been taken out of the wait list, then we know * that it has been signaled by ConditionVariableSignal (or -- 2.49.0
From 75d3e41f0833845b7abc9dff6dc9d4857c1c340c Mon Sep 17 00:00:00 2001 From: Rustam Khamidullin <r.khamidul...@postgrespro.ru> Date: Fri, 14 Mar 2025 18:18:34 +0700 Subject: [PATCH 2/2] Implement batching for WAL records notification during cascade replication Currently standby server notifies walsenders after applying of each WAL record during cascade replication. This creates a bottleneck in case of large number of sender processes during WalSndWakeup invocation. This change introduces batching for such notifications, which are now sent either after certain number of applied records or specified time interval (whichever comes first). Co-authored-by: Alexey Makhmutov <a.makhmu...@postgrespro.ru> --- src/backend/access/transam/xlogrecovery.c | 68 ++++++++++++++++++++++- src/backend/postmaster/startup.c | 1 + src/backend/utils/misc/guc_tables.c | 22 ++++++++ src/include/access/xlogrecovery.h | 4 ++ src/include/utils/timeout.h | 1 + 5 files changed, 94 insertions(+), 2 deletions(-) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 0aa3ab59085..84a6c176593 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -64,6 +64,7 @@ #include "utils/pg_lsn.h" #include "utils/ps_status.h" #include "utils/pg_rusage.h" +#include "utils/timeout.h" /* Unsupported old recovery command file names (relative to $PGDATA) */ #define RECOVERY_COMMAND_FILE "recovery.conf" @@ -147,6 +148,8 @@ bool InArchiveRecovery = false; static bool StandbyModeRequested = false; bool StandbyMode = false; +#define StandbyWithCascadeReplication() (AmStartupProcess() && StandbyMode && AllowCascadeReplication()) + /* was a signal file present at startup? */ static bool standby_signal_file_found = false; static bool recovery_signal_file_found = false; @@ -298,6 +301,14 @@ bool reachedConsistency = false; static char *replay_image_masked = NULL; static char *primary_image_masked = NULL; +/* Maximum number of applied records in batch before notifying walsender during cascade replication */ +int cascadeReplicationMaxBatchSize; + +/* Maximum batching delay before notifying walsender during cascade replication */ +int cascadeReplicationMaxBatchDelay; + +/* Counter for collected records during cascade replication */ +static volatile sig_atomic_t appliedRecords = 0; /* * Shared-memory state for WAL recovery. @@ -1747,6 +1758,14 @@ PerformWalRecovery(void) if (!StandbyMode) begin_startup_progress_phase(); + /* Enable batch mode for cascade replication */ + if (StandbyWithCascadeReplication() && cascadeReplicationMaxBatchSize > 1 && cascadeReplicationMaxBatchDelay > 0) + { + enable_timeout_every(STANDBY_CASCADE_WAL_SEND_TIMEOUT, + TimestampTzPlusMilliseconds(GetCurrentTimestamp(), cascadeReplicationMaxBatchDelay), + cascadeReplicationMaxBatchDelay); + } + /* * main redo apply loop */ @@ -1839,6 +1858,16 @@ PerformWalRecovery(void) * end of main redo apply loop */ + if (StandbyWithCascadeReplication() && cascadeReplicationMaxBatchSize > 1 && cascadeReplicationMaxBatchDelay > 0) + { + disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false); + if (appliedRecords > 0) + { + appliedRecords = 0; + WalSndWakeup(false, true); + } + } + if (reachedRecoveryTarget) { if (!reachedConsistency) @@ -2037,8 +2066,30 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl * be created otherwise) * ------ */ - if (AllowCascadeReplication()) - WalSndWakeup(switchedTLI, true); + + if (StandbyWithCascadeReplication()) + { + if (cascadeReplicationMaxBatchSize <=1) + WalSndWakeup(switchedTLI, true); + else + { + bool batchFlushRequired = ++appliedRecords >= cascadeReplicationMaxBatchSize; + if (batchFlushRequired) + { + appliedRecords = 0; + if (cascadeReplicationMaxBatchDelay > 0) + disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false); + } + + WalSndWakeup(switchedTLI, batchFlushRequired); + + /* Reset batch notifications timer from current moment */ + if (batchFlushRequired && cascadeReplicationMaxBatchDelay > 0) + enable_timeout_every(STANDBY_CASCADE_WAL_SEND_TIMEOUT, + TimestampTzPlusMilliseconds(GetCurrentTimestamp(), cascadeReplicationMaxBatchDelay), + cascadeReplicationMaxBatchDelay); + } + } /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the @@ -5064,3 +5115,16 @@ assign_recovery_target_xid(const char *newval, void *extra) else recoveryTarget = RECOVERY_TARGET_UNSET; } + +/* + * Timer handler for batch notifications in cascade replication + */ +void +StandbyWalSendTimeoutHandler(void) +{ + if (appliedRecords > 0) + { + appliedRecords = 0; + WalSndWakeup(false, true); + } +} diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c index 27e86cf393f..185a58f94fd 100644 --- a/src/backend/postmaster/startup.c +++ b/src/backend/postmaster/startup.c @@ -246,6 +246,7 @@ StartupProcessMain(const void *startup_data, size_t startup_data_len) RegisterTimeout(STANDBY_DEADLOCK_TIMEOUT, StandbyDeadLockHandler); RegisterTimeout(STANDBY_TIMEOUT, StandbyTimeoutHandler); RegisterTimeout(STANDBY_LOCK_TIMEOUT, StandbyLockTimeoutHandler); + RegisterTimeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, StandbyWalSendTimeoutHandler); /* * Unblock signals (they were blocked when the postmaster forked us) diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 76c7c6bb4b1..9b69015f375 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -2306,6 +2306,28 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"cascade_replication_batch_size", PGC_POSTMASTER, REPLICATION_STANDBY, + gettext_noop("Set the maximum number of applied WAL records before cascade walsenders are notified on standby."), + gettext_noop("0 disabled records batching in cascade replication"), + GUC_NOT_IN_SAMPLE + }, + &cascadeReplicationMaxBatchSize, + 500, 0, INT_MAX, + NULL, NULL, NULL + }, + + { + {"cascade_replication_batch_delay", PGC_POSTMASTER, REPLICATION_STANDBY, + gettext_noop("Sets the maximum time before cascade walsenders are notified on standby about applied records."), + gettext_noop("0 disables timed notifications"), + GUC_NOT_IN_SAMPLE | GUC_UNIT_MS + }, + &cascadeReplicationMaxBatchDelay, + 500, 0, INT_MAX, + NULL, NULL, NULL + }, + { {"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS, gettext_noop("Sets the maximum number of concurrent connections."), diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index 91446303024..efb97213a07 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -57,6 +57,8 @@ extern PGDLLIMPORT char *PrimarySlotName; extern PGDLLIMPORT char *recoveryRestoreCommand; extern PGDLLIMPORT char *recoveryEndCommand; extern PGDLLIMPORT char *archiveCleanupCommand; +extern PGDLLIMPORT int cascadeReplicationMaxBatchSize; +extern PGDLLIMPORT int cascadeReplicationMaxBatchDelay; /* indirectly set via GUC system */ extern PGDLLIMPORT TransactionId recoveryTargetXid; @@ -155,4 +157,6 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, extern void xlog_outdesc(StringInfo buf, XLogReaderState *record); +extern void StandbyWalSendTimeoutHandler(void); + #endif /* XLOGRECOVERY_H */ diff --git a/src/include/utils/timeout.h b/src/include/utils/timeout.h index 7b19beafdc9..0062cb562b9 100644 --- a/src/include/utils/timeout.h +++ b/src/include/utils/timeout.h @@ -36,6 +36,7 @@ typedef enum TimeoutId IDLE_STATS_UPDATE_TIMEOUT, CLIENT_CONNECTION_CHECK_TIMEOUT, STARTUP_PROGRESS_TIMEOUT, + STANDBY_CASCADE_WAL_SEND_TIMEOUT, /* First user-definable timeout reason */ USER_TIMEOUT, /* Maximum number of timeout reasons */ -- 2.49.0
<<attachment: test_scenario.zip>>