On Fri, Nov 30, 2018 at 05:54:15PM +0900, Michael Paquier wrote: > Looks pretty to me at quick glance, unfortunately I have not spent much > time on it, particularly testing it. > > + <row> > + <entry><structfield>reply_time</structfield></entry> > + <entry><type>timestamp with time zone</type></entry> > + <entry>Send time of last message received from WAL > receiver</entry> > + </row> > This is a bit confusing as this counts for the last *standby* message > ('r'), and not the last feedback message ('h'). > > + /* > + * timestamp of the latest message, reported by standby server > + */ > + TimestampTz replyTime; > > A small indentation problem, not a big deal.
I have been looking at this patch more in-depth, and you missed one critical thing: hot standby feedback messages also include the timestamp the client used when sending the message, so if we want to track the latest time when a message has been sent we should track it as much as the timestamp from status update messages. Fixing that and updating a couple of comments and variables, I am finishing with the attached. Thoughts? (The catversion bump is a self-reminder, don't worry about it.) -- Michael
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 7aada14417..22e93019a7 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1920,6 +1920,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i </itemizedlist> </entry> </row> + <row> + <entry><structfield>reply_time</structfield></entry> + <entry><type>timestamp with time zone</type></entry> + <entry>Send time of last message received from standby server</entry> + </row> </tbody> </tgroup> </table> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 715995dd88..8630542bb3 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -734,7 +734,8 @@ CREATE VIEW pg_stat_replication AS W.flush_lag, W.replay_lag, W.sync_priority, - W.sync_state + W.sync_state, + W.reply_time FROM pg_stat_get_activity(NULL) AS S JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 46edb525e8..11bdc741f0 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1763,6 +1763,8 @@ ProcessStandbyReplyMessage(void) applyLag; bool clearLagTimes; TimestampTz now; + TimestampTz replyTime; + char *replyTimeStr; static bool fullyAppliedLastTime = false; @@ -1770,14 +1772,20 @@ ProcessStandbyReplyMessage(void) writePtr = pq_getmsgint64(&reply_message); flushPtr = pq_getmsgint64(&reply_message); applyPtr = pq_getmsgint64(&reply_message); - (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + replyTime = pq_getmsgint64(&reply_message); replyRequested = pq_getmsgbyte(&reply_message); - elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", + /* Copy because timestamptz_to_str returns a static buffer */ + replyTimeStr = pstrdup(timestamptz_to_str(replyTime)); + + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s", (uint32) (writePtr >> 32), (uint32) writePtr, (uint32) (flushPtr >> 32), (uint32) flushPtr, (uint32) (applyPtr >> 32), (uint32) applyPtr, - replyRequested ? " (reply requested)" : ""); + replyRequested ? " (reply requested)" : "", + replyTimeStr); + + pfree(replyTimeStr); /* See if we can compute the round-trip lag for these positions. */ now = GetCurrentTimestamp(); @@ -1824,6 +1832,7 @@ ProcessStandbyReplyMessage(void) walsnd->flushLag = flushLag; if (applyLag != -1 || clearLagTimes) walsnd->applyLag = applyLag; + walsnd->replyTime = replyTime; SpinLockRelease(&walsnd->mutex); } @@ -1927,23 +1936,43 @@ ProcessStandbyHSFeedbackMessage(void) uint32 feedbackEpoch; TransactionId feedbackCatalogXmin; uint32 feedbackCatalogEpoch; + TimestampTz replyTime; + char *replyTimeStr; /* * Decipher the reply message. The caller already consumed the msgtype * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation * of this message. */ - (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + replyTime = pq_getmsgint64(&reply_message); feedbackXmin = pq_getmsgint(&reply_message, 4); feedbackEpoch = pq_getmsgint(&reply_message, 4); feedbackCatalogXmin = pq_getmsgint(&reply_message, 4); feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4); - elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u", + /* Copy because timestamptz_to_str returns a static buffer */ + replyTimeStr = pstrdup(timestamptz_to_str(replyTime)); + + elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s", feedbackXmin, feedbackEpoch, feedbackCatalogXmin, - feedbackCatalogEpoch); + feedbackCatalogEpoch, + replyTimeStr); + + pfree(replyTimeStr); + + /* + * Update shared state for this WalSender process based on reply data from + * standby. + */ + { + WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->replyTime = replyTime; + SpinLockRelease(&walsnd->mutex); + } /* * Unset WalSender's xmins if the feedback message values are invalid. @@ -2265,6 +2294,7 @@ InitWalSenderSlot(void) walsnd->applyLag = -1; walsnd->state = WALSNDSTATE_STARTUP; walsnd->latch = &MyProc->procLatch; + walsnd->replyTime = 0; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3179,7 +3209,7 @@ offset_to_interval(TimeOffset offset) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 11 +#define PG_STAT_GET_WAL_SENDERS_COLS 12 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -3233,6 +3263,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int priority; int pid; WalSndState state; + TimestampTz replyTime; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -3252,6 +3283,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) flushLag = walsnd->flushLag; applyLag = walsnd->applyLag; priority = walsnd->sync_standby_priority; + replyTime = walsnd->replyTime; SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); @@ -3328,6 +3360,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else values[10] = CStringGetTextDatum("potential"); + + if (replyTime == 0) + nulls[11] = true; + else + values[11] = TimestampTzGetDatum(replyTime); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index be72bddd17..eec843cf7e 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201811201 +#define CATALOG_VERSION_NO 201812041 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 034a41eb55..f79fcfe029 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5023,9 +5023,9 @@ proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}', + proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}', prosrc => 'pg_stat_get_wal_senders' }, { oid => '3317', descr => 'statistics: information about WAL receiver', proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's', diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 4b90477936..7aba153ff6 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -75,6 +75,11 @@ typedef struct WalSnd * SyncRepLock. */ int sync_standby_priority; + + /* + * Timestamp of the last message received from standby. + */ + TimestampTz replyTime; } WalSnd; extern WalSnd *MyWalSnd; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 735dd37acf..b68b8d273f 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1861,9 +1861,10 @@ pg_stat_replication| SELECT s.pid, w.flush_lag, w.replay_lag, w.sync_priority, - w.sync_state + w.sync_state, + w.reply_time FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn) - JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid))) + JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_ssl| SELECT s.pid, s.ssl,
signature.asc
Description: PGP signature