On Mon, Apr 12, 2021 at 4:46 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Sat, Apr 10, 2021 at 6:51 PM vignesh C <vignes...@gmail.com> wrote:
> >
>
> Thanks, 0001 and 0002 look good to me. I have a minor comment for 0002.
>
> <entry role="catalog_table_entry"><para role="column_definition">
> +        <structfield>total_bytes</structfield><type>bigint</type>
> +       </para>
> +       <para>
> +        Amount of decoded transactions data sent to the decoding output 
> plugin
> +        while decoding the changes from WAL for this slot. This can be used 
> to
> +        gauge the total amount of data sent during logical decoding.
>
> Can we slightly extend it to say something like: Note that this
> includes the bytes streamed and or spilled. Similarly, we can extend
> it for total_txns.
>

Thanks for the comments, the comments are fixed in the v8 patch attached.
Thoughts?

Regards,
Vignesh
From b3288d575d9b8c26381fa8da773960d16ae2a1f3 Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Sat, 10 Apr 2021 08:14:52 +0530
Subject: [PATCH v8 1/5] Changed char datatype to NameData datatype for
 slotname.

Changed char datatype to NameData datatype for slotname.
---
 src/backend/postmaster/pgstat.c           | 32 +++++++++++------------
 src/backend/replication/logical/logical.c | 13 ++++++---
 src/backend/replication/slot.c            |  7 ++++-
 src/backend/utils/adt/pgstatfuncs.c       |  2 +-
 src/include/pgstat.h                      | 11 +++-----
 5 files changed, 36 insertions(+), 29 deletions(-)

diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index f4467625f7..666ce95d08 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -64,6 +64,7 @@
 #include "storage/pg_shmem.h"
 #include "storage/proc.h"
 #include "storage/procsignal.h"
+#include "utils/builtins.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -1539,7 +1540,7 @@ pgstat_reset_replslot_counter(const char *name)
 		if (SlotIsPhysical(slot))
 			return;
 
-		strlcpy(msg.m_slotname, name, NAMEDATALEN);
+		namestrcpy(&msg.m_slotname, name);
 		msg.clearall = false;
 	}
 	else
@@ -1812,10 +1813,7 @@ pgstat_report_tempfile(size_t filesize)
  * ----------
  */
 void
-pgstat_report_replslot(const char *slotname, PgStat_Counter spilltxns,
-					   PgStat_Counter spillcount, PgStat_Counter spillbytes,
-					   PgStat_Counter streamtxns, PgStat_Counter streamcount,
-					   PgStat_Counter streambytes)
+pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
 {
 	PgStat_MsgReplSlot msg;
 
@@ -1823,14 +1821,14 @@ pgstat_report_replslot(const char *slotname, PgStat_Counter spilltxns,
 	 * Prepare and send the message
 	 */
 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
-	strlcpy(msg.m_slotname, slotname, NAMEDATALEN);
+	namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname));
 	msg.m_drop = false;
-	msg.m_spill_txns = spilltxns;
-	msg.m_spill_count = spillcount;
-	msg.m_spill_bytes = spillbytes;
-	msg.m_stream_txns = streamtxns;
-	msg.m_stream_count = streamcount;
-	msg.m_stream_bytes = streambytes;
+	msg.m_spill_txns = repSlotStat->spill_txns;
+	msg.m_spill_count = repSlotStat->spill_count;
+	msg.m_spill_bytes = repSlotStat->spill_bytes;
+	msg.m_stream_txns = repSlotStat->stream_txns;
+	msg.m_stream_count = repSlotStat->stream_count;
+	msg.m_stream_bytes = repSlotStat->stream_bytes;
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
@@ -1846,7 +1844,7 @@ pgstat_report_replslot_drop(const char *slotname)
 	PgStat_MsgReplSlot msg;
 
 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
-	strlcpy(msg.m_slotname, slotname, NAMEDATALEN);
+	namestrcpy(&msg.m_slotname, slotname);
 	msg.m_drop = true;
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
@@ -5202,7 +5200,7 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
 	else
 	{
 		/* Get the index of replication slot statistics to reset */
-		idx = pgstat_replslot_index(msg->m_slotname, false);
+		idx = pgstat_replslot_index(NameStr(msg->m_slotname), false);
 
 		/*
 		 * Nothing to do if the given slot entry is not found.  This could
@@ -5538,7 +5536,7 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
 	 * Get the index of replication slot statistics.  On dropping, we don't
 	 * create the new statistics.
 	 */
-	idx = pgstat_replslot_index(msg->m_slotname, !msg->m_drop);
+	idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop);
 
 	/*
 	 * The slot entry is not found or there is no space to accommodate the new
@@ -5763,7 +5761,7 @@ pgstat_replslot_index(const char *name, bool create_it)
 	Assert(nReplSlotStats <= max_replication_slots);
 	for (i = 0; i < nReplSlotStats; i++)
 	{
-		if (strcmp(replSlotStats[i].slotname, name) == 0)
+		if (namestrcmp(&replSlotStats[i].slotname, name) == 0)
 			return i;			/* found */
 	}
 
@@ -5776,7 +5774,7 @@ pgstat_replslot_index(const char *name, bool create_it)
 
 	/* Register new slot */
 	memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
-	strlcpy(replSlotStats[nReplSlotStats].slotname, name, NAMEDATALEN);
+	namestrcpy(&replSlotStats[nReplSlotStats].slotname, name);
 
 	return nReplSlotStats++;
 }
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 4f6e87f18d..68e210ce12 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1773,6 +1773,7 @@ void
 UpdateDecodingStats(LogicalDecodingContext *ctx)
 {
 	ReorderBuffer *rb = ctx->reorder;
+	PgStat_ReplSlotStats repSlotStat;
 
 	/*
 	 * Nothing to do if we haven't spilled or streamed anything since the last
@@ -1790,9 +1791,15 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 		 (long long) rb->streamCount,
 		 (long long) rb->streamBytes);
 
-	pgstat_report_replslot(NameStr(ctx->slot->data.name),
-						   rb->spillTxns, rb->spillCount, rb->spillBytes,
-						   rb->streamTxns, rb->streamCount, rb->streamBytes);
+	namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
+	repSlotStat.spill_txns = rb->spillTxns;
+	repSlotStat.spill_count = rb->spillCount;
+	repSlotStat.spill_bytes = rb->spillBytes;
+	repSlotStat.stream_txns = rb->streamTxns;
+	repSlotStat.stream_count = rb->streamCount;
+	repSlotStat.stream_bytes = rb->streamBytes;
+
+	pgstat_report_replslot(&repSlotStat);
 	rb->spillTxns = 0;
 	rb->spillCount = 0;
 	rb->spillBytes = 0;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 75a087c2f9..f61b163f78 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -328,7 +328,12 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	 * ReplicationSlotAllocationLock.
 	 */
 	if (SlotIsLogical(slot))
-		pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
+	{
+		PgStat_ReplSlotStats repSlotStat;
+		MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotStats));
+		namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name));
+		pgstat_report_replslot(&repSlotStat);
+	}
 
 	/*
 	 * Now that the slot has been marked as in_use and active, it's safe to
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 182b15e3f2..521ba73614 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2328,7 +2328,7 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 		MemSet(values, 0, sizeof(values));
 		MemSet(nulls, 0, sizeof(nulls));
 
-		values[0] = PointerGetDatum(cstring_to_text(s->slotname));
+		values[0] = CStringGetTextDatum(NameStr(s->slotname));
 		values[1] = Int64GetDatum(s->spill_txns);
 		values[2] = Int64GetDatum(s->spill_count);
 		values[3] = Int64GetDatum(s->spill_bytes);
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 9a87e7cd88..8e11215058 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -393,7 +393,7 @@ typedef struct PgStat_MsgResetslrucounter
 typedef struct PgStat_MsgResetreplslotcounter
 {
 	PgStat_MsgHdr m_hdr;
-	char		m_slotname[NAMEDATALEN];
+	NameData      m_slotname;
 	bool		clearall;
 } PgStat_MsgResetreplslotcounter;
 
@@ -540,7 +540,7 @@ typedef struct PgStat_MsgSLRU
 typedef struct PgStat_MsgReplSlot
 {
 	PgStat_MsgHdr m_hdr;
-	char		m_slotname[NAMEDATALEN];
+	NameData	m_slotname;
 	bool		m_drop;
 	PgStat_Counter m_spill_txns;
 	PgStat_Counter m_spill_count;
@@ -917,7 +917,7 @@ typedef struct PgStat_SLRUStats
  */
 typedef struct PgStat_ReplSlotStats
 {
-	char		slotname[NAMEDATALEN];
+	NameData	slotname;
 	PgStat_Counter spill_txns;
 	PgStat_Counter spill_count;
 	PgStat_Counter spill_bytes;
@@ -1027,10 +1027,7 @@ extern void pgstat_report_recovery_conflict(int reason);
 extern void pgstat_report_deadlock(void);
 extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
 extern void pgstat_report_checksum_failure(void);
-extern void pgstat_report_replslot(const char *slotname, PgStat_Counter spilltxns,
-								   PgStat_Counter spillcount, PgStat_Counter spillbytes,
-								   PgStat_Counter streamtxns, PgStat_Counter streamcount,
-								   PgStat_Counter streambytes);
+extern void pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat);
 extern void pgstat_report_replslot_drop(const char *slotname);
 
 extern void pgstat_initialize(void);
