diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 8d7b3bf..b894e31 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3310,6 +3310,26 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-replication-lag-sample-interval" xreflabel="replication_lag_sample_interval">
+      <term><varname>replication_lag_sample_interval</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>replication_lag_sample_interval</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Controls how often a standby should sample timestamps from upstream to
+        send back to the primary or upstream standby after writing, flushing
+        and replaying WAL.  The default is 1 second.  Units are milliseconds if
+        not specified.  A value of -1 disables the reporting of replication
+        lag.  Estimated lag can be seen in the <link linkend="monitoring-stats-views-table">
+        <literal>pg_stat_replication</></link> view of the upstream server.
+        This parameter can only be set
+        in the <filename>postgresql.conf</> file or on the server command line.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-hot-standby-feedback" xreflabel="hot_standby_feedback">
       <term><varname>hot_standby_feedback</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 1545f03..a422ac0 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1405,6 +1405,24 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       standby server</entry>
     </row>
     <row>
+     <entry><structfield>write_lag</></entry>
+     <entry><type>interval</></entry>
+     <entry>Estimated time taken for recent WAL records to be written on this
+      standby server</entry>
+    </row>
+    <row>
+     <entry><structfield>flush_lag</></entry>
+     <entry><type>interval</></entry>
+     <entry>Estimated time taken for recent WAL records to be flushed on this
+      standby server</entry>
+    </row>
+    <row>
+     <entry><structfield>replay_lag</></entry>
+     <entry><type>interval</></entry>
+     <entry>Estimated time taken for recent WAL records to be replayed on this
+      standby server</entry>
+    </row>
+    <row>
      <entry><structfield>sync_priority</></entry>
      <entry><type>integer</></entry>
      <entry>Priority of this standby server for being chosen as the
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f8ffa5c..7e7312f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -82,6 +82,8 @@ extern uint32 bootstrap_data_checksum_version;
 #define PROMOTE_SIGNAL_FILE		"promote"
 #define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote"
 
+/* Size of the circular buffer of timestamped LSNs. */
+#define XLOG_TIMESTAMP_BUFFER_SIZE 8192
 
 /* User-settable parameters */
 int			max_wal_size = 64;	/* 1 GB */
@@ -530,6 +532,26 @@ typedef struct XLogCtlInsert
 } XLogCtlInsert;
 
 /*
+ * A sample associating a timestamp with a given xlog position.
+ */
+typedef struct XLogTimestamp
+{
+	TimestampTz	timestamp;
+	XLogRecPtr	lsn;
+} XLogTimestamp;
+
+/*
+ * A circular buffer of LSNs and associated timestamps.  The buffer is empty
+ * when read_head == write_head.
+ */
+typedef struct XLogTimestampBuffer
+{
+	uint32			read_head;
+	uint32			write_head;
+	XLogTimestamp	buffer[XLOG_TIMESTAMP_BUFFER_SIZE];
+} XLogTimestampBuffer;
+
+/*
  * Total shared-memory state for XLOG.
  */
 typedef struct XLogCtlData
@@ -648,6 +670,14 @@ typedef struct XLogCtlData
 	/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
 	TimestampTz recoveryLastXTime;
 
