diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 5b67def..816cc9b 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1413,6 +1413,24 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       standby server</entry>
     </row>
     <row>
+     <entry><structfield>write_lag</></entry>
+     <entry><type>interval</></entry>
+     <entry>Estimated time taken for recent WAL records to be written on this
+      standby server</entry>
+    </row>
+    <row>
+     <entry><structfield>flush_lag</></entry>
+     <entry><type>interval</></entry>
+     <entry>Estimated time taken for recent WAL records to be flushed on this
+      standby server</entry>
+    </row>
+    <row>
+     <entry><structfield>replay_lag</></entry>
+     <entry><type>interval</></entry>
+     <entry>Estimated time taken for recent WAL records to be replayed on this
+      standby server</entry>
+    </row>
+    <row>
      <entry><structfield>sync_priority</></entry>
      <entry><type>integer</></entry>
      <entry>Priority of this standby server for being chosen as the
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 2dcff7f..be0aae9 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -11509,6 +11509,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 {
 	static TimestampTz last_fail_time = 0;
 	TimestampTz now;
+	bool		streaming_reply_sent = false;
 
 	/*-------
 	 * Standby mode is implemented by a state machine:
@@ -11832,6 +11833,19 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					}
 
 					/*
+					 * Since we have replayed everything we have received so
+					 * far and are about to start waiting for more WAL, let's
+					 * tell the upstream server our replay location now so
+					 * that pg_stat_replication doesn't show stale
+					 * information.
+					 */
+					if (!streaming_reply_sent)
+					{
+						WalRcvForceReply();
+						streaming_reply_sent = true;
+					}
+
+					/*
 					 * Wait for more WAL to arrive. Time out after 5 seconds
 					 * to react to a trigger file promptly.
 					 */
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 38be9cf..60047d7 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -705,6 +705,9 @@ CREATE VIEW pg_stat_replication AS
             W.write_location,
             W.flush_location,
             W.replay_location,
+            W.write_lag,
+            W.flush_lag,
+            W.replay_lag,
             W.sync_priority,
             W.sync_state
     FROM pg_stat_get_activity(NULL) AS S
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ba506e2..62e0110 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -188,6 +188,31 @@ static volatile sig_atomic_t replication_active = false;
 static LogicalDecodingContext *logical_decoding_ctx = NULL;
 static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
 
+/* A sample associating a log position with the time it was written. */
+typedef struct
+{
+	XLogRecPtr lsn;
+	TimestampTz time;
+} WalTimeSample;
+
+/* The size of our buffer of time samples. */
+#define LAG_TRACKER_BUFFER_SIZE 8192
+
+/* Constants for the read heads used in the lag tracking circular buffer. */
+#define LAG_TRACKER_WRITE_HEAD 0
+#define LAG_TRACKER_FLUSH_HEAD 1
+#define LAG_TRACKER_APPLY_HEAD 2
+#define LAG_TRACKER_NUM_READ_HEADS 3
+
+/* A mechanism for tracking replication lag. */
+static struct
+{
+	XLogRecPtr last_lsn;
+	WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
+	int write_head;
+	int read_heads[LAG_TRACKER_NUM_READ_HEADS];
+} LagTracker;
+
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
 static void WalSndXLogSendHandler(SIGNAL_ARGS);
@@ -219,6 +244,8 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz time);
+static int64 LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
@@ -244,6 +271,9 @@ InitWalSender(void)
 	 */
 	MarkPostmasterChildWalSender();
 	SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
+
+	/* Initialize empty timestamp buffer for lag tracking. */
+	memset(&LagTracker, 0, sizeof(LagTracker));
 }
 
 /*
@@ -1489,6 +1519,10 @@ ProcessStandbyReplyMessage(void)
 				flushPtr,
 				applyPtr;
 	bool		replyRequested;
+	uint64		writeLag,
+				flushLag,
+				applyLag;
+	TimestampTz now;
 
 	/* the caller already consumed the msgtype byte */
 	writePtr = pq_getmsgint64(&reply_message);
@@ -1503,6 +1537,12 @@ ProcessStandbyReplyMessage(void)
 		 (uint32) (applyPtr >> 32), (uint32) applyPtr,
 		 replyRequested ? " (reply requested)" : "");
 
+	/* See if we can compute the round-trip lag for these positions. */
+	now = GetCurrentTimestamp();
+	writeLag = LagTrackerRead(LAG_TRACKER_WRITE_HEAD, writePtr, now);
+	flushLag = LagTrackerRead(LAG_TRACKER_FLUSH_HEAD, flushPtr, now);
+	applyLag = LagTrackerRead(LAG_TRACKER_APPLY_HEAD, applyPtr, now);
+
 	/* Send a reply if the standby requested one. */
 	if (replyRequested)
 		WalSndKeepalive(false);
