Hi Amit,

On Mon, Jul 14, 2025 at 3:31 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Mon, Jul 14, 2025 at 10:55 AM Ashutosh Bapat
> <ashutosh.bapat....@gmail.com> wrote:
> >
> > On Sun, Jul 13, 2025 at 4:34 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> > >
> > >
> > > I think we don't want to make it mandatory for plugins to implement
> > > these stats, so instead of throwing ERROR, the view should show that
> > > the plugin doesn't provide stats. How about having OutputPluginStats
> > > similar to OutputPluginCallbacks and OutputPluginOptions members in
> > > LogicalDecodingContext? It will have members like stats_available,
> > > txns_sent or txns_skipped, txns_filtered, etc.
> >
> > Not making mandatory looks useful. I can try your suggestion. Rather
> > than having stats_available as a member of OutputPluginStats, it's
> > better to have a NULL value for the corresponding member in
> > LogicalDecodingContext. We don't want an output plugin to reset
> > stats_available once set. Will that work?
> >
>
> We can try that.
>
> > > I am thinking it will
> > > be better to provide this information in a separate view like
> > > pg_stat_plugin_stats or something like that, here we can report
> > > slot_name, plugin_name, then the other stats we want to implement part
> > > of OutputPluginStats.
> >
> > As you have previously pointed out, the view should make it explicit
> > that the new stats are maintained by the plugin and not core. I agree
> > with that intention. However, already have three views
> > pg_replication_slots (which has slot name and plugin name), then
> > pg_replication_stats which is about stats maintained by a WAL sender
> > or running replication and then pg_stat_replication_slots, which is
> > about accumulated statistics for a replication through a given
> > replication slot. It's already a bit hard to keep track of who's who
> > when debugging an issue. Adding one more view will add to confusion.
> >
> > Instead of adding a new view how about
> > a. name the columns as plugin_sent_txns, plugin_sent_bytes,
> > plugin_filtered_change_bytes to make it clear that these columns are
> > maintained by plugin
> > b. report these NULL if stats_available = false OR OutputPluginStats
> > is not set in LogicalDecodingContext
> > c. Document that NULL value for these columns indicates that the
> > plugin is not maintaining/reporting these stats
> > d. adding plugin name to pg_stat_replication_slots, that will make it
> > easy for users to know which plugin they should look at in case of
> > dubious or unavailable stats
> >
>
> Sounds reasonable.

Here's the next patch which considers all the discussion so far. It
adds four fields to pg_stat_replication_slots.
    - plugin - name of the output plugin
    - plugin_filtered_bytes - reports the amount of changes filtered
out by the output plugin
    - plugin_sent_txns - the amount of transactions sent downstream by
the output plugin
    - plugin_sent_bytes - the amount of data sent downstream by the
outputplugin.

There are some points up for a discussion:
1. pg_stat_reset_replication_slot() zeroes out the statistics entry by
calling pgstat_reset() or pgstat_reset_of_kind() which don't know
about the contents of the entry. So
PgStat_StatReplSlotEntry::plugin_has_stats is set to false and plugin
stats are reported as NULL, instead of zero, immediately after reset.
This is the same case when the stats is queried immediately after the
statistics is initialized and before any stats are reported. We could
instead make it report
zero, if we save the plugin_has_stats and restore it after reset. But
doing that in pgstat_reset_of_kind() seems like an extra overhead + we
will need to write a function to find all replication slot entries.
Resetting the stats would be a rare event in practice. Trying to
report 0 instead of NULL in that rare case doesn't seem to be worth
the efforts and code. Given that the core code doesn't know whether a
given plugin reports stats or not, I think this behaviour is
appropriate as long as we document it. Please let me know if the
documentation in the patch is clear enough.

2. There's also a bit of asymmetry in the way sent_bytes is handled.
The code which actually sends the logical changes to the downstream is
part of the core code
but the format of the change and hence the number of bytes sent is
decided by the plugin. It's a stat related to plugin but maintained by
the core code. The patch implements it as a plugin stat (so the
corresponding column has "plugin" prefix and is also reported as NULL
upon reset etc.), but we may want to reconsider how to report and
maintain it.

3. The names of new columns have the prefix "plugin_" but the internal
variables tracking those don't for the sake of brevity. If you prefer
to have the same prefix for the internal variables, I can change that.

I think I have covered all the cases where filteredbytes should be
incremented, but please let me know if I have missed any.