+	/* timestamp from the most recently applied record associated with a timestamp. */
+	TimestampTz lastReplayedTimestamp;
+
+	/* buffers of timestamps for WAL that is not yet written/flushed/applied. */
+	XLogTimestampBuffer writeTimestamps;
+	XLogTimestampBuffer flushTimestamps;
+	XLogTimestampBuffer applyTimestamps;
+
 	/*
 	 * timestamp of when we started replaying the current chunk of WAL data,
 	 * only relevant for replication or archive recovery
@@ -6006,6 +6036,96 @@ CheckRequiredParameterValues(void)
 }
 
 /*
+ * Read and consume all records from 'buffer' whose position is <= 'lsn'.
+ * Return true if any such records are found, and write the latest timestamp
+ * found into *timestamp.  Write the new read head position into *read_head,
+ * so that the caller can store it with appropriate locking.
+ */
+static bool
+ReadXLogTimestampForLsn(XLogTimestampBuffer *buffer,
+						XLogRecPtr lsn,
+						uint32 *read_head,
+						TimestampTz *timestamp)
+{
+	bool found = false;
+
+	/*
+	 * It's OK to access buffer->read_head without any kind synchronization
+	 * because in all cases the caller is the only process reading from the
+	 * buffer (ie writing to *buffer->read_head).
+	 */
+	*read_head = buffer->read_head;
+
+	/*
+	 * It's OK to access write_head without interlocking because it's an
+	 * aligned 32 bit value which we can read atomically on all supported
+	 * platforms to get some recent value, not a torn/garbage value.
+	 * Furthermore we must see a value that is at least as recent as any WAL
+	 * that we have written/flushed/replayed, because walreceiver calls
+	 * SetXLogTimestampAtLsn before writing.
+	 */
+	while (*read_head != buffer->write_head &&
+		   buffer->buffer[*read_head].lsn <= lsn)
+	{
+		found = true;
+		*timestamp = buffer->buffer[*read_head].timestamp;
+		*read_head = (*read_head + 1) % XLOG_TIMESTAMP_BUFFER_SIZE;
+	}
+
+	return found;
+}
+
+/*
+ * Called by the WAL receiver process after it has written up to 'lsn'.
+ * Return true if it has written any LSN location that had an associated
+ * timestamp, and write the timestamp to '*timestamp'.
+ */
+bool
+CheckForWrittenTimestampedLsn(XLogRecPtr lsn, TimestampTz *timestamp)
+{
+	Assert(AmWalReceiverProcess());
+
+	return ReadXLogTimestampForLsn(&XLogCtl->writeTimestamps, lsn,
+								   &XLogCtl->writeTimestamps.read_head,
+								   timestamp);
+}
+
+/*
+ * Called by the WAL receiver process after it has flushed up to 'lsn'.
+ * Return true if it has flushed any LSN location that had an associated
+ * timestamp, and write the timestamp to '*timestamp'.
+ */
+bool
+CheckForFlushedTimestampedLsn(XLogRecPtr lsn, TimestampTz *timestamp)
+{
+	Assert(AmWalReceiverProcess());
+
+	return ReadXLogTimestampForLsn(&XLogCtl->flushTimestamps, lsn,
+								   &XLogCtl->flushTimestamps.read_head,
+								   timestamp);
+}
+
+/*
+ * Called by the startup process after it has replayed up to 'lsn'.  Checks
+ * for timestamps associated with WAL positions that have now been replayed.
+ * If any are found, the latest such timestamp found is written to
+ * '*timestamp'.  Returns the new buffer read head position, which the caller
+ * should write into XLogCtl->timestamps.read_head while holding info_lck.
+ */
+static uint32
+CheckForAppliedTimestampedLsn(XLogRecPtr lsn, TimestampTz *timestamp)
+{
+	uint32 read_head;
+
+	Assert(AmStartupProcess());
+
+	ReadXLogTimestampForLsn(&XLogCtl->applyTimestamps, lsn, &read_head,
+							timestamp);
+
+	return read_head;
+}
+
+/*
  * This must be called ONCE during postmaster or standalone-backend startup
  */
 void
