Hi All,
In a recent logical replication issue, there were multiple replication
slots involved, each using a different publication. Thus the amount of
data that was replicated through each slot was expected to be
different. However, total_bytes and total_txns were reported the same
for all the replication slots as expected. One of the slots started
lagging and we were trying to figure out whether its the WAL sender
slowing down or the consumer  (in this case Debezium). The lagging
slot then showed total_txns and total_bytes lesser than other slots
giving an impression that the WAL sender is processing the data
slowly. Had pg_stat_replication_slot reported the amount of data
actually sent downstream, it would have been easier to compare it with
the amount of data received by the consumer and thus pinpoint the
bottleneck.

Here's a patch to do the same. It adds two columns
    - sent_txns: The total number of transactions sent downstream.
    - sent_bytes: The total number of bytes sent downstream in data messages
to pg_stat_replication_slots. sent_bytes includes only the bytes sent
as part of 'd' messages and does not include keep alive messages or
CopyDone messages for example. But those are very few and can be
ignored. If others feel that those are important to be included, we
can make that change.

Plugins may choose not to send an empty transaction downstream. It's
better to increment sent_txns counter in the plugin code when it
actually sends a BEGIN message, for example in pgoutput_send_begin()
and pg_output_begin(). This means that every plugin will need to be
modified to increment the counter for it to reported correctly.

I first thought of incrementing sent_bytes in OutputPluginWrite()
which is a central function for all logical replication message
writes. But that calls LogicalDecodingContext::write() which may
further add bytes to the message e.g. WalSndWriteData() and
LogicalOutputWrite(). So it's better to increment the counter in
implementations of LogicalDecodingContext::write(), so that we count
the exact number of bytes. These implementations are within core code
so they won't miss updating sent_bytes.

I think we should rename total_txns and total_bytes to reordered_txns
and reordered_bytes respectively, and also update the documentation
accordingly to make better sense of those numbers. But these patches
do not contain that change. If others feel the same way, I will
provide a patch with that change.

-- 
Best Wishes,
Ashutosh Bapat
From 1a385b30d6cb4ca111cbcc16ea14017c08f9a579 Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <ashutosh.bapat....@gmail.com>
Date: Fri, 27 Jun 2025 09:16:23 +0530
Subject: [PATCH] Report data sent statistics in pg_stat_replication_slots

pg_stat_replication_slots reports the total number of transactions and bytes
added to the reorder buffer. This is the amount of WAL that is processed by the
WAL sender. This data undergoes filtering and logical decoding before sending to
the downstream. Hence the amount of WAL added to the reorder buffer does not
serve as a good metric for the amount of data sent downstream. Knowing the
amount data sent downstream is useful when debugging slowly moving logical
replication.

This patch adds two new columns to pg_stat_replication_slots:
- sent_txns: The total number of transactions sent downstream.
- sent_bytes: The total number of bytes sent downstream in data messages

Ashutosh Bapat
---
 contrib/test_decoding/expected/stats.out      | 54 ++++++++++---------
 contrib/test_decoding/sql/stats.sql           | 12 +++--
 contrib/test_decoding/t/001_repl_stats.pl     | 14 ++---
 contrib/test_decoding/test_decoding.c         |  1 +
 doc/src/sgml/monitoring.sgml                  | 27 ++++++++++
 src/backend/catalog/system_views.sql          |  2 +
 src/backend/replication/logical/logical.c     | 10 +++-
 .../replication/logical/logicalfuncs.c        |  2 +
 .../replication/logical/reorderbuffer.c       |  2 +
 src/backend/replication/pgoutput/pgoutput.c   |  1 +
 src/backend/replication/walsender.c           |  2 +
 src/backend/utils/activity/pgstat_replslot.c  |  2 +
 src/backend/utils/adt/pgstatfuncs.c           | 14 +++--
 src/include/catalog/pg_proc.dat               |  6 +--
 src/include/pgstat.h                          |  2 +
 src/include/replication/reorderbuffer.h       |  8 +++
 src/test/recovery/t/006_logical_decoding.pl   | 12 ++---
 src/test/regress/expected/rules.out           |  4 +-
 18 files changed, 126 insertions(+), 49 deletions(-)

diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index de6dc416130..867a8506051 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -37,28 +37,34 @@ SELECT pg_stat_force_next_flush();
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes 
-------------------------+------------+-------------+------------+-------------
- regression_slot_stats1 | t          | t           | t          | t
- regression_slot_stats2 | t          | t           | t          | t
- regression_slot_stats3 | t          | t           | t          | t
+-- total_txns may vary based on the background activity but sent_txns should
+-- always be 1 since the background transactions are always skipped.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_txns | total_bytes | sent_txns | sent_bytes 
+------------------------+------------+-------------+------------+-------------+-----------+------------
+ regression_slot_stats1 | t          | t           | t          | t           |         1 | t
+ regression_slot_stats2 | t          | t           | t          | t           |         1 | t
+ regression_slot_stats3 | t          | t           | t          | t           |         1 | t
 (3 rows)
 
 RESET logical_decoding_work_mem;
 -- reset stats for one slot, others should be unaffected
+-- total_txns may vary based on the background activity but sent_txns should
+-- always be 0 for the slot whose stats were reset since the background
+-- transactions are always skipped and not transaction, which would be sent
+-- downstream, has happened since the reset.
 SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
  pg_stat_reset_replication_slot 
 --------------------------------
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes 
-------------------------+------------+-------------+------------+-------------
- regression_slot_stats1 | t          | t           | f          | f
- regression_slot_stats2 | t          | t           | t          | t
- regression_slot_stats3 | t          | t           | t          | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_txns | total_bytes | sent_txns | sent_bytes 
+------------------------+------------+-------------+------------+-------------+-----------+------------
+ regression_slot_stats1 | t          | t           | f          | f           |         0 | f
+ regression_slot_stats2 | t          | t           | t          | t           |         1 | t
+ regression_slot_stats3 | t          | t           | t          | t           |         1 | t
 (3 rows)
 
 -- reset stats for all slots
@@ -68,27 +74,27 @@ SELECT pg_stat_reset_replication_slot(NULL);
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes 
-------------------------+------------+-------------+------------+-------------
- regression_slot_stats1 | t          | t           | f          | f
- regression_slot_stats2 | t          | t           | f          | f
- regression_slot_stats3 | t          | t           | f          | f
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_txns | total_bytes | sent_txns | sent_bytes 
+------------------------+------------+-------------+------------+-------------+-----------+------------
+ regression_slot_stats1 | t          | t           | f          | f           |         0 | f
+ regression_slot_stats2 | t          | t           | f          | f           |         0 | f
+ regression_slot_stats3 | t          | t           | f          | f           |         0 | f
 (3 rows)
 
 -- verify accessing/resetting stats for non-existent slot does something reasonable
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | sent_txns | sent_bytes | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------+------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 |         0 |          0 | 
 (1 row)
 
 SELECT pg_stat_reset_replication_slot('do-not-exist');
 ERROR:  replication slot "do-not-exist" does not exist
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | sent_txns | sent_bytes | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------+------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 |         0 |          0 | 
 (1 row)
 
 -- spilling the xact
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index a022fe1bf07..8581387d867 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -15,16 +15,22 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL,
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1');
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1');
 SELECT pg_stat_force_next_flush();
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+-- total_txns may vary based on the background activity but sent_txns should
+-- always be 1 since the background transactions are always skipped.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
 RESET logical_decoding_work_mem;
 
 -- reset stats for one slot, others should be unaffected
+-- total_txns may vary based on the background activity but sent_txns should
+-- always be 0 for the slot whose stats were reset since the background
+-- transactions are always skipped and not transaction, which would be sent
+-- downstream, has happened since the reset.
 SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
 
 -- reset stats for all slots
 SELECT pg_stat_reset_replication_slot(NULL);
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
 
 -- verify accessing/resetting stats for non-existent slot does something reasonable
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
index 0de62edb7d8..ba887d752eb 100644
--- a/contrib/test_decoding/t/001_repl_stats.pl
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -26,7 +26,9 @@ sub test_slot_stats
 	my $result = $node->safe_psql(
 		'postgres', qq[
 		SELECT slot_name, total_txns > 0 AS total_txn,
-			   total_bytes > 0 AS total_bytes
+			   total_bytes > 0 AS total_bytes,
+			   sent_txns > 0 AS sent_txn,
+			   sent_bytes > 0 AS sent_bytes
 			   FROM pg_stat_replication_slots
 			   ORDER BY slot_name]);
 	is($result, $expected, $msg);
