On Thu, 16 Jul 2020 at 20:01, Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Thu, Jul 16, 2020 at 4:04 PM Masahiko Sawada > <masahiko.saw...@2ndquadrant.com> wrote: > > > > On Thu, 16 Jul 2020 at 18:16, Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > > > On Thu, Jul 16, 2020 at 1:45 PM Masahiko Sawada > > > <masahiko.saw...@2ndquadrant.com> wrote: > > > > > > > > A possible solution would be to add an in-use flag to > > > > PgStat_ReplSlotStats indicating whether the stats for slot is used or > > > > not. When receiving a drop message for a slot, the stats collector > > > > just marks the corresponding stats as unused. When receiving the stats > > > > report for a new slot but there is no unused stats slot, ignore it. > > > > What do you think? > > > > > > > > > > As of now, you have a boolean flag msg.m_drop to distinguish the drop > > > message but we don't have a similar way to distinguish the 'create' > > > message. What if have a way to distinguish 'create' message (we can > > > probably keep some sort of flag to indicate the type of message > > > (create, drop, update)) and then if the slot with the same name > > > already exists, we ignore such a message. Now, we also need a way to > > > create the entry for a slot for a normal stats update message as well > > > to accommodate for the lost 'create' message. Does that make sense? > > > > I might be missing your point, but even if we have 'create' message, > > the problem can happen if when slots are full the user drops slot > > ‘slot_a’, creates slot ‘slot_b', and messages arrive in the reverse > > order? > > > > In that case, also, we should drop the 'create' message of 'slot_b' as > we don't have space but later when an 'update' message arrives with > stats for the 'slot_b', we will create the entry.
Agreed. > I am also thinking > what if send only 'update' and 'drop' message, the message ordering > problem can still happen but we will lose one 'update' message in that > case? Yes, I think so too. We will lose one 'update' message at a maximum. I've updated the patch so that the stats collector ignores the 'update' message if the slot stats array is already full. Regards, -- Masahiko Sawada http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 048ccc0988..1aebe82ef6 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -314,6 +314,15 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser </entry> </row> + <row> + <entry><structname>pg_stat_replication_slots</structname><indexterm><primary>pg_stat_replication_slots</primary></indexterm></entry> + <entry>One row per replication slot, showing statistics about + replication slot usage. + See <link linkend="monitoring-pg-stat-replication-slots-view"> + <structname>pg_stat_replication_slots</structname></link> for details. + </entry> + </row> + <row> <entry><structname>pg_stat_wal_receiver</structname><indexterm><primary>pg_stat_wal_receiver</primary></indexterm></entry> <entry>Only one row, showing statistics about the WAL receiver from @@ -2511,7 +2520,86 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i </sect2> - <sect2 id="monitoring-pg-stat-wal-receiver-view"> + <sect2 id="monitoring-pg-stat-replication-slots-view"> + <title><structname>pg_stat_replication_slots</structname></title> + + <indexterm> + <primary>pg_stat_replication_slots</primary> + </indexterm> + + <para> + The <structname>pg_stat_replication_slots</structname> view will contain + one row per replication slot, showing statistics about replication + slot usage. + </para> + + <table id="pg-stat-replication-slots-view" xreflabel="pg_stat_replication_slots"> + <title><structname>pg_stat_replication_slots</structname> View</title> + <tgroup cols="1"> + <thead> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + Column Type + </para> + <para> + Description + </para></entry> + </row> + </thead> + + <tbody> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>name</structfield> <type>text</type> + </para> + <para> + A unique, cluster-wide identifier for the replication slot + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>spill_txns</structfield> <type>bigint</type> + </para> + <para> + Number of transactions spilled to disk after the memory used by + logical decoding exceeds <literal>logical_decoding_work_mem</literal>. The + counter gets incremented both for toplevel transactions and + subtransactions. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>spill_count</structfield> <type>bigint</type> + </para> + <para> + Number of times transactions were spilled to disk. Transactions + may get spilled repeatedly, and this counter gets incremented on every + such invocation. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>spill_bytes</structfield> <type>bigint</type> + </para> + <para> + Amount of decoded transaction data spilled to disk. + </para></entry> + </row> + </tbody> + </tgroup> + </table> + + <para> + Tracking of spilled transactions works only for logical replication. In + physical replication, the tracking mechanism will display 0 for spilled + statistics. + </para> + </sect2> + + <sect2 id="monitoring-pg-stat-wal-receiver-view"> <title><structname>pg_stat_wal_receiver</structname></title> <indexterm> @@ -4710,6 +4798,26 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i can be granted EXECUTE to run the function. </para></entry> </row> + + <row> + <entry role="func_table_entry"><para role="func_signature"> + <indexterm> + <primary>pg_stat_reset_replication_slot</primary> + </indexterm> + <function>pg_stat_reset_replication_slot</function> ( <type>text</type> ) + <returnvalue>void</returnvalue> + </para> + <para> + Resets statistics to zero for a single replication slot, or for all + replication slots in the cluster. If the argument is NULL, all counters + shown in the <structname>pg_stat_replication_slots</structname> view for + all replication slots are reset. + </para> + <para> + This function is restricted to superusers by default, but other users + can be granted EXECUTE to run the function. + </para></entry> + </row> </tbody> </tgroup> </table> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index b6d35c2d11..204312e6d5 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -790,6 +790,14 @@ CREATE VIEW pg_stat_replication AS JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); +CREATE VIEW pg_stat_replication_slots AS + SELECT + s.name, + s.spill_txns, + s.spill_count, + s.spill_bytes + FROM pg_stat_get_replication_slots() AS s; + CREATE VIEW pg_stat_slru AS SELECT s.name, @@ -1441,6 +1449,7 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_shared(text) FROM public; REVOKE EXECUTE ON FUNCTION pg_stat_reset_slru(text) FROM public; REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_table_counters(oid) FROM public; REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM public; +REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public; REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public; REVOKE EXECUTE ON FUNCTION lo_import(text, oid) FROM public; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 88992c2da2..3518ee45ad 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -51,6 +51,7 @@ #include "postmaster/fork_process.h" #include "postmaster/interrupt.h" #include "postmaster/postmaster.h" +#include "replication/slot.h" #include "replication/walsender.h" #include "storage/backendid.h" #include "storage/dsm.h" @@ -282,6 +283,8 @@ static int localNumBackends = 0; static PgStat_ArchiverStats archiverStats; static PgStat_GlobalStats globalStats; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; +static PgStat_ReplSlotStats *replSlotStats; +static int nReplSlotStats; /* * List of OIDs of databases we need to write out. If an entry is InvalidOid, @@ -340,6 +343,8 @@ static const char *pgstat_get_wait_io(WaitEventIO w); static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype); static void pgstat_send(void *msg, int len); +static int pgstat_replslot_index(const char *name, bool create_it); + static void pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len); static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len); static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len); @@ -348,6 +353,7 @@ static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len); static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len); static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len); static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len); +static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len); static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len); static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len); static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len); @@ -360,6 +366,7 @@ static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int le static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len); static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len); static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); +static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); /* ------------------------------------------------------------ * Public functions called from postmaster follow @@ -1429,6 +1436,36 @@ pgstat_reset_slru_counter(const char *name) pgstat_send(&msg, sizeof(msg)); } +/* ---------- + * pgstat_reset_replslot_counter() - + * + * Tell the statistics collector to reset a single replication slot + * counter, or all replication slots counters (when name is null). + * + * Permission checking for this function is managed through the normal + * GRANT system. + * ---------- + */ +void +pgstat_reset_replslot_counter(const char *name) +{ + PgStat_MsgResetreplslotcounter msg; + + if (pgStatSock == PGINVALID_SOCKET) + return; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER); + if (name) + { + memcpy(&msg.m_slotname, name, NAMEDATALEN); + msg.clearall = false; + } + else + msg.clearall = true; + + pgstat_send(&msg, sizeof(msg)); +} + /* ---------- * pgstat_report_autovac() - * @@ -1629,6 +1666,46 @@ pgstat_report_tempfile(size_t filesize) pgstat_send(&msg, sizeof(msg)); } +/* ---------- + * pgstat_report_replslot() - + * + * Tell the collector about replication slot statistics. + * ---------- + */ +void +pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, + int spillbytes) +{ + PgStat_MsgReplSlot msg; + + /* + * Prepare and send the message + */ + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); + memcpy(&msg.m_slotname, slotname, NAMEDATALEN); + msg.m_drop = false; + msg.m_spill_txns = spilltxns; + msg.m_spill_count = spillcount; + msg.m_spill_bytes = spillbytes; + pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); +} + +/* ---------- + * pgstat_report_replslot_drop() - + * + * Tell the collector about dropping the replication slot. + * ---------- + */ +void +pgstat_report_replslot_drop(const char *slotname) +{ + PgStat_MsgReplSlot msg; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); + memcpy(&msg.m_slotname, slotname, NAMEDATALEN); + msg.m_drop = true; + pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); +} /* ---------- * pgstat_ping() - @@ -2691,6 +2768,23 @@ pgstat_fetch_slru(void) return slruStats; } +/* + * --------- + * pgstat_fetch_replslot() - + * + * Support function for the SQL-callable pgstat* functions. Returns + * a pointer to the replication slot statistics struct and set the + * number of entries to nslots_p. + * --------- + */ +PgStat_ReplSlotStats * +pgstat_fetch_replslot(int *nslots_p) +{ + backend_read_statsfile(); + + *nslots_p = nReplSlotStats; + return replSlotStats; +} /* ------------------------------------------------------------ * Functions for management of the shared-memory PgBackendStatus array @@ -4615,6 +4709,11 @@ PgstatCollectorMain(int argc, char *argv[]) len); break; + case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER: + pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter, + len); + break; + case PGSTAT_MTYPE_AUTOVAC_START: pgstat_recv_autovac(&msg.msg_autovacuum_start, len); break; @@ -4665,6 +4764,10 @@ PgstatCollectorMain(int argc, char *argv[]) len); break; + case PGSTAT_MTYPE_REPLSLOT: + pgstat_recv_replslot(&msg.msg_replslot, len); + break; + default: break; } @@ -4868,6 +4971,7 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; int rc; + int i; elog(DEBUG2, "writing stats file \"%s\"", statfile); @@ -4914,6 +5018,16 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) rc = fwrite(slruStats, sizeof(slruStats), 1, fpout); (void) rc; /* we'll check for error with ferror */ + /* + * Write replication slot stats struct + */ + for (i = 0; i < nReplSlotStats; i++) + { + fputc('R', fpout); + rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } + /* * Walk through the database table. */ @@ -5166,6 +5280,10 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + /* Allocate the space for replication slot statistics */ + replSlotStats = palloc0(max_replication_slots * sizeof(PgStat_ReplSlotStats)); + nReplSlotStats = 0; + /* * Clear out global and archiver statistics so they start from zero in * case we can't load an existing statsfile. @@ -5187,6 +5305,12 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) for (i = 0; i < SLRU_NUM_ELEMENTS; i++) slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp; + /* + * Set the same reset timestamp for all replication slots too. + */ + for (i = 0; i < max_replication_slots; i++) + replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp; + /* * Try to open the stats file. If it doesn't exist, the backends simply * return zero for anything and the collector simply starts from scratch @@ -5350,6 +5474,23 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) break; + /* + * 'R' A PgStat_ReplSlotStats struct describing a replication slot + * follows. + */ + case 'R': + if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin) + != sizeof(PgStat_ReplSlotStats)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); + goto done; + } + nReplSlotStats++; + break; + case 'E': goto done; @@ -5559,6 +5700,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, PgStat_GlobalStats myGlobalStats; PgStat_ArchiverStats myArchiverStats; PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; + PgStat_ReplSlotStats myReplSlotStats; FILE *fpin; int32 format_id; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; @@ -5661,6 +5803,21 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, break; + /* + * 'R' A PgStat_ReplSlotStats struct describing a replication slot + * follows. + */ + case 'R': + if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin) + != sizeof(PgStat_ReplSlotStats)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + break; + case 'E': goto done; @@ -6247,6 +6404,45 @@ pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len) } } +/* ---------- + * pgstat_recv_resetreplslotcounter() - + * + * Reset some replication slot statistics of the cluster. + * ---------- + */ +static void +pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, + int len) +{ + int i; + int idx = -1; + TimestampTz ts; + + if (!msg->clearall) + { + /* Get the index of replication slot statistics to reset */ + idx = pgstat_replslot_index(msg->m_slotname, false); + + if (idx < 0) + return; /* not found */ + } + + ts = GetCurrentTimestamp(); + for (i = 0; i < SLRU_NUM_ELEMENTS; i++) + { + /* reset entry with the given index, or all entries (index is -1) */ + if (msg->clearall || idx == i) + { + /* reset only counters. Don't clear slot name */ + replSlotStats[i].spill_txns = 0; + replSlotStats[i].spill_count = 0; + replSlotStats[i].spill_bytes = 0; + replSlotStats[i].stat_reset_timestamp = ts; + } + } +} + + /* ---------- * pgstat_recv_autovac() - * @@ -6494,6 +6690,77 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len) dbentry->last_checksum_failure = msg->m_failure_time; } +/* + * pgstat_replslot_index + * + * Return the index of entry of a replication slot with the given name, or + * -1 if the slot is not found. If create_it is true, this function creates + * the statistics of the replication slot if not exists. + */ +static int +pgstat_replslot_index(const char *name, bool create_it) +{ + int i; + + Assert(nReplSlotStats <= max_replication_slots); + for (i = 0; i < nReplSlotStats; i++) + { + if (strcmp(replSlotStats[i].slotname, name) == 0) + return i; /* found */ + } + + /* + * The slot is not found. We don't want to register the new statistics + * if the list is already full or the caller didn't request. + */ + if (i == max_replication_slots || !create_it) + return -1; + + /* Register new slot */ + memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); + memcpy(&replSlotStats[nReplSlotStats].slotname, name, NAMEDATALEN); + return nReplSlotStats++; +} + +/* ---------- + * pgstat_recv_replslot() - + * + * Process a REPLSLOT message. + * ---------- + */ +static void +pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) +{ + int idx; + + /* + * Get the index of replication slot statistics. On dropping, we + * don't create the new statistics. + */ + idx = pgstat_replslot_index(msg->m_slotname, !msg->m_drop); + + /* the statistics is not found or is already full */ + if (idx < 0) + return; + + Assert(idx >= 0 && idx <= max_replication_slots); + if (msg->m_drop) + { + /* Remove the replication slot statistics with the given name */ + memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1], + sizeof(PgStat_ReplSlotStats)); + nReplSlotStats--; + Assert(nReplSlotStats >= 0); + } + else + { + /* Update the replication slot statistics */ + replSlotStats[idx].spill_txns += msg->m_spill_txns; + replSlotStats[idx].spill_count += msg->m_spill_count; + replSlotStats[idx].spill_bytes += msg->m_spill_bytes; + } +} + /* ---------- * pgstat_recv_tempfile() - * diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 61902be3b0..378c78f896 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -32,6 +32,7 @@ #include "access/xlog_internal.h" #include "fmgr.h" #include "miscadmin.h" +#include "pgstat.h" #include "replication/decode.h" #include "replication/logical.h" #include "replication/origin.h" @@ -66,6 +67,7 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, const char *prefix, Size message_size, const char *message); static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); +static void UpdateSpillStats(LogicalDecodingContext *ctx); /* * Make sure the current settings & environment are capable of doing logical @@ -721,6 +723,11 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* Pop the error context stack */ error_context_stack = errcallback.previous; + + /* + * Update statistics about transactions that spilled to disk. + */ + UpdateSpillStats(ctx); } static void @@ -1091,3 +1098,21 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); } } + +static void +UpdateSpillStats(LogicalDecodingContext *ctx) +{ + ReorderBuffer *rb = ctx->reorder; + + elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld", + rb, + (long long) rb->spillTxns, + (long long) rb->spillCount, + (long long) rb->spillBytes); + + pgstat_report_replslot(NameStr(ctx->slot->data.name), + rb->spillTxns, rb->spillCount, rb->spillBytes); + rb->spillTxns = 0; + rb->spillCount = 0; + rb->spillBytes = 0; +} diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 5251932669..7afa2271bd 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -317,6 +317,10 @@ ReorderBufferAllocate(void) buffer->outbufsize = 0; buffer->size = 0; + buffer->spillCount = 0; + buffer->spillTxns = 0; + buffer->spillBytes = 0; + buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; dlist_init(&buffer->toplevel_by_lsn); @@ -2414,6 +2418,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) int fd = -1; XLogSegNo curOpenSegNo = 0; Size spilled = 0; + Size size = txn->size; elog(DEBUG2, "spill %u changes in XID %u to disk", (uint32) txn->nentries_mem, txn->xid); @@ -2472,6 +2477,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) spilled++; } + /* update the statistics */ + rb->spillCount += 1; + rb->spillBytes += size; + + /* Don't consider already serialized transactions. */ + rb->spillTxns += rbtxn_is_serialized(txn) ? 0 : 1; + Assert(spilled == txn->nentries_mem); Assert(dlist_is_empty(&txn->changes)); txn->nentries_mem = 0; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 57bbb6288c..f6119c6d95 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -322,6 +322,9 @@ ReplicationSlotCreate(const char *name, bool db_specific, /* Let everybody know we've modified this slot */ ConditionVariableBroadcast(&slot->active_cv); + + /* Create statistics entry for the new slot */ + pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0); } /* @@ -682,6 +685,17 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) ereport(WARNING, (errmsg("could not remove directory \"%s\"", tmppath))); + /* + * Report to drop the replication slot to stats collector. Since there + * is no guarantees the order of message arrival on an UDP connection, + * it's possible that a message for creating a new slot arrives before a + * message for removing the old slot. We send the drop message while + * holding ReplicationSlotAllocationLock to reduce that possibility. + * If the messages arrived in reverse, we would lose one statistics update + * message. + */ + pgstat_report_replslot_drop(NameStr(slot->data.name)); + /* * We release this at the very end, so that nobody starts trying to create * a slot while we're still cleaning up the detritus of the old one. diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 2aff739466..820311fa8f 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2027,6 +2027,20 @@ pg_stat_reset_slru(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* Reset replication slots counters (a specific one or all of them). */ +Datum +pg_stat_reset_replication_slot(PG_FUNCTION_ARGS) +{ + char *target = NULL; + + if (!PG_ARGISNULL(0)) + target = text_to_cstring(PG_GETARG_TEXT_PP(0)); + + pgstat_reset_replslot_counter(target); + + PG_RETURN_VOID(); +} + Datum pg_stat_get_archiver(PG_FUNCTION_ARGS) { @@ -2092,3 +2106,63 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } + +Datum +pg_stat_get_replication_slots(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_REPLICATION_SLOT_CLOS 4 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + PgStat_ReplSlotStats *stats; + int nstats; + int i; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + stats = pgstat_fetch_replslot(&nstats); + for (i = 0; i < nstats; i++) + { + Datum values[PG_STAT_GET_REPLICATION_SLOT_CLOS]; + bool nulls[PG_STAT_GET_REPLICATION_SLOT_CLOS]; + PgStat_ReplSlotStats stat = stats[i]; + + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = PointerGetDatum(cstring_to_text(stat.slotname)); + values[1] = Int64GetDatum(stat.spill_txns); + values[2] = Int64GetDatum(stat.spill_count); + values[3] = Int64GetDatum(stat.spill_bytes); + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 95604e988a..cfab3a0306 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5251,6 +5251,14 @@ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}', prosrc => 'pg_stat_get_wal_receiver' }, +{ oid => '8595', descr => 'statistics: information about replication slots', + proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f', + proretset => 't', provolatile => 's', proparallel => 'r', + prorettype => 'record', proargtypes => '', + proallargtypes => '{text,int8,int8,int8}', + proargmodes => '{o,o,o,o}', + proargnames => '{name,spill_txns,spill_count,spill_bytes}', + prosrc => 'pg_stat_get_replication_slots' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', @@ -5592,6 +5600,10 @@ descr => 'statistics: reset collected statistics for a single SLRU', proname => 'pg_stat_reset_slru', proisstrict => 'f', provolatile => 'v', prorettype => 'void', proargtypes => 'text', prosrc => 'pg_stat_reset_slru' }, +{ oid => '8596', + descr => 'statistics: reset collected statistics for a single replication slot', + proname => 'pg_stat_reset_replication_slot', proisstrict => 'f', provolatile => 'v', + prorettype => 'void', proargtypes => 'text', prosrc => 'pg_stat_reset_replication_slot' }, { oid => '3163', descr => 'current trigger depth', proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 1387201382..db51ac6b34 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -56,6 +56,7 @@ typedef enum StatMsgType PGSTAT_MTYPE_RESETSHAREDCOUNTER, PGSTAT_MTYPE_RESETSINGLECOUNTER, PGSTAT_MTYPE_RESETSLRUCOUNTER, + PGSTAT_MTYPE_RESETREPLSLOTCOUNTER, PGSTAT_MTYPE_AUTOVAC_START, PGSTAT_MTYPE_VACUUM, PGSTAT_MTYPE_ANALYZE, @@ -67,7 +68,8 @@ typedef enum StatMsgType PGSTAT_MTYPE_RECOVERYCONFLICT, PGSTAT_MTYPE_TEMPFILE, PGSTAT_MTYPE_DEADLOCK, - PGSTAT_MTYPE_CHECKSUMFAILURE + PGSTAT_MTYPE_CHECKSUMFAILURE, + PGSTAT_MTYPE_REPLSLOT, } StatMsgType; /* ---------- @@ -356,6 +358,18 @@ typedef struct PgStat_MsgResetslrucounter int m_index; } PgStat_MsgResetslrucounter; +/* ---------- + * PgStat_MsgResetreplslotcounter Sent by the backend to tell the collector + * to reset replicatino slot counter(s) + * ---------- + */ +typedef struct PgStat_MsgResetreplslotcounter +{ + PgStat_MsgHdr m_hdr; + char m_slotname[NAMEDATALEN]; + bool clearall; +} PgStat_MsgResetreplslotcounter; + /* ---------- * PgStat_MsgAutovacStart Sent by the autovacuum daemon to signal * that a database is going to be processed @@ -453,6 +467,22 @@ typedef struct PgStat_MsgSLRU PgStat_Counter m_truncate; } PgStat_MsgSLRU; +/* ---------- + * PgStat_MsgReplSlot Sent by a backend or a wal sender to update replication + * slot statistics. + * ---------- + */ +typedef struct PgStat_MsgReplSlot +{ + PgStat_MsgHdr m_hdr; + char m_slotname[NAMEDATALEN]; + bool m_drop; + PgStat_Counter m_spill_txns; + PgStat_Counter m_spill_count; + PgStat_Counter m_spill_bytes; +} PgStat_MsgReplSlot; + + /* ---------- * PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict * ---------- @@ -591,6 +621,7 @@ typedef union PgStat_Msg PgStat_MsgResetsharedcounter msg_resetsharedcounter; PgStat_MsgResetsinglecounter msg_resetsinglecounter; PgStat_MsgResetslrucounter msg_resetslrucounter; + PgStat_MsgResetreplslotcounter msg_resetreplslotcounter; PgStat_MsgAutovacStart msg_autovacuum_start; PgStat_MsgVacuum msg_vacuum; PgStat_MsgAnalyze msg_analyze; @@ -603,6 +634,7 @@ typedef union PgStat_Msg PgStat_MsgDeadlock msg_deadlock; PgStat_MsgTempFile msg_tempfile; PgStat_MsgChecksumFailure msg_checksumfailure; + PgStat_MsgReplSlot msg_replslot; } PgStat_Msg; @@ -760,6 +792,17 @@ typedef struct PgStat_SLRUStats TimestampTz stat_reset_timestamp; } PgStat_SLRUStats; +/* + * Replication slot statistics kept in the stats collector + */ +typedef struct PgStat_ReplSlotStats +{ + char slotname[NAMEDATALEN]; + PgStat_Counter spill_txns; + PgStat_Counter spill_count; + PgStat_Counter spill_bytes; + TimestampTz stat_reset_timestamp; +} PgStat_ReplSlotStats; /* ---------- * Backend states @@ -1298,6 +1341,7 @@ extern void pgstat_reset_counters(void); extern void pgstat_reset_shared_counters(const char *); extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type); extern void pgstat_reset_slru_counter(const char *); +extern void pgstat_reset_replslot_counter(const char *name); extern void pgstat_report_autovac(Oid dboid); extern void pgstat_report_vacuum(Oid tableoid, bool shared, @@ -1310,6 +1354,9 @@ extern void pgstat_report_recovery_conflict(int reason); extern void pgstat_report_deadlock(void); extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount); extern void pgstat_report_checksum_failure(void); +extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, + int spillbytes); +extern void pgstat_report_replslot_drop(const char *slotname); extern void pgstat_initialize(void); extern void pgstat_bestart(void); @@ -1474,6 +1521,7 @@ extern int pgstat_fetch_stat_numbackends(void); extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void); extern PgStat_GlobalStats *pgstat_fetch_global(void); extern PgStat_SLRUStats *pgstat_fetch_slru(void); +extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p); extern void pgstat_count_slru_page_zeroed(int slru_idx); extern void pgstat_count_slru_page_hit(int slru_idx); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 019bd382de..626ecf4dc9 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -413,6 +413,17 @@ struct ReorderBuffer /* memory accounting */ Size size; + + /* + * Statistics about transactions spilled to disk. + * + * A single transaction may be spilled repeatedly, which is why we keep + * two different counters. For spilling, the transaction counter includes + * both toplevel transactions and subtransactions. + */ + int64 spillCount; /* spill-to-disk invocation counter */ + int64 spillTxns; /* number of transactions spilled to disk */ + int64 spillBytes; /* amount of data spilled to disk */ }; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index fa436f2caa..d082127111 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2006,6 +2006,11 @@ pg_stat_replication| SELECT s.pid, FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid) JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); +pg_stat_replication_slots| SELECT s.name, + s.spill_txns, + s.spill_count, + s.spill_bytes + FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes); pg_stat_slru| SELECT s.name, s.blks_zeroed, s.blks_hit,