@@ -6824,6 +6944,8 @@ StartupXLOG(void)
 			do
 			{
 				bool		switchedTLI = false;
+				TimestampTz	replayed_timestamp = 0;
+				uint32		timestamp_read_head;
 
 #ifdef WAL_DEBUG
 				if (XLOG_DEBUG ||
@@ -6977,24 +7099,35 @@ StartupXLOG(void)
 				/* Pop the error context stack */
 				error_context_stack = errcallback.previous;
 
+				/* Check if we have replayed a timestamped WAL position */
+				timestamp_read_head =
+					CheckForAppliedTimestampedLsn(EndRecPtr,
+												  &replayed_timestamp);
+
 				/*
-				 * Update lastReplayedEndRecPtr after this record has been
-				 * successfully replayed.
+				 * Update lastReplayedEndRecPtr and lastReplayedTimestamp
+				 * after this record has been successfully replayed.
 				 */
 				SpinLockAcquire(&XLogCtl->info_lck);
 				XLogCtl->lastReplayedEndRecPtr = EndRecPtr;
 				XLogCtl->lastReplayedTLI = ThisTimeLineID;
+				XLogCtl->applyTimestamps.read_head = timestamp_read_head;
+				if (replayed_timestamp != 0)
+					XLogCtl->lastReplayedTimestamp = replayed_timestamp;
 				SpinLockRelease(&XLogCtl->info_lck);
 
 				/*
 				 * If rm_redo called XLogRequestWalReceiverReply, then we wake
 				 * up the receiver so that it notices the updated
-				 * lastReplayedEndRecPtr and sends a reply to the master.
+				 * lastReplayedEndRecPtr and sends a reply to the master.  We
+				 * also wake it if we have replayed a WAL position that has
+				 * an associated timestamp so that the upstream server can
+				 * measure our replay lag.
 				 */
-				if (doRequestWalReceiverReply)
+				if (doRequestWalReceiverReply || replayed_timestamp != 0)
 				{
 					doRequestWalReceiverReply = false;
-					WalRcvForceReply();
+					WalRcvForceReply(replayed_timestamp != 0);
 				}
 
 				/* Remember this record as the last-applied one */
@@ -11809,3 +11942,106 @@ XLogRequestWalReceiverReply(void)
 {
 	doRequestWalReceiverReply = true;
 }
+
+/*
+ * Store an (lsn, timestamp) sample in a timestamp buffer.
+ */
+static void
+StoreXLogTimestampAtLsn(XLogTimestampBuffer *buffer,
+							TimestampTz timestamp, XLogRecPtr lsn)
+{
+
+	uint32 write_head = buffer->write_head;
+	uint32 new_write_head = (write_head + 1) % XLOG_TIMESTAMP_BUFFER_SIZE;
+
+	Assert(AmWalReceiverProcess());
+
+	if (new_write_head == buffer->read_head)
+	{
+		/*
+		 * The buffer is full, so we'll rewind and overwrite the most
+		 * recent sample.  Overwriting the most recent sample means that
+		 * if we're not writing/flushing/replaying fast enough and the buffer
+		 * fills up, we'll effectively lower the sampling rate.
+		 */
+		new_write_head = write_head;
+		write_head = (write_head - 1) % XLOG_TIMESTAMP_BUFFER_SIZE;
+	}
+
+	buffer->buffer[write_head].lsn = lsn;
+	buffer->buffer[write_head].timestamp = timestamp;
+	buffer->write_head = new_write_head;
+}
+
+/*
+ * Record the timestamp that is associated with a WAL position.
+ *
+ * This is called by walreceiver on standby servers when new messages arrive,
+ * using a timestamp and the latest known WAL position from the upstream
+ * server.  The timestamp will be sent back to the upstream server via
+ * walreceiver when the WAL position is eventually written, flushed and
+ * applied.
+ */
+void
+SetXLogTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn)
+{
+	bool applied_end = false;
+	static TimestampTz last_timestamp;
+	static XLogRecPtr last_lsn;
+
+	Assert(AmWalReceiverProcess());
+	Assert(replication_lag_sample_interval >= 0);
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+
+	/*
+	 * Check if we're fully applied, so we can avoid recording samples in that
+	 * case.  There is effectively no replay lag, and we don't want to report
+	 * bogus lag after a period of idleness.
+	 */
+	if (lsn == XLogCtl->lastReplayedEndRecPtr)
+		applied_end = true;
+
+	/*
+	 * Record this timestamp/LSN pair, if the LSN has moved since last time
+	 * and we haven't recorded a sample too recently.
+	 */
+	if (!applied_end &&
+		lsn > last_lsn &&
+		timestamp > TimestampTzPlusMilliseconds(last_timestamp,
+											replication_lag_sample_interval))
+	{
+		StoreXLogTimestampAtLsn(&XLogCtl->applyTimestamps, timestamp, lsn);
+		StoreXLogTimestampAtLsn(&XLogCtl->writeTimestamps, timestamp, lsn);
+		StoreXLogTimestampAtLsn(&XLogCtl->flushTimestamps, timestamp, lsn);
+
+		last_timestamp = timestamp;
+		last_lsn = lsn;
+	}
+
+	SpinLockRelease(&XLogCtl->info_lck);
+}
+
+/*
+ * Get the timestamp for the most recently applied WAL record that carried a
+ * timestamp from the upstream server, and also the most recently applied LSN.
+ * (Note that the timestamp and the LSN don't necessarily relate to the same
+ * record.)
+ *
+ * This is similar to GetLatestXTime, except that it is advanced when WAL
+ * positions recorded with SetXLogReplayTimestampAtLsn have been applied,
+ * rather than commit records.
+ */
+TimestampTz
+GetXLogReplayTimestamp(XLogRecPtr *lsn)
+{
+	TimestampTz result;
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+	if (lsn)
+		*lsn = XLogCtl->lastReplayedEndRecPtr;
+	result = XLogCtl->lastReplayedTimestamp;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	return result;
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 649cef8..2fd63e3 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -685,6 +685,9 @@ CREATE VIEW pg_stat_replication AS
             W.write_location,
             W.flush_location,
             W.replay_location,
+            W.write_lag,
+            W.flush_lag,
+            W.replay_lag,
             W.sync_priority,
             W.sync_state
     FROM pg_stat_get_activity(NULL) AS S
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index cc3cf7d..621aa24 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -73,6 +73,7 @@
 int			wal_receiver_status_interval;
 int			wal_receiver_timeout;
 bool		hot_standby_feedback;
+int			replication_lag_sample_interval;
 
 /* libpqwalreceiver connection */
 static WalReceiverConn *wrconn = NULL;
@@ -107,6 +108,10 @@ static struct
 	XLogRecPtr	Flush;			/* last byte + 1 flushed in the standby */
 }	LogstreamResult;
 
+/* Latest timestamps for replication lag tracking. */
+static TimestampTz last_write_timestamp;
+static TimestampTz last_flush_timestamp;
+
 static StringInfoData reply_message;
 static StringInfoData incoming_message;
 
@@ -138,7 +143,7 @@ static void WalRcvDie(int code, Datum arg);
 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(bool dying);
-static void XLogWalRcvSendReply(bool force, bool requestReply);
+static void XLogWalRcvSendReply(bool force, bool requestReply, int timestamps);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 
@@ -148,6 +153,16 @@ static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
 static void WalRcvShutdownHandler(SIGNAL_ARGS);
 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
 
+/*
+ * Which timestamps to include in a reply message.
+ */
+typedef enum XLogReplyTimestamp
+{
+	REPLY_WRITE_TIMESTAMP = 1,
+	REPLY_FLUSH_TIMESTAMP = 2,
+	REPLY_APPLY_TIMESTAMP = 4
+} XLogReplyTimestamp;
+
 
 static void
 ProcessWalRcvInterrupts(void)
@@ -424,6 +439,8 @@ WalReceiverMain(void)
 				len = walrcv_receive(wrconn, &buf, &wait_fd);
 				if (len != 0)
 				{
+					int timestamp = 0;
+
 					/*
 					 * Process the received data, and any subsequent data we
 					 * can read without blocking.
@@ -455,8 +472,17 @@ WalReceiverMain(void)
 						len = walrcv_receive(wrconn, &buf, &wait_fd);
 					}
 
+					/*
+					 * Check if we have written an LSN location for which we
+					 * have a timestamp from the upstream server, for
+					 * replication lag tracking.
+					 */
+					if (CheckForWrittenTimestampedLsn(LogstreamResult.Write,
+													  &last_write_timestamp))
+						timestamp = REPLY_WRITE_TIMESTAMP;
+
 					/* Let the master know that we received some data. */
-					XLogWalRcvSendReply(false, false);
+					XLogWalRcvSendReply(false, false, timestamp);
 
 					/*
 					 * If we've written some records, flush them to disk and
@@ -493,15 +519,20 @@ WalReceiverMain(void)
 					ResetLatch(walrcv->latch);
 					if (walrcv->force_reply)
 					{
+						int timestamps = 0;
+
 						/*
 						 * The recovery process has asked us to send apply
 						 * feedback now.  Make sure the flag is really set to
 						 * false in shared memory before sending the reply, so
 						 * we don't miss a new request for a reply.
 						 */
+						if (walrcv->force_reply_apply_timestamp)
+							timestamps = REPLY_APPLY_TIMESTAMP;
 						walrcv->force_reply = false;
+						walrcv->force_reply_apply_timestamp = false;
 						pg_memory_barrier();
-						XLogWalRcvSendReply(true, false);
+						XLogWalRcvSendReply(true, false, timestamps);
 					}
 				}
 				if (rc & WL_POSTMASTER_DEATH)