@@ -80,9 +82,9 @@ $node->start;
 # restart.
 test_slot_stats(
 	$node,
-	qq(regression_slot1|t|t
-regression_slot2|t|t
-regression_slot3|t|t),
+	qq(regression_slot1|t|t|t|t
+regression_slot2|t|t|t|t
+regression_slot3|t|t|t|t),
 	'check replication statistics are updated');
 
 # Test to remove one of the replication slots and adjust
@@ -104,8 +106,8 @@ $node->start;
 # restart.
 test_slot_stats(
 	$node,
-	qq(regression_slot1|t|t
-regression_slot2|t|t),
+	qq(regression_slot1|t|t|t|t
+regression_slot2|t|t|t|t),
 	'check replication statistics after removing the slot file');
 
 # cleanup
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index bb495563200..4a7e918efbe 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -310,6 +310,7 @@ static void
 pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 {
 	OutputPluginPrepareWrite(ctx, last_write);
+	ctx->reorder->sentTxns++;
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
 	else
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 4265a22d4de..e5af284df90 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1644,6 +1644,33 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>sent_txns</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Number of decoded transactions sent downstream for
+        this slot. This counts top-level transactions only, and is not incremented
+        for subtransactions. These transactions are subset of transctions
+        sent to the decoding plugin. Hence this count is expected to be lesser than or equal to <structfield>total_txns</structfield>.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>sent_bytes</structfield><type>bigint</type>
+       </para>
+       <para>
+        Amount of decoded transaction changes sent downstream for this slot. The
+        amount of WAL corresponding to the changes sent downstream is subset of
+        the total WAL sent to the decoding plugin. But the amount of data sent
+        downstream for a given decoded WAL record may not match the size of the
+        WAL record. Thus values in this column do not have strong correlation
+        with the values in <structfield>total_bytes</structfield>.
+       </para>
+      </entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 08f780a2e63..bcc210dd754 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1053,6 +1053,8 @@ CREATE VIEW pg_stat_replication_slots AS
             s.stream_bytes,
             s.total_txns,
             s.total_bytes,
+            s.sent_txns,
+            s.sent_bytes,
             s.stats_reset
     FROM pg_replication_slots as r,
         LATERAL pg_stat_get_replication_slot(slot_name) as s
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index f1eb798f3e9..14615fa70d7 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1958,7 +1958,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
 		return;
 
-	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
+	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
 		 rb,
 		 rb->spillTxns,
 		 rb->spillCount,
@@ -1967,7 +1967,9 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 		 rb->streamCount,
 		 rb->streamBytes,
 		 rb->totalTxns,
-		 rb->totalBytes);
+		 rb->totalBytes,
+		 rb->sentTxns,
+		 rb->sentBytes);
 
 	repSlotStat.spill_txns = rb->spillTxns;
 	repSlotStat.spill_count = rb->spillCount;
@@ -1977,6 +1979,8 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	repSlotStat.stream_bytes = rb->streamBytes;
 	repSlotStat.total_txns = rb->totalTxns;
 	repSlotStat.total_bytes = rb->totalBytes;
+	repSlotStat.sent_txns = rb->sentTxns;
+	repSlotStat.sent_bytes = rb->sentBytes;
 
 	pgstat_report_replslot(ctx->slot, &repSlotStat);
 
@@ -1988,6 +1992,8 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->streamBytes = 0;
 	rb->totalTxns = 0;
 	rb->totalBytes = 0;
+	rb->sentTxns = 0;
+	rb->sentBytes = 0;
 }
 
 /*
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index ca53caac2f2..f85e21b1a50 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -89,6 +89,8 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 	values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
 
 	tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
+
+	ctx->reorder->sentBytes += ctx->out->len + sizeof(XLogRecPtr) + sizeof(TransactionId);
 	p->returned_rows++;
 }
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c4299c76fb1..fa4c61192e8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -392,6 +392,8 @@ ReorderBufferAllocate(void)
 	buffer->streamBytes = 0;
 	buffer->totalTxns = 0;
 	buffer->totalBytes = 0;
+	buffer->sentTxns = 0;
+	buffer->sentBytes = 0;
 
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 082b4d9d327..daafac7049c 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -589,6 +589,7 @@ pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
 	logicalrep_write_begin(ctx->out, txn);
 	txndata->sent_begin_txn = true;
+	ctx->reorder->sentTxns++;
 
 	send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
 					 send_replication_origin);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index f2c33250e8b..f097aa92c64 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1569,6 +1569,8 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	/* output previously gathered data in a CopyData packet */
 	pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
 