@@ -1518,6 +1558,12 @@ ProcessStandbyReplyMessage(void)
 		walsnd->write = writePtr;
 		walsnd->flush = flushPtr;
 		walsnd->apply = applyPtr;
+		if (writeLag != -1)
+			walsnd->writeLag = writeLag;
+		if (flushLag != -1)
+			walsnd->flushLag = flushLag;
+		if (applyLag != -1)
+			walsnd->applyLag = applyLag;
 		SpinLockRelease(&walsnd->mutex);
 	}
 
@@ -1914,6 +1960,9 @@ InitWalSenderSlot(void)
 			walsnd->write = InvalidXLogRecPtr;
 			walsnd->flush = InvalidXLogRecPtr;
 			walsnd->apply = InvalidXLogRecPtr;
+			walsnd->writeLag = -1;
+			walsnd->flushLag = -1;
+			walsnd->applyLag = -1;
 			walsnd->state = WALSNDSTATE_STARTUP;
 			walsnd->latch = &MyProc->procLatch;
 			SpinLockRelease(&walsnd->mutex);
@@ -2239,6 +2288,13 @@ XLogSendPhysical(void)
 	}
 
 	/*
+	 * Record the current system time as an approximation of the time at which
+	 * this WAL position was written for the purposes of lag tracking if it
+	 * has moved forwards.
+	 */
+	LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
+
+	/*
 	 * If this is a historic timeline and we've reached the point where we
 	 * forked to the next timeline, stop streaming.
 	 *
@@ -2688,6 +2744,21 @@ WalSndGetStateString(WalSndState state)
 	return "UNKNOWN";
 }
 
+static Interval *
+usecs_to_interval(uint64 usecs)
+{
+	Interval *result = palloc(sizeof(Interval));
+
+	result->month = 0;
+	result->day = 0;
+#ifdef HAVE_INT64_TIMESTAMP
+	result->time = usecs;
+#else
+	result->time = usecs / 1000000.0;
+#endif
+
+	return result;
+}
 
 /*
  * Returns activity of walsenders, including pids and xlog locations sent to
@@ -2696,7 +2767,7 @@ WalSndGetStateString(WalSndState state)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS	8
+#define PG_STAT_GET_WAL_SENDERS_COLS	11
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -2744,6 +2815,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		XLogRecPtr	write;
 		XLogRecPtr	flush;
 		XLogRecPtr	apply;
+		int64		writeLag;
+		int64		flushLag;
+		int64		applyLag;
 		int			priority;
 		WalSndState state;
 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -2758,6 +2832,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 		write = walsnd->write;
 		flush = walsnd->flush;
 		apply = walsnd->apply;
+		writeLag = walsnd->writeLag;
+		flushLag = walsnd->flushLag;
+		applyLag = walsnd->applyLag;
 		priority = walsnd->sync_standby_priority;
 		SpinLockRelease(&walsnd->mutex);
 
@@ -2799,7 +2876,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 			 */
 			priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
 
-			values[6] = Int32GetDatum(priority);
+			if (writeLag < 0)
+				nulls[6] = true;
+			else
+				values[6] = IntervalPGetDatum(usecs_to_interval(writeLag));
+
+			if (flushLag < 0)
+				nulls[7] = true;
+			else
+				values[7] = IntervalPGetDatum(usecs_to_interval(flushLag));
+
+			if (applyLag < 0)
+				nulls[8] = true;
+			else
+				values[8] = IntervalPGetDatum(usecs_to_interval(applyLag));
+
+			values[9] = Int32GetDatum(priority);
 
 			/*
 			 * More easily understood version of standby state. This is purely
@@ -2813,12 +2905,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 			 * states. We report just "quorum" for them.
 			 */
 			if (priority == 0)
-				values[7] = CStringGetTextDatum("async");
+				values[10] = CStringGetTextDatum("async");
 			else if (list_member_int(sync_standbys, i))
-				values[7] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
+				values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
 					CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
 			else
-				values[7] = CStringGetTextDatum("potential");
+				values[10] = CStringGetTextDatum("potential");
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -2886,3 +2978,85 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
 			WalSndShutdown();
 	}
 }