@@ -559,7 +590,7 @@ WalReceiverMain(void)
 						}
 					}
 
-					XLogWalRcvSendReply(requestReply, requestReply);
+					XLogWalRcvSendReply(requestReply, requestReply, 0);
 					XLogWalRcvSendHSFeedback(false);
 				}
 			}
@@ -911,7 +942,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 
 				/* If the primary requested a reply, send one immediately */
 				if (replyRequested)
-					XLogWalRcvSendReply(true, false);
+					XLogWalRcvSendReply(true, false, 0);
 				break;
 			}
 		default:
@@ -1074,7 +1105,18 @@ XLogWalRcvFlush(bool dying)
 		/* Also let the master know that we made some progress */
 		if (!dying)
 		{
-			XLogWalRcvSendReply(false, false);
+			/*
+			 * Check if we have just flushed a position for which we have a
+			 * timestamp from the upstream server, for replication lag
+			 * tracking.
+			 */
+			int timestamp = 0;
+
+			if (CheckForFlushedTimestampedLsn(LogstreamResult.Flush,
+											  &last_flush_timestamp))
+				timestamp = REPLY_FLUSH_TIMESTAMP;
+
+			XLogWalRcvSendReply(false, false, timestamp);
 			XLogWalRcvSendHSFeedback(false);
 		}
 	}
