At Thu, 16 Apr 2020 16:48:28 +0900, Masahiko Sawada <masahiko.saw...@2ndquadrant.com> wrote in > This is just a notice; I'm reading your latest patch but it seems to > include unrelated changes: > > $ git diff --stat > src/backend/replication/syncrep.c | 475 > +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------------------------------------------------------------------------------------------------------------------------- > src/backend/replication/walsender.c | 40 ++++++++++++++----- > src/bin/pg_dump/compress_io.c | 12 ++++++ > src/bin/pg_dump/pg_backup_directory.c | 48 ++++++++++++++++++----- > src/include/replication/syncrep.h | 20 +++++++++- > src/include/replication/walsender_private.h | 16 ++++---- > 6 files changed, 274 insertions(+), 337 deletions(-)
Ugg. I failed to clean up working directory.. I didn't noticed as I made the file by git diff. Thanks for noticing me of that. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ffd5b31eb2..99a7bbbc86 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -103,19 +103,21 @@ static int SyncRepWakeQueue(bool all, int mode); static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, - bool *am_sync); + XLogRecPtr *applyPtr); static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nsyncs); static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys, uint8 nth); + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth); static int SyncRepGetStandbyPriority(void); -static List *SyncRepGetSyncStandbysPriority(bool *am_sync); -static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); +static int standby_priority_comparator(const void *a, const void *b); static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING @@ -406,9 +408,10 @@ SyncRepInitConfig(void) priority = SyncRepGetStandbyPriority(); if (MyWalSnd->sync_standby_priority != priority) { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + SpinLockAcquire(&MyWalSnd->mutex); MyWalSnd->sync_standby_priority = priority; - LWLockRelease(SyncRepLock); + SpinLockRelease(&MyWalSnd->mutex); + ereport(DEBUG1, (errmsg("standby \"%s\" now has synchronous standby priority %u", application_name, priority))); @@ -429,8 +432,7 @@ SyncRepReleaseWaiters(void) XLogRecPtr writePtr; XLogRecPtr flushPtr; XLogRecPtr applyPtr; - bool got_recptr; - bool am_sync; + bool release_waiters; int numwrite = 0; int numflush = 0; int numapply = 0; @@ -458,16 +460,24 @@ SyncRepReleaseWaiters(void) LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* - * Check whether we are a sync standby or not, and calculate the synced - * positions among all sync standbys. + * Check whether we may release any of waiter processes, and calculate the + * synced positions. */ - got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync); + release_waiters = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr); + + /* Return if nothing to do. */ + if (!release_waiters) + { + LWLockRelease(SyncRepLock); + announce_next_takeover = true; + return; + } /* - * If we are managing a sync standby, though we weren't prior to this, - * then announce we are now a sync standby. + * If this walsender becomes to be able to release waiter processes, + * announce about that. */ - if (announce_next_takeover && am_sync) + if (announce_next_takeover) { announce_next_takeover = false; @@ -481,17 +491,6 @@ SyncRepReleaseWaiters(void) application_name))); } - /* - * If the number of sync standbys is less than requested or we aren't - * managing a sync standby then just leave. - */ - if (!got_recptr || !am_sync) - { - LWLockRelease(SyncRepLock); - announce_next_takeover = !am_sync; - return; - } - /* * Set the lsn first so that when we wake backends they will release up to * this location. @@ -523,43 +522,62 @@ SyncRepReleaseWaiters(void) /* * Calculate the synced Write, Flush and Apply positions among sync standbys. * - * The caller must hold SyncRepLock. - * - * Return false if the number of sync standbys is less than - * synchronous_standby_names specifies. Otherwise return true and - * store the positions into *writePtr, *flushPtr and *applyPtr. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * Return false if this walsender cannot release any of waiteres. Otherwise + * return true and store the positions into *writePtr, *flushPtr and *applyPtr. */ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, bool *am_sync) + XLogRecPtr *applyPtr) { - List *sync_standbys; - - Assert(LWLockHeldByMe(SyncRepLock)); + SyncRepStandbyData *standbys; + int num_standbys; + int i; + bool sync_priority = + (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); + /* Initialize default results */ *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; *applyPtr = InvalidXLogRecPtr; - *am_sync = false; - /* Get standbys that are considered as synchronous at this moment */ - sync_standbys = SyncRepGetSyncStandbys(am_sync); + /* Quick out if not even configured to be synchronous */ + if (SyncRepConfig == NULL) + return false; + + /* Get standbys. They are in priority order. */ + num_standbys = SyncRepGetStandbys(&standbys); /* - * Quick exit if we are not managing a sync standby or there are not - * enough synchronous standbys. + * Nothing more to do if there are not enough synchronous standbys or + * candidates. */ - if (!(*am_sync) || - SyncRepConfig == NULL || - list_length(sync_standbys) < SyncRepConfig->num_sync) + if (num_standbys < SyncRepConfig->num_sync) { - list_free(sync_standbys); + pfree(standbys); return false; } + /* When priority mode, nothing to do if I an not a sync standby */ + if (sync_priority) + { + bool am_sync = false; + + for (i = 0; i < num_standbys; i++) + { + if (standbys[i].is_me) + { + am_sync = true; + break; + } + } + + if (!am_sync) + { + pfree(standbys); + return false; + } + } + /* * In a priority-based sync replication, the synced positions are the * oldest ones among sync standbys. In a quorum-based, they are the Nth @@ -573,46 +591,51 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced * positions even in a quorum-based sync replication. */ - if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + if (sync_priority) { SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys); + standbys, num_standbys, + SyncRepConfig->num_sync); } else { SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, SyncRepConfig->num_sync); + standbys, num_standbys, + SyncRepConfig->num_sync); } - list_free(sync_standbys); + pfree(standbys); return true; } /* - * Calculate the oldest Write, Flush and Apply positions among sync standbys. + * Calculate the oldest Write among the first nsync standbys, Flush and Apply + * positions among sync standbys. */ static void -SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys) +SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nsyncs) { - ListCell *cell; + int i; /* * Scan through all sync standbys and calculate the oldest Write, Flush - * and Apply positions. + * and Apply positions. We assume *writePtr et al were initialized to + * InvalidXLogRecPtr. */ - foreach(cell, sync_standbys) + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - XLogRecPtr write; - XLogRecPtr flush; - XLogRecPtr apply; + XLogRecPtr write = sync_standbys[i].write; + XLogRecPtr flush = sync_standbys[i].flush; + XLogRecPtr apply = sync_standbys[i].apply; - SpinLockAcquire(&walsnd->mutex); - write = walsnd->write; - flush = walsnd->flush; - apply = walsnd->apply; - SpinLockRelease(&walsnd->mutex); + /* Ignore candidates that aren't considered synchronous */ + if (i >= nsyncs) + break; if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) *writePtr = write; @@ -628,38 +651,39 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, * standbys. */ static void -SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) +SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys, + uint8 nth) { - ListCell *cell; XLogRecPtr *write_array; XLogRecPtr *flush_array; XLogRecPtr *apply_array; - int len; - int i = 0; + int i; + int n; - len = list_length(sync_standbys); - write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); - apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); + apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys); - foreach(cell, sync_standbys) + n = 0; + for (i = 0; i < num_standbys; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - - SpinLockAcquire(&walsnd->mutex); - write_array[i] = walsnd->write; - flush_array[i] = walsnd->flush; - apply_array[i] = walsnd->apply; - SpinLockRelease(&walsnd->mutex); - - i++; + write_array[n] = sync_standbys[i].write; + flush_array[n] = sync_standbys[i].flush; + apply_array[n] = sync_standbys[i].apply; + n++; } + /* Should have enough, or somebody messed up */ + Assert(n >= nth); + /* Sort each array in descending order */ - qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); - qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(write_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(flush_array, n, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, n, sizeof(XLogRecPtr), cmp_lsn); /* Get Nth latest Write, Flush, Apply positions */ *writePtr = write_array[nth - 1]; @@ -689,67 +713,49 @@ cmp_lsn(const void *a, const void *b) } /* - * Return the list of sync standbys, or NIL if no sync standby is connected. + * Return data about active walsenders. * - * The caller must hold SyncRepLock. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. + * *standbys is set to a palloc'd array of structs of per-walsender data, and + * the number of valid entries is returned. The entries are in + * sync_standby_priority order. */ -List * -SyncRepGetSyncStandbys(bool *am_sync) +int +SyncRepGetStandbys(SyncRepStandbyData **standbys) { - Assert(LWLockHeldByMe(SyncRepLock)); + int i; + int n; - /* Set default result */ - if (am_sync != NULL) - *am_sync = false; + /* Create result array */ + *standbys = (SyncRepStandbyData *) + palloc(max_wal_senders * sizeof(SyncRepStandbyData)); /* Quick exit if sync replication is not requested */ if (SyncRepConfig == NULL) - return NIL; - - return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? - SyncRepGetSyncStandbysPriority(am_sync) : - SyncRepGetSyncStandbysQuorum(am_sync); -} - -/* - * Return the list of all the candidates for quorum sync standbys, - * or NIL if no such standby is connected. - * - * The caller must hold SyncRepLock. This function must be called only in - * a quorum-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. - */ -static List * -SyncRepGetSyncStandbysQuorum(bool *am_sync) -{ - List *result = NIL; - int i; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ - - Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM); + return false; + /* Collect raw data from shared memory */ + n = 0; for (i = 0; i < max_wal_senders; i++) { - XLogRecPtr flush; - WalSndState state; - int pid; + volatile WalSnd *walsnd; /* Use volatile pointer to prevent code + * rearrangement */ + SyncRepStandbyData *stby; + WalSndState state; /* not included in SyncRepStandbyData */ walsnd = &WalSndCtl->walsnds[i]; + stby = *standbys + n; SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; + stby->pid = walsnd->pid; state = walsnd->state; + stby->write = walsnd->write; + stby->flush = walsnd->flush; + stby->apply = walsnd->apply; + stby->sync_standby_priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); /* Must be active */ - if (pid == 0) + if (stby->pid == 0) continue; /* Must be streaming or stopping */ @@ -758,200 +764,53 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) continue; /* Must be synchronous */ - if (walsnd->sync_standby_priority == 0) + if (stby->sync_standby_priority == 0) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) + if (XLogRecPtrIsInvalid(stby->flush)) continue; - /* - * Consider this standby as a candidate for quorum sync standbys and - * append it to the result. - */ - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; + /* OK, it's a candidate */ + stby->walsnd_index = i; + stby->is_me = (walsnd == MyWalSnd); + n++; } - return result; -} - -/* - * Return the list of sync standbys chosen based on their priorities, - * or NIL if no sync standby is connected. - * - * If there are multiple standbys with the same priority, - * the first one found is selected preferentially. - * - * The caller must hold SyncRepLock. This function must be called only in - * a priority-based sync replication. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. - */ -static List * -SyncRepGetSyncStandbysPriority(bool *am_sync) -{ - List *result = NIL; - List *pending = NIL; - int lowest_priority; - int next_highest_priority; - int this_priority; - int priority; - int i; - bool am_in_pending = false; - volatile WalSnd *walsnd; /* Use volatile pointer to prevent code - * rearrangement */ - - Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); - - lowest_priority = SyncRepConfig->nmembers; - next_highest_priority = lowest_priority + 1; - /* - * Find the sync standbys which have the highest priority (i.e, 1). Also - * store all the other potential sync standbys into the pending list, in - * order to scan it later and find other sync standbys from it quickly. + * In quorum mode, that's all we have to do since all entries are in the + * same priority 1. In priority mode, sort the candidates by priority. */ - for (i = 0; i < max_wal_senders; i++) - { - XLogRecPtr flush; - WalSndState state; - int pid; - - walsnd = &WalSndCtl->walsnds[i]; - - SpinLockAcquire(&walsnd->mutex); - pid = walsnd->pid; - flush = walsnd->flush; - state = walsnd->state; - SpinLockRelease(&walsnd->mutex); - - /* Must be active */ - if (pid == 0) - continue; - - /* Must be streaming or stopping */ - if (state != WALSNDSTATE_STREAMING && - state != WALSNDSTATE_STOPPING) - continue; - - /* Must be synchronous */ - this_priority = walsnd->sync_standby_priority; - if (this_priority == 0) - continue; - - /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(flush)) - continue; + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + qsort(*standbys, n, sizeof(SyncRepStandbyData), + standby_priority_comparator); - /* - * If the priority is equal to 1, consider this standby as sync and - * append it to the result. Otherwise append this standby to the - * pending list to check if it's actually sync or not later. - */ - if (this_priority == 1) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - } - else - { - pending = lappend_int(pending, i); - if (am_sync != NULL && walsnd == MyWalSnd) - am_in_pending = true; + return n; +} - /* - * Track the highest priority among the standbys in the pending - * list, in order to use it as the starting priority for later - * scan of the list. This is useful to find quickly the sync - * standbys from the pending list later because we can skip - * unnecessary scans for the unused priorities. - */ - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - } - /* - * Consider all pending standbys as sync if the number of them plus - * already-found sync ones is lower than the configuration requests. - */ - if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync) - { - /* - * Set *am_sync to true if this walsender is in the pending list - * because all pending standbys are considered as sync. - */ - if (am_sync != NULL && !(*am_sync)) - *am_sync = am_in_pending; +/* + * qsort comparator to sort SyncRepStandbyData entries by priority + */ +static int +standby_priority_comparator(const void *a, const void *b) +{ + const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a; + const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b; - result = list_concat(result, pending); - list_free(pending); - return result; - } + /* First, sort by increasing priority value */ + if (sa->sync_standby_priority != sb->sync_standby_priority) + return sa->sync_standby_priority - sb->sync_standby_priority; /* - * Find the sync standbys from the pending list. + * We might have equal priority values; arbitrarily break ties by position + * in the WALSnd array. (This is utterly bogus, since that is arrival + * order dependent, but there are regression tests that rely on it.) */ - priority = next_highest_priority; - while (priority <= lowest_priority) - { - ListCell *cell; - - next_highest_priority = lowest_priority + 1; - - foreach(cell, pending) - { - i = lfirst_int(cell); - walsnd = &WalSndCtl->walsnds[i]; - - this_priority = walsnd->sync_standby_priority; - if (this_priority == priority) - { - result = lappend_int(result, i); - if (am_sync != NULL && walsnd == MyWalSnd) - *am_sync = true; - - /* - * We should always exit here after the scan of pending list - * starts because we know that the list has enough elements to - * reach SyncRepConfig->num_sync. - */ - if (list_length(result) == SyncRepConfig->num_sync) - { - list_free(pending); - return result; /* Exit if got enough sync standbys */ - } - - /* - * Remove the entry for this sync standby from the list to - * prevent us from looking at the same entry again. - */ - pending = foreach_delete_current(pending, cell); - - continue; /* don't adjust next_highest_priority */ - } - - if (this_priority < next_highest_priority) - next_highest_priority = this_priority; - } - - priority = next_highest_priority; - } - - /* never reached, but keep compiler quiet */ - Assert(false); - return result; + return sa->walsnd_index - sb->walsnd_index; } + /* * Check if we are in the list of sync standbys, and if so, determine * priority sequence. Return priority if set, or zero to indicate that diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fc475d144d..7889ea5f6f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2375,14 +2375,16 @@ InitWalSenderSlot(void) * Found a free slot. Reserve it for us. */ walsnd->pid = MyProcPid; + walsnd->state = WALSNDSTATE_STARTUP; walsnd->sentPtr = InvalidXLogRecPtr; + walsnd->needreload = false; walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; walsnd->writeLag = -1; walsnd->flushLag = -1; walsnd->applyLag = -1; - walsnd->state = WALSNDSTATE_STARTUP; + walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; walsnd->spillTxns = 0; @@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - List *sync_standbys; + SyncRepStandbyData *standbys; + int num_standbys; int i; /* check to see if caller supports us returning a tuplestore */ @@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); /* - * Get the currently active synchronous standbys. + * Get the currently active standbys in priority order. This could be out + * of date before we're done, but we'll use the data anyway. */ - LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standbys = SyncRepGetSyncStandbys(NULL); - LWLockRelease(SyncRepLock); + num_standbys = SyncRepGetStandbys(&standbys); for (i = 0; i < max_wal_senders; i++) { @@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int64 spillTxns; int64 spillCount; int64 spillBytes; + bool is_sync_standby; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + int j; + /* Collect data from shared memory */ SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) { @@ -3311,6 +3316,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) spillBytes = walsnd->spillBytes; SpinLockRelease(&walsnd->mutex); + /* Detect whether walsender is/was considered synchronous */ + is_sync_standby = false; + for (j = 0; j < num_standbys; j++) + { + if (standbys[j].walsnd_index == i) + { + is_sync_standby = (j < SyncRepConfig->num_sync); + break; + } + } + memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(pid); @@ -3380,11 +3396,15 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[10] = CStringGetTextDatum("async"); - else if (list_member_int(sync_standbys, i)) - values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? - CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); + else if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + { + if (is_sync_standby) + values[10] = CStringGetTextDatum("sync"); + else + values[10] = CStringGetTextDatum("potential"); + } else - values[10] = CStringGetTextDatum("potential"); + values[10] = CStringGetTextDatum("quorum"); if (replyTime == 0) nulls[11] = true; diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index c5f0e91aad..533aac25c7 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -36,6 +36,24 @@ #define SYNC_REP_PRIORITY 0 #define SYNC_REP_QUORUM 1 +/* + * SyncRepGetSyncStandbys returns an array of these structs, + * one per candidate synchronous walsender. + */ +typedef struct SyncRepStandbyData +{ + /* Copies of relevant fields from WalSnd shared-memory struct */ + pid_t pid; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + int sync_standby_priority; + /* Index of this walsender in the WalSnd shared-memory array */ + int walsnd_index; + /* This flag indicates whether this struct is about our own process */ + bool is_me; +} SyncRepStandbyData; + /* * Struct for the configuration of synchronous replication. * @@ -74,7 +92,7 @@ extern void SyncRepInitConfig(void); extern void SyncRepReleaseWaiters(void); /* called by wal sender and user backend */ -extern List *SyncRepGetSyncStandbys(bool *am_sync); +extern int SyncRepGetStandbys(SyncRepStandbyData **standbys); /* called by checkpointer */ extern void SyncRepUpdateSyncStandbysDefined(void); diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 366828f0a4..734acec2a4 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -31,8 +31,7 @@ typedef enum WalSndState /* * Each walsender has a WalSnd struct in shared memory. * - * This struct is protected by 'mutex', with two exceptions: one is - * sync_standby_priority as noted below. The other exception is that some + * This struct is protected by its 'mutex' spinlock field, except that some * members are only written by the walsender process itself, and thus that * process is free to read those members without holding spinlock. pid and * needreload always require the spinlock to be held for all accesses. @@ -60,6 +59,12 @@ typedef struct WalSnd TimeOffset flushLag; TimeOffset applyLag; + /* + * The priority order of the standby managed by this WALSender, as listed + * in synchronous_standby_names, or 0 if not-listed. + */ + int sync_standby_priority; + /* Protects shared variables shown above. */ slock_t mutex; @@ -69,13 +74,6 @@ typedef struct WalSnd */ Latch *latch; - /* - * The priority order of the standby managed by this WALSender, as listed - * in synchronous_standby_names, or 0 if not-listed. Protected by - * SyncRepLock. - */ - int sync_standby_priority; - /* * Timestamp of the last message received from standby. */