-- 
Best Wishes,
Ashutosh Bapat
From 0bdaca3d1bb95cb834d45137eddaaa58fe1646b9 Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <ashutosh.bapat....@gmail.com>
Date: Fri, 27 Jun 2025 09:16:23 +0530
Subject: [PATCH] Report output plugin statistics in pg_stat_replication_slots

As of now pg_stat_replication_slots reports statistics about the reorder
buffer, but it does not report output plugin statistics like the amount of data
filtered by the output plugin, amount of data sent downstream or the
number of transactions sent downstream. This statistics is useful when
investigating issues related to a slow downstream.

This commit adds following fields to pg_stat_replication_slots
- plugin_filtered_bytes is the amount of changes filtered out by the
  output plugin
- plugin_sent_txns is the amount of transactions sent downstream by the
  output plugin
- plugin_sent_bytes is the amount of data sent downstream by the output
  plugin.

The prefix "plugin_" indicates that these counters are related to and
maintained by the output plugin. An output plugin may choose not to
initialize LogicalDecodingContext::stats, which holds these counters, in
which case the above columns will be reported as NULL.

Filtered bytes are reported next to total_bytes to keep these two
closely related fields together.

Additionally report name of the output plugin in the view for an easy
reference.

Author: Ashutosh Bapat <ashutosh.bapat....@gmail.com>
Author: Amit Kapila <amit.kapil...@gmail.com>
Discussion: https://www.postgresql.org/message-id/CAExHW5s6KntzUyUoMbKR5dgwRmdV2Ay_2+AnTgYGAzo=qv6...@mail.gmail.com
---
 contrib/test_decoding/expected/stats.out      | 77 ++++++++++---------
 contrib/test_decoding/sql/stats.sql           | 15 ++--
 contrib/test_decoding/t/001_repl_stats.pl     | 18 +++--
 contrib/test_decoding/test_decoding.c         |  2 +
 doc/src/sgml/logicaldecoding.sgml             | 27 +++++++
 doc/src/sgml/monitoring.sgml                  | 58 ++++++++++++++
 src/backend/catalog/system_views.sql          |  4 +
 src/backend/replication/logical/logical.c     | 24 +++++-
 .../replication/logical/logicalfuncs.c        |  7 ++
 .../replication/logical/reorderbuffer.c       |  3 +-
 src/backend/replication/pgoutput/pgoutput.c   | 21 +++++
 src/backend/replication/walsender.c           |  7 ++
 src/backend/utils/activity/pgstat_replslot.c  |  7 ++
 src/backend/utils/adt/pgstatfuncs.c           | 26 ++++++-
 src/include/catalog/pg_proc.dat               |  6 +-
 src/include/pgstat.h                          |  4 +
 src/include/replication/logical.h             |  1 +
 src/include/replication/output_plugin.h       | 13 ++++
 src/include/replication/reorderbuffer.h       |  1 +
 src/test/recovery/t/006_logical_decoding.pl   | 12 +--
 src/test/regress/expected/rules.out           |  6 +-
 src/tools/pgindent/typedefs.list              |  1 +
 22 files changed, 275 insertions(+), 65 deletions(-)

diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index de6dc416130..d19fe6a1c61 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -37,12 +37,17 @@ SELECT pg_stat_force_next_flush();
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes 
-------------------------+------------+-------------+------------+-------------
- regression_slot_stats1 | t          | t           | t          | t
- regression_slot_stats2 | t          | t           | t          | t
- regression_slot_stats3 | t          | t           | t          | t
+-- total_txns may vary based on the background activity but sent_txns should
+-- always be 1 since the background transactions are always skipped. Filtered
+-- bytes would be set only when there's a change that was passed to the plugin
+-- but was filtered out. Depending upon the background transactions, filtered
+-- bytes may or may not be zero.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_txns | total_bytes | plugin_sent_txns | sent_bytes | filtered_bytes 
+------------------------+------------+-------------+------------+-------------+------------------+------------+----------------
+ regression_slot_stats1 | t          | t           | t          | t           |                1 | t          | t
+ regression_slot_stats2 | t          | t           | t          | t           |                1 | t          | t
+ regression_slot_stats3 | t          | t           | t          | t           |                1 | t          | t
 (3 rows)
 
 RESET logical_decoding_work_mem;