@@ -1092,21 +1134,27 @@ XLogWalRcvFlush(bool dying)
  * If 'requestReply' is true, requests the server to reply immediately upon
  * receiving this message. This is used for heartbearts, when approaching
  * wal_receiver_timeout.
+ *
+ * The bitmap 'timestamps' specifies which timestamps should be included, for
+ * replication lag tracking purposes.
  */
 static void
-XLogWalRcvSendReply(bool force, bool requestReply)
+XLogWalRcvSendReply(bool force, bool requestReply, int timestamps)
 {
 	static XLogRecPtr writePtr = 0;
 	static XLogRecPtr flushPtr = 0;
 	XLogRecPtr	applyPtr;
 	static TimestampTz sendTime = 0;
 	TimestampTz now;
+	TimestampTz writeTimestamp = 0;
+	TimestampTz flushTimestamp = 0;
+	TimestampTz applyTimestamp = 0;
 
 	/*
 	 * If the user doesn't want status to be reported to the master, be sure
 	 * to exit before doing anything at all.
 	 */
-	if (!force && wal_receiver_status_interval <= 0)
+	if (!force && timestamps == 0 && wal_receiver_status_interval <= 0)
 		return;
 
 	/* Get current timestamp. */
@@ -1132,7 +1180,41 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	/* Construct a new message */
 	writePtr = LogstreamResult.Write;
 	flushPtr = LogstreamResult.Flush;
-	applyPtr = GetXLogReplayRecPtr(NULL);
+	applyTimestamp = GetXLogReplayTimestamp(&applyPtr);
+	flushTimestamp = last_flush_timestamp;
+	writeTimestamp = last_write_timestamp;
+
+	/* Decide whether to send timestamps for replay lag estimation. */
+	if (replication_lag_sample_interval != -1)
+	{
+		static TimestampTz lastApplyTimestampSendTime = 0;
+
+		/*
+		 * Only send an apply timestamp if we were explicitly asked to by the
+		 * recovery process or if replay lag sampling is active but the
+		 * recovery process seems to be stuck.
+		 *
+		 * If we haven't heard from the recovery process in a time exceeding
+		 * wal_receiver_status_interval and yet it has not applied the highest
+		 * LSN we've heard about, then we want to resend the last replayed
+		 * timestamp we have; otherwise we zero it out and wait for the
+		 * recovery process to wake us when it has set a new accurate replay
+		 * timestamp.  Note that we can read latestWalEnd without acquiring the
+		 * mutex that protects it because it is only written to by this
+		 * process (walreceiver).
+		 */
+		if (((timestamps & REPLY_APPLY_TIMESTAMP) != 0) ||
+			(WalRcv->latestWalEnd > applyPtr &&
+			 TimestampDifferenceExceeds(lastApplyTimestampSendTime, now,
+										wal_receiver_status_interval * 1000)))
+			lastApplyTimestampSendTime = now;
+		else
+			applyTimestamp = 0;
+		if ((timestamps & REPLY_FLUSH_TIMESTAMP) == 0)
+			flushTimestamp = 0;
+		if ((timestamps & REPLY_WRITE_TIMESTAMP) == 0)
+			writeTimestamp = 0;
+	}
 
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, 'r');
@@ -1140,6 +1222,9 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	pq_sendint64(&reply_message, flushPtr);
 	pq_sendint64(&reply_message, applyPtr);
 	pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
+	pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(writeTimestamp));
+	pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(flushTimestamp));
+	pq_sendint64(&reply_message, TimestampTzToIntegerTimestamp(applyTimestamp));
 	pq_sendbyte(&reply_message, requestReply ? 1 : 0);
 
 	/* Send it */
