On Fri, Sep 19, 2025 at 11:48 AM shveta malik <shveta.ma...@gmail.com> wrote: > > On Thu, Sep 18, 2025 at 3:54 PM Ashutosh Bapat > <ashutosh.bapat....@gmail.com> wrote: > > > > > > > > > > Few comments: > > > > > > 1) > > > postgres=# select slot_name, > > > total_bytes,plugin_filtered_bytes,plugin_sent_bytes from > > > pg_stat_replication_slots order by slot_name; > > > slot_name | total_bytes | plugin_filtered_bytes | plugin_sent_bytes > > > -----------+-------------+-----------------------+------------------- > > > slot1 | 800636 | 793188 | 211 > > > sub1 | 401496 | 132712 | 84041 > > > sub2 | 401496 | 396184 | 674 > > > sub3 | 401496 | 145912 | 79959 > > > (4 rows) > > > > > > Currently it looks quite confusing. 'total_bytes' gives a sense that > > > it has to be a sum of filtered and sent. But they are no way like > > > that. In the thread earlier there was a proposal to change the name to > > > reordered_txns, reordered_bytes. That looks better to me. It will give > > > clarity without even someone digging into docs. > > > > I also agree with that. But that will break backward compatibility. > > Yes, that it will do. > > > Do > > you think other columns like spill_* and stream_* should also be > > renamed with the prefix "reordered"? > > > > Okay, I see that all fields in pg_stat_replication_slots are related > to the ReorderBuffer. On reconsideration, I’m unsure whether it's > appropriate to prefix all of them with reorderd_. For example, > renaming spill_bytes and stream_bytes to reordered_spill_bytes and > reordered_stream_bytes. These names start to feel overly long, and I > also noticed that ReorderBuffer isn’t clearly defined anywhere in the > documentation (or at least I couldn’t find it), even though the term > 'reorder buffer' does appear in a few places. > > As an example, see ReorderBufferRead, ReorderBufferWrite wait-types > at [1]. Also in plugin-doc [2], we use 'ReorderBufferTXN'. And now, we > are adding: ReorderBufferChangeSize, ReorderBufferChange > > This gives me a feeling, will it be better to let > pg_stat_replication_slots as is and add a brief ReorderBuffer section > under Logical Decoding concepts [3] just before Output Plugins. And > then, pg_stat_replication_slots can refer to that section, clarifying > that the bytes, counts, and txn fields pertain to ReorderBuffer > (without changing any of the fields). > > And then to define plugin related data, we can have a new view, say > pg_stat_plugin_stats (as Amit suggested earlier) or > pg_stat_replication_plugins. I understand that adding a new view might > not be desirable, but it provides better clarity without requiring > changes to the existing fields in pg_stat_replication_slots. I also > strongly feel that to properly tie all this information together, a > brief definition of the ReorderBuffer is needed. Other pages that > reference this term can then point to that section. Thoughts?
Even if we keep two views, when they are joined, users will still get confused by total_* names. So it's not solving the underlying problem. Andres had raised the point about renaming total_* fields with me off-list earlier. He suggested names total_wal_bytes, and total_wal_txns in an off list discussion today. I think those convey the true meaning - that these are txns and bytes that come from WAL. Used those in the attached patches. Prefix reordered would give away lower level details, so I didn't use it. I agree that it would be good to mention ReorderBuffer in the logical decoding concepts section since it mentions structures ReorderBuffer*. But that would be a separate patch since we aren't using "reordered" in the names of the fields. 0001 is the previous patch 0002 changes addressing your and Bertrand's comments. -- Best Wishes, Ashutosh Bapat
From 9d80a42c1932acff63357148a830df7ada8dfaca Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat <ashutosh.bapat....@gmail.com> Date: Fri, 27 Jun 2025 09:16:23 +0530 Subject: [PATCH 1/2] Report output plugin statistics in pg_stat_replication_slots As of now pg_stat_replication_slots reports statistics about the reorder buffer, but it does not report output plugin statistics like the amount of data filtered by the output plugin, amount of data sent downstream or the number of transactions sent downstream. This statistics is useful when investigating issues related to a slow downstream. This commit adds following fields to pg_stat_replication_slots - plugin_filtered_bytes is the amount of changes filtered out by the output plugin - plugin_sent_txns is the amount of transactions sent downstream by the output plugin - plugin_sent_bytes is the amount of data sent downstream by the output plugin. The prefix "plugin_" indicates that these counters are related to and maintained by the output plugin. An output plugin may choose not to initialize LogicalDecodingContext::stats, which holds these counters, in which case the above columns will be reported as NULL. Filtered bytes are reported next to total_bytes to keep these two closely related fields together. Additionally report name of the output plugin in the view for an easy reference. Author: Ashutosh Bapat <ashutosh.bapat....@gmail.com> Author: Amit Kapila <amit.kapil...@gmail.com> Discussion: https://www.postgresql.org/message-id/CAExHW5s6KntzUyUoMbKR5dgwRmdV2Ay_2+AnTgYGAzo=qv6...@mail.gmail.com --- contrib/test_decoding/expected/stats.out | 77 ++++++++++--------- contrib/test_decoding/sql/stats.sql | 15 ++-- contrib/test_decoding/t/001_repl_stats.pl | 18 +++-- contrib/test_decoding/test_decoding.c | 2 + doc/src/sgml/logicaldecoding.sgml | 27 +++++++ doc/src/sgml/monitoring.sgml | 58 ++++++++++++++ src/backend/catalog/system_views.sql | 4 + src/backend/replication/logical/logical.c | 24 +++++- .../replication/logical/logicalfuncs.c | 7 ++ .../replication/logical/reorderbuffer.c | 3 +- src/backend/replication/pgoutput/pgoutput.c | 21 +++++ src/backend/replication/walsender.c | 7 ++ src/backend/utils/activity/pgstat_replslot.c | 7 ++ src/backend/utils/adt/pgstatfuncs.c | 26 ++++++- src/include/catalog/pg_proc.dat | 6 +- src/include/pgstat.h | 4 + src/include/replication/logical.h | 1 + src/include/replication/output_plugin.h | 13 ++++ src/include/replication/reorderbuffer.h | 1 + src/test/recovery/t/006_logical_decoding.pl | 12 +-- src/test/regress/expected/rules.out | 6 +- src/tools/pgindent/typedefs.list | 1 + 22 files changed, 275 insertions(+), 65 deletions(-) diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out index de6dc416130..d19fe6a1c61 100644 --- a/contrib/test_decoding/expected/stats.out +++ b/contrib/test_decoding/expected/stats.out @@ -37,12 +37,17 @@ SELECT pg_stat_force_next_flush(); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes -------------------------+------------+-------------+------------+------------- - regression_slot_stats1 | t | t | t | t - regression_slot_stats2 | t | t | t | t - regression_slot_stats3 | t | t | t | t +-- total_txns may vary based on the background activity but sent_txns should +-- always be 1 since the background transactions are always skipped. Filtered +-- bytes would be set only when there's a change that was passed to the plugin +-- but was filtered out. Depending upon the background transactions, filtered +-- bytes may or may not be zero. +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_txns | total_bytes | plugin_sent_txns | sent_bytes | filtered_bytes +------------------------+------------+-------------+------------+-------------+------------------+------------+---------------- + regression_slot_stats1 | t | t | t | t | 1 | t | t + regression_slot_stats2 | t | t | t | t | 1 | t | t + regression_slot_stats3 | t | t | t | t | 1 | t | t (3 rows) RESET logical_decoding_work_mem; @@ -53,12 +58,12 @@ SELECT pg_stat_reset_replication_slot('regression_slot_stats1'); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes -------------------------+------------+-------------+------------+------------- - regression_slot_stats1 | t | t | f | f - regression_slot_stats2 | t | t | t | t - regression_slot_stats3 | t | t | t | t +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_txns | total_bytes | plugin_sent_txns | sent_bytes | filtered_bytes +------------------------+------------+-------------+------------+-------------+------------------+------------+---------------- + regression_slot_stats1 | t | t | f | f | | | + regression_slot_stats2 | t | t | t | t | 1 | t | t + regression_slot_stats3 | t | t | t | t | 1 | t | t (3 rows) -- reset stats for all slots @@ -68,27 +73,27 @@ SELECT pg_stat_reset_replication_slot(NULL); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes -------------------------+------------+-------------+------------+------------- - regression_slot_stats1 | t | t | f | f - regression_slot_stats2 | t | t | f | f - regression_slot_stats3 | t | t | f | f +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_txns | total_bytes | plugin_sent_txns | plugin_sent_bytes | plugin_filtered_bytes +------------------------+------------+-------------+------------+-------------+------------------+-------------------+----------------------- + regression_slot_stats1 | t | t | f | f | | | + regression_slot_stats2 | t | t | f | f | | | + regression_slot_stats3 | t | t | f | f | | | (3 rows) -- verify accessing/resetting stats for non-existent slot does something reasonable SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------------------+------------------+-------------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | | | (1 row) SELECT pg_stat_reset_replication_slot('do-not-exist'); ERROR: replication slot "do-not-exist" does not exist SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------------------+------------------+-------------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | | | (1 row) -- spilling the xact @@ -121,20 +126,20 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count F -- Ensure stats can be repeatedly accessed using the same stats snapshot. See -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de BEGIN; -SELECT slot_name FROM pg_stat_replication_slots; - slot_name ------------------------- - regression_slot_stats1 - regression_slot_stats2 - regression_slot_stats3 +SELECT slot_name, plugin FROM pg_stat_replication_slots; + slot_name | plugin +------------------------+--------------- + regression_slot_stats1 | test_decoding + regression_slot_stats2 | test_decoding + regression_slot_stats3 | test_decoding (3 rows) -SELECT slot_name FROM pg_stat_replication_slots; - slot_name ------------------------- - regression_slot_stats1 - regression_slot_stats2 - regression_slot_stats3 +SELECT slot_name, plugin FROM pg_stat_replication_slots; + slot_name | plugin +------------------------+--------------- + regression_slot_stats1 | test_decoding + regression_slot_stats2 | test_decoding + regression_slot_stats3 | test_decoding (3 rows) COMMIT; diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql index a022fe1bf07..1077cea5855 100644 --- a/contrib/test_decoding/sql/stats.sql +++ b/contrib/test_decoding/sql/stats.sql @@ -15,16 +15,21 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL, SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1'); SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1'); SELECT pg_stat_force_next_flush(); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; +-- total_txns may vary based on the background activity but sent_txns should +-- always be 1 since the background transactions are always skipped. Filtered +-- bytes would be set only when there's a change that was passed to the plugin +-- but was filtered out. Depending upon the background transactions, filtered +-- bytes may or may not be zero. +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; RESET logical_decoding_work_mem; -- reset stats for one slot, others should be unaffected SELECT pg_stat_reset_replication_slot('regression_slot_stats1'); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; -- reset stats for all slots SELECT pg_stat_reset_replication_slot(NULL); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; -- verify accessing/resetting stats for non-existent slot does something reasonable SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); @@ -46,8 +51,8 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count F -- Ensure stats can be repeatedly accessed using the same stats snapshot. See -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de BEGIN; -SELECT slot_name FROM pg_stat_replication_slots; -SELECT slot_name FROM pg_stat_replication_slots; +SELECT slot_name, plugin FROM pg_stat_replication_slots; +SELECT slot_name, plugin FROM pg_stat_replication_slots; COMMIT; diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl index 0de62edb7d8..76dd86fc420 100644 --- a/contrib/test_decoding/t/001_repl_stats.pl +++ b/contrib/test_decoding/t/001_repl_stats.pl @@ -23,10 +23,16 @@ sub test_slot_stats my ($node, $expected, $msg) = @_; + # If there are background transactions which are filtered out by the output + # plugin, plugin_filtered_bytes may be greater than 0. But it's not + # guaranteed that such transactions would be present. my $result = $node->safe_psql( 'postgres', qq[ SELECT slot_name, total_txns > 0 AS total_txn, - total_bytes > 0 AS total_bytes + total_bytes > 0 AS total_bytes, + plugin_sent_txns > 0 AS sent_txn, + plugin_sent_bytes > 0 AS sent_bytes, + plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name]); is($result, $expected, $msg); @@ -80,9 +86,9 @@ $node->start; # restart. test_slot_stats( $node, - qq(regression_slot1|t|t -regression_slot2|t|t -regression_slot3|t|t), + qq(regression_slot1|t|t|t|t|t +regression_slot2|t|t|t|t|t +regression_slot3|t|t|t|t|t), 'check replication statistics are updated'); # Test to remove one of the replication slots and adjust @@ -104,8 +110,8 @@ $node->start; # restart. test_slot_stats( $node, - qq(regression_slot1|t|t -regression_slot2|t|t), + qq(regression_slot1|t|t|t|t|t +regression_slot2|t|t|t|t|t), 'check replication statistics after removing the slot file'); # cleanup diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index f671a7d4b31..ea5c527644b 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -173,6 +173,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->only_local = false; ctx->output_plugin_private = data; + ctx->stats = palloc0(sizeof(OutputPluginStats)); opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; opt->receive_rewrites = false; @@ -310,6 +311,7 @@ static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) { OutputPluginPrepareWrite(ctx, last_write); + ctx->stats->sentTxns++; if (data->include_xids) appendStringInfo(ctx->out, "BEGIN %u", txn->xid); else diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index b803a819cf1..8ac10cda90c 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -938,6 +938,33 @@ typedef struct OutputPluginOptions needs to have a state, it can use <literal>ctx->output_plugin_private</literal> to store it. </para> + + <para> + The startup callback may initialize <literal>ctx->stats</literal>, + typically as follows, if it chooses to maintain and report statistics + about its activity in <structname>pg_stat_replication_slots</structname>. +<programlisting> +ctx->stats = palloc0(sizeof(OutputPluginStats)); +</programlisting> + where <literal>OutputPluginStats</literal> is defined as follows: +<programlisting> +typedef struct OutputPluginStats +{ + int64 sentTxns; + int64 sentBytes; + int64 filteredBytes; +} OutputPluginStats; +</programlisting> + <literal>sentTxns</literal> is the number of transactions sent downstream + by the output plugin. <literal>sentBytes</literal> is the amount of data + sent downstream by the output plugin. + <function>OutputPluginWrite</function> is expected to update this counter + if <literal>ctx->stats</literal> is initialized by the output plugin. + <literal>filteredBytes</literal> is the size of changes in bytes that are + filtered out by the output plugin. Function + <literal>ReorderBufferChangeSize</literal> may be used to find the size of + filtered <literal>ReorderBufferChange</literal>. + </para> </sect3> <sect3 id="logicaldecoding-output-plugin-shutdown"> diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 3f4a27a736e..e121f55c9c2 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1545,6 +1545,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage </para></entry> </row> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>plugin</structfield> <type>text</type> + </para> + <para> + The base name of the shared object containing the output plugin this + logical slot is using. This column is same as the one in + <structname>pg_replication_slots</structname>. + </para></entry> + </row> + <row> <entry role="catalog_table_entry"><para role="column_definition"> <structfield>spill_txns</structfield> <type>bigint</type> @@ -1644,6 +1655,53 @@ description | Waiting for a newly initialized WAL file to reach durable storage </entry> </row> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>plugin_filtered_bytes</structfield> <type>bigint</type> + </para> + <para> + Amount of changes, from <structfield>total_bytes</structfield>, filtered + out by the output plugin and not sent downstream. Please note that it + does not include the changes filtered before a change is handed over to + the output plugin, e.g. the changes filtered by origin. The count is + maintained by the output plugin mentioned in + <structfield>plugin</structfield>. It is NULL when statistics is not + initialized or immediately after a reset or when not maintained by the + output plugin. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>plugin_sent_txns</structfield> <type>bigint</type> + </para> + <para> + Number of decoded transactions sent downstream for this slot. This + counts top-level transactions only, and is not incremented for + subtransactions. These transactions are subset of transctions sent to + the decoding plugin. Hence this count is expected to be lesser than or + equal to <structfield>total_txns</structfield>. The count is maintained + by the output plugin mentioned in <structfield>plugin</structfield>. It + is NULL when statistics is not initialized or immediately after a reset or + when not maintained by the output plugin. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>plugin_sent_bytes</structfield><type>bigint</type> + </para> + <para> + Amount of transaction changes sent downstream for this slot by the + output plugin after applying filtering and converting into its output + format. The count is maintained by the output plugin mentioned in + <structfield>plugin</structfield>. It is NULL when statistics is not + initialized or immediately after a reset or when not maintained by the + output plugin. + </para> + </entry> + </row> + <row> <entry role="catalog_table_entry"><para role="column_definition"> <structfield>stats_reset</structfield> <type>timestamp with time zone</type> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index c77fa0234bb..d38c21150b0 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1053,6 +1053,7 @@ CREATE VIEW pg_replication_slots AS CREATE VIEW pg_stat_replication_slots AS SELECT s.slot_name, + r.plugin, s.spill_txns, s.spill_count, s.spill_bytes, @@ -1061,6 +1062,9 @@ CREATE VIEW pg_stat_replication_slots AS s.stream_bytes, s.total_txns, s.total_bytes, + s.plugin_filtered_bytes, + s.plugin_sent_txns, + s.plugin_sent_bytes, s.stats_reset FROM pg_replication_slots as r, LATERAL pg_stat_get_replication_slot(slot_name) as s diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index c68c0481f42..b26ac29e32f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1952,13 +1952,14 @@ void UpdateDecodingStats(LogicalDecodingContext *ctx) { ReorderBuffer *rb = ctx->reorder; + OutputPluginStats *stats = ctx->stats; PgStat_StatReplSlotEntry repSlotStat; /* Nothing to do if we don't have any replication stats to be sent. */ if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0) return; - elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, + elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " (%s) %" PRId64 " %" PRId64 " %" PRId64, rb, rb->spillTxns, rb->spillCount, @@ -1967,7 +1968,11 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->streamCount, rb->streamBytes, rb->totalTxns, - rb->totalBytes); + rb->totalBytes, + stats ? "plugin has stats" : "plugin has no stats", + stats ? stats->sentTxns : 0, + stats ? stats->sentBytes : 0, + stats ? stats->filteredBytes : 0); repSlotStat.spill_txns = rb->spillTxns; repSlotStat.spill_count = rb->spillCount; @@ -1977,6 +1982,15 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) repSlotStat.stream_bytes = rb->streamBytes; repSlotStat.total_txns = rb->totalTxns; repSlotStat.total_bytes = rb->totalBytes; + if (stats) + { + repSlotStat.plugin_has_stats = true; + repSlotStat.sent_txns = stats->sentTxns; + repSlotStat.sent_bytes = stats->sentBytes; + repSlotStat.filtered_bytes = stats->filteredBytes; + } + else + repSlotStat.plugin_has_stats = false; pgstat_report_replslot(ctx->slot, &repSlotStat); @@ -1988,6 +2002,12 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->streamBytes = 0; rb->totalTxns = 0; rb->totalBytes = 0; + if (stats) + { + stats->sentTxns = 0; + stats->sentBytes = 0; + stats->filteredBytes = 0; + } } /* diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 25f890ddeed..788967e2ab1 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -89,6 +89,13 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len)); tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls); + + /* + * If output plugin has chosen to maintain its stats, update the amount of + * data sent downstream. + */ + if (ctx->stats) + ctx->stats->sentBytes += ctx->out->len + sizeof(XLogRecPtr) + sizeof(TransactionId); p->returned_rows++; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 4736f993c37..12579dff2c1 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -310,7 +310,6 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t * memory accounting * --------------------------------------- */ -static Size ReorderBufferChangeSize(ReorderBufferChange *change); static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, @@ -4436,7 +4435,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* * Size of a change in memory. */ -static Size +Size ReorderBufferChangeSize(ReorderBufferChange *change) { Size sz = sizeof(ReorderBufferChange); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 80540c017bd..339babbeb56 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -450,6 +450,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ALLOCSET_SMALL_SIZES); ctx->output_plugin_private = data; + ctx->stats = palloc0(sizeof(OutputPluginStats)); /* This plugin uses binary protocol. */ opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; @@ -591,6 +592,7 @@ pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); txndata->sent_begin_txn = true; + ctx->stats->sentTxns++; send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin); @@ -1469,7 +1471,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleTableSlot *new_slot = NULL; if (!is_publishable_relation(relation)) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } /* * Remember the xid for the change in streaming mode. We need to send xid @@ -1487,15 +1492,24 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: if (!relentry->pubactions.pubinsert) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } break; case REORDER_BUFFER_CHANGE_UPDATE: if (!relentry->pubactions.pubupdate) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } break; case REORDER_BUFFER_CHANGE_DELETE: if (!relentry->pubactions.pubdelete) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; + } /* * This is only possible if deletes are allowed even when replica @@ -1505,6 +1519,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!change->data.tp.oldtuple) { elog(DEBUG1, "didn't send DELETE change because of missing oldtuple"); + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); return; } break; @@ -1560,7 +1575,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * of the row filter for old and new tuple. */ if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action)) + { + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); goto cleanup; + } /* * Send BEGIN if we haven't yet. @@ -1688,6 +1706,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, change->data.truncate.restart_seqs); OutputPluginWrite(ctx, true); } + else + ctx->stats->filteredBytes += ReorderBufferChangeSize(change); + MemoryContextSwitchTo(old); MemoryContextReset(data->context); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 59822f22b8d..d9217ce49aa 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1573,6 +1573,13 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, /* output previously gathered data in a CopyData packet */ pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len); + /* + * If output plugin maintains statistics, update the amount of data sent + * downstream. + */ + if (ctx->stats) + ctx->stats->sentBytes += ctx->out->len + 1; /* +1 for the 'd' */ + CHECK_FOR_INTERRUPTS(); /* Try to flush pending output to the client */ diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c index ccfb11c49bf..ed055324a99 100644 --- a/src/backend/utils/activity/pgstat_replslot.c +++ b/src/backend/utils/activity/pgstat_replslot.c @@ -96,6 +96,13 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re REPLSLOT_ACC(stream_bytes); REPLSLOT_ACC(total_txns); REPLSLOT_ACC(total_bytes); + statent->plugin_has_stats = repSlotStat->plugin_has_stats; + if (repSlotStat->plugin_has_stats) + { + REPLSLOT_ACC(sent_txns); + REPLSLOT_ACC(sent_bytes); + REPLSLOT_ACC(filtered_bytes); + } #undef REPLSLOT_ACC pgstat_unlock_entry(entry_ref); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index c756c2bebaa..796dacddcfb 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2100,7 +2100,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slot(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_COLS 10 +#define PG_STAT_GET_REPLICATION_SLOT_COLS 13 text *slotname_text = PG_GETARG_TEXT_P(0); NameData slotname; TupleDesc tupdesc; @@ -2129,7 +2129,13 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "plugin_filtered_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "plugin_sent_txns", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "plugin_sent_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2154,11 +2160,23 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) values[6] = Int64GetDatum(slotent->stream_bytes); values[7] = Int64GetDatum(slotent->total_txns); values[8] = Int64GetDatum(slotent->total_bytes); + if (slotent->plugin_has_stats) + { + values[9] = Int64GetDatum(slotent->filtered_bytes); + values[10] = Int64GetDatum(slotent->sent_txns); + values[11] = Int64GetDatum(slotent->sent_bytes); + } + else + { + nulls[9] = true; + nulls[10] = true; + nulls[11] = true; + } if (slotent->stat_reset_timestamp == 0) - nulls[9] = true; + nulls[12] = true; else - values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp); + values[12] = TimestampTzGetDatum(slotent->stat_reset_timestamp); /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 01eba3b5a19..7519941bcf3 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5687,9 +5687,9 @@ { oid => '6169', descr => 'statistics: information about replication slot', proname => 'pg_stat_get_replication_slot', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'text', - proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}', + proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,plugin_filtered_bytes,plugin_sent_txns,plugin_sent_bytes,stats_reset}', prosrc => 'pg_stat_get_replication_slot' }, { oid => '6230', descr => 'statistics: check if a stats object exists', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index f402b17295c..87afeaed8a5 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -395,6 +395,10 @@ typedef struct PgStat_StatReplSlotEntry PgStat_Counter stream_bytes; PgStat_Counter total_txns; PgStat_Counter total_bytes; + bool plugin_has_stats; + PgStat_Counter sent_txns; + PgStat_Counter sent_bytes; + PgStat_Counter filtered_bytes; TimestampTz stat_reset_timestamp; } PgStat_StatReplSlotEntry; diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 2e562bee5a9..010c59f783d 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -52,6 +52,7 @@ typedef struct LogicalDecodingContext OutputPluginCallbacks callbacks; OutputPluginOptions options; + OutputPluginStats *stats; /* * User specified options diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 8d4d5b71887..02018f0593c 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -29,6 +29,19 @@ typedef struct OutputPluginOptions bool receive_rewrites; } OutputPluginOptions; +/* + * Statistics about the transactions decoded and sent downstream by the output + * plugin. + */ +typedef struct OutputPluginStats +{ + int64 sentTxns; /* number of transactions decoded and sent + * downstream */ + int64 sentBytes; /* amount of data decoded and sent downstream */ + int64 filteredBytes; /* amount of data from reoder buffer that was + * filtered out by the output plugin */ +} OutputPluginStats; + /* * Type of the shared library symbol _PG_output_plugin_init that is looked up * when loading an output plugin shared library. diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index fa0745552f8..3ea2d9885b6 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -715,6 +715,7 @@ extern void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids); extern void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert); +extern Size ReorderBufferChangeSize(ReorderBufferChange *change); extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 2137c4e5e30..b04a0d9f8db 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -212,10 +212,10 @@ my $stats_test_slot2 = 'logical_slot'; # Stats exist for stats test slot 1 is( $node_primary->safe_psql( 'postgres', - qq(SELECT total_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT total_bytes > 0, plugin_sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), - qq(t|t), - qq(Total bytes is > 0 and stats_reset is NULL for slot '$stats_test_slot1'.) + qq(t|t|t), + qq(Total bytes and plugin sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.) ); # Do reset of stats for stats test slot 1 @@ -233,10 +233,10 @@ $node_primary->safe_psql('postgres', is( $node_primary->safe_psql( 'postgres', - qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0, plugin_sent_bytes is NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), - qq(t|t), - qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes was set to 0.) + qq(t|t|t), + qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes and plugin_sent_bytes were set to 0 and NULL respectively.) ); # Check that test slot 2 has NULL in reset timestamp diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 35e8aad7701..2a048af3569 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2132,6 +2132,7 @@ pg_stat_replication| SELECT s.pid, JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_replication_slots| SELECT s.slot_name, + r.plugin, s.spill_txns, s.spill_count, s.spill_bytes, @@ -2140,9 +2141,12 @@ pg_stat_replication_slots| SELECT s.slot_name, s.stream_bytes, s.total_txns, s.total_bytes, + s.plugin_filtered_bytes, + s.plugin_sent_txns, + s.plugin_sent_bytes, s.stats_reset FROM pg_replication_slots r, - LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset) + LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, plugin_filtered_bytes, plugin_sent_txns, plugin_sent_bytes, stats_reset) WHERE (r.datoid IS NOT NULL); pg_stat_slru| SELECT name, blks_zeroed, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e90af5b2ad3..8f6af48b04a 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1830,6 +1830,7 @@ OuterJoinClauseInfo OutputPluginCallbacks OutputPluginOptions OutputPluginOutputType +OutputPluginStats OverridingKind PACE_HEADER PACL base-commit: 18cdf5932a279a2c035d44460e1e0cbb659471f2 -- 2.34.1
From e2500f1c610d2f73b53e353b98856a90db4cc452 Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat <ashutosh.bapat....@gmail.com> Date: Thu, 18 Sep 2025 15:55:01 +0530 Subject: [PATCH 2/2] Address review comments Among others rename total_txns and total_bytes to total_wal_txns and total_wal_bytes respectively. Reviewed-by: Bertrand Drouvot <bertranddrouvot...@gmail.com> Reviewed-by: Shveta Malik <shveta.ma...@gmail.com> --- contrib/test_decoding/expected/stats.out | 58 +++++++++++------------ contrib/test_decoding/sql/stats.sql | 17 +++---- contrib/test_decoding/t/001_repl_stats.pl | 6 +-- doc/src/sgml/logicaldecoding.sgml | 6 +-- doc/src/sgml/monitoring.sgml | 18 +++---- src/backend/catalog/system_views.sql | 4 +- src/backend/utils/adt/pgstatfuncs.c | 4 +- src/include/catalog/pg_proc.dat | 2 +- src/test/regress/expected/rules.out | 6 +-- 9 files changed, 61 insertions(+), 60 deletions(-) diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out index d19fe6a1c61..4834b3460a6 100644 --- a/contrib/test_decoding/expected/stats.out +++ b/contrib/test_decoding/expected/stats.out @@ -37,17 +37,17 @@ SELECT pg_stat_force_next_flush(); (1 row) --- total_txns may vary based on the background activity but sent_txns should --- always be 1 since the background transactions are always skipped. Filtered --- bytes would be set only when there's a change that was passed to the plugin --- but was filtered out. Depending upon the background transactions, filtered --- bytes may or may not be zero. -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes | plugin_sent_txns | sent_bytes | filtered_bytes -------------------------+------------+-------------+------------+-------------+------------------+------------+---------------- - regression_slot_stats1 | t | t | t | t | 1 | t | t - regression_slot_stats2 | t | t | t | t | 1 | t | t - regression_slot_stats3 | t | t | t | t | 1 | t | t +-- total_wal_txns may vary based on the background activity but plugin_sent_txns +-- should always be 1 since the background transactions are always skipped. +-- Filtered bytes would be set only when there's a change that was passed to the +-- plugin but was filtered out. Depending upon the background transactions, +-- filtered bytes may or may not be zero. +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_wal_txns | total_wal_bytes | plugin_sent_txns | sent_bytes | filtered_bytes +------------------------+------------+-------------+----------------+-----------------+------------------+------------+---------------- + regression_slot_stats1 | t | t | t | t | 1 | t | t + regression_slot_stats2 | t | t | t | t | 1 | t | t + regression_slot_stats3 | t | t | t | t | 1 | t | t (3 rows) RESET logical_decoding_work_mem; @@ -58,12 +58,12 @@ SELECT pg_stat_reset_replication_slot('regression_slot_stats1'); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes | plugin_sent_txns | sent_bytes | filtered_bytes -------------------------+------------+-------------+------------+-------------+------------------+------------+---------------- - regression_slot_stats1 | t | t | f | f | | | - regression_slot_stats2 | t | t | t | t | 1 | t | t - regression_slot_stats3 | t | t | t | t | 1 | t | t +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_wal_txns | total_wal_bytes | plugin_sent_txns | sent_bytes | filtered_bytes +------------------------+------------+-------------+----------------+-----------------+------------------+------------+---------------- + regression_slot_stats1 | t | t | f | f | | | + regression_slot_stats2 | t | t | t | t | 1 | t | t + regression_slot_stats3 | t | t | t | t | 1 | t | t (3 rows) -- reset stats for all slots @@ -73,27 +73,27 @@ SELECT pg_stat_reset_replication_slot(NULL); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes | plugin_sent_txns | plugin_sent_bytes | plugin_filtered_bytes -------------------------+------------+-------------+------------+-------------+------------------+-------------------+----------------------- - regression_slot_stats1 | t | t | f | f | | | - regression_slot_stats2 | t | t | f | f | | | - regression_slot_stats3 | t | t | f | f | | | +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_wal_txns | total_wal_bytes | plugin_sent_txns | plugin_sent_bytes | plugin_filtered_bytes +------------------------+------------+-------------+----------------+-----------------+------------------+-------------------+----------------------- + regression_slot_stats1 | t | t | f | f | | | + regression_slot_stats2 | t | t | f | f | | | + regression_slot_stats3 | t | t | f | f | | | (3 rows) -- verify accessing/resetting stats for non-existent slot does something reasonable SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------------------+------------------+-------------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | | | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_wal_txns | total_wal_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+----------------+-----------------+-----------------------+------------------+-------------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | | | (1 row) SELECT pg_stat_reset_replication_slot('do-not-exist'); ERROR: replication slot "do-not-exist" does not exist SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------------------+------------------+-------------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | | | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_wal_txns | total_wal_bytes | plugin_filtered_bytes | plugin_sent_txns | plugin_sent_bytes | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+----------------+-----------------+-----------------------+------------------+-------------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | | | (1 row) -- spilling the xact diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql index 1077cea5855..99f513902d3 100644 --- a/contrib/test_decoding/sql/stats.sql +++ b/contrib/test_decoding/sql/stats.sql @@ -15,21 +15,22 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL, SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1'); SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1'); SELECT pg_stat_force_next_flush(); --- total_txns may vary based on the background activity but sent_txns should --- always be 1 since the background transactions are always skipped. Filtered --- bytes would be set only when there's a change that was passed to the plugin --- but was filtered out. Depending upon the background transactions, filtered --- bytes may or may not be zero. -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; + +-- total_wal_txns may vary based on the background activity but plugin_sent_txns +-- should always be 1 since the background transactions are always skipped. +-- Filtered bytes would be set only when there's a change that was passed to the +-- plugin but was filtered out. Depending upon the background transactions, +-- filtered bytes may or may not be zero. +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; RESET logical_decoding_work_mem; -- reset stats for one slot, others should be unaffected SELECT pg_stat_reset_replication_slot('regression_slot_stats1'); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; -- reset stats for all slots SELECT pg_stat_reset_replication_slot(NULL); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, plugin_sent_txns, plugin_sent_bytes, plugin_filtered_bytes FROM pg_stat_replication_slots ORDER BY slot_name; -- verify accessing/resetting stats for non-existent slot does something reasonable SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl index 76dd86fc420..756fc691ed6 100644 --- a/contrib/test_decoding/t/001_repl_stats.pl +++ b/contrib/test_decoding/t/001_repl_stats.pl @@ -28,8 +28,8 @@ sub test_slot_stats # guaranteed that such transactions would be present. my $result = $node->safe_psql( 'postgres', qq[ - SELECT slot_name, total_txns > 0 AS total_txn, - total_bytes > 0 AS total_bytes, + SELECT slot_name, total_wal_txns > 0 AS total_txn, + total_wal_bytes > 0 AS total_bytes, plugin_sent_txns > 0 AS sent_txn, plugin_sent_bytes > 0 AS sent_bytes, plugin_filtered_bytes >= 0 AS filtered_bytes @@ -71,7 +71,7 @@ $node->poll_query_until( 'postgres', qq[ SELECT count(slot_name) >= 4 FROM pg_stat_replication_slots WHERE slot_name ~ 'regression_slot' - AND total_txns > 0 AND total_bytes > 0; + AND total_wal_txns > 0 AND total_wal_bytes > 0; ]) or die "Timed out while waiting for statistics to be updated"; # Test to drop one of the replication slot and verify replication statistics data is diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 8ac10cda90c..3952f68e806 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -956,11 +956,11 @@ typedef struct OutputPluginStats } OutputPluginStats; </programlisting> <literal>sentTxns</literal> is the number of transactions sent downstream - by the output plugin. <literal>sentBytes</literal> is the amount of data + by the output plugin. <literal>sentBytes</literal> is the amount of data, in bytes, sent downstream by the output plugin. - <function>OutputPluginWrite</function> is expected to update this counter + <function>OutputPluginWrite</function> will update this counter if <literal>ctx->stats</literal> is initialized by the output plugin. - <literal>filteredBytes</literal> is the size of changes in bytes that are + <literal>filteredBytes</literal> is the size of changes, in bytes, that are filtered out by the output plugin. Function <literal>ReorderBufferChangeSize</literal> may be used to find the size of filtered <literal>ReorderBufferChange</literal>. diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index e121f55c9c2..fbe03ffd670 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1633,19 +1633,19 @@ description | Waiting for a newly initialized WAL file to reach durable storage <row> <entry role="catalog_table_entry"><para role="column_definition"> - <structfield>total_txns</structfield> <type>bigint</type> + <structfield>total_wal_txns</structfield> <type>bigint</type> </para> <para> - Number of decoded transactions sent to the decoding output plugin for - this slot. This counts top-level transactions only, and is not incremented - for subtransactions. Note that this includes the transactions that are - streamed and/or spilled. + Number of decoded transactions from WAL sent to the decoding output + plugin for this slot. This counts top-level transactions only, and is + not incremented for subtransactions. Note that this includes the + transactions that are streamed and/or spilled. </para></entry> </row> <row> <entry role="catalog_table_entry"><para role="column_definition"> - <structfield>total_bytes</structfield><type>bigint</type> + <structfield>total_wal_bytes</structfield><type>bigint</type> </para> <para> Amount of transaction data decoded for sending transactions to the @@ -1660,9 +1660,9 @@ description | Waiting for a newly initialized WAL file to reach durable storage <structfield>plugin_filtered_bytes</structfield> <type>bigint</type> </para> <para> - Amount of changes, from <structfield>total_bytes</structfield>, filtered + Amount of changes, from <structfield>total_wal_bytes</structfield>, filtered out by the output plugin and not sent downstream. Please note that it - does not include the changes filtered before a change is handed over to + does not include the changes filtered before a change is sent to the output plugin, e.g. the changes filtered by origin. The count is maintained by the output plugin mentioned in <structfield>plugin</structfield>. It is NULL when statistics is not @@ -1680,7 +1680,7 @@ description | Waiting for a newly initialized WAL file to reach durable storage counts top-level transactions only, and is not incremented for subtransactions. These transactions are subset of transctions sent to the decoding plugin. Hence this count is expected to be lesser than or - equal to <structfield>total_txns</structfield>. The count is maintained + equal to <structfield>total_wal_txns</structfield>. The count is maintained by the output plugin mentioned in <structfield>plugin</structfield>. It is NULL when statistics is not initialized or immediately after a reset or when not maintained by the output plugin. diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index d38c21150b0..9e8e32b5849 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1060,8 +1060,8 @@ CREATE VIEW pg_stat_replication_slots AS s.stream_txns, s.stream_count, s.stream_bytes, - s.total_txns, - s.total_bytes, + s.total_wal_txns, + s.total_wal_bytes, s.plugin_filtered_bytes, s.plugin_sent_txns, s.plugin_sent_bytes, diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 796dacddcfb..15bafe63b24 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2125,9 +2125,9 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_bytes", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "total_txns", + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "total_wal_txns", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_wal_bytes", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 10, "plugin_filtered_bytes", INT8OID, -1, 0); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 7519941bcf3..9e4f6620214 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5689,7 +5689,7 @@ proparallel => 'r', prorettype => 'record', proargtypes => 'text', proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,plugin_filtered_bytes,plugin_sent_txns,plugin_sent_bytes,stats_reset}', + proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_wal_txns,total_wal_bytes,plugin_filtered_bytes,plugin_sent_txns,plugin_sent_bytes,stats_reset}', prosrc => 'pg_stat_get_replication_slot' }, { oid => '6230', descr => 'statistics: check if a stats object exists', diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 2a048af3569..2a401552a7a 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2139,14 +2139,14 @@ pg_stat_replication_slots| SELECT s.slot_name, s.stream_txns, s.stream_count, s.stream_bytes, - s.total_txns, - s.total_bytes, + s.total_wal_txns, + s.total_wal_bytes, s.plugin_filtered_bytes, s.plugin_sent_txns, s.plugin_sent_bytes, s.stats_reset FROM pg_replication_slots r, - LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, plugin_filtered_bytes, plugin_sent_txns, plugin_sent_bytes, stats_reset) + LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_wal_txns, total_wal_bytes, plugin_filtered_bytes, plugin_sent_txns, plugin_sent_bytes, stats_reset) WHERE (r.datoid IS NOT NULL); pg_stat_slru| SELECT name, blks_zeroed, -- 2.34.1