Hi All, In a recent logical replication issue, there were multiple replication slots involved, each using a different publication. Thus the amount of data that was replicated through each slot was expected to be different. However, total_bytes and total_txns were reported the same for all the replication slots as expected. One of the slots started lagging and we were trying to figure out whether its the WAL sender slowing down or the consumer (in this case Debezium). The lagging slot then showed total_txns and total_bytes lesser than other slots giving an impression that the WAL sender is processing the data slowly. Had pg_stat_replication_slot reported the amount of data actually sent downstream, it would have been easier to compare it with the amount of data received by the consumer and thus pinpoint the bottleneck.
Here's a patch to do the same. It adds two columns - sent_txns: The total number of transactions sent downstream. - sent_bytes: The total number of bytes sent downstream in data messages to pg_stat_replication_slots. sent_bytes includes only the bytes sent as part of 'd' messages and does not include keep alive messages or CopyDone messages for example. But those are very few and can be ignored. If others feel that those are important to be included, we can make that change. Plugins may choose not to send an empty transaction downstream. It's better to increment sent_txns counter in the plugin code when it actually sends a BEGIN message, for example in pgoutput_send_begin() and pg_output_begin(). This means that every plugin will need to be modified to increment the counter for it to reported correctly. I first thought of incrementing sent_bytes in OutputPluginWrite() which is a central function for all logical replication message writes. But that calls LogicalDecodingContext::write() which may further add bytes to the message e.g. WalSndWriteData() and LogicalOutputWrite(). So it's better to increment the counter in implementations of LogicalDecodingContext::write(), so that we count the exact number of bytes. These implementations are within core code so they won't miss updating sent_bytes. I think we should rename total_txns and total_bytes to reordered_txns and reordered_bytes respectively, and also update the documentation accordingly to make better sense of those numbers. But these patches do not contain that change. If others feel the same way, I will provide a patch with that change. -- Best Wishes, Ashutosh Bapat
From 1a385b30d6cb4ca111cbcc16ea14017c08f9a579 Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat <ashutosh.bapat....@gmail.com> Date: Fri, 27 Jun 2025 09:16:23 +0530 Subject: [PATCH] Report data sent statistics in pg_stat_replication_slots pg_stat_replication_slots reports the total number of transactions and bytes added to the reorder buffer. This is the amount of WAL that is processed by the WAL sender. This data undergoes filtering and logical decoding before sending to the downstream. Hence the amount of WAL added to the reorder buffer does not serve as a good metric for the amount of data sent downstream. Knowing the amount data sent downstream is useful when debugging slowly moving logical replication. This patch adds two new columns to pg_stat_replication_slots: - sent_txns: The total number of transactions sent downstream. - sent_bytes: The total number of bytes sent downstream in data messages Ashutosh Bapat --- contrib/test_decoding/expected/stats.out | 54 ++++++++++--------- contrib/test_decoding/sql/stats.sql | 12 +++-- contrib/test_decoding/t/001_repl_stats.pl | 14 ++--- contrib/test_decoding/test_decoding.c | 1 + doc/src/sgml/monitoring.sgml | 27 ++++++++++ src/backend/catalog/system_views.sql | 2 + src/backend/replication/logical/logical.c | 10 +++- .../replication/logical/logicalfuncs.c | 2 + .../replication/logical/reorderbuffer.c | 2 + src/backend/replication/pgoutput/pgoutput.c | 1 + src/backend/replication/walsender.c | 2 + src/backend/utils/activity/pgstat_replslot.c | 2 + src/backend/utils/adt/pgstatfuncs.c | 14 +++-- src/include/catalog/pg_proc.dat | 6 +-- src/include/pgstat.h | 2 + src/include/replication/reorderbuffer.h | 8 +++ src/test/recovery/t/006_logical_decoding.pl | 12 ++--- src/test/regress/expected/rules.out | 4 +- 18 files changed, 126 insertions(+), 49 deletions(-) diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out index de6dc416130..867a8506051 100644 --- a/contrib/test_decoding/expected/stats.out +++ b/contrib/test_decoding/expected/stats.out @@ -37,28 +37,34 @@ SELECT pg_stat_force_next_flush(); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes -------------------------+------------+-------------+------------+------------- - regression_slot_stats1 | t | t | t | t - regression_slot_stats2 | t | t | t | t - regression_slot_stats3 | t | t | t | t +-- total_txns may vary based on the background activity but sent_txns should +-- always be 1 since the background transactions are always skipped. +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_txns | total_bytes | sent_txns | sent_bytes +------------------------+------------+-------------+------------+-------------+-----------+------------ + regression_slot_stats1 | t | t | t | t | 1 | t + regression_slot_stats2 | t | t | t | t | 1 | t + regression_slot_stats3 | t | t | t | t | 1 | t (3 rows) RESET logical_decoding_work_mem; -- reset stats for one slot, others should be unaffected +-- total_txns may vary based on the background activity but sent_txns should +-- always be 0 for the slot whose stats were reset since the background +-- transactions are always skipped and not transaction, which would be sent +-- downstream, has happened since the reset. SELECT pg_stat_reset_replication_slot('regression_slot_stats1'); pg_stat_reset_replication_slot -------------------------------- (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes -------------------------+------------+-------------+------------+------------- - regression_slot_stats1 | t | t | f | f - regression_slot_stats2 | t | t | t | t - regression_slot_stats3 | t | t | t | t +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_txns | total_bytes | sent_txns | sent_bytes +------------------------+------------+-------------+------------+-------------+-----------+------------ + regression_slot_stats1 | t | t | f | f | 0 | f + regression_slot_stats2 | t | t | t | t | 1 | t + regression_slot_stats3 | t | t | t | t | 1 | t (3 rows) -- reset stats for all slots @@ -68,27 +74,27 @@ SELECT pg_stat_reset_replication_slot(NULL); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes -------------------------+------------+-------------+------------+------------- - regression_slot_stats1 | t | t | f | f - regression_slot_stats2 | t | t | f | f - regression_slot_stats3 | t | t | f | f +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_txns | total_bytes | sent_txns | sent_bytes +------------------------+------------+-------------+------------+-------------+-----------+------------ + regression_slot_stats1 | t | t | f | f | 0 | f + regression_slot_stats2 | t | t | f | f | 0 | f + regression_slot_stats3 | t | t | f | f | 0 | f (3 rows) -- verify accessing/resetting stats for non-existent slot does something reasonable SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | sent_txns | sent_bytes | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------+------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | (1 row) SELECT pg_stat_reset_replication_slot('do-not-exist'); ERROR: replication slot "do-not-exist" does not exist SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | sent_txns | sent_bytes | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-----------+------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | (1 row) -- spilling the xact diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql index a022fe1bf07..8581387d867 100644 --- a/contrib/test_decoding/sql/stats.sql +++ b/contrib/test_decoding/sql/stats.sql @@ -15,16 +15,22 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL, SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1'); SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1'); SELECT pg_stat_force_next_flush(); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; +-- total_txns may vary based on the background activity but sent_txns should +-- always be 1 since the background transactions are always skipped. +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name; RESET logical_decoding_work_mem; -- reset stats for one slot, others should be unaffected +-- total_txns may vary based on the background activity but sent_txns should +-- always be 0 for the slot whose stats were reset since the background +-- transactions are always skipped and not transaction, which would be sent +-- downstream, has happened since the reset. SELECT pg_stat_reset_replication_slot('regression_slot_stats1'); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name; -- reset stats for all slots SELECT pg_stat_reset_replication_slot(NULL); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name; +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_txns, sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name; -- verify accessing/resetting stats for non-existent slot does something reasonable SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl index 0de62edb7d8..ba887d752eb 100644 --- a/contrib/test_decoding/t/001_repl_stats.pl +++ b/contrib/test_decoding/t/001_repl_stats.pl @@ -26,7 +26,9 @@ sub test_slot_stats my $result = $node->safe_psql( 'postgres', qq[ SELECT slot_name, total_txns > 0 AS total_txn, - total_bytes > 0 AS total_bytes + total_bytes > 0 AS total_bytes, + sent_txns > 0 AS sent_txn, + sent_bytes > 0 AS sent_bytes FROM pg_stat_replication_slots ORDER BY slot_name]); is($result, $expected, $msg); @@ -80,9 +82,9 @@ $node->start; # restart. test_slot_stats( $node, - qq(regression_slot1|t|t -regression_slot2|t|t -regression_slot3|t|t), + qq(regression_slot1|t|t|t|t +regression_slot2|t|t|t|t +regression_slot3|t|t|t|t), 'check replication statistics are updated'); # Test to remove one of the replication slots and adjust @@ -104,8 +106,8 @@ $node->start; # restart. test_slot_stats( $node, - qq(regression_slot1|t|t -regression_slot2|t|t), + qq(regression_slot1|t|t|t|t +regression_slot2|t|t|t|t), 'check replication statistics after removing the slot file'); # cleanup diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index bb495563200..4a7e918efbe 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -310,6 +310,7 @@ static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) { OutputPluginPrepareWrite(ctx, last_write); + ctx->reorder->sentTxns++; if (data->include_xids) appendStringInfo(ctx->out, "BEGIN %u", txn->xid); else diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 4265a22d4de..e5af284df90 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1644,6 +1644,33 @@ description | Waiting for a newly initialized WAL file to reach durable storage </entry> </row> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>sent_txns</structfield> <type>bigint</type> + </para> + <para> + Number of decoded transactions sent downstream for + this slot. This counts top-level transactions only, and is not incremented + for subtransactions. These transactions are subset of transctions + sent to the decoding plugin. Hence this count is expected to be lesser than or equal to <structfield>total_txns</structfield>. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>sent_bytes</structfield><type>bigint</type> + </para> + <para> + Amount of decoded transaction changes sent downstream for this slot. The + amount of WAL corresponding to the changes sent downstream is subset of + the total WAL sent to the decoding plugin. But the amount of data sent + downstream for a given decoded WAL record may not match the size of the + WAL record. Thus values in this column do not have strong correlation + with the values in <structfield>total_bytes</structfield>. + </para> + </entry> + </row> + <row> <entry role="catalog_table_entry"><para role="column_definition"> <structfield>stats_reset</structfield> <type>timestamp with time zone</type> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 08f780a2e63..bcc210dd754 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1053,6 +1053,8 @@ CREATE VIEW pg_stat_replication_slots AS s.stream_bytes, s.total_txns, s.total_bytes, + s.sent_txns, + s.sent_bytes, s.stats_reset FROM pg_replication_slots as r, LATERAL pg_stat_get_replication_slot(slot_name) as s diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index f1eb798f3e9..14615fa70d7 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1958,7 +1958,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0) return; - elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, + elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, rb, rb->spillTxns, rb->spillCount, @@ -1967,7 +1967,9 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->streamCount, rb->streamBytes, rb->totalTxns, - rb->totalBytes); + rb->totalBytes, + rb->sentTxns, + rb->sentBytes); repSlotStat.spill_txns = rb->spillTxns; repSlotStat.spill_count = rb->spillCount; @@ -1977,6 +1979,8 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) repSlotStat.stream_bytes = rb->streamBytes; repSlotStat.total_txns = rb->totalTxns; repSlotStat.total_bytes = rb->totalBytes; + repSlotStat.sent_txns = rb->sentTxns; + repSlotStat.sent_bytes = rb->sentBytes; pgstat_report_replslot(ctx->slot, &repSlotStat); @@ -1988,6 +1992,8 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->streamBytes = 0; rb->totalTxns = 0; rb->totalBytes = 0; + rb->sentTxns = 0; + rb->sentBytes = 0; } /* diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index ca53caac2f2..f85e21b1a50 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -89,6 +89,8 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len)); tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls); + + ctx->reorder->sentBytes += ctx->out->len + sizeof(XLogRecPtr) + sizeof(TransactionId); p->returned_rows++; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c4299c76fb1..fa4c61192e8 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -392,6 +392,8 @@ ReorderBufferAllocate(void) buffer->streamBytes = 0; buffer->totalTxns = 0; buffer->totalBytes = 0; + buffer->sentTxns = 0; + buffer->sentBytes = 0; buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 082b4d9d327..daafac7049c 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -589,6 +589,7 @@ pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); txndata->sent_begin_txn = true; + ctx->reorder->sentTxns++; send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index f2c33250e8b..f097aa92c64 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1569,6 +1569,8 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, /* output previously gathered data in a CopyData packet */ pq_putmessage_noblock('d', ctx->out->data, ctx->out->len); + ctx->reorder->sentBytes += ctx->out->len + 1; /* +1 for the 'd' */ + CHECK_FOR_INTERRUPTS(); /* Try to flush pending output to the client */ diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c index ccfb11c49bf..583ac090cdd 100644 --- a/src/backend/utils/activity/pgstat_replslot.c +++ b/src/backend/utils/activity/pgstat_replslot.c @@ -96,6 +96,8 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re REPLSLOT_ACC(stream_bytes); REPLSLOT_ACC(total_txns); REPLSLOT_ACC(total_bytes); + REPLSLOT_ACC(sent_txns); + REPLSLOT_ACC(sent_bytes); #undef REPLSLOT_ACC pgstat_unlock_entry(entry_ref); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 1c12ddbae49..4ed405c6ad5 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2100,7 +2100,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slot(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_COLS 10 +#define PG_STAT_GET_REPLICATION_SLOT_COLS 12 text *slotname_text = PG_GETARG_TEXT_P(0); NameData slotname; TupleDesc tupdesc; @@ -2129,7 +2129,11 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "sent_txns", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "sent_bytes", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2154,11 +2158,13 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) values[6] = Int64GetDatum(slotent->stream_bytes); values[7] = Int64GetDatum(slotent->total_txns); values[8] = Int64GetDatum(slotent->total_bytes); + values[9] = Int64GetDatum(slotent->sent_txns); + values[10] = Int64GetDatum(slotent->sent_bytes); if (slotent->stat_reset_timestamp == 0) - nulls[9] = true; + nulls[11] = true; else - values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp); + values[11] = TimestampTzGetDatum(slotent->stat_reset_timestamp); /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index fb4f7f50350..49990bfe42e 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5675,9 +5675,9 @@ { oid => '6169', descr => 'statistics: information about replication slot', proname => 'pg_stat_get_replication_slot', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'text', - proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}', + proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,sent_txns,sent_bytes,stats_reset}', prosrc => 'pg_stat_get_replication_slot' }, { oid => '6230', descr => 'statistics: check if a stats object exists', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 378f2f2c2ba..814c6024ba1 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -395,6 +395,8 @@ typedef struct PgStat_StatReplSlotEntry PgStat_Counter stream_bytes; PgStat_Counter total_txns; PgStat_Counter total_bytes; + PgStat_Counter sent_txns; + PgStat_Counter sent_bytes; TimestampTz stat_reset_timestamp; } PgStat_StatReplSlotEntry; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index fa0745552f8..b22e046a9ad 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -696,6 +696,14 @@ struct ReorderBuffer */ int64 totalTxns; /* total number of transactions sent */ int64 totalBytes; /* total amount of data decoded */ + + /* + * Statistics about the transactions decoded by the output plugin and sent + * downstream. + */ + int64 sentTxns; /* number of transactions decoded and sent + * downstream */ + int64 sentBytes; /* amount of data decoded and sent downstream */ }; diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 2137c4e5e30..03b688f54eb 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -212,10 +212,10 @@ my $stats_test_slot2 = 'logical_slot'; # Stats exist for stats test slot 1 is( $node_primary->safe_psql( 'postgres', - qq(SELECT total_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT total_bytes > 0, sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), - qq(t|t), - qq(Total bytes is > 0 and stats_reset is NULL for slot '$stats_test_slot1'.) + qq(t|t|t), + qq(Total bytes and sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.) ); # Do reset of stats for stats test slot 1 @@ -233,10 +233,10 @@ $node_primary->safe_psql('postgres', is( $node_primary->safe_psql( 'postgres', - qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0, sent_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), - qq(t|t), - qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes was set to 0.) + qq(t|t|t), + qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes and sent_bytes were set to 0.) ); # Check that test slot 2 has NULL in reset timestamp diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6cf828ca8d0..9fcf73152cd 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2131,9 +2131,11 @@ pg_stat_replication_slots| SELECT s.slot_name, s.stream_bytes, s.total_txns, s.total_bytes, + s.sent_txns, + s.sent_bytes, s.stats_reset FROM pg_replication_slots r, - LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset) + LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, sent_txns, sent_bytes, stats_reset) WHERE (r.datoid IS NOT NULL); pg_stat_slru| SELECT name, blks_zeroed, base-commit: 3431e3e4aa3a33e8411f15e76c284cdd4c54ca28 -- 2.34.1