@@ -1244,7 +1329,6 @@ static void
 ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
 {
 	WalRcvData *walrcv = WalRcv;
-
 	TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
 
 	/* Update shared-memory status */
@@ -1256,6 +1340,16 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
 	walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
 	SpinLockRelease(&walrcv->mutex);
 
+	/*
+	 * If replication lag sampling is active, remember the upstream server's
+	 * timestamp at the latest WAL end that it has.  We'll be able to retrieve
+	 * this timestamp once we have written, flushed and finally applied this
+	 * LSN, so that we can report it to the upstream server for lag tracking
+	 * purposes.
+	 */
+	if (replication_lag_sample_interval != -1)
+		SetXLogTimestampAtLsn(sendTime, walEnd);
+
 	if (log_min_messages <= DEBUG2)
 	{
 		char	   *sendtime;
@@ -1291,12 +1385,14 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
  * This is called by the startup process whenever interesting xlog records
  * are applied, so that walreceiver can check if it needs to send an apply
  * notification back to the master which may be waiting in a COMMIT with
- * synchronous_commit = remote_apply.
+ * synchronous_commit = remote_apply.  Also used to send periodic messages
+ * which are used to compute pg_stat_replication.replay_lag.
  */
 void
-WalRcvForceReply(void)
+WalRcvForceReply(bool apply_timestamp)
 {
 	WalRcv->force_reply = true;
+	WalRcv->force_reply_apply_timestamp = apply_timestamp;
 	if (WalRcv->latch)
 		SetLatch(WalRcv->latch);
 }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5cdb8a0..3fbca0c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1545,6 +1545,25 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 }
 
 /*
+ * Compute the difference between 'timestamp' and 'now' in microseconds.
+ * Return -1 if timestamp is zero.
+ */
+static uint64
+compute_lag(TimestampTz now, TimestampTz timestamp)
+{
+	if (timestamp == 0)
+		return -1;
+	else
+	{
+#ifdef HAVE_INT64_TIMESTAMP
+		return now - timestamp;
+#else
+		return (now - timestamp) * 1000000;
+#endif
+	}
+}
+
+/*
  * Regular reply from standby advising of WAL positions on standby server.
  */
 static void
@@ -1553,15 +1572,30 @@ ProcessStandbyReplyMessage(void)
 	XLogRecPtr	writePtr,
 				flushPtr,
 				applyPtr;
+	int64		writeLagUs,
+				flushLagUs,
+				applyLagUs;
+	TimestampTz writeTimestamp,
+				flushTimestamp,
+				applyTimestamp;
 	bool		replyRequested;
+	TimestampTz now = GetCurrentTimestamp();
 
 	/* the caller already consumed the msgtype byte */
 	writePtr = pq_getmsgint64(&reply_message);
 	flushPtr = pq_getmsgint64(&reply_message);
 	applyPtr = pq_getmsgint64(&reply_message);
 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
+	writeTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message));
+	flushTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message));
+	applyTimestamp = IntegerTimestampToTimestampTz(pq_getmsgint64(&reply_message));
 	replyRequested = pq_getmsgbyte(&reply_message);
 
+	/* Compute the replication lag. */
+	writeLagUs = compute_lag(now, writeTimestamp);
+	flushLagUs = compute_lag(now, flushTimestamp);
+	applyLagUs = compute_lag(now, applyTimestamp);
+
 	elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
 		 (uint32) (writePtr >> 32), (uint32) writePtr,
 		 (uint32) (flushPtr >> 32), (uint32) flushPtr,
@@ -1583,6 +1617,12 @@ ProcessStandbyReplyMessage(void)
 		walsnd->write = writePtr;
 		walsnd->flush = flushPtr;
 		walsnd->apply = applyPtr;
+		if (writeLagUs >= 0)
+			walsnd->writeLagUs = writeLagUs;
+		if (flushLagUs >= 0)
+			walsnd->flushLagUs = flushLagUs;
+		if (applyLagUs >= 0)
+			walsnd->applyLagUs = applyLagUs;
 		SpinLockRelease(&walsnd->mutex);
 	}
 
@@ -1979,6 +2019,9 @@ InitWalSenderSlot(void)
 			walsnd->write = InvalidXLogRecPtr;
 			walsnd->flush = InvalidXLogRecPtr;
 			walsnd->apply = InvalidXLogRecPtr;
+			walsnd->writeLagUs = -1;
+			walsnd->flushLagUs = -1;
+			walsnd->applyLagUs = -1;
 			walsnd->state = WALSNDSTATE_STARTUP;
 			walsnd->latch = &MyProc->procLatch;
 			SpinLockRelease(&walsnd->mutex);
@@ -2753,6 +2796,21 @@ WalSndGetStateString(WalSndState state)
 	return "UNKNOWN";
 }
 