-- 
2.25.1

From 1b7771c1c6d53fd76b59dd3ff7687c96d1086598 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.m...@gmail.com>
Date: Mon, 12 Apr 2021 10:31:52 +0900
Subject: [PATCH v8 2/5] Use HTAB for replication slot statistics.

Previously, we used to use the array to store stats for repilcation
slots. But this had two problems in case where drop-slot-stats message
is lost: 1) the stats for the new slot are not recorded and 2) writing
beyond the end of the array when after restring the number of slots
whose stats are stored in the stats file exceeds
max_replication_slots.

This commit changes to use HTAB for replication slot statistics,
resolving both problems. Instead, we have pgstat_vacuum_stat() checks
if a slot for stats entry in the stats collector still exists or not.
Then send drop-slot-stats message.
---
 src/backend/catalog/system_views.sql      |  26 ++-
 src/backend/postmaster/pgstat.c           | 261 +++++++++++-----------
 src/backend/replication/logical/logical.c |   2 +-
 src/backend/replication/slot.c            |  23 +-
 src/backend/utils/adt/pgstatfuncs.c       | 127 ++++++-----
 src/include/catalog/pg_proc.dat           |  14 +-
 src/include/pgstat.h                      |   8 +-
 src/include/replication/slot.h            |   2 +-
 src/test/regress/expected/rules.out       |   4 +-
 9 files changed, 247 insertions(+), 220 deletions(-)

diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 451db2ee0a..1bceb25571 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -866,18 +866,6 @@ CREATE VIEW pg_stat_replication AS
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
 
-CREATE VIEW pg_stat_replication_slots AS
-    SELECT
-            s.slot_name,
-            s.spill_txns,
-            s.spill_count,
-            s.spill_bytes,
-            s.stream_txns,
-            s.stream_count,
-            s.stream_bytes,
-            s.stats_reset
-    FROM pg_stat_get_replication_slots() AS s;
-
 CREATE VIEW pg_stat_slru AS
     SELECT
             s.name,
@@ -982,6 +970,20 @@ CREATE VIEW pg_replication_slots AS
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
+CREATE VIEW pg_stat_replication_slots AS
+    SELECT
+            s.slot_name,
+            s.spill_txns,
+            s.spill_count,
+            s.spill_bytes,
+            s.stream_txns,
+            s.stream_count,
+            s.stream_bytes,
+            s.stats_reset
+    FROM pg_replication_slots as r,
+        LATERAL pg_stat_get_replication_slot(slot_name) as s
+    WHERE r.datoid IS NOT NULL; -- excluding physical slots
+
 CREATE VIEW pg_stat_database AS
     SELECT
             D.oid AS datid,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 666ce95d08..142567e654 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -106,6 +106,7 @@
 #define PGSTAT_DB_HASH_SIZE		16
 #define PGSTAT_TAB_HASH_SIZE	512
 #define PGSTAT_FUNCTION_HASH_SIZE	512
+#define PGSTAT_REPLSLOT_HASH_SIZE	32
 
 
 /* ----------
@@ -278,8 +279,7 @@ static PgStat_ArchiverStats archiverStats;
 static PgStat_GlobalStats globalStats;
 static PgStat_WalStats walStats;
 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
-static PgStat_ReplSlotStats *replSlotStats;
-static int	nReplSlotStats;
+static HTAB *replSlotStats = NULL;
 static PgStat_RecoveryPrefetchStats recoveryPrefetchStats;
 
 /*
@@ -319,8 +319,8 @@ static void backend_read_statsfile(void);
 static bool pgstat_write_statsfile_needed(void);
 static bool pgstat_db_requested(Oid databaseid);
 
-static int	pgstat_replslot_index(const char *name, bool create_it);
-static void pgstat_reset_replslot(int i, TimestampTz ts);
+static PgStat_ReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
+static void pgstat_reset_replslot(PgStat_ReplSlotEntry *slotstats, TimestampTz ts);
 
 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
 static void pgstat_send_funcstats(void);
@@ -1109,6 +1109,24 @@ pgstat_vacuum_stat(void)
 	/* Clean up */
 	hash_destroy(htab);
 
+	/*
+	 * Check for all replication slots in stats hash table. We do this check
+	 * when replSlotStats has more than max_replication_slots entries, i.g,
+	 * when there are stats for the already-dropped slot, to avoid frequent
+	 * call SearchNamedReplicationSlot() which acquires LWLock.
+	 */
+	if (replSlotStats && hash_get_num_entries(replSlotStats) > max_replication_slots)
+	{
+		PgStat_ReplSlotEntry *slotentry;
+
+		hash_seq_init(&hstat, replSlotStats);
+		while ((slotentry = (PgStat_ReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL)
+				pgstat_report_replslot_drop(NameStr(slotentry->slotname));
+		}
+	}
+
 	/*
 	 * Lookup our own database entry; if not found, nothing more to do.
 	 */
@@ -1516,30 +1534,6 @@ pgstat_reset_replslot_counter(const char *name)
 
 	if (name)
 	{
-		ReplicationSlot *slot;
-
-		/*
-		 * Check if the slot exists with the given name. It is possible that by
-		 * the time this message is executed the slot is dropped but at least
-		 * this check will ensure that the given name is for a valid slot.
-		 */
-		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-		slot = SearchNamedReplicationSlot(name);
-		LWLockRelease(ReplicationSlotControlLock);
-
-		if (!slot)
-			ereport(ERROR,
-					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-					 errmsg("replication slot \"%s\" does not exist",
-							name)));
-
-		/*
-		 * Nothing to do for physical slots as we collect stats only for
-		 * logical slots.
-		 */
-		if (SlotIsPhysical(slot))
-			return;
-
 		namestrcpy(&msg.m_slotname, name);
 		msg.clearall = false;
 	}