@@ -53,12 +58,12 @@ SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes 
-------------------------+------------+-------------+------------+-------------
- regression_slot_stats1 | t          | t           | f          | f
- regression_slot_stats2 | t          | t           | t          | t
- regression_slot_stats3 | t          | t           | t          | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_txns | total_bytes | plugin_sent_txns | sent_bytes | filtered_bytes 
+------------------------+------------+-------------+------------+-------------+------------------+------------+----------------
+ regression_slot_stats1 | t          | t           | f          | f           |                  |            | 
+ regression_slot_stats2 | t          | t           | t          | t           |                1 | t          | t
+ regression_slot_stats3 | t          | t           | t          | t           |                1 | t          | t
 (3 rows)
 
 -- reset stats for all slots
@@ -68,27 +73,27 @@ SELECT pg_stat_reset_replication_slot(NULL);
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes 
-------------------------+------------+-------------+------------+-------------
- regression_slot_stats1 | t          | t           | f          | f
- regression_slot_stats2 | t          | t           | f          | f
- regression_slot_stats3 | t          | t           | f          | f
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_txns | total_bytes | plugin_sent_txns | plugin_sent_bytes | plugin_filtered_bytes 
+------------------------+------------+-------------+------------+-------------+------------------+-------------------+-----------------------
+ regression_slot_stats1 | t          | t           | f          | f           |                  |                   |                      
+ regression_slot_stats2 | t          | t           | f          | f           |                  |                   |                      
+ regression_slot_stats3 | t          | t           | f          | f           |                  |                   |                      
 (3 rows)
 
 -- verify accessing/resetting stats for non-existent slot does something reasonable
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------------------+------------------+-------------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 |                       |                  |                   | 
 (1 row)
 
 SELECT pg_stat_reset_replication_slot('do-not-exist');
 ERROR:  replication slot "do-not-exist" does not exist
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------------------+------------------+-------------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 |                       |                  |                   | 
 (1 row)
 
 -- spilling the xact
@@ -121,20 +126,20 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count F
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
 -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
 BEGIN;
-SELECT slot_name FROM pg_stat_replication_slots;
-       slot_name        
-------------------------
- regression_slot_stats1
- regression_slot_stats2
- regression_slot_stats3
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
+       slot_name        |    plugin     
+------------------------+---------------
+ regression_slot_stats1 | test_decoding
+ regression_slot_stats2 | test_decoding
+ regression_slot_stats3 | test_decoding
 (3 rows)
 
-SELECT slot_name FROM pg_stat_replication_slots;
-       slot_name        
-------------------------
- regression_slot_stats1
- regression_slot_stats2
- regression_slot_stats3
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
+       slot_name        |    plugin     
+------------------------+---------------
+ regression_slot_stats1 | test_decoding
+ regression_slot_stats2 | test_decoding
+ regression_slot_stats3 | test_decoding
 (3 rows)
 
 COMMIT;
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index a022fe1bf07..1077cea5855 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -15,16 +15,21 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL,
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1');
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1');
 SELECT pg_stat_force_next_flush();
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+-- total_txns may vary based on the background activity but sent_txns should
+-- always be 1 since the background transactions are always skipped. Filtered
+-- bytes would be set only when there's a change that was passed to the plugin
+-- but was filtered out. Depending upon the background transactions, filtered
+-- bytes may or may not be zero.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
 RESET logical_decoding_work_mem;
 
 -- reset stats for one slot, others should be unaffected
 SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
 
 -- reset stats for all slots
 SELECT pg_stat_reset_replication_slot(NULL);
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
 
 -- verify accessing/resetting stats for non-existent slot does something reasonable
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
@@ -46,8 +51,8 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count F
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
 -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
 BEGIN;
-SELECT slot_name FROM pg_stat_replication_slots;
-SELECT slot_name FROM pg_stat_replication_slots;
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
 COMMIT;
 
 
diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
index 0de62edb7d8..76dd86fc420 100644
--- a/contrib/test_decoding/t/001_repl_stats.pl
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -23,10 +23,16 @@ sub test_slot_stats
 
 	my ($node, $expected, $msg) = @_;
 