+	ctx->reorder->sentBytes += ctx->out->len + 1;	/* +1 for the 'd' */
+
 	CHECK_FOR_INTERRUPTS();
 
 	/* Try to flush pending output to the client */
diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c
index ccfb11c49bf..583ac090cdd 100644
--- a/src/backend/utils/activity/pgstat_replslot.c
+++ b/src/backend/utils/activity/pgstat_replslot.c
@@ -96,6 +96,8 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re
 	REPLSLOT_ACC(stream_bytes);
 	REPLSLOT_ACC(total_txns);
 	REPLSLOT_ACC(total_bytes);
+	REPLSLOT_ACC(sent_txns);
+	REPLSLOT_ACC(sent_bytes);
 #undef REPLSLOT_ACC
 
 	pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 1c12ddbae49..4ed405c6ad5 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2100,7 +2100,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 12
 	text	   *slotname_text = PG_GETARG_TEXT_P(0);
 	NameData	slotname;
 	TupleDesc	tupdesc;
@@ -2129,7 +2129,11 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "sent_txns",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "sent_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2154,11 +2158,13 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 	values[6] = Int64GetDatum(slotent->stream_bytes);
 	values[7] = Int64GetDatum(slotent->total_txns);
 	values[8] = Int64GetDatum(slotent->total_bytes);
+	values[9] = Int64GetDatum(slotent->sent_txns);
+	values[10] = Int64GetDatum(slotent->sent_bytes);
 
 	if (slotent->stat_reset_timestamp == 0)
-		nulls[9] = true;
+		nulls[11] = true;
 	else
-		values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+		values[11] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fb4f7f50350..49990bfe42e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5675,9 +5675,9 @@
 { oid => '6169', descr => 'statistics: information about replication slot',
   proname => 'pg_stat_get_replication_slot', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'text',
-  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
+  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,sent_txns,sent_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
 
 { oid => '6230', descr => 'statistics: check if a stats object exists',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 378f2f2c2ba..814c6024ba1 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -395,6 +395,8 @@ typedef struct PgStat_StatReplSlotEntry
 	PgStat_Counter stream_bytes;
 	PgStat_Counter total_txns;
 	PgStat_Counter total_bytes;
+	PgStat_Counter sent_txns;
+	PgStat_Counter sent_bytes;
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatReplSlotEntry;
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index fa0745552f8..b22e046a9ad 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -696,6 +696,14 @@ struct ReorderBuffer
 	 */
 	int64		totalTxns;		/* total number of transactions sent */
 	int64		totalBytes;		/* total amount of data decoded */
+
+	/*
+	 * Statistics about the transactions decoded by the output plugin and sent
+	 * downstream.
+	 */
+	int64		sentTxns;		/* number of transactions decoded and sent
+								 * downstream */
+	int64		sentBytes;		/* amount of data decoded and sent downstream */
 };
 
 
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index 2137c4e5e30..03b688f54eb 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -212,10 +212,10 @@ my $stats_test_slot2 = 'logical_slot';
 # Stats exist for stats test slot 1
 is( $node_primary->safe_psql(
 		'postgres',
-		qq(SELECT total_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+		qq(SELECT total_bytes > 0, sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
 	),
-	qq(t|t),
-	qq(Total bytes is > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
+	qq(t|t|t),
+	qq(Total bytes and sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
 );
 
 # Do reset of stats for stats test slot 1
@@ -233,10 +233,10 @@ $node_primary->safe_psql('postgres',
 
 is( $node_primary->safe_psql(
 		'postgres',
-		qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+		qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0, sent_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
 	),
-	qq(t|t),
-	qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes was set to 0.)
+	qq(t|t|t),
+	qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes and sent_bytes were set to 0.)
 );
 
 # Check that test slot 2 has NULL in reset timestamp
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 6cf828ca8d0..9fcf73152cd 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2131,9 +2131,11 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.stream_bytes,
     s.total_txns,
     s.total_bytes,
+    s.sent_txns,
+    s.sent_bytes,
     s.stats_reset
    FROM pg_replication_slots r,
-    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset)
+    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, sent_txns, sent_bytes, stats_reset)
   WHERE (r.datoid IS NOT NULL);
 pg_stat_slru| SELECT name,
     blks_zeroed,

base-commit: 3431e3e4aa3a33e8411f15e76c284cdd4c54ca28
-- 
2.34.1

Reply via email to