On Fri, Nov 30, 2018 at 05:54:15PM +0900, Michael Paquier wrote:
> Looks pretty to me at quick glance, unfortunately I have not spent much
> time on it, particularly testing it.
> 
> +    <row>
> +     <entry><structfield>reply_time</structfield></entry>
> +     <entry><type>timestamp with time zone</type></entry>
> +     <entry>Send time of last message received from WAL
> receiver</entry>
> +    </row>
> This is a bit confusing as this counts for the last *standby* message
> ('r'), and not the last feedback message ('h').
> 
> +   /*
> +    * timestamp of the latest message, reported by standby server
> +   */
> +   TimestampTz replyTime;
> 
> A small indentation problem, not a big deal.

I have been looking at this patch more in-depth, and you missed one
critical thing: hot standby feedback messages also include the timestamp
the client used when sending the message, so if we want to track the
latest time when a message has been sent we should track it as much as
the timestamp from status update messages.

Fixing that and updating a couple of comments and variables, I am
finishing with the attached.  Thoughts?

(The catversion bump is a self-reminder, don't worry about it.)
--
Michael
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 7aada14417..22e93019a7 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1920,6 +1920,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        </itemizedlist>
      </entry>
     </row>
+    <row>
+     <entry><structfield>reply_time</structfield></entry>
+     <entry><type>timestamp with time zone</type></entry>
+     <entry>Send time of last message received from standby server</entry>
+    </row>
    </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 715995dd88..8630542bb3 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -734,7 +734,8 @@ CREATE VIEW pg_stat_replication AS
             W.flush_lag,
             W.replay_lag,
             W.sync_priority,
-            W.sync_state
+            W.sync_state,
+            W.reply_time
     FROM pg_stat_get_activity(NULL) AS S
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 46edb525e8..11bdc741f0 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1763,6 +1763,8 @@ ProcessStandbyReplyMessage(void)
 				applyLag;
 	bool		clearLagTimes;
 	TimestampTz now;
+	TimestampTz	replyTime;
+	char	   *replyTimeStr;
 
 	static bool fullyAppliedLastTime = false;
 
@@ -1770,14 +1772,20 @@ ProcessStandbyReplyMessage(void)
 	writePtr = pq_getmsgint64(&reply_message);
 	flushPtr = pq_getmsgint64(&reply_message);
 	applyPtr = pq_getmsgint64(&reply_message);
-	(void) pq_getmsgint64(&reply_message);	/* sendTime; not used ATM */
+	replyTime = pq_getmsgint64(&reply_message);
 	replyRequested = pq_getmsgbyte(&reply_message);
 
-	elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
+	/* Copy because timestamptz_to_str returns a static buffer */
+	replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
+
+	elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
 		 (uint32) (writePtr >> 32), (uint32) writePtr,
 		 (uint32) (flushPtr >> 32), (uint32) flushPtr,
 		 (uint32) (applyPtr >> 32), (uint32) applyPtr,
-		 replyRequested ? " (reply requested)" : "");
+		 replyRequested ? " (reply requested)" : "",
+		 replyTimeStr);
+
+	pfree(replyTimeStr);
 
 	/* See if we can compute the round-trip lag for these positions. */
 	now = GetCurrentTimestamp();
@@ -1824,6 +1832,7 @@ ProcessStandbyReplyMessage(void)
 			walsnd->flushLag = flushLag;
 		if (applyLag != -1 || clearLagTimes)
 			walsnd->applyLag = applyLag;
+		walsnd->replyTime = replyTime;
 		SpinLockRelease(&walsnd->mutex);
 	}
 
@@ -1927,23 +1936,43 @@ ProcessStandbyHSFeedbackMessage(void)
 	uint32		feedbackEpoch;
 	TransactionId feedbackCatalogXmin;
 	uint32		feedbackCatalogEpoch;
+	TimestampTz	replyTime;
+	char	   *replyTimeStr;
 
 	/*
 	 * Decipher the reply message. The caller already consumed the msgtype
 	 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
 	 * of this message.
 	 */
-	(void) pq_getmsgint64(&reply_message);	/* sendTime; not used ATM */
+	replyTime = pq_getmsgint64(&reply_message);
 	feedbackXmin = pq_getmsgint(&reply_message, 4);
 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
 	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
 	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
 
-	elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
+	/* Copy because timestamptz_to_str returns a static buffer */
+	replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
+
+	elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
 		 feedbackXmin,
 		 feedbackEpoch,
 		 feedbackCatalogXmin,
-		 feedbackCatalogEpoch);
+		 feedbackCatalogEpoch,
+		 replyTimeStr);
+
+	pfree(replyTimeStr);
+
+	/*
+	 * Update shared state for this WalSender process based on reply data from
+	 * standby.
+	 */
+	{
+		WalSnd	   *walsnd = MyWalSnd;
+
+		SpinLockAcquire(&walsnd->mutex);
+		walsnd->replyTime = replyTime;
+		SpinLockRelease(&walsnd->mutex);
+	}
 
 	/*
 	 * Unset WalSender's xmins if the feedback message values are invalid.
@@ -2265,6 +2294,7 @@ InitWalSenderSlot(void)
 			walsnd->applyLag = -1;
 			walsnd->state = WALSNDSTATE_STARTUP;
 			walsnd->latch = &MyProc->procLatch;
+			walsnd->replyTime = 0;
 			SpinLockRelease(&walsnd->mutex);
 			/* don't need the lock anymore */
 			MyWalSnd = (WalSnd *) walsnd;
@@ -3179,7 +3209,7 @@ offset_to_interval(TimeOffset offset)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS	11
+#define PG_STAT_GET_WAL_SENDERS_COLS	12
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -3233,6 +3263,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		int			priority;
 		int			pid;
 		WalSndState state;
+		TimestampTz	replyTime;
 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
 		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS];
 
@@ -3252,6 +3283,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		flushLag = walsnd->flushLag;
 		applyLag = walsnd->applyLag;
 		priority = walsnd->sync_standby_priority;
+		replyTime = walsnd->replyTime;
 		SpinLockRelease(&walsnd->mutex);
 
 		memset(nulls, 0, sizeof(nulls));
@@ -3328,6 +3360,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 					CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
 			else
 				values[10] = CStringGetTextDatum("potential");
+
+			if (replyTime == 0)
+				nulls[11] = true;
+			else
+				values[11] = TimestampTzGetDatum(replyTime);
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index be72bddd17..eec843cf7e 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	201811201
+#define CATALOG_VERSION_NO	201812041
 
 #endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 034a41eb55..f79fcfe029 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5023,9 +5023,9 @@
   proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}',
+  proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}',
   prosrc => 'pg_stat_get_wal_senders' },
 { oid => '3317', descr => 'statistics: information about WAL receiver',
   proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 4b90477936..7aba153ff6 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -75,6 +75,11 @@ typedef struct WalSnd
 	 * SyncRepLock.
 	 */
 	int			sync_standby_priority;
+
+	/*
+	 * Timestamp of the last message received from standby.
+	 */
+	TimestampTz	replyTime;
 } WalSnd;
 
 extern WalSnd *MyWalSnd;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 735dd37acf..b68b8d273f 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1861,9 +1861,10 @@ pg_stat_replication| SELECT s.pid,
     w.flush_lag,
     w.replay_lag,
     w.sync_priority,
-    w.sync_state
+    w.sync_state,
+    w.reply_time
    FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn)
-     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid)))
+     JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_ssl| SELECT s.pid,
     s.ssl,

Attachment: signature.asc
Description: PGP signature

Reply via email to