+
+/*
+ * Record the end of the WAL and the current time, so that we can compute the
+ * lag when this WAL position is eventually reported by the standby.
+ */
+static void
+LagTrackerWrite(XLogRecPtr lsn, TimestampTz time)
+{
+	bool buffer_full;
+	int new_write_head;
+	int i;
+
+	/*
+	 * If the lsn hasn't advanced since last time, then do nothing.  This way
+	 * we only record a new sample when new WAL has been written, which is
+	 * simple proxy for the time at which the log was written.
+	 */
+	if (LagTracker.last_lsn == lsn)
+		return;
+	LagTracker.last_lsn = lsn;
+
+	/*
+	 * If advancing the write head of the circular buffer would crash into any
+	 * of the read heads, then the buffer is full.  In other words, the
+	 * slowest reader (presumably apply) is the one that controls the release
+	 * of space.
+	 */
+	new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
+	buffer_full = false;
+	for (i = 0; i < LAG_TRACKER_NUM_READ_HEADS; ++i)
+	{
+		if (new_write_head == LagTracker.read_heads[i])
+			buffer_full = true;
+	}
+
+	/*
+	 * If the buffer if full, for now we just rewind by one slot and overwrite
+	 * the last sample, as a simple if somewhat uneven way to lower the
+	 * sampling rate.  There may be better adaptive compaction algorithms.
+	 */
+	if (buffer_full)
+	{
+		new_write_head = LagTracker.write_head;
+		LagTracker.write_head =
+			(LagTracker.write_head - 1) % LAG_TRACKER_BUFFER_SIZE;
+	}
+
+	/* Store a sample at the current write head position. */
+	LagTracker.buffer[LagTracker.write_head].lsn = lsn;
+	LagTracker.buffer[LagTracker.write_head].time = time;
+	LagTracker.write_head = new_write_head;
+}
+
+/*
+ * Find out how much time has elapsed since WAL position 'lsn' or earlier was
+ * written to the lag tracking buffer and 'now'.  Return -1 if no time is
+ * available, and otherwise the elapsed time in microseconds.
+ */
+static int64
+LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
+{
+	TimestampTz time = 0;
+	int64 result;
+
+	/* Read all unread samples up to this LSN or end of buffer. */
+	while (LagTracker.read_heads[head] != LagTracker.write_head &&
+		   LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
+	{
+		time = LagTracker.buffer[LagTracker.read_heads[head]].time;
+		LagTracker.read_heads[head] =
+			(LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
+	}
+
+	/* If the clock somehow went backwards, treat as not found. */
+	if (time == 0 || time > now)
+		result = -1;
+	else
+		result = TimestampTzToIntegerTimestamp(now) -
+			TimestampTzToIntegerTimestamp(time);
+
+	return result;
+}
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index 9b4c012..664b584 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -1777,6 +1777,20 @@ GetSQLLocalTimestamp(int32 typmod)
 }
 
 /*
+ * TimestampTzToIntegerTimestamp -- convert a native timestamp to int64 format
+ *
+ * When compiled with --enable-integer-datetimes, this is implemented as a
+ * no-op macro.
+ */
+#ifndef HAVE_INT64_TIMESTAMP
+int64
+TimestampTzToIntegerTimestamp(TimestampTz timestamp)
+{
+	return timestamp * 1000000;
+}
+#endif
+
+/*
  * TimestampDifference -- convert the difference between two timestamps
  *		into integer seconds and microseconds
  *
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 41c12af..246336f 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2772,7 +2772,7 @@ DATA(insert OID = 2022 (  pg_stat_get_activity			PGNSP PGUID 12 1 100 0 0 f f f
 DESCR("statistics: information about currently active backends");
 DATA(insert OID = 3318 (  pg_stat_get_progress_info			  PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ ));
 DESCR("statistics: information about progress of backends running maintenance command");
-DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 (  pg_stat_get_wal_senders	PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,write_lag,flush_lag,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active replication");
 DATA(insert OID = 3317 (  pg_stat_get_wal_receiver	PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
 DESCR("statistics: information about WAL receiver");
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 5e6ccfc..3ec7dfb 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -47,6 +47,11 @@ typedef struct WalSnd
 	XLogRecPtr	flush;
 	XLogRecPtr	apply;
 
+	/* Lag times in microseconds. */
+	int64		writeLag;
+	int64		flushLag;
+	int64		applyLag;
+
 	/* Protects shared variables shown above. */
 	slock_t		mutex;
 
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 21651b1..765fa81 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -108,9 +108,11 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time,
 #ifndef HAVE_INT64_TIMESTAMP
 extern int64 GetCurrentIntegerTimestamp(void);
 extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp);
+extern int64 TimestampTzToIntegerTimestamp(TimestampTz timestamp);
 #else
 #define GetCurrentIntegerTimestamp()	GetCurrentTimestamp()
 #define IntegerTimestampToTimestampTz(timestamp) (timestamp)
+#define TimestampTzToIntegerTimestamp(timestamp) (timestamp)
 #endif
 
 extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