+	# If there are background transactions which are filtered out by the output
+	# plugin, plugin_filtered_bytes may be greater than 0. But it's not
+	# guaranteed that such transactions would be present.
 	my $result = $node->safe_psql(
 		'postgres', qq[
 		SELECT slot_name, total_txns > 0 AS total_txn,
-			   total_bytes > 0 AS total_bytes
+			   total_bytes > 0 AS total_bytes,
+			   plugin_sent_txns > 0 AS sent_txn,
+			   plugin_sent_bytes > 0 AS sent_bytes,
+			   plugin_filtered_bytes >= 0 AS filtered_bytes
 			   FROM pg_stat_replication_slots
 			   ORDER BY slot_name]);
 	is($result, $expected, $msg);
@@ -80,9 +86,9 @@ $node->start;
 # restart.
 test_slot_stats(
 	$node,
-	qq(regression_slot1|t|t
-regression_slot2|t|t
-regression_slot3|t|t),
+	qq(regression_slot1|t|t|t|t|t
+regression_slot2|t|t|t|t|t
+regression_slot3|t|t|t|t|t),
 	'check replication statistics are updated');
 
 # Test to remove one of the replication slots and adjust
@@ -104,8 +110,8 @@ $node->start;
 # restart.
 test_slot_stats(
 	$node,
-	qq(regression_slot1|t|t
-regression_slot2|t|t),
+	qq(regression_slot1|t|t|t|t|t
+regression_slot2|t|t|t|t|t),
 	'check replication statistics after removing the slot file');
 
 # cleanup
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index bb495563200..a77309ce438 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -173,6 +173,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->only_local = false;
 
 	ctx->output_plugin_private = data;
+	ctx->stats = palloc0(sizeof(OutputPluginStats));
 
 	opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
 	opt->receive_rewrites = false;
@@ -310,6 +311,7 @@ static void
 pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 {
 	OutputPluginPrepareWrite(ctx, last_write);
+	ctx->stats->sentTxns++;
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
 	else
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 593f784b69d..a84c2f00195 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -774,6 +774,33 @@ typedef struct OutputPluginOptions
       needs to have a state, it can
       use <literal>ctx-&gt;output_plugin_private</literal> to store it.
      </para>
+
+     <para>
+      The startup callback may initialize <literal>ctx-&gt;stats</literal>,
+      typically as follows, if it chooses to maintain and report statistics
+      about its activity in <structname>pg_stat_replication_slots</structname>.
+<programlisting>
+ctx->stats = palloc0(sizeof(OutputPluginStats));
+</programlisting>
+      where <literal>OutputPluginStats</literal> is defined as follows:
+<programlisting>
+typedef struct OutputPluginStats
+{
+      int64   sentTxns;
+      int64   sentBytes;
+      int64   filteredBytes;
+} OutputPluginStats;
+</programlisting>
+      <literal>sentTxns</literal> is the number of transactions sent downstream
+      by the output plugin. <literal>sentBytes</literal> is the amount of data
+      sent downstream by the output plugin.
+      <function>OutputPluginWrite</function> is expected to update this counter
+      if <literal>ctx-&gt;stats</literal> is initialized by the output plugin.
+      <literal>filteredBytes</literal> is the size of changes in bytes that are
+      filtered out by the output plugin. Function
+      <literal>ReorderBufferChangeSize</literal> may be used to find the size of
+      filtered <literal>ReorderBufferChange</literal>.
+     </para>
     </sect3>
 
     <sect3 id="logicaldecoding-output-plugin-shutdown">
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 823afe1b30b..d8efae0cfd2 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1545,6 +1545,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>plugin</structfield> <type>text</type>
+       </para>
+       <para>
+        The base name of the shared object containing the output plugin this
+        logical slot is using. This column is same as the one in
+        <structname>pg_replication_slots</structname>.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>spill_txns</structfield> <type>bigint</type>
@@ -1644,6 +1655,53 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>plugin_filtered_bytes</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Amount of changes, from <structfield>total_bytes</structfield>, filtered
+        out by the output plugin and not sent downstream. Please note that it
+        does not include the changes filtered before a change is handed over to
+        the output plugin, e.g. the changes filtered by origin. The count is
+        maintained by the output plugin mentioned in
+        <structfield>plugin</structfield>. It is NULL when statistics is not
+        initialized or immediately after a reset or when not maintained by the
+        output plugin.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>plugin_sent_txns</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Number of decoded transactions sent downstream for this slot. This
+        counts top-level transactions only, and is not incremented for
+        subtransactions. These transactions are subset of transctions sent to
+        the decoding plugin. Hence this count is expected to be lesser than or
+        equal to <structfield>total_txns</structfield>.  The count is maintained
+        by the output plugin mentioned in <structfield>plugin</structfield>.  It
+        is NULL when statistics is not initialized or immediately after a reset or
+        when not maintained by the output plugin.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>plugin_sent_bytes</structfield><type>bigint</type>
+       </para>
+       <para>
+        Amount of transaction changes sent downstream for this slot by the
+        output plugin after applying filtering and converting into its output
+        format. The count is maintained by the output plugin mentioned in
+        <structfield>plugin</structfield>.  It is NULL when statistics is not
+        initialized or immediately after a reset or when not maintained by the
+        output plugin.
+       </para>
+      </entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index f6eca09ee15..930e6309d71 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1053,6 +1053,7 @@ CREATE VIEW pg_replication_slots AS
 CREATE VIEW pg_stat_replication_slots AS
     SELECT
             s.slot_name,
+            r.plugin,
             s.spill_txns,
             s.spill_count,
             s.spill_bytes,
@@ -1061,6 +1062,9 @@ CREATE VIEW pg_stat_replication_slots AS
             s.stream_bytes,
             s.total_txns,
             s.total_bytes,
+            s.plugin_filtered_bytes,
+            s.plugin_sent_txns,
+            s.plugin_sent_bytes,
             s.stats_reset
     FROM pg_replication_slots as r,
         LATERAL pg_stat_get_replication_slot(slot_name) as s
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 7e363a7c05b..9d77e57af65 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1952,13 +1952,14 @@ void
 UpdateDecodingStats(LogicalDecodingContext *ctx)
 {
 	ReorderBuffer *rb = ctx->reorder;
+	OutputPluginStats *stats = ctx->stats;
 	PgStat_StatReplSlotEntry repSlotStat;
 
 	/* Nothing to do if we don't have any replication stats to be sent. */
 	if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
 		return;
 
-	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
+	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " (%s) %" PRId64 " %" PRId64 " %" PRId64,
 		 rb,
 		 rb->spillTxns,
 		 rb->spillCount,
@@ -1967,7 +1968,11 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 		 rb->streamCount,
 		 rb->streamBytes,
 		 rb->totalTxns,
-		 rb->totalBytes);
+		 rb->totalBytes,
+		 stats ? "plugin has stats" : "plugin has no stats",
+		 stats ? stats->sentTxns : 0,
+		 stats ? stats->sentBytes : 0,
+		 stats ? stats->filteredBytes : 0);
 
 	repSlotStat.spill_txns = rb->spillTxns;
 	repSlotStat.spill_count = rb->spillCount;
