On Fri, May 19, 2017 at 3:01 PM, Masahiko Sawada <sawada.m...@gmail.com> wrote:
> Also, as Horiguchi-san pointed out earlier, walreceiver seems need the
> similar fix.

Actually, now that I look at it, ready_to_display should as well be
protected by the lock of the WAL receiver, so it is incorrectly placed
in walreceiver.h. As you are pointing out, pg_stat_get_wal_receiver()
is lazy as well, and that's new in 10, so we have an open item here
for both of them. And I am the author for both things. No issues
spotted in walreceiverfuncs.c after review.

I am adding an open item so as both issues are fixed in PG10. With the
WAL sender part, I think that this should be a group shot.

So what do you think about the attached?
-- 
Michael
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ad213fc454..5ab5e95fa9 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -409,15 +409,23 @@ void
 SyncRepReleaseWaiters(void)
 {
 	volatile WalSndCtlData *walsndctl = WalSndCtl;
+	WalSnd	   *walsnd = MyWalSnd;
 	XLogRecPtr	writePtr;
 	XLogRecPtr	flushPtr;
 	XLogRecPtr	applyPtr;
+	XLogRecPtr	flush;
+	WalSndState	state;
 	bool		got_recptr;
 	bool		am_sync;
 	int			numwrite = 0;
 	int			numflush = 0;
 	int			numapply = 0;
 
+	SpinLockAcquire(&walsnd->mutex);
+	flush = walsnd->flush;
+	state = walsnd->state;
+	SpinLockRelease(&walsnd->mutex);
+
 	/*
 	 * If this WALSender is serving a standby that is not on the list of
 	 * potential sync standbys then we have nothing to do. If we are still
@@ -425,8 +433,8 @@ SyncRepReleaseWaiters(void)
 	 * still invalid, then leave quickly also.
 	 */
 	if (MyWalSnd->sync_standby_priority == 0 ||
-		MyWalSnd->state < WALSNDSTATE_STREAMING ||
-		XLogRecPtrIsInvalid(MyWalSnd->flush))
+		state < WALSNDSTATE_STREAMING ||
+		XLogRecPtrIsInvalid(flush))
 	{
 		announce_next_takeover = true;
 		return;
@@ -711,14 +719,24 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
 
 	for (i = 0; i < max_wal_senders; i++)
 	{
+		XLogRecPtr	flush;
+		WalSndState	state;
+		int			pid;
+
 		walsnd = &WalSndCtl->walsnds[i];
 
+		SpinLockAcquire(&walsnd->mutex);
+		pid = walsnd->pid;
+		flush = walsnd->flush;
+		state = walsnd->state;
+		SpinLockRelease(&walsnd->mutex);
+
 		/* Must be active */
-		if (walsnd->pid == 0)
+		if (pid == 0)
 			continue;
 
 		/* Must be streaming */
-		if (walsnd->state != WALSNDSTATE_STREAMING)
+		if (state != WALSNDSTATE_STREAMING)
 			continue;
 
 		/* Must be synchronous */
@@ -726,7 +744,7 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
 			continue;
 
 		/* Must have a valid flush position */
-		if (XLogRecPtrIsInvalid(walsnd->flush))
+		if (XLogRecPtrIsInvalid(flush))
 			continue;
 
 		/*
@@ -780,14 +798,24 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
 	 */
 	for (i = 0; i < max_wal_senders; i++)
 	{
+		XLogRecPtr	flush;
+		WalSndState	state;
+		int			pid;
+
 		walsnd = &WalSndCtl->walsnds[i];
 
+		SpinLockAcquire(&walsnd->mutex);
+		pid = walsnd->pid;
+		flush = walsnd->flush;
+		state = walsnd->state;
+		SpinLockRelease(&walsnd->mutex);
+
 		/* Must be active */
-		if (walsnd->pid == 0)
+		if (pid == 0)
 			continue;
 
 		/* Must be streaming */
-		if (walsnd->state != WALSNDSTATE_STREAMING)
+		if (state != WALSNDSTATE_STREAMING)
 			continue;
 
 		/* Must be synchronous */
@@ -796,7 +824,7 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
 			continue;
 
 		/* Must have a valid flush position */
-		if (XLogRecPtrIsInvalid(walsnd->flush))
+		if (XLogRecPtrIsInvalid(flush))
 			continue;
 
 		/*
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 2723612718..25f12e0706 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1392,23 +1392,13 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
 	TimestampTz latest_end_time;
 	char	   *slotname;
 	char	   *conninfo;
-
-	/*
-	 * No WAL receiver (or not ready yet), just return a tuple with NULL
-	 * values
-	 */
-	if (walrcv->pid == 0 || !walrcv->ready_to_display)
-		PG_RETURN_NULL();
-
-	/* determine result type */
-	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
-		elog(ERROR, "return type must be a row type");
-
-	values = palloc0(sizeof(Datum) * tupdesc->natts);
-	nulls = palloc0(sizeof(bool) * tupdesc->natts);
+	int			pid;
+	bool		ready_to_display;
 
 	/* Take a lock to ensure value consistency */
 	SpinLockAcquire(&walrcv->mutex);
+	pid = walrcv->pid;
+	ready_to_display = walrcv->ready_to_display;
 	state = walrcv->walRcvState;
 	receive_start_lsn = walrcv->receiveStart;
 	receive_start_tli = walrcv->receiveStartTLI;
@@ -1422,8 +1412,22 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
 	conninfo = pstrdup(walrcv->conninfo);
 	SpinLockRelease(&walrcv->mutex);
 
+	/*
+	 * No WAL receiver (or not ready yet), just return a tuple with NULL
+	 * values
+	 */
+	if (pid == 0 || !ready_to_display)
+		PG_RETURN_NULL();
+
+	/* determine result type */
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		elog(ERROR, "return type must be a row type");
+
+	values = palloc0(sizeof(Datum) * tupdesc->natts);
+	nulls = palloc0(sizeof(bool) * tupdesc->natts);
+
 	/* Fetch values */
-	values[0] = Int32GetDatum(walrcv->pid);
+	values[0] = Int32GetDatum(pid);
 
 	if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
 	{
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 49cce38880..a04be8039d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2874,10 +2874,12 @@ WalSndRqstFileReload(void)
 	{
 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
 
+		SpinLockAcquire(&walsnd->mutex);
 		if (walsnd->pid == 0)
+		{
+			SpinLockRelease(&walsnd->mutex);
 			continue;
-
-		SpinLockAcquire(&walsnd->mutex);
+		}
 		walsnd->needreload = true;
 		SpinLockRelease(&walsnd->mutex);
 	}
@@ -3190,14 +3192,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		TimeOffset	flushLag;
 		TimeOffset	applyLag;
 		int			priority;
+		int			pid;
 		WalSndState state;
 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
 		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS];
 
+		SpinLockAcquire(&walsnd->mutex);
 		if (walsnd->pid == 0)
+		{
+			SpinLockRelease(&walsnd->mutex);
 			continue;
-
-		SpinLockAcquire(&walsnd->mutex);
+		}
+		pid = walsnd->pid;
 		sentPtr = walsnd->sentPtr;
 		state = walsnd->state;
 		write = walsnd->write;
@@ -3210,7 +3216,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		SpinLockRelease(&walsnd->mutex);
 
 		memset(nulls, 0, sizeof(nulls));
-		values[0] = Int32GetDatum(walsnd->pid);
+		values[0] = Int32GetDatum(pid);
 
 		if (!superuser())
 		{
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 31d090c99d..44199670b1 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -114,6 +114,9 @@ typedef struct
 	 */
 	char		slotname[NAMEDATALEN];
 
+	/* set true once conninfo is ready to display (obfuscated pwds etc) */
+	bool		ready_to_display;
+
 	slock_t		mutex;			/* locks shared variables shown above */
 
 	/*
@@ -122,9 +125,6 @@ typedef struct
 	 */
 	bool		force_reply;
 
-	/* set true once conninfo is ready to display (obfuscated pwds etc) */
-	bool		ready_to_display;
-
 	/*
 	 * Latch used by startup process to wake up walreceiver after telling it
 	 * where to start streaming (after setting receiveStart and
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to