On Fri, May 19, 2017 at 3:01 PM, Masahiko Sawada <sawada.m...@gmail.com> wrote: > Also, as Horiguchi-san pointed out earlier, walreceiver seems need the > similar fix.
Actually, now that I look at it, ready_to_display should as well be protected by the lock of the WAL receiver, so it is incorrectly placed in walreceiver.h. As you are pointing out, pg_stat_get_wal_receiver() is lazy as well, and that's new in 10, so we have an open item here for both of them. And I am the author for both things. No issues spotted in walreceiverfuncs.c after review. I am adding an open item so as both issues are fixed in PG10. With the WAL sender part, I think that this should be a group shot. So what do you think about the attached? -- Michael
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ad213fc454..5ab5e95fa9 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -409,15 +409,23 @@ void SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; + WalSnd *walsnd = MyWalSnd; XLogRecPtr writePtr; XLogRecPtr flushPtr; XLogRecPtr applyPtr; + XLogRecPtr flush; + WalSndState state; bool got_recptr; bool am_sync; int numwrite = 0; int numflush = 0; int numapply = 0; + SpinLockAcquire(&walsnd->mutex); + flush = walsnd->flush; + state = walsnd->state; + SpinLockRelease(&walsnd->mutex); + /* * If this WALSender is serving a standby that is not on the list of * potential sync standbys then we have nothing to do. If we are still @@ -425,8 +433,8 @@ SyncRepReleaseWaiters(void) * still invalid, then leave quickly also. */ if (MyWalSnd->sync_standby_priority == 0 || - MyWalSnd->state < WALSNDSTATE_STREAMING || - XLogRecPtrIsInvalid(MyWalSnd->flush)) + state < WALSNDSTATE_STREAMING || + XLogRecPtrIsInvalid(flush)) { announce_next_takeover = true; return; @@ -711,14 +719,24 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) for (i = 0; i < max_wal_senders; i++) { + XLogRecPtr flush; + WalSndState state; + int pid; + walsnd = &WalSndCtl->walsnds[i]; + SpinLockAcquire(&walsnd->mutex); + pid = walsnd->pid; + flush = walsnd->flush; + state = walsnd->state; + SpinLockRelease(&walsnd->mutex); + /* Must be active */ - if (walsnd->pid == 0) + if (pid == 0) continue; /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) + if (state != WALSNDSTATE_STREAMING) continue; /* Must be synchronous */ @@ -726,7 +744,7 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) + if (XLogRecPtrIsInvalid(flush)) continue; /* @@ -780,14 +798,24 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) */ for (i = 0; i < max_wal_senders; i++) { + XLogRecPtr flush; + WalSndState state; + int pid; + walsnd = &WalSndCtl->walsnds[i]; + SpinLockAcquire(&walsnd->mutex); + pid = walsnd->pid; + flush = walsnd->flush; + state = walsnd->state; + SpinLockRelease(&walsnd->mutex); + /* Must be active */ - if (walsnd->pid == 0) + if (pid == 0) continue; /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) + if (state != WALSNDSTATE_STREAMING) continue; /* Must be synchronous */ @@ -796,7 +824,7 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) + if (XLogRecPtrIsInvalid(flush)) continue; /* diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 2723612718..25f12e0706 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1392,23 +1392,13 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) TimestampTz latest_end_time; char *slotname; char *conninfo; - - /* - * No WAL receiver (or not ready yet), just return a tuple with NULL - * values - */ - if (walrcv->pid == 0 || !walrcv->ready_to_display) - PG_RETURN_NULL(); - - /* determine result type */ - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); - - values = palloc0(sizeof(Datum) * tupdesc->natts); - nulls = palloc0(sizeof(bool) * tupdesc->natts); + int pid; + bool ready_to_display; /* Take a lock to ensure value consistency */ SpinLockAcquire(&walrcv->mutex); + pid = walrcv->pid; + ready_to_display = walrcv->ready_to_display; state = walrcv->walRcvState; receive_start_lsn = walrcv->receiveStart; receive_start_tli = walrcv->receiveStartTLI; @@ -1422,8 +1412,22 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) conninfo = pstrdup(walrcv->conninfo); SpinLockRelease(&walrcv->mutex); + /* + * No WAL receiver (or not ready yet), just return a tuple with NULL + * values + */ + if (pid == 0 || !ready_to_display) + PG_RETURN_NULL(); + + /* determine result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + values = palloc0(sizeof(Datum) * tupdesc->natts); + nulls = palloc0(sizeof(bool) * tupdesc->natts); + /* Fetch values */ - values[0] = Int32GetDatum(walrcv->pid); + values[0] = Int32GetDatum(pid); if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS)) { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 49cce38880..a04be8039d 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2874,10 +2874,12 @@ WalSndRqstFileReload(void) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; + SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) + { + SpinLockRelease(&walsnd->mutex); continue; - - SpinLockAcquire(&walsnd->mutex); + } walsnd->needreload = true; SpinLockRelease(&walsnd->mutex); } @@ -3190,14 +3192,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) TimeOffset flushLag; TimeOffset applyLag; int priority; + int pid; WalSndState state; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) + { + SpinLockRelease(&walsnd->mutex); continue; - - SpinLockAcquire(&walsnd->mutex); + } + pid = walsnd->pid; sentPtr = walsnd->sentPtr; state = walsnd->state; write = walsnd->write; @@ -3210,7 +3216,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); - values[0] = Int32GetDatum(walsnd->pid); + values[0] = Int32GetDatum(pid); if (!superuser()) { diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 31d090c99d..44199670b1 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -114,6 +114,9 @@ typedef struct */ char slotname[NAMEDATALEN]; + /* set true once conninfo is ready to display (obfuscated pwds etc) */ + bool ready_to_display; + slock_t mutex; /* locks shared variables shown above */ /* @@ -122,9 +125,6 @@ typedef struct */ bool force_reply; - /* set true once conninfo is ready to display (obfuscated pwds etc) */ - bool ready_to_display; - /* * Latch used by startup process to wake up walreceiver after telling it * where to start streaming (after setting receiveStart and
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers