At Thu, 16 Apr 2020 16:48:28 +0900, Masahiko Sawada 
<masahiko.saw...@2ndquadrant.com> wrote in 
> This is just a notice; I'm reading your latest patch but it seems to
> include unrelated changes:
> 
> $ git diff --stat
>  src/backend/replication/syncrep.c           | 475
> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------------------------------------------------------------------------------------------------------------------------------
>  src/backend/replication/walsender.c         |  40 ++++++++++++++-----
>  src/bin/pg_dump/compress_io.c               |  12 ++++++
>  src/bin/pg_dump/pg_backup_directory.c       |  48 ++++++++++++++++++-----
>  src/include/replication/syncrep.h           |  20 +++++++++-
>  src/include/replication/walsender_private.h |  16 ++++----
>  6 files changed, 274 insertions(+), 337 deletions(-)

Ugg.  I failed to clean up working directory..  I didn't noticed as I
made the file by git diff. Thanks for noticing me of that.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ffd5b31eb2..99a7bbbc86 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -103,19 +103,21 @@ static int	SyncRepWakeQueue(bool all, int mode);
 
 static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
 								 XLogRecPtr *flushPtr,
-								 XLogRecPtr *applyPtr,
-								 bool *am_sync);
+								 XLogRecPtr *applyPtr);
 static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
 									   XLogRecPtr *flushPtr,
 									   XLogRecPtr *applyPtr,
-									   List *sync_standbys);
+									   SyncRepStandbyData *sync_standbys,
+									   int num_standbys,
+									   uint8 nsyncs);
 static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
 										  XLogRecPtr *flushPtr,
 										  XLogRecPtr *applyPtr,
-										  List *sync_standbys, uint8 nth);
+										  SyncRepStandbyData *sync_standbys,
+										  int num_standbys,
+										  uint8 nth);
 static int	SyncRepGetStandbyPriority(void);
-static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
-static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
+static int	standby_priority_comparator(const void *a, const void *b);
 static int	cmp_lsn(const void *a, const void *b);
 
 #ifdef USE_ASSERT_CHECKING
@@ -406,9 +408,10 @@ SyncRepInitConfig(void)
 	priority = SyncRepGetStandbyPriority();
 	if (MyWalSnd->sync_standby_priority != priority)
 	{
-		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+		SpinLockAcquire(&MyWalSnd->mutex);
 		MyWalSnd->sync_standby_priority = priority;
-		LWLockRelease(SyncRepLock);
+		SpinLockRelease(&MyWalSnd->mutex);
+
 		ereport(DEBUG1,
 				(errmsg("standby \"%s\" now has synchronous standby priority %u",
 						application_name, priority)));
@@ -429,8 +432,7 @@ SyncRepReleaseWaiters(void)
 	XLogRecPtr	writePtr;
 	XLogRecPtr	flushPtr;
 	XLogRecPtr	applyPtr;
-	bool		got_recptr;
-	bool		am_sync;
+	bool		release_waiters;
 	int			numwrite = 0;
 	int			numflush = 0;
 	int			numapply = 0;
@@ -458,16 +460,24 @@ SyncRepReleaseWaiters(void)
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
 
 	/*
-	 * Check whether we are a sync standby or not, and calculate the synced
-	 * positions among all sync standbys.
+	 * Check whether we may release any of waiter processes, and calculate the
+	 * synced positions.
 	 */
-	got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
+	release_waiters = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr);
+
+	/* Return if nothing to do. */
+	if (!release_waiters)
+	{
+		LWLockRelease(SyncRepLock);
+		announce_next_takeover = true;
+		return;
+	}
 
 	/*
-	 * If we are managing a sync standby, though we weren't prior to this,
-	 * then announce we are now a sync standby.
+	 * If this walsender becomes to be able to release waiter processes,
+	 * announce about that.
 	 */