@@ -1977,6 +1982,15 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	repSlotStat.stream_bytes = rb->streamBytes;
 	repSlotStat.total_txns = rb->totalTxns;
 	repSlotStat.total_bytes = rb->totalBytes;
+	if (stats)
+	{
+		repSlotStat.plugin_has_stats = true;
+		repSlotStat.sent_txns = stats->sentTxns;
+		repSlotStat.sent_bytes = stats->sentBytes;
+		repSlotStat.filtered_bytes = stats->filteredBytes;
+	}
+	else
+		repSlotStat.plugin_has_stats = false;
 
 	pgstat_report_replslot(ctx->slot, &repSlotStat);
 
@@ -1988,6 +2002,12 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->streamBytes = 0;
 	rb->totalTxns = 0;
 	rb->totalBytes = 0;
+	if (stats)
+	{
+		stats->sentTxns = 0;
+		stats->sentBytes = 0;
+		stats->filteredBytes = 0;
+	}
 }
 
 /*
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index ca53caac2f2..00a53d1857e 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -89,6 +89,13 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 	values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
 
 	tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
+
+	/*
+	 * If output plugin has chosen to maintain its stats, update the amount of
+	 * data sent downstream.
+	 */
+	if (ctx->stats)
+		ctx->stats->sentBytes += ctx->out->len + sizeof(XLogRecPtr) + sizeof(TransactionId);
 	p->returned_rows++;
 }
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5febd154b6b..9edaece0282 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -310,7 +310,6 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
  * memory accounting
  * ---------------------------------------
  */
-static Size ReorderBufferChangeSize(ReorderBufferChange *change);
 static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 											ReorderBufferChange *change,
 											ReorderBufferTXN *txn,
@@ -4421,7 +4420,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 /*
  * Size of a change in memory.
  */