+static Interval *
+lag_as_interval(uint64 lag_us)
+{
+	Interval *result = palloc(sizeof(Interval));
+
+	result->month = 0;
+	result->day = 0;
+#ifdef HAVE_INT64_TIMESTAMP
+	result->time = lag_us;
+#else
+	result->time = lag_us / 1000000.0;
+#endif
+
+	return result;
+}
 
 /*
  * Returns activity of walsenders, including pids and xlog locations sent to
@@ -2761,7 +2819,7 @@ WalSndGetStateString(WalSndState state)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS	8
+#define PG_STAT_GET_WAL_SENDERS_COLS	11
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -2809,6 +2867,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		XLogRecPtr	write;
 		XLogRecPtr	flush;
 		XLogRecPtr	apply;
+		int64		writeLagUs;
+		int64		flushLagUs;
+		int64		applyLagUs;
 		int			priority;
 		WalSndState state;
 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -2823,6 +2884,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		write = walsnd->write;
 		flush = walsnd->flush;
 		apply = walsnd->apply;
+		writeLagUs = walsnd->writeLagUs;
+		flushLagUs = walsnd->flushLagUs;
+		applyLagUs = walsnd->applyLagUs;
 		priority = walsnd->sync_standby_priority;
 		SpinLockRelease(&walsnd->mutex);
 
@@ -2857,6 +2921,21 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 				nulls[5] = true;
 			values[5] = LSNGetDatum(apply);
 
+			if (writeLagUs < 0)
+				nulls[6] = true;
+			else
+				values[6] = IntervalPGetDatum(lag_as_interval(writeLagUs));
+
+			if (flushLagUs < 0)
+				nulls[7] = true;
+			else
+				values[7] = IntervalPGetDatum(lag_as_interval(flushLagUs));
+
+			if (applyLagUs < 0)
+				nulls[8] = true;
+			else
+				values[8] = IntervalPGetDatum(lag_as_interval(applyLagUs));
+
 			/*
 			 * Treat a standby such as a pg_basebackup background process
 			 * which always returns an invalid flush location, as an
@@ -2864,7 +2943,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 			 */
 			priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
 
-			values[6] = Int32GetDatum(priority);
+			values[9] = Int32GetDatum(priority);
 
 			/*
 			 * More easily understood version of standby state. This is purely
@@ -2878,12 +2957,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 			 * states. We report just "quorum" for them.
 			 */
 			if (priority == 0)
-				values[7] = CStringGetTextDatum("async");
+				values[10] = CStringGetTextDatum("async");
 			else if (list_member_int(sync_standbys, i))
-				values[7] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
+				values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
 					CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
 			else
-				values[7] = CStringGetTextDatum("potential");
+				values[10] = CStringGetTextDatum("potential");
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index 545e9e0..90c608d 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -1777,6 +1777,20 @@ GetSQLLocalTimestamp(int32 typmod)
 }
 
 /*
+ * TimestampTzToIntegerTimestamp -- convert a native timestamp to int64 format
+ *
+ * When compiled with --enable-integer-datetimes, this is implemented as a
+ * no-op macro.
+ */
+#ifndef HAVE_INT64_TIMESTAMP
+int64
+TimestampTzToIntegerTimestamp(TimestampTz timestamp)
+{
+	return timestamp * 1000000;
+}
+#endif
+
+/*
  * TimestampDifference -- convert the difference between two timestamps
  *		into integer seconds and microseconds
  *
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 946ba9e..1adb598 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1800,6 +1800,17 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"replication_lag_sample_interval", PGC_SIGHUP, REPLICATION_STANDBY,
+			gettext_noop("Sets the minimum time between WAL timestamp samples used to estimate replication lag."),
+			NULL,
+			GUC_UNIT_MS
+		},
+		&replication_lag_sample_interval,
+		1 * 1000, -1, INT_MAX / 1000,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"wal_receiver_timeout", PGC_SIGHUP, REPLICATION_STANDBY,
 			gettext_noop("Sets the maximum wait time to receive data from the primary."),
 			NULL,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ee8232f..f703e25 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -271,6 +271,8 @@
 					# in milliseconds; 0 disables
 #wal_retrieve_retry_interval = 5s	# time to wait before retrying to
 					# retrieve WAL after a failed attempt
+#replication_lag_sample_interval = 1s	# min time between timestamps recorded
+					# to estimate lag; -1 disables lag sampling
 
 
 #------------------------------------------------------------------------------
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index cb5f989..6feb95d 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -111,7 +111,7 @@ sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested)
 	static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
 	static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
 
-	char		replybuf[1 + 8 + 8 + 8 + 8 + 1];
+	char		replybuf[1 + 8 + 8 + 8 + 8 + 8 + 8 + 8 + 1];
 	int			len = 0;
 
 	/*
@@ -142,6 +142,12 @@ sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested)
 	len += 8;
 	fe_sendint64(now, &replybuf[len]);	/* sendTime */
 	len += 8;