@@ -1813,7 +1807,7 @@ pgstat_report_tempfile(size_t filesize)
  * ----------
  */
 void
-pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
+pgstat_report_replslot(const PgStat_ReplSlotEntry *repSlotStat)
 {
 	PgStat_MsgReplSlot msg;
 
@@ -2870,17 +2864,19 @@ pgstat_fetch_slru(void)
  * pgstat_fetch_replslot() -
  *
  *	Support function for the SQL-callable pgstat* functions. Returns
- *	a pointer to the replication slot statistics struct and sets the
- *	number of entries in nslots_p.
+ *	a pointer to the replication slot statistics struct.
  * ---------
  */
-PgStat_ReplSlotStats *
-pgstat_fetch_replslot(int *nslots_p)
+PgStat_ReplSlotEntry *
+pgstat_fetch_replslot(NameData slotname)
 {
+	PgStat_ReplSlotEntry *slotent = NULL;
+
 	backend_read_statsfile();
 
-	*nslots_p = nReplSlotStats;
-	return replSlotStats;
+	slotent = pgstat_get_replslot_entry(slotname, false);
+
+	return slotent;
 }
 
 /*
@@ -3652,7 +3648,6 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
 	int			rc;
-	int			i;
 
 	elog(DEBUG2, "writing stats file \"%s\"", statfile);
 
@@ -3742,11 +3737,17 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	/*
 	 * Write replication slot stats struct
 	 */
-	for (i = 0; i < nReplSlotStats; i++)
+	if (replSlotStats)
 	{
-		fputc('R', fpout);
-		rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
-		(void) rc;				/* we'll check for error with ferror */
+		PgStat_ReplSlotEntry *slotentry;
+
+		hash_seq_init(&hstat, replSlotStats);
+		while ((slotentry = (PgStat_ReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			fputc('R', fpout);
+			rc = fwrite(slotentry, sizeof(PgStat_ReplSlotEntry), 1, fpout);
+			(void) rc;				/* we'll check for error with ferror */
+		}
 	}
 
 	/*
@@ -3973,12 +3974,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 	dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
 						 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
-	/* Allocate the space for replication slot statistics */
-	replSlotStats = MemoryContextAllocZero(pgStatLocalContext,
-										   max_replication_slots
-										   * sizeof(PgStat_ReplSlotStats));
-	nReplSlotStats = 0;
-
 	/*
 	 * Clear out global, archiver, WAL and SLRU statistics so they start from
 	 * zero in case we can't load an existing statsfile.
@@ -4003,12 +3998,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 	for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
 		slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
 
-	/*
-	 * Set the same reset timestamp for all replication slots too.
-	 */
-	for (i = 0; i < max_replication_slots; i++)
-		replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
-
 	/*
 	 * Try to open the stats file. If it doesn't exist, the backends simply
 	 * return zero for anything and the collector simply starts from scratch
@@ -4195,21 +4184,27 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 				break;
 
 				/*
-				 * 'R'	A PgStat_ReplSlotStats struct describing a replication
+				 * 'R'	A PgStat_ReplSlotEntry struct describing a replication
 				 * slot follows.
 				 */
 			case 'R':
-				if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
-					!= sizeof(PgStat_ReplSlotStats))
+			{
+				PgStat_ReplSlotEntry slotstats;
+				PgStat_ReplSlotEntry *slotent;
+
+				if (fread(&slotstats, 1, sizeof(PgStat_ReplSlotEntry), fpin)
+					!= sizeof(PgStat_ReplSlotEntry))
 				{
 					ereport(pgStatRunningInCollector ? LOG : WARNING,
 							(errmsg("corrupted statistics file \"%s\"",
 									statfile)));
-					memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
 					goto done;
 				}
-				nReplSlotStats++;
+
+				slotent = pgstat_get_replslot_entry(slotstats.slotname, true);
+				memcpy(slotent, &slotstats, sizeof(PgStat_ReplSlotEntry));
 				break;
+			}
 
 			case 'E':
 				goto done;
@@ -4422,7 +4417,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 	PgStat_ArchiverStats myArchiverStats;
 	PgStat_WalStats myWalStats;
 	PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
-	PgStat_ReplSlotStats myReplSlotStats;
+	PgStat_ReplSlotEntry myReplSlotStats;
 	PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats;
 	FILE	   *fpin;
 	int32		format_id;
@@ -4551,12 +4546,12 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 				break;
 
 				/*
-				 * 'R'	A PgStat_ReplSlotStats struct describing a replication
+				 * 'R'	A PgStat_ReplSlotEntry struct describing a replication
 				 * slot follows.
 				 */
 			case 'R':
-				if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
-					!= sizeof(PgStat_ReplSlotStats))
+				if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotEntry), fpin)
+					!= sizeof(PgStat_ReplSlotEntry))
 				{
 					ereport(pgStatRunningInCollector ? LOG : WARNING,
 							(errmsg("corrupted statistics file \"%s\"",
@@ -4763,7 +4758,6 @@ pgstat_clear_snapshot(void)
 	pgStatLocalContext = NULL;
 	pgStatDBHash = NULL;
 	replSlotStats = NULL;
-	nReplSlotStats = 0;
 
 	/*
 	 * Historically the backend_status.c facilities lived in this file, and
@@ -5187,20 +5181,26 @@ static void
 pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
 								 int len)
 {
-	int			i;
-	int			idx = -1;
+	PgStat_ReplSlotEntry *slotent;
 	TimestampTz ts;
 
+	/* Return if we don't have replication slot statistics */
+	if (replSlotStats == NULL)
+		return;
+
 	ts = GetCurrentTimestamp();
 	if (msg->clearall)
 	{
-		for (i = 0; i < nReplSlotStats; i++)
-			pgstat_reset_replslot(i, ts);
+		HASH_SEQ_STATUS sstat;
+
+		hash_seq_init(&sstat, replSlotStats);
+		while ((slotent = (PgStat_ReplSlotEntry *) hash_seq_search(&sstat)) != NULL)
+			pgstat_reset_replslot(slotent, ts);
 	}
 	else
 	{
 		/* Get the index of replication slot statistics to reset */
-		idx = pgstat_replslot_index(NameStr(msg->m_slotname), false);
+		slotent = pgstat_get_replslot_entry(msg->m_slotname, false);
 
 		/*
 		 * Nothing to do if the given slot entry is not found.  This could
@@ -5208,11 +5208,11 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
 		 * corresponding statistics entry is also removed before receiving the
 		 * reset message.
 		 */
-		if (idx < 0)
+		if (!slotent)
 			return;
 
 		/* Reset the stats for the requested replication slot */
-		pgstat_reset_replslot(idx, ts);
+		pgstat_reset_replslot(slotent, ts);
 	}
 }
 
@@ -5530,44 +5530,27 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
 static void
 pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
 {
-	int			idx;
-
-	/*
-	 * Get the index of replication slot statistics.  On dropping, we don't
-	 * create the new statistics.
-	 */
-	idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop);
-
-	/*
-	 * The slot entry is not found or there is no space to accommodate the new
-	 * entry.  This could happen when the message for the creation of a slot
-	 * reached before the drop message even though the actual operations
-	 * happen in reverse order.  In such a case, the next update of the
-	 * statistics for the same slot will create the required entry.
-	 */
-	if (idx < 0)
-		return;
-
-	/* it must be a valid replication slot index */
-	Assert(idx < nReplSlotStats);
-
 	if (msg->m_drop)
 	{
 		/* Remove the replication slot statistics with the given name */
-		if (idx < nReplSlotStats - 1)
-			memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
-				   sizeof(PgStat_ReplSlotStats));
-		nReplSlotStats--;
+		if (replSlotStats != NULL)
+			(void) hash_search(replSlotStats, (void *) NameStr(msg->m_slotname),
+							   HASH_REMOVE, NULL);
 	}
 	else
 	{
+		PgStat_ReplSlotEntry *slotent;
+
+		slotent = pgstat_get_replslot_entry(msg->m_slotname, true);
+		Assert(slotent);
+
 		/* Update the replication slot statistics */
-		replSlotStats[idx].spill_txns += msg->m_spill_txns;
-		replSlotStats[idx].spill_count += msg->m_spill_count;
-		replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
-		replSlotStats[idx].stream_txns += msg->m_stream_txns;
-		replSlotStats[idx].stream_count += msg->m_stream_count;
-		replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
+		slotent->spill_txns += msg->m_spill_txns;
+		slotent->spill_count += msg->m_spill_count;
+		slotent->spill_bytes += msg->m_spill_bytes;
+		slotent->stream_txns += msg->m_stream_txns;
+		slotent->stream_count += msg->m_stream_count;
+		slotent->stream_bytes += msg->m_stream_bytes;
 	}
 }
 
@@ -5745,57 +5728,77 @@ pgstat_db_requested(Oid databaseid)
 }
 
 /* ----------
- * pgstat_replslot_index
+ * pgstat_replslot_entry
  *
- * Return the index of entry of a replication slot with the given name, or
- * -1 if the slot is not found.
+ * Return the entry of replication slot stats with the given name. Return
+ * NULL if not found and the caller didn't request to create it.
  *
  * create_it tells whether to create the new slot entry if it is not found.
  * ----------
  */