-static Size
+Size
 ReorderBufferChangeSize(ReorderBufferChange *change)
 {
 	Size		sz = sizeof(ReorderBufferChange);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f4c977262c5..4ffcce0eb23 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -450,6 +450,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 										 ALLOCSET_SMALL_SIZES);
 
 	ctx->output_plugin_private = data;
+	ctx->stats = palloc0(sizeof(OutputPluginStats));
 
 	/* This plugin uses binary protocol. */
 	opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
@@ -591,6 +592,7 @@ pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
 	logicalrep_write_begin(ctx->out, txn);
 	txndata->sent_begin_txn = true;
+	ctx->stats->sentTxns++;
 
 	send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
 					 send_replication_origin);
@@ -1469,7 +1471,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TupleTableSlot *new_slot = NULL;
 
 	if (!is_publishable_relation(relation))
+	{
+		ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
 		return;
+	}
 
 	/*
 	 * Remember the xid for the change in streaming mode. We need to send xid
@@ -1487,15 +1492,24 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			if (!relentry->pubactions.pubinsert)
+			{
+				ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			if (!relentry->pubactions.pubupdate)
+			{
+				ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (!relentry->pubactions.pubdelete)
+			{
+				ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
 				return;
+			}
 
 			/*
 			 * This is only possible if deletes are allowed even when replica
@@ -1505,6 +1519,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			if (!change->data.tp.oldtuple)
 			{
 				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+				ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
 				return;
 			}
 			break;
@@ -1560,7 +1575,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	 * of the row filter for old and new tuple.
 	 */
 	if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
+	{
+		ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
 		goto cleanup;
+	}
 
 	/*
 	 * Send BEGIN if we haven't yet.
@@ -1688,6 +1706,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 								  change->data.truncate.restart_seqs);
 		OutputPluginWrite(ctx, true);
 	}
+	else
+		ctx->stats->filteredBytes += ReorderBufferChangeSize(change);
+
 
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ee911394a23..59dda2c2bb7 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1572,6 +1572,13 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	/* output previously gathered data in a CopyData packet */
 	pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len);
 
+	/*
+	 * If output plugin maintains statistics, update the amount of data sent
+	 * downstream.
+	 */
+	if (ctx->stats)
+		ctx->stats->sentBytes += ctx->out->len + 1; /* +1 for the 'd' */
+
 	CHECK_FOR_INTERRUPTS();
 
 	/* Try to flush pending output to the client */
diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c
index ccfb11c49bf..ed055324a99 100644
--- a/src/backend/utils/activity/pgstat_replslot.c
+++ b/src/backend/utils/activity/pgstat_replslot.c
@@ -96,6 +96,13 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re
 	REPLSLOT_ACC(stream_bytes);
 	REPLSLOT_ACC(total_txns);
 	REPLSLOT_ACC(total_bytes);
+	statent->plugin_has_stats = repSlotStat->plugin_has_stats;
+	if (repSlotStat->plugin_has_stats)
+	{
+		REPLSLOT_ACC(sent_txns);
+		REPLSLOT_ACC(sent_bytes);
+		REPLSLOT_ACC(filtered_bytes);
+	}
 #undef REPLSLOT_ACC
 
 	pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 1c12ddbae49..2add51e8f3a 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2100,7 +2100,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 13
 	text	   *slotname_text = PG_GETARG_TEXT_P(0);
 	NameData	slotname;
 	TupleDesc	tupdesc;
@@ -2129,7 +2129,13 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "plugin_filtered_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "plugin_sent_txns",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 12, "plugin_sent_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2154,11 +2160,23 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 	values[6] = Int64GetDatum(slotent->stream_bytes);
 	values[7] = Int64GetDatum(slotent->total_txns);
 	values[8] = Int64GetDatum(slotent->total_bytes);
+	if (slotent->plugin_has_stats)
+	{
+		values[9] = Int64GetDatum(slotent->filtered_bytes);
+		values[10] = Int64GetDatum(slotent->sent_txns);
+		values[11] = Int64GetDatum(slotent->sent_bytes);
+	}
+	else
+	{
+		nulls[9] = true;
+		nulls[10] = true;
+		nulls[11] = true;
+	}
 
 	if (slotent->stat_reset_timestamp == 0)
-		nulls[9] = true;
+		nulls[12] = true;
 	else