-	if (announce_next_takeover && am_sync)
+	if (announce_next_takeover)
 	{
 		announce_next_takeover = false;
 
@@ -481,17 +491,6 @@ SyncRepReleaseWaiters(void)
 							application_name)));
 	}
 
-	/*
-	 * If the number of sync standbys is less than requested or we aren't
-	 * managing a sync standby then just leave.
-	 */
-	if (!got_recptr || !am_sync)
-	{
-		LWLockRelease(SyncRepLock);
-		announce_next_takeover = !am_sync;
-		return;
-	}
-
 	/*
 	 * Set the lsn first so that when we wake backends they will release up to
 	 * this location.
@@ -523,43 +522,62 @@ SyncRepReleaseWaiters(void)
 /*
  * Calculate the synced Write, Flush and Apply positions among sync standbys.
  *
- * The caller must hold SyncRepLock.
- *
- * Return false if the number of sync standbys is less than
- * synchronous_standby_names specifies. Otherwise return true and
- * store the positions into *writePtr, *flushPtr and *applyPtr.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
+ * Return false if this walsender cannot release any of waiteres. Otherwise
+ * return true and store the positions into *writePtr, *flushPtr and *applyPtr.
  */
 static bool
 SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-					 XLogRecPtr *applyPtr, bool *am_sync)
+					 XLogRecPtr *applyPtr)
 {
-	List	   *sync_standbys;
-
-	Assert(LWLockHeldByMe(SyncRepLock));
+	SyncRepStandbyData *standbys;
+	int			num_standbys;
+	int			i;
+	bool		sync_priority =
+		(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
 
+	/* Initialize default results */
 	*writePtr = InvalidXLogRecPtr;
 	*flushPtr = InvalidXLogRecPtr;
 	*applyPtr = InvalidXLogRecPtr;
-	*am_sync = false;
 
-	/* Get standbys that are considered as synchronous at this moment */
-	sync_standbys = SyncRepGetSyncStandbys(am_sync);
+	/* Quick out if not even configured to be synchronous */
+	if (SyncRepConfig == NULL)
+		return false;
+
+	/* Get standbys.  They are in priority order. */
+	num_standbys = SyncRepGetStandbys(&standbys);
 
 	/*
-	 * Quick exit if we are not managing a sync standby or there are not
-	 * enough synchronous standbys.
+	 * Nothing more to do if there are not enough synchronous standbys or
+	 * candidates.
 	 */
-	if (!(*am_sync) ||
-		SyncRepConfig == NULL ||
-		list_length(sync_standbys) < SyncRepConfig->num_sync)
+	if (num_standbys < SyncRepConfig->num_sync)
 	{
-		list_free(sync_standbys);
+		pfree(standbys);
 		return false;
 	}
 
+	/* When priority mode, nothing to do if I an not a sync standby */
+	if (sync_priority)
+	{
+		bool		am_sync = false;
+
+		for (i = 0; i < num_standbys; i++)
+		{
+			if (standbys[i].is_me)
+			{
+				am_sync = true;
+				break;
+			}
+		}
+
+		if (!am_sync)
+		{
+			pfree(standbys);
+			return false;
+		}
+	}
+
 	/*
 	 * In a priority-based sync replication, the synced positions are the
 	 * oldest ones among sync standbys. In a quorum-based, they are the Nth
@@ -573,46 +591,51 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
 	 * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
 	 * positions even in a quorum-based sync replication.
 	 */
-	if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+	if (sync_priority)
 	{
 		SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
-								   sync_standbys);
+								   standbys, num_standbys,
+								   SyncRepConfig->num_sync);
 	}
 	else
 	{
 		SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
-									  sync_standbys, SyncRepConfig->num_sync);
+									  standbys, num_standbys,
+									  SyncRepConfig->num_sync);
 	}
 
-	list_free(sync_standbys);
+	pfree(standbys);
 	return true;
 }
 
 /*
- * Calculate the oldest Write, Flush and Apply positions among sync standbys.
+ * Calculate the oldest Write among the first nsync standbys, Flush and Apply
+ * positions among sync standbys.
  */
 static void
-SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-						   XLogRecPtr *applyPtr, List *sync_standbys)
+SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
+						   XLogRecPtr *flushPtr,
+						   XLogRecPtr *applyPtr,
+						   SyncRepStandbyData *sync_standbys,
+						   int num_standbys,
+						   uint8 nsyncs)
 {
-	ListCell   *cell;
+	int			i;
 
 	/*
 	 * Scan through all sync standbys and calculate the oldest Write, Flush
-	 * and Apply positions.
+	 * and Apply positions.  We assume *writePtr et al were initialized to
+	 * InvalidXLogRecPtr.
 	 */
-	foreach(cell, sync_standbys)
+	for (i = 0; i < num_standbys; i++)
 	{
-		WalSnd	   *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
-		XLogRecPtr	write;
-		XLogRecPtr	flush;
-		XLogRecPtr	apply;
+		XLogRecPtr	write = sync_standbys[i].write;
+		XLogRecPtr	flush = sync_standbys[i].flush;
+		XLogRecPtr	apply = sync_standbys[i].apply;
 
-		SpinLockAcquire(&walsnd->mutex);
-		write = walsnd->write;
-		flush = walsnd->flush;
-		apply = walsnd->apply;
-		SpinLockRelease(&walsnd->mutex);
+		/* Ignore candidates that aren't considered synchronous */
+		if (i >= nsyncs)
+			break;
 
 		if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
 			*writePtr = write;
@@ -628,38 +651,39 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
  * standbys.
  */
 static void
-SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
-							  XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
+SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
+							  XLogRecPtr *flushPtr,
+							  XLogRecPtr *applyPtr,
+							  SyncRepStandbyData *sync_standbys,
+							  int num_standbys,
+							  uint8 nth)
 {
-	ListCell   *cell;
 	XLogRecPtr *write_array;
 	XLogRecPtr *flush_array;
 	XLogRecPtr *apply_array;
-	int			len;
-	int			i = 0;
+	int			i;
+	int			n;
 
-	len = list_length(sync_standbys);
-	write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
-	flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
-	apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+	write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+	flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+	apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
 
-	foreach(cell, sync_standbys)
+	n = 0;
+	for (i = 0; i < num_standbys; i++)
 	{
-		WalSnd	   *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
-
-		SpinLockAcquire(&walsnd->mutex);
-		write_array[i] = walsnd->write;
-		flush_array[i] = walsnd->flush;
-		apply_array[i] = walsnd->apply;
-		SpinLockRelease(&walsnd->mutex);
-
-		i++;
+		write_array[n] = sync_standbys[i].write;
+		flush_array[n] = sync_standbys[i].flush;
+		apply_array[n] = sync_standbys[i].apply;
+		n++;
 	}
 
+	/* Should have enough, or somebody messed up */
+	Assert(n >= nth);
+
 	/* Sort each array in descending order */
-	qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
-	qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
-	qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
+	qsort(write_array, n, sizeof(XLogRecPtr), cmp_lsn);
+	qsort(flush_array, n, sizeof(XLogRecPtr), cmp_lsn);
+	qsort(apply_array, n, sizeof(XLogRecPtr), cmp_lsn);
 
 	/* Get Nth latest Write, Flush, Apply positions */
 	*writePtr = write_array[nth - 1];
@@ -689,67 +713,49 @@ cmp_lsn(const void *a, const void *b)
 }
 
 /*
- * Return the list of sync standbys, or NIL if no sync standby is connected.
+ * Return data about active walsenders.
  *
- * The caller must hold SyncRepLock.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
+ * *standbys is set to a palloc'd array of structs of per-walsender data, and
+ * the number of valid entries is returned.  The entries are in
+ * sync_standby_priority order.
  */
-List *
-SyncRepGetSyncStandbys(bool *am_sync)
+int
+SyncRepGetStandbys(SyncRepStandbyData **standbys)
 {
-	Assert(LWLockHeldByMe(SyncRepLock));
+	int			i;
+	int			n;
 
-	/* Set default result */
-	if (am_sync != NULL)
-		*am_sync = false;
+	/* Create result array */
+	*standbys = (SyncRepStandbyData *)
+		palloc(max_wal_senders * sizeof(SyncRepStandbyData));
 
 	/* Quick exit if sync replication is not requested */
 	if (SyncRepConfig == NULL)
-		return NIL;
-
-	return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
-		SyncRepGetSyncStandbysPriority(am_sync) :
-		SyncRepGetSyncStandbysQuorum(am_sync);
-}
-
-/*
- * Return the list of all the candidates for quorum sync standbys,
- * or NIL if no such standby is connected.
- *
- * The caller must hold SyncRepLock. This function must be called only in
- * a quorum-based sync replication.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
- */
-static List *
-SyncRepGetSyncStandbysQuorum(bool *am_sync)
-{
-	List	   *result = NIL;
-	int			i;
-	volatile WalSnd *walsnd;	/* Use volatile pointer to prevent code
-								 * rearrangement */
-
-	Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
+		return false;
 
+	/* Collect raw data from shared memory */
+	n = 0;
 	for (i = 0; i < max_wal_senders; i++)
 	{
-		XLogRecPtr	flush;
-		WalSndState state;
-		int			pid;
+		volatile WalSnd *walsnd;	/* Use volatile pointer to prevent code
+									 * rearrangement */
+		SyncRepStandbyData *stby;
+		WalSndState state;		/* not included in SyncRepStandbyData */
 
 		walsnd = &WalSndCtl->walsnds[i];
+		stby = *standbys + n;
 
 		SpinLockAcquire(&walsnd->mutex);
-		pid = walsnd->pid;
-		flush = walsnd->flush;
+		stby->pid = walsnd->pid;
 		state = walsnd->state;
+		stby->write = walsnd->write;
+		stby->flush = walsnd->flush;
+		stby->apply = walsnd->apply;
+		stby->sync_standby_priority = walsnd->sync_standby_priority;
 		SpinLockRelease(&walsnd->mutex);
 
 		/* Must be active */
-		if (pid == 0)
+		if (stby->pid == 0)
 			continue;
 
 		/* Must be streaming or stopping */
@@ -758,200 +764,53 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
 			continue;
 
 		/* Must be synchronous */
-		if (walsnd->sync_standby_priority == 0)
+		if (stby->sync_standby_priority == 0)
 			continue;
 
 		/* Must have a valid flush position */
-		if (XLogRecPtrIsInvalid(flush))
+		if (XLogRecPtrIsInvalid(stby->flush))
 			continue;
 
-		/*
-		 * Consider this standby as a candidate for quorum sync standbys and
-		 * append it to the result.
-		 */
-		result = lappend_int(result, i);
-		if (am_sync != NULL && walsnd == MyWalSnd)
-			*am_sync = true;
+		/* OK, it's a candidate */
+		stby->walsnd_index = i;
+		stby->is_me = (walsnd == MyWalSnd);
+		n++;
 	}
 
-	return result;
-}
-
-/*
- * Return the list of sync standbys chosen based on their priorities,
- * or NIL if no sync standby is connected.
- *
- * If there are multiple standbys with the same priority,
- * the first one found is selected preferentially.
- *
- * The caller must hold SyncRepLock. This function must be called only in
- * a priority-based sync replication.
- *
- * On return, *am_sync is set to true if this walsender is connecting to
- * sync standby. Otherwise it's set to false.
- */
-static List *
-SyncRepGetSyncStandbysPriority(bool *am_sync)
-{
-	List	   *result = NIL;
-	List	   *pending = NIL;
-	int			lowest_priority;
-	int			next_highest_priority;
-	int			this_priority;
-	int			priority;
-	int			i;
-	bool		am_in_pending = false;
-	volatile WalSnd *walsnd;	/* Use volatile pointer to prevent code
-								 * rearrangement */
-
-	Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
-
-	lowest_priority = SyncRepConfig->nmembers;
-	next_highest_priority = lowest_priority + 1;
-
 	/*
-	 * Find the sync standbys which have the highest priority (i.e, 1). Also
-	 * store all the other potential sync standbys into the pending list, in
-	 * order to scan it later and find other sync standbys from it quickly.
+	 * In quorum mode, that's all we have to do since all entries are in the
+	 * same priority 1.  In priority mode, sort the candidates by priority.
 	 */
-	for (i = 0; i < max_wal_senders; i++)
-	{
-		XLogRecPtr	flush;
-		WalSndState state;
-		int			pid;
-
-		walsnd = &WalSndCtl->walsnds[i];
-
-		SpinLockAcquire(&walsnd->mutex);
-		pid = walsnd->pid;
-		flush = walsnd->flush;
-		state = walsnd->state;
-		SpinLockRelease(&walsnd->mutex);
-
-		/* Must be active */
-		if (pid == 0)
-			continue;
-
-		/* Must be streaming or stopping */
-		if (state != WALSNDSTATE_STREAMING &&
-			state != WALSNDSTATE_STOPPING)
-			continue;
-
-		/* Must be synchronous */
-		this_priority = walsnd->sync_standby_priority;
-		if (this_priority == 0)
-			continue;
-
-		/* Must have a valid flush position */
-		if (XLogRecPtrIsInvalid(flush))
-			continue;
+	if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+		qsort(*standbys, n, sizeof(SyncRepStandbyData),
+			  standby_priority_comparator);
 
-		/*
-		 * If the priority is equal to 1, consider this standby as sync and
-		 * append it to the result. Otherwise append this standby to the
-		 * pending list to check if it's actually sync or not later.
-		 */
-		if (this_priority == 1)
-		{
-			result = lappend_int(result, i);
-			if (am_sync != NULL && walsnd == MyWalSnd)
-				*am_sync = true;
-			if (list_length(result) == SyncRepConfig->num_sync)
-			{
-				list_free(pending);
-				return result;	/* Exit if got enough sync standbys */
-			}
-		}
-		else
-		{
-			pending = lappend_int(pending, i);
-			if (am_sync != NULL && walsnd == MyWalSnd)
-				am_in_pending = true;
+	return n;
+}
 
-			/*
-			 * Track the highest priority among the standbys in the pending
-			 * list, in order to use it as the starting priority for later
-			 * scan of the list. This is useful to find quickly the sync
-			 * standbys from the pending list later because we can skip
-			 * unnecessary scans for the unused priorities.
-			 */
-			if (this_priority < next_highest_priority)
-				next_highest_priority = this_priority;
-		}
-	}
 
-	/*
-	 * Consider all pending standbys as sync if the number of them plus
-	 * already-found sync ones is lower than the configuration requests.
-	 */
-	if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
-	{
-		/*
-		 * Set *am_sync to true if this walsender is in the pending list
-		 * because all pending standbys are considered as sync.
-		 */
-		if (am_sync != NULL && !(*am_sync))
-			*am_sync = am_in_pending;
+/*
+ * qsort comparator to sort SyncRepStandbyData entries by priority
+ */
+static int
+standby_priority_comparator(const void *a, const void *b)
+{
+	const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
+	const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
 
-		result = list_concat(result, pending);
-		list_free(pending);
-		return result;
-	}
+	/* First, sort by increasing priority value */
+	if (sa->sync_standby_priority != sb->sync_standby_priority)
+		return sa->sync_standby_priority - sb->sync_standby_priority;
 
 	/*
-	 * Find the sync standbys from the pending list.
+	 * We might have equal priority values; arbitrarily break ties by position
+	 * in the WALSnd array.  (This is utterly bogus, since that is arrival
+	 * order dependent, but there are regression tests that rely on it.)
 	 */
-	priority = next_highest_priority;
-	while (priority <= lowest_priority)
-	{
-		ListCell   *cell;
-
-		next_highest_priority = lowest_priority + 1;
-
-		foreach(cell, pending)
-		{
-			i = lfirst_int(cell);
-			walsnd = &WalSndCtl->walsnds[i];
-
-			this_priority = walsnd->sync_standby_priority;
-			if (this_priority == priority)
-			{
-				result = lappend_int(result, i);
-				if (am_sync != NULL && walsnd == MyWalSnd)
-					*am_sync = true;
-
-				/*
-				 * We should always exit here after the scan of pending list
-				 * starts because we know that the list has enough elements to
-				 * reach SyncRepConfig->num_sync.
-				 */
-				if (list_length(result) == SyncRepConfig->num_sync)
-				{
-					list_free(pending);
-					return result;	/* Exit if got enough sync standbys */
-				}
-
-				/*
-				 * Remove the entry for this sync standby from the list to
-				 * prevent us from looking at the same entry again.
-				 */
-				pending = foreach_delete_current(pending, cell);
-
-				continue;		/* don't adjust next_highest_priority */
-			}
-
-			if (this_priority < next_highest_priority)
-				next_highest_priority = this_priority;
-		}
-
-		priority = next_highest_priority;
-	}
-
-	/* never reached, but keep compiler quiet */
-	Assert(false);
-	return result;
+	return sa->walsnd_index - sb->walsnd_index;
 }
 
+
 /*
  * Check if we are in the list of sync standbys, and if so, determine
  * priority sequence. Return priority if set, or zero to indicate that
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index fc475d144d..7889ea5f6f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2375,14 +2375,16 @@ InitWalSenderSlot(void)
 			 * Found a free slot. Reserve it for us.
 			 */
 			walsnd->pid = MyProcPid;
+			walsnd->state = WALSNDSTATE_STARTUP;
 			walsnd->sentPtr = InvalidXLogRecPtr;
+			walsnd->needreload = false;
 			walsnd->write = InvalidXLogRecPtr;
 			walsnd->flush = InvalidXLogRecPtr;
 			walsnd->apply = InvalidXLogRecPtr;
 			walsnd->writeLag = -1;
 			walsnd->flushLag = -1;
 			walsnd->applyLag = -1;
-			walsnd->state = WALSNDSTATE_STARTUP;
+			walsnd->sync_standby_priority = 0;
 			walsnd->latch = &MyProc->procLatch;
 			walsnd->replyTime = 0;
 			walsnd->spillTxns = 0;
@@ -3235,7 +3237,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	Tuplestorestate *tupstore;
 	MemoryContext per_query_ctx;
 	MemoryContext oldcontext;
-	List	   *sync_standbys;
+	SyncRepStandbyData *standbys;
+	int			num_standbys;
 	int			i;
 
 	/* check to see if caller supports us returning a tuplestore */
@@ -3263,11 +3266,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	MemoryContextSwitchTo(oldcontext);
 
 	/*
-	 * Get the currently active synchronous standbys.
+	 * Get the currently active standbys in priority order.  This could be out
+	 * of date before we're done, but we'll use the data anyway.
 	 */
-	LWLockAcquire(SyncRepLock, LW_SHARED);
-	sync_standbys = SyncRepGetSyncStandbys(NULL);
-	LWLockRelease(SyncRepLock);
+	num_standbys = SyncRepGetStandbys(&standbys);
 
 	for (i = 0; i < max_wal_senders; i++)
 	{
@@ -3286,9 +3288,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		int64		spillTxns;
 		int64		spillCount;
 		int64		spillBytes;
+		bool		is_sync_standby;
 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
 		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS];
+		int			j;
 
+		/* Collect data from shared memory */
 		SpinLockAcquire(&walsnd->mutex);
 		if (walsnd->pid == 0)
 		{
@@ -3311,6 +3316,17 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		spillBytes = walsnd->spillBytes;
 		SpinLockRelease(&walsnd->mutex);
 
+		/* Detect whether walsender is/was considered synchronous */
+		is_sync_standby = false;
+		for (j = 0; j < num_standbys; j++)
+		{
+			if (standbys[j].walsnd_index == i)
+			{
+				is_sync_standby = (j < SyncRepConfig->num_sync);
+				break;
+			}
+		}
+
 		memset(nulls, 0, sizeof(nulls));
 		values[0] = Int32GetDatum(pid);
 
@@ -3380,11 +3396,15 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 			 */
 			if (priority == 0)
 				values[10] = CStringGetTextDatum("async");
-			else if (list_member_int(sync_standbys, i))
-				values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
-					CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
+			else if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+			{
+				if (is_sync_standby)
+					values[10] = CStringGetTextDatum("sync");
+				else
+					values[10] = CStringGetTextDatum("potential");
+			}
 			else
-				values[10] = CStringGetTextDatum("potential");
+				values[10] = CStringGetTextDatum("quorum");
 
 			if (replyTime == 0)
 				nulls[11] = true;
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index c5f0e91aad..533aac25c7 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -36,6 +36,24 @@
 #define SYNC_REP_PRIORITY		0
 #define SYNC_REP_QUORUM		1
 
+/*
+ * SyncRepGetSyncStandbys returns an array of these structs,
+ * one per candidate synchronous walsender.
+ */
+typedef struct SyncRepStandbyData
+{
+	/* Copies of relevant fields from WalSnd shared-memory struct */
+	pid_t		pid;
+	XLogRecPtr	write;
+	XLogRecPtr	flush;
+	XLogRecPtr	apply;
+	int			sync_standby_priority;
+	/* Index of this walsender in the WalSnd shared-memory array */
+	int			walsnd_index;
+	/* This flag indicates whether this struct is about our own process */
+	bool		is_me;
+} SyncRepStandbyData;
+
 /*
  * Struct for the configuration of synchronous replication.
  *
@@ -74,7 +92,7 @@ extern void SyncRepInitConfig(void);
 extern void SyncRepReleaseWaiters(void);
 
 /* called by wal sender and user backend */
-extern List *SyncRepGetSyncStandbys(bool *am_sync);
+extern int	SyncRepGetStandbys(SyncRepStandbyData **standbys);
 
 /* called by checkpointer */
 extern void SyncRepUpdateSyncStandbysDefined(void);
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 366828f0a4..734acec2a4 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -31,8 +31,7 @@ typedef enum WalSndState
 /*
  * Each walsender has a WalSnd struct in shared memory.
  *
- * This struct is protected by 'mutex', with two exceptions: one is
- * sync_standby_priority as noted below.  The other exception is that some
+ * This struct is protected by its 'mutex' spinlock field, except that some
  * members are only written by the walsender process itself, and thus that
  * process is free to read those members without holding spinlock.  pid and
  * needreload always require the spinlock to be held for all accesses.
@@ -60,6 +59,12 @@ typedef struct WalSnd
 	TimeOffset	flushLag;
 	TimeOffset	applyLag;
 
+	/*
+	 * The priority order of the standby managed by this WALSender, as listed
+	 * in synchronous_standby_names, or 0 if not-listed.
+	 */
+	int			sync_standby_priority;
+
 	/* Protects shared variables shown above. */
 	slock_t		mutex;
 
@@ -69,13 +74,6 @@ typedef struct WalSnd
 	 */
 	Latch	   *latch;
 
-	/*
-	 * The priority order of the standby managed by this WALSender, as listed
-	 * in synchronous_standby_names, or 0 if not-listed. Protected by
-	 * SyncRepLock.
-	 */
-	int			sync_standby_priority;
-
 	/*
 	 * Timestamp of the last message received from standby.
 	 */

Reply via email to