-static int
-pgstat_replslot_index(const char *name, bool create_it)
+static PgStat_ReplSlotEntry *
+pgstat_get_replslot_entry(NameData name, bool create_it)
 {
-	int			i;
+	PgStat_ReplSlotEntry *slotent;
+	bool	found;
 
-	Assert(nReplSlotStats <= max_replication_slots);
-	for (i = 0; i < nReplSlotStats; i++)
+	/*
+	 * Create the replication slot stats hash table if we don't have
+	 * it already.
+	 */
+	if (replSlotStats == NULL)
 	{
-		if (namestrcmp(&replSlotStats[i].slotname, name) == 0)
-			return i;			/* found */
+		HASHCTL		hash_ctl;
+
+		hash_ctl.keysize = sizeof(NameData);
+		hash_ctl.entrysize = sizeof(PgStat_ReplSlotEntry);
+		hash_ctl.hcxt = pgStatLocalContext;
+
+		replSlotStats = hash_create("Replication slots hash",
+									PGSTAT_REPLSLOT_HASH_SIZE,
+									&hash_ctl,
+									HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 	}
 
-	/*
-	 * The slot is not found.  We don't want to register the new statistics if
-	 * the list is already full or the caller didn't request.
-	 */
-	if (i == max_replication_slots || !create_it)
-		return -1;
+	slotent = (PgStat_ReplSlotEntry *) hash_search(replSlotStats,
+												   (void *) &name,
+												   create_it ? HASH_ENTER : HASH_FIND,
+												   &found);
 
-	/* Register new slot */
-	memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
-	namestrcpy(&replSlotStats[nReplSlotStats].slotname, name);
+	if (!slotent)
+	{
+		/* not found */
+		Assert(!create_it && !found);
+		return NULL;
+	}
+
+	/* initialize the entry */
+	if (create_it && !found)
+	{
+		memset(slotent, 0, sizeof(PgStat_ReplSlotEntry));
+		namestrcpy(&(slotent->slotname), NameStr(name));
+	}
 
-	return nReplSlotStats++;
+	return slotent;
 }
 
 /* ----------
  * pgstat_reset_replslot
  *
- * Reset the replication slot stats at index 'i'.
+ * Reset the given replication slot stats.
  * ----------
  */
 static void
-pgstat_reset_replslot(int i, TimestampTz ts)
+pgstat_reset_replslot(PgStat_ReplSlotEntry *slotent, TimestampTz ts)
 {
 	/* reset only counters. Don't clear slot name */
-	replSlotStats[i].spill_txns = 0;
-	replSlotStats[i].spill_count = 0;
-	replSlotStats[i].spill_bytes = 0;
-	replSlotStats[i].stream_txns = 0;
-	replSlotStats[i].stream_count = 0;
-	replSlotStats[i].stream_bytes = 0;
-	replSlotStats[i].stat_reset_timestamp = ts;
+	slotent->spill_txns = 0;
+	slotent->spill_count = 0;
+	slotent->spill_bytes = 0;
+	slotent->stream_txns = 0;
+	slotent->stream_count = 0;
+	slotent->stream_bytes = 0;
+	slotent->stat_reset_timestamp = ts;
 }
 
 /*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 68e210ce12..d66846faab 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1773,7 +1773,7 @@ void
 UpdateDecodingStats(LogicalDecodingContext *ctx)
 {
 	ReorderBuffer *rb = ctx->reorder;
-	PgStat_ReplSlotStats repSlotStat;
+	PgStat_ReplSlotEntry repSlotStat;
 
 	/*
 	 * Nothing to do if we haven't spilled or streamed anything since the last
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index f61b163f78..f75e7e95f9 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -329,8 +329,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	 */
 	if (SlotIsLogical(slot))
 	{
-		PgStat_ReplSlotStats repSlotStat;
-		MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotStats));
+		PgStat_ReplSlotEntry repSlotStat;
+		MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotEntry));
 		namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name));
 		pgstat_report_replslot(&repSlotStat);
 	}
@@ -349,17 +349,15 @@ ReplicationSlotCreate(const char *name, bool db_specific,
  * Search for the named replication slot.
  *
  * Return the replication slot if found, otherwise NULL.
- *
- * The caller must hold ReplicationSlotControlLock in shared mode.
  */
 ReplicationSlot *
-SearchNamedReplicationSlot(const char *name)
+SearchNamedReplicationSlot(const char *name, bool need_lock)
 {
 	int			i;
 	ReplicationSlot	*slot = NULL;
 
-	Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
-								LW_SHARED));
+	if (need_lock)
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -372,6 +370,9 @@ SearchNamedReplicationSlot(const char *name)
 		}
 	}
 
+	if (need_lock)
+		LWLockRelease(ReplicationSlotControlLock);
+
 	return slot;
 }
 
@@ -416,7 +417,7 @@ retry:
 	 * Search for the slot with the specified name if the slot to acquire is
 	 * not given. If the slot is not found, we either return -1 or error out.
 	 */