-		values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+		values[12] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 3ee8fed7e53..267406ae906 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5675,9 +5675,9 @@
 { oid => '6169', descr => 'statistics: information about replication slot',
   proname => 'pg_stat_get_replication_slot', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'text',
-  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
+  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,plugin_filtered_bytes,plugin_sent_txns,plugin_sent_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
 
 { oid => '6230', descr => 'statistics: check if a stats object exists',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 202bd2d5ace..9779e5dc5a8 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -395,6 +395,10 @@ typedef struct PgStat_StatReplSlotEntry
 	PgStat_Counter stream_bytes;
 	PgStat_Counter total_txns;
 	PgStat_Counter total_bytes;
+	bool		plugin_has_stats;
+	PgStat_Counter sent_txns;
+	PgStat_Counter sent_bytes;
+	PgStat_Counter filtered_bytes;
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatReplSlotEntry;
 
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 2e562bee5a9..010c59f783d 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -52,6 +52,7 @@ typedef struct LogicalDecodingContext
 
 	OutputPluginCallbacks callbacks;
 	OutputPluginOptions options;
+	OutputPluginStats *stats;
 
 	/*
 	 * User specified options
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 8d4d5b71887..02018f0593c 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -29,6 +29,19 @@ typedef struct OutputPluginOptions
 	bool		receive_rewrites;
 } OutputPluginOptions;
 
+/*
+ * Statistics about the transactions decoded and sent downstream by the output
+ * plugin.
+ */
+typedef struct OutputPluginStats
+{
+	int64		sentTxns;		/* number of transactions decoded and sent
+								 * downstream */
+	int64		sentBytes;		/* amount of data decoded and sent downstream */
+	int64		filteredBytes;	/* amount of data from reoder buffer that was
+								 * filtered out by the output plugin */
+} OutputPluginStats;
+
 /*
  * Type of the shared library symbol _PG_output_plugin_init that is looked up
  * when loading an output plugin shared library.
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index fa0745552f8..3ea2d9885b6 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -715,6 +715,7 @@ extern void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids);
 extern void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid,
 									 XLogRecPtr lsn, ReorderBufferChange *change,
 									 bool toast_insert);
+extern Size ReorderBufferChangeSize(ReorderBufferChange *change);
 extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 									  Snapshot snap, XLogRecPtr lsn,
 									  bool transactional, const char *prefix,
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index 2137c4e5e30..b04a0d9f8db 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -212,10 +212,10 @@ my $stats_test_slot2 = 'logical_slot';
 # Stats exist for stats test slot 1
 is( $node_primary->safe_psql(
 		'postgres',
-		qq(SELECT total_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+		qq(SELECT total_bytes > 0, plugin_sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
 	),
-	qq(t|t),
-	qq(Total bytes is > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
+	qq(t|t|t),
+	qq(Total bytes and plugin sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
 );
 
 # Do reset of stats for stats test slot 1
@@ -233,10 +233,10 @@ $node_primary->safe_psql('postgres',
 
 is( $node_primary->safe_psql(
 		'postgres',
-		qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+		qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0, plugin_sent_bytes is NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
 	),
-	qq(t|t),
-	qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes was set to 0.)
+	qq(t|t|t),
+	qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes and plugin_sent_bytes were set to 0 and NULL respectively.)
 );
 
 # Check that test slot 2 has NULL in reset timestamp
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index dce8c672b40..acf3c4e4294 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2127,6 +2127,7 @@ pg_stat_replication| SELECT s.pid,
      JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_replication_slots| SELECT s.slot_name,
+    r.plugin,
     s.spill_txns,
     s.spill_count,
     s.spill_bytes,
@@ -2135,9 +2136,12 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.stream_bytes,
     s.total_txns,
     s.total_bytes,
+    s.plugin_filtered_bytes,
+    s.plugin_sent_txns,
+    s.plugin_sent_bytes,
     s.stats_reset
    FROM pg_replication_slots r,
-    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset)
+    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, plugin_filtered_bytes, plugin_sent_txns, plugin_sent_bytes, stats_reset)
   WHERE (r.datoid IS NOT NULL);
 pg_stat_slru| SELECT name,
     blks_zeroed,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index a8656419cb6..45da79caead 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1830,6 +1830,7 @@ OuterJoinClauseInfo
 OutputPluginCallbacks
 OutputPluginOptions
 OutputPluginOutputType
+OutputPluginStats
 OverridingKind
 PACE_HEADER
 PACL

base-commit: df335618ed87eecdef44a95e453e345a55a14ad8
-- 
2.34.1

Reply via email to