+	fe_sendint64(0, &replybuf[len]);	/* writeTimestamp */
+	len += 8;
+	fe_sendint64(0, &replybuf[len]);	/* flushTimestamp */
+	len += 8;
+	fe_sendint64(0, &replybuf[len]);	/* applyTimestamp */
+	len += 8;
 	replybuf[len] = replyRequested ? 1 : 0;		/* replyRequested */
 	len += 1;
 
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 568ff17..960e02f 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -321,7 +321,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
 static bool
 sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
 {
-	char		replybuf[1 + 8 + 8 + 8 + 8 + 1];
+	char		replybuf[1 + 8 + 8 + 8 + 8 + 8 + 8 + 8 + 1];
 	int			len = 0;
 
 	replybuf[len] = 'r';
@@ -337,6 +337,12 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
 	len += 8;
 	fe_sendint64(now, &replybuf[len]);	/* sendTime */
 	len += 8;
+	fe_sendint64(0, &replybuf[len]);	/* writeTimestamp */
+	len += 8;
+	fe_sendint64(0, &replybuf[len]);	/* flushTimestamp */
+	len += 8;
+	fe_sendint64(0, &replybuf[len]);	/* applyTimestamp */
+	len += 8;
 	replybuf[len] = replyRequested ? 1 : 0;		/* replyRequested */
 	len += 1;
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 7d21408..ee11cf5 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -246,6 +246,12 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
 extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
 extern XLogRecPtr GetXLogInsertRecPtr(void);
 extern XLogRecPtr GetXLogWriteRecPtr(void);
+extern void SetXLogTimestampAtLsn(TimestampTz timestamp, XLogRecPtr lsn);
+extern bool CheckForWrittenTimestampedLsn(XLogRecPtr lsn,
+										  TimestampTz *timestamp);
+extern bool CheckForFlushedTimestampedLsn(XLogRecPtr lsn,
+										  TimestampTz *timestamp);
+extern TimestampTz GetXLogReplayTimestamp(XLogRecPtr *lsn);
 extern bool RecoveryIsPaused(void);
 extern void SetRecoveryPause(bool recoveryPause);
 extern TimestampTz GetLatestXTime(void);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index a6cc2eb..80267b4 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2768,7 +2768,7 @@ DATA(insert OID = 2022 (  pg_stat_get_activity			PGNSP PGUID 12 1 100 0 0 f f f
 DESCR("statistics: information about currently active backends");
 DATA(insert OID = 3318 (  pg_stat_get_progress_info			  PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ ));
 DESCR("statistics: information about progress of backends running maintenance command");
-DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,write_lag,flush_lag,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active replication");
 DATA(insert OID = 3317 (  pg_stat_get_wal_receiver	PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
 DESCR("statistics: information about WAL receiver");
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 28dc1fc..41b248f 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -23,6 +23,7 @@
 extern int	wal_receiver_status_interval;
 extern int	wal_receiver_timeout;
 extern bool hot_standby_feedback;
+extern int	replication_lag_sample_interval;
 
 /*
  * MAXCONNINFO: maximum size of a connection string.
@@ -119,6 +120,9 @@ typedef struct
 	 */
 	bool		force_reply;
 
+	/* include the latest replayed timestamp when replying? */
+	bool		force_reply_apply_timestamp;
+
 	/* set true once conninfo is ready to display (obfuscated pwds etc) */
 	bool		ready_to_display;
 
@@ -208,6 +212,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
 extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
 extern int	GetReplicationApplyDelay(void);
 extern int	GetReplicationTransferLatency(void);
-extern void WalRcvForceReply(void);
+extern void WalRcvForceReply(bool sendApplyTimestamp);
 
 #endif   /* _WALRECEIVER_H */
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 7794aa5..fb3a03f 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -46,6 +46,9 @@ typedef struct WalSnd
 	XLogRecPtr	write;
 	XLogRecPtr	flush;
 	XLogRecPtr	apply;
+	int64		writeLagUs;
+	int64		flushLagUs;
+	int64		applyLagUs;
 
 	/* Protects shared variables shown above. */
 	slock_t		mutex;
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 93b90fe..20517c9 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -233,9 +233,11 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time,
 #ifndef HAVE_INT64_TIMESTAMP
 extern int64 GetCurrentIntegerTimestamp(void);
 extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp);
+extern int64 TimestampTzToIntegerTimestamp(TimestampTz timestamp);
 #else
 #define GetCurrentIntegerTimestamp()	GetCurrentTimestamp()
 #define IntegerTimestampToTimestampTz(timestamp) (timestamp)
+#define TimestampTzToIntegerTimestamp(timestamp) (timestamp)
 #endif
 
 extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index e9cfadb..14147c5 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1809,10 +1809,13 @@ pg_stat_replication| SELECT s.pid,
     w.write_location,
     w.flush_location,
     w.replay_location,
+    w.write_lag,
+    w.flush_lag,
+    w.replay_lag,
     w.sync_priority,
     w.sync_state
    FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn)
-     JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state) ON ((s.pid = w.pid)))
+     JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_ssl| SELECT s.pid,
     s.ssl,