-	s = slot ? slot : SearchNamedReplicationSlot(name);
+	s = slot ? slot : SearchNamedReplicationSlot(name, false);
 	if (s == NULL || !s->in_use)
 	{
 		LWLockRelease(ReplicationSlotControlLock);
@@ -712,7 +713,11 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 	 * and create messages while holding ReplicationSlotAllocationLock to
 	 * reduce that possibility. If the messages reached in reverse, we would
 	 * lose one statistics update message. But the next update message will
-	 * create the statistics for the replication slot.
+	 * create the statistics for the replication slot. In case where the
+	 * message for dropping the old slot gets lost and a slot with the same is
+	 * created, the stats will be accumulated into the old slots since we
+	 * use the slot name as the key. In that case, user can reset the particular
+	 * stats by pg_stat_reset_replication_slot().
 	 */
 	if (SlotIsLogical(slot))
 		pgstat_report_replslot_drop(NameStr(slot->data.name));
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 521ba73614..569abce27e 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -24,6 +24,7 @@
 #include "pgstat.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
+#include "replication/slot.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/acl.h"
@@ -2207,8 +2208,32 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
 	char	   *target = NULL;
 
 	if (!PG_ARGISNULL(0))
+	{
+		ReplicationSlot *slot;
+
 		target = text_to_cstring(PG_GETARG_TEXT_PP(0));
 
+		/*
+		 * Check if the slot exists with the given name. It is possible that by
+		 * the time this message is executed the slot is dropped but at least
+		 * this check will ensure that the given name is for a valid slot.
+		 */
+		slot = SearchNamedReplicationSlot(target, true);
+
+		if (!slot)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("replication slot \"%s\" does not exist",
+							target)));
+
+		/*
+		 * Nothing to do for physical slots as we collect stats only for
+		 * logical slots.
+		 */
+		if (SlotIsPhysical(slot))
+			PG_RETURN_VOID();
+	}
+
 	pgstat_reset_replslot_counter(target);
 
 	PG_RETURN_VOID();
@@ -2280,71 +2305,61 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
 
-/* Get the statistics for the replication slots */
+/* Get the statistics for the replication slot */
 Datum
-pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
+pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 {
 #define PG_STAT_GET_REPLICATION_SLOT_COLS 8
-	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	text *slotname_text = PG_GETARG_TEXT_P(0);
+	NameData slotname;
 	TupleDesc	tupdesc;
-	Tuplestorestate *tupstore;
-	MemoryContext per_query_ctx;
-	MemoryContext oldcontext;
-	PgStat_ReplSlotStats *slotstats;
-	int			nstats;
-	int			i;
+	Datum		values[10];
+	bool		nulls[10];
+	PgStat_ReplSlotEntry *slotent;
 
-	/* check to see if caller supports us returning a tuplestore */
-	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("set-valued function called in context that cannot accept a set")));
-	if (!(rsinfo->allowedModes & SFRM_Materialize))
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("materialize mode required, but it is not allowed in this context")));
-
-	/* Build a tuple descriptor for our result type */
-	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
-		elog(ERROR, "return type must be a row type");
-
-	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
-	oldcontext = MemoryContextSwitchTo(per_query_ctx);
-
-	tupstore = tuplestore_begin_heap(true, false, work_mem);
-	rsinfo->returnMode = SFRM_Materialize;
-	rsinfo->setResult = tupstore;
-	rsinfo->setDesc = tupdesc;
-
-	MemoryContextSwitchTo(oldcontext);
-
-	slotstats = pgstat_fetch_replslot(&nstats);
-	for (i = 0; i < nstats; i++)
-	{
-		Datum		values[PG_STAT_GET_REPLICATION_SLOT_COLS];
-		bool		nulls[PG_STAT_GET_REPLICATION_SLOT_COLS];
-		PgStat_ReplSlotStats *s = &(slotstats[i]);
+	/* Initialise values and NULL flags arrays */
+	MemSet(values, 0, sizeof(values));
+	MemSet(nulls, 0, sizeof(nulls));
 
-		MemSet(values, 0, sizeof(values));
-		MemSet(nulls, 0, sizeof(nulls));
+	/* Initialise attributes information in the tuple descriptor */
+	tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_REPLICATION_SLOT_COLS);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "slot_name",
+					   TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "spill_txns",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "spill_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "spill_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "stream_txns",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "stream_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "stats_reset",
+					   TIMESTAMPTZOID, -1, 0);
+	BlessTupleDesc(tupdesc);
 
-		values[0] = CStringGetTextDatum(NameStr(s->slotname));
-		values[1] = Int64GetDatum(s->spill_txns);
-		values[2] = Int64GetDatum(s->spill_count);
-		values[3] = Int64GetDatum(s->spill_bytes);
-		values[4] = Int64GetDatum(s->stream_txns);
-		values[5] = Int64GetDatum(s->stream_count);
-		values[6] = Int64GetDatum(s->stream_bytes);
+	namestrcpy(&slotname, text_to_cstring(slotname_text));
+	slotent = pgstat_fetch_replslot(slotname);
 
-		if (s->stat_reset_timestamp == 0)
-			nulls[7] = true;
-		else
-			values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
+	if (!slotent)
+		PG_RETURN_NULL();
 
-		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
-	}
+	values[0] = CStringGetTextDatum(NameStr(slotent->slotname));
+	values[1] = Int64GetDatum(slotent->spill_txns);
+	values[2] = Int64GetDatum(slotent->spill_count);
+	values[3] = Int64GetDatum(slotent->spill_bytes);
+	values[4] = Int64GetDatum(slotent->stream_txns);
+	values[5] = Int64GetDatum(slotent->stream_count);
+	values[6] = Int64GetDatum(slotent->stream_bytes);
 
-	tuplestore_donestoring(tupstore);
+	if (slotent->stat_reset_timestamp == 0)
+		nulls[7] = true;
+	else
+		values[7] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
 
-	return (Datum) 0;
+	/* Returns the record as Datum */
+	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f4957653ae..6c9521603f 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5311,14 +5311,14 @@
   proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}',
   prosrc => 'pg_stat_get_wal_receiver' },
-{ oid => '8595', descr => 'statistics: information about replication slots',
-  proname => 'pg_stat_get_replication_slots', prorows => '10',
+{ oid => '8595', descr => 'statistics: information about replication slot',
+  proname => 'pg_stat_get_replication_slot', prorows => '1',
   proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
-  prorettype => 'record', proargtypes => '',
-  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
-  prosrc => 'pg_stat_get_replication_slots' },
+  prorettype => 'record', proargtypes => 'text',
+  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
+  prosrc => 'pg_stat_get_replication_slot' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 8e11215058..c2d192fc55 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -915,7 +915,7 @@ typedef struct PgStat_SLRUStats
 /*
  * Replication slot statistics kept in the stats collector
  */
-typedef struct PgStat_ReplSlotStats
+typedef struct PgStat_ReplSlotEntry
 {
 	NameData	slotname;
 	PgStat_Counter spill_txns;
@@ -925,7 +925,7 @@ typedef struct PgStat_ReplSlotStats
 	PgStat_Counter stream_count;
 	PgStat_Counter stream_bytes;
 	TimestampTz stat_reset_timestamp;
-} PgStat_ReplSlotStats;
+} PgStat_ReplSlotEntry;
 
 
 /*
@@ -1027,7 +1027,7 @@ extern void pgstat_report_recovery_conflict(int reason);
 extern void pgstat_report_deadlock(void);
 extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
 extern void pgstat_report_checksum_failure(void);
-extern void pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat);
+extern void pgstat_report_replslot(const PgStat_ReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_drop(const char *slotname);
 
 extern void pgstat_initialize(void);
@@ -1125,7 +1125,7 @@ extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
 extern PgStat_GlobalStats *pgstat_fetch_global(void);
 extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
 extern PgStat_SLRUStats *pgstat_fetch_slru(void);
-extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p);
+extern PgStat_ReplSlotEntry *pgstat_fetch_replslot(NameData slotname);
 extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void);
 
 extern void pgstat_count_slru_page_zeroed(int slru_idx);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 1ad5e6c50d..357068403a 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -223,7 +223,7 @@ extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
-extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);
 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 186e6c966c..eb741b9ff1 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2069,7 +2069,9 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.stream_count,
     s.stream_bytes,
     s.stats_reset
-   FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
+   FROM pg_replication_slots r,
+    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset)
+  WHERE (r.datoid IS NOT NULL);
 pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
     s.blks_hit,
-- 
2.25.1

From 4526498234141f19fd54bed7e4aa07ff783a139d Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Sat, 10 Apr 2021 08:57:05 +0530
Subject: [PATCH v8 3/5] Added total txns and total txn bytes to replication
 statistics.

This adds the statistics about total transactions count and total transaction
data logically replicated to the decoding output plugin from ReorderBuffer.
Users can query the pg_stat_replication_slots view to check these stats.
---
 contrib/test_decoding/expected/stats.out      | 79 +++++++++++++------
 contrib/test_decoding/sql/stats.sql           | 48 +++++++----
 doc/src/sgml/monitoring.sgml                  | 25 ++++++
 src/backend/catalog/system_views.sql          |  2 +
 src/backend/postmaster/pgstat.c               |  6 ++
 src/backend/replication/logical/logical.c     | 16 ++--
 .../replication/logical/reorderbuffer.c       | 12 +++
 src/backend/utils/adt/pgstatfuncs.c           | 38 +++++----
 src/include/catalog/pg_proc.dat               |  6 +-
 src/include/pgstat.h                          |  4 +
 src/include/replication/reorderbuffer.h       |  4 +
 src/test/regress/expected/rules.out           |  4 +-
 12 files changed, 182 insertions(+), 62 deletions(-)

diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index bca36fa903..bc8e601eab 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -8,7 +8,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
 
 CREATE TABLE stats_test(data text);
 -- function to wait for counters to advance
-CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
+CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
 DECLARE
   start_time timestamptz := clock_timestamp();
   updated bool;
@@ -16,12 +16,25 @@ BEGIN
   -- we don't want to wait forever; loop will exit after 30 seconds
   FOR i IN 1 .. 300 LOOP
 
-    -- check to see if all updates have been reset/updated
-    SELECT CASE WHEN check_reset THEN (spill_txns = 0)
-                ELSE (spill_txns > 0)
-           END
-    INTO updated
-    FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+    IF check_spill_txns THEN
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (spill_txns = 0)
+                  ELSE (spill_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    ELSE
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (total_txns = 0)
+                  ELSE (total_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    END IF;
 
     exit WHEN updated;
 
@@ -51,16 +64,16 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
 -- Check stats, wait for the stats collector to update. We can't test the
 -- exact stats count as that can vary if any background transaction (say by
 -- autovacuum) happens in parallel to the main transaction.
-SELECT wait_for_decode_stats(false);
+SELECT wait_for_decode_stats(false, true);
  wait_for_decode_stats 
 -----------------------
  
 (1 row)
 
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
-    slot_name    | spill_txns | spill_count 
------------------+------------+-------------
- regression_slot | t          | t
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot | t          | t           | t          | t
 (1 row)
 
 -- reset the slot stats, and wait for stats collector to reset
@@ -70,16 +83,16 @@ SELECT pg_stat_reset_replication_slot('regression_slot');
  
 (1 row)
 
-SELECT wait_for_decode_stats(true);
+SELECT wait_for_decode_stats(true, true);
  wait_for_decode_stats 
 -----------------------
  
 (1 row)
 
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
-    slot_name    | spill_txns | spill_count 
------------------+------------+-------------
- regression_slot |          0 |           0
+SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot |          0 |           0 |          0 |           0
 (1 row)
 
 -- decode and check stats again.
@@ -89,16 +102,36 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
   5002
 (1 row)
 
-SELECT wait_for_decode_stats(false);
+SELECT wait_for_decode_stats(false, true);
+ wait_for_decode_stats 
+-----------------------
+ 
+(1 row)
+
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot | t          | t           | t          | t
+(1 row)
+
+SELECT pg_stat_reset_replication_slot('regression_slot');
+ pg_stat_reset_replication_slot 
+--------------------------------
+ 
+(1 row)
+
+-- non-spilled xact
+INSERT INTO stats_test values(generate_series(1, 10));
+SELECT wait_for_decode_stats(false, false);
  wait_for_decode_stats 
 -----------------------
  
 (1 row)
 
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
-    slot_name    | spill_txns | spill_count 
------------------+------------+-------------
- regression_slot | t          | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot | f          | f           | t          | t
 (1 row)
 
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
@@ -117,7 +150,7 @@ SELECT slot_name FROM pg_stat_replication_slots;
 (1 row)
 
 COMMIT;
-DROP FUNCTION wait_for_decode_stats(bool);
+DROP FUNCTION wait_for_decode_stats(bool, bool);
 DROP TABLE stats_test;
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index 51294e48e8..8c34aeced1 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -6,7 +6,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
 CREATE TABLE stats_test(data text);
 
 -- function to wait for counters to advance
-CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
+CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
 DECLARE
   start_time timestamptz := clock_timestamp();
   updated bool;
@@ -14,12 +14,25 @@ BEGIN
   -- we don't want to wait forever; loop will exit after 30 seconds
   FOR i IN 1 .. 300 LOOP
 
-    -- check to see if all updates have been reset/updated
-    SELECT CASE WHEN check_reset THEN (spill_txns = 0)
-                ELSE (spill_txns > 0)
-           END
-    INTO updated
-    FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+    IF check_spill_txns THEN
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (spill_txns = 0)
+                  ELSE (spill_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    ELSE
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (total_txns = 0)
+                  ELSE (total_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    END IF;
 
     exit WHEN updated;
 
@@ -46,18 +59,25 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
 -- Check stats, wait for the stats collector to update. We can't test the
 -- exact stats count as that can vary if any background transaction (say by
 -- autovacuum) happens in parallel to the main transaction.
-SELECT wait_for_decode_stats(false);
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(false, true);
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
 
 -- reset the slot stats, and wait for stats collector to reset
 SELECT pg_stat_reset_replication_slot('regression_slot');
-SELECT wait_for_decode_stats(true);
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(true, true);
+SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
 
 -- decode and check stats again.
 SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'skip-empty-xacts', '1');
-SELECT wait_for_decode_stats(false);
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(false, true);
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+
+SELECT pg_stat_reset_replication_slot('regression_slot');
+
+-- non-spilled xact
+INSERT INTO stats_test values(generate_series(1, 10));
+SELECT wait_for_decode_stats(false, false);
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
 
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
 -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
@@ -66,6 +86,6 @@ SELECT slot_name FROM pg_stat_replication_slots;
 SELECT slot_name FROM pg_stat_replication_slots;
 COMMIT;
 
-DROP FUNCTION wait_for_decode_stats(bool);
+DROP FUNCTION wait_for_decode_stats(bool, bool);
 DROP TABLE stats_test;
 SELECT pg_drop_replication_slot('regression_slot');
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 8287587f61..25024dfc7c 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2716,6 +2716,31 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       </entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>total_txns</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Number of decoded transactions sent to the decoding output plugin for
+        this slot. This counter is used to maintain the top level transactions,
+        so the counter is not incremented for subtransactions. Note that this
+        includes the transactions streamed and or spilled.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>total_bytes</structfield><type>bigint</type>
+       </para>
+       <para>
+        Amount of decoded transactions data sent to the decoding output plugin
+        while decoding the changes from WAL for this slot. This can be used to
+        gauge the total amount of data sent during logical decoding. Note that
+        this includes the data streamed and or spilled. 
+       </para>
+      </entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 1bceb25571..0636a5d5f1 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -973,6 +973,8 @@ CREATE VIEW pg_replication_slots AS
 CREATE VIEW pg_stat_replication_slots AS
     SELECT
             s.slot_name,
+	    s.total_txns,
+            s.total_bytes,
             s.spill_txns,
             s.spill_count,
             s.spill_bytes,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 142567e654..0b3b8ae90b 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1823,6 +1823,8 @@ pgstat_report_replslot(const PgStat_ReplSlotEntry *repSlotStat)
 	msg.m_stream_txns = repSlotStat->stream_txns;
 	msg.m_stream_count = repSlotStat->stream_count;
 	msg.m_stream_bytes = repSlotStat->stream_bytes;
+	msg.m_total_txns = repSlotStat->total_txns;
+	msg.m_total_bytes = repSlotStat->total_bytes;
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
@@ -5545,6 +5547,8 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
 		Assert(slotent);
 
 		/* Update the replication slot statistics */
+		slotent->total_txns += msg->m_total_txns;
+		slotent->total_bytes += msg->m_total_bytes;
 		slotent->spill_txns += msg->m_spill_txns;
 		slotent->spill_count += msg->m_spill_count;
 		slotent->spill_bytes += msg->m_spill_bytes;
@@ -5792,6 +5796,8 @@ static void
 pgstat_reset_replslot(PgStat_ReplSlotEntry *slotent, TimestampTz ts)
 {
 	/* reset only counters. Don't clear slot name */
+	slotent->total_txns = 0;
+	slotent->total_bytes = 0;
 	slotent->spill_txns = 0;
 	slotent->spill_count = 0;
 	slotent->spill_bytes = 0;
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index d66846faab..73ef806530 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1776,20 +1776,22 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	PgStat_ReplSlotEntry repSlotStat;
 
 	/*
-	 * Nothing to do if we haven't spilled or streamed anything since the last
-	 * time the stats has been sent.
+	 * Nothing to do if we don't have any replication stats to be sent.
 	 */
-	if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
+	if (rb->spillBytes <= 0 && rb->streamBytes <= 0 &&
+		rb->totalBytes <= 0 && rb->totalTxns <=0)
 		return;
 
-	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
+	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
 		 rb,
 		 (long long) rb->spillTxns,
 		 (long long) rb->spillCount,
 		 (long long) rb->spillBytes,
 		 (long long) rb->streamTxns,
 		 (long long) rb->streamCount,
-		 (long long) rb->streamBytes);
+		 (long long) rb->streamBytes,
+		 (long long) rb->totalTxns,
+		 (long long) rb->totalBytes);
 
 	namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
 	repSlotStat.spill_txns = rb->spillTxns;
@@ -1798,6 +1800,8 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	repSlotStat.stream_txns = rb->streamTxns;
 	repSlotStat.stream_count = rb->streamCount;
 	repSlotStat.stream_bytes = rb->streamBytes;
+	repSlotStat.total_txns = rb->totalTxns;
+	repSlotStat.total_bytes = rb->totalBytes;
 
 	pgstat_report_replslot(&repSlotStat);
 	rb->spillTxns = 0;
@@ -1806,4 +1810,6 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->streamTxns = 0;
 	rb->streamCount = 0;
 	rb->streamBytes = 0;
+	rb->totalTxns = 0;
+	rb->totalBytes = 0;
 }
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 52d06285a2..bc251adfda 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -350,6 +350,8 @@ ReorderBufferAllocate(void)
 	buffer->streamTxns = 0;
 	buffer->streamCount = 0;
 	buffer->streamBytes = 0;
+	buffer->totalTxns = 0;
+	buffer->totalBytes = 0;
 
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
@@ -2359,6 +2361,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			specinsert = NULL;
 		}
 
+		/*
+		 * Update total transaction count and total transaction bytes
+		 * processed. Ensure to not count the streamed transaction multiple
+		 * times.
+		 */
+		if (!rbtxn_is_streamed(txn))
+			rb->totalTxns++;
+
+		rb->totalBytes += rb->size;
+
 		/* clean up the iterator */
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 569abce27e..be069cf54c 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2309,7 +2309,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 8
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
 	text *slotname_text = PG_GETARG_TEXT_P(0);
 	NameData slotname;
 	TupleDesc	tupdesc;
@@ -2325,19 +2325,23 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 	tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_REPLICATION_SLOT_COLS);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "slot_name",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "spill_txns",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "total_txns",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "spill_count",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "total_bytes",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "spill_bytes",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "spill_txns",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "stream_txns",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "spill_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "stream_count",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "spill_bytes",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_bytes",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_txns",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "stream_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "stream_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2348,17 +2352,19 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 		PG_RETURN_NULL();
 
 	values[0] = CStringGetTextDatum(NameStr(slotent->slotname));
-	values[1] = Int64GetDatum(slotent->spill_txns);
-	values[2] = Int64GetDatum(slotent->spill_count);
-	values[3] = Int64GetDatum(slotent->spill_bytes);
-	values[4] = Int64GetDatum(slotent->stream_txns);
-	values[5] = Int64GetDatum(slotent->stream_count);
-	values[6] = Int64GetDatum(slotent->stream_bytes);
+	values[1] = Int64GetDatum(slotent->total_txns);
+	values[2] = Int64GetDatum(slotent->total_bytes);
+	values[3] = Int64GetDatum(slotent->spill_txns);
+	values[4] = Int64GetDatum(slotent->spill_count);
+	values[5] = Int64GetDatum(slotent->spill_bytes);
+	values[6] = Int64GetDatum(slotent->stream_txns);
+	values[7] = Int64GetDatum(slotent->stream_count);
+	values[8] = Int64GetDatum(slotent->stream_bytes);
 
 	if (slotent->stat_reset_timestamp == 0)
-		nulls[7] = true;
+		nulls[9] = true;
 	else
-		values[7] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+		values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 6c9521603f..b9156b873e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5315,9 +5315,9 @@
   proname => 'pg_stat_get_replication_slot', prorows => '1',
   proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'text',
-  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
+  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,slot_name,total_txns,total_bytes,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index c2d192fc55..e9e16e5e2d 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -548,6 +548,8 @@ typedef struct PgStat_MsgReplSlot
 	PgStat_Counter m_stream_txns;
 	PgStat_Counter m_stream_count;
 	PgStat_Counter m_stream_bytes;
+	PgStat_Counter m_total_txns;
+	PgStat_Counter m_total_bytes;
 } PgStat_MsgReplSlot;
 
 /* ----------
@@ -924,6 +926,8 @@ typedef struct PgStat_ReplSlotEntry
 	PgStat_Counter stream_txns;
 	PgStat_Counter stream_count;
 	PgStat_Counter stream_bytes;
+	PgStat_Counter total_txns;
+	PgStat_Counter total_bytes;
 	TimestampTz stat_reset_timestamp;
 } PgStat_ReplSlotEntry;
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 565a961d6a..a372b70b7d 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -618,6 +618,10 @@ struct ReorderBuffer
 	int64		streamTxns;		/* number of transactions streamed */
 	int64		streamCount;	/* streaming invocation counter */
 	int64		streamBytes;	/* amount of data streamed */
+
+	/* Statistics about all the replicated transactions */
+	int64		totalTxns;		/* total number of transactions replicated */
+	int64		totalBytes;		/* total amount of data replicated */
 };
 
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index eb741b9ff1..4380577b7d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2062,6 +2062,8 @@ pg_stat_replication| SELECT s.pid,
      JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_replication_slots| SELECT s.slot_name,
+    s.total_txns,
+    s.total_bytes,
     s.spill_txns,
     s.spill_count,
     s.spill_bytes,
@@ -2070,7 +2072,7 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.stream_bytes,
     s.stats_reset
    FROM pg_replication_slots r,
-    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset)
+    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, total_txns, total_bytes, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset)
   WHERE (r.datoid IS NOT NULL);
 pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
-- 
2.25.1

From 49f7f534e3cc474cbc1c420fdba28972ec785aca Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Mon, 5 Apr 2021 18:17:21 +0530
Subject: [PATCH v8 4/5] Added tests for verification of logical replication
 statistics.

Added tests for verification of logical replication statistics after
restart of server.
---
 contrib/test_decoding/Makefile            |  2 +
 contrib/test_decoding/t/001_repl_stats.pl | 84 +++++++++++++++++++++++
 2 files changed, 86 insertions(+)
 create mode 100644 contrib/test_decoding/t/001_repl_stats.pl

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index c5e28ce5cc..9a31e0b879 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -17,6 +17,8 @@ ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 # typical installcheck users do not have (e.g. buildfarm clients).
 NO_INSTALLCHECK = 1
 
+TAP_TESTS = 1
+
 ifdef USE_PGXS
 PG_CONFIG = pg_config
 PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
new file mode 100644
index 0000000000..70cb4cf055
--- /dev/null
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -0,0 +1,84 @@
+# Test replication statistics data in pg_stat_replication_slots is sane after
+# drop replication slot and restart.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+# Test set-up
+my $node = get_new_node('test');
+$node->init(allows_streaming => 'logical');
+$node->append_conf('postgresql.conf', 'synchronous_commit = on');
+$node->start;
+
+# Create table.
+$node->safe_psql('postgres',
+        "CREATE TABLE test_repl_stat(col1 int)");
+
+# Create replication slots.
+$node->safe_psql('postgres',
+	"SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding')");
+$node->safe_psql('postgres',
+	"SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot2', 'test_decoding')");
+$node->safe_psql('postgres',
+	"SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot3', 'test_decoding')");
+$node->safe_psql('postgres',
+	"SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot4', 'test_decoding')");
+
+# Insert some data.
+$node->safe_psql('postgres', "INSERT INTO test_repl_stat values(generate_series(1, 5));");
+
+$node->safe_psql('postgres',
+	"SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')");
+$node->safe_psql('postgres',
+	"SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')");
+$node->safe_psql('postgres',
+        "SELECT data FROM pg_logical_slot_get_changes('regression_slot3', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')");
+$node->safe_psql('postgres',
+	"SELECT data FROM pg_logical_slot_get_changes('regression_slot4', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')");
+
+# Wait for the statistics to be updated.
+my $slot1_stat_check_query =
+  "SELECT count(1) = 1 FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot1' AND total_txns > 0 AND total_bytes > 0;";
+my $slot2_stat_check_query =
+  "SELECT count(1) = 1 FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot2' AND total_txns > 0 AND total_bytes > 0;";
+my $slot3_stat_check_query =
+  "SELECT count(1) = 1 FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot3' AND total_txns > 0 AND total_bytes > 0;";
+my $slot4_stat_check_query =
+  "SELECT count(1) = 1 FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot4' AND total_txns > 0 AND total_bytes > 0;";
+
+# Verify that the statistics have been updated.
+$node->poll_query_until('postgres', $slot1_stat_check_query)
+  or die "Timed out while waiting for statistics to be updated";
+$node->poll_query_until('postgres', $slot2_stat_check_query)
+  or die "Timed out while waiting for statistics to be updated";
+$node->poll_query_until('postgres', $slot3_stat_check_query)
+  or die "Timed out while waiting for statistics to be updated";
+$node->poll_query_until('postgres', $slot4_stat_check_query)
+  or die "Timed out while waiting for statistics to be updated";
+
+# Test to drop one of the replication slot and verify replication statistics data is
+# fine after restart.
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot4')");
+
+$node->stop;
+$node->start;
+
+# Verify statistics data present in pg_stat_replication_slots are sane after
+# restart.
+my $result = $node->safe_psql('postgres',
+	"SELECT slot_name, total_txns > 0 AS total_txn, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name"
+);
+is($result, qq(regression_slot1|t|t
+regression_slot2|t|t
+regression_slot3|t|t), 'check replication statistics are updated');
+
+# cleanup
+$node->safe_psql('postgres', "DROP TABLE test_repl_stat");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')");
+
+# shutdown
+$node->stop;
-- 
2.25.1

From e049c8551791ff25bb9ea4a0c41e8a37d5956275 Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Mon, 12 Apr 2021 16:10:11 +0530
Subject: [PATCH v8 5/5] Test where there are more replication slot statistics
 than max_replication_slot slots at startup.

There is a remote scenario where one of the replication slots is dropped and
the drop slot statistics message is not received by the statistic collector
process, now if the max_replication_slots is reduced to the actual number of
replication slots that are in use and the server is re-started then the
statistics process will not be aware of this and the statistic collector
process will write beyond the slots available, added a test for this.
---
 contrib/test_decoding/t/001_repl_stats.pl | 24 +++++++++++++++++++++--
 1 file changed, 22 insertions(+), 2 deletions(-)

diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
index 70cb4cf055..9f82da3818 100644
--- a/contrib/test_decoding/t/001_repl_stats.pl
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -2,9 +2,10 @@
 # drop replication slot and restart.
 use strict;
 use warnings;
+use File::Path qw(rmtree);
 use PostgresNode;
 use TestLib;
-use Test::More tests => 1;
+use Test::More tests => 2;
 
 # Test set-up
 my $node = get_new_node('test');
@@ -74,11 +75,30 @@ is($result, qq(regression_slot1|t|t
 regression_slot2|t|t
 regression_slot3|t|t), 'check replication statistics are updated');
 
+# Test to remove one of the replication slots and adjust max_replication_slots
+# accordingly to the number of slots and verify replication statistics data is
+# fine after restart.
+$node->stop;
+my $datadir = $node->data_dir;
+my $slot3_replslotdir = "$datadir/pg_replslot/regression_slot3";
+
+rmtree($slot3_replslotdir);
+
+$node->append_conf('postgresql.conf', 'max_replication_slots = 2');
+$node->start;
+
+# Verify statistics data present in pg_stat_replication_slots are sane after
+# restart.
+$result = $node->safe_psql('postgres',
+	"SELECT slot_name, total_txns > 0 AS total_txn, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name"
+);
+is($result, qq(regression_slot1|t|t
+regression_slot2|t|t), 'check replication statistics are updated');
+
 # cleanup
 $node->safe_psql('postgres', "DROP TABLE test_repl_stat");
 $node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')");
 $node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')");
-$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')");
 
 # shutdown
 $node->stop;
-- 
2.25.1

Reply via email to