On Fri, Apr 16, 2021 at 3:16 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Mon, Apr 12, 2021 at 2:57 PM vignesh C <vignes...@gmail.com> wrote:
> >
> > On Sat, Mar 20, 2021 at 9:26 AM Amit Kapila <amit.kapil...@gmail.com> wrote:
> > >
> > > On Sat, Mar 20, 2021 at 12:22 AM Andres Freund <and...@anarazel.de> wrote:
> > > >
> > > > And then more generally about the feature:
> > > > - If a slot was used to stream out a large amount of changes (say an
> > > >   initial data load), but then replication is interrupted before the
> > > >   transaction is committed/aborted, stream_bytes will not reflect the
> > > >   many gigabytes of data we may have sent.
> > > >
> > >
> > > We can probably update the stats each time we spilled or streamed the
> > > transaction data but it was not clear at that stage whether or how
> > > much it will be useful.
> > >
> >
> > I felt we can update the replication slot statistics data each time we
> > spill/stream the transaction data instead of accumulating the
> > statistics and updating at the end. I have tried this in the attached
> > patch and the statistics data were getting updated.
> > Thoughts?
> >
>
> Did you check if we can update the stats when we release the slot as
> discussed above? I am not sure if it is easy to do at the time of slot
> release because this information might not be accessible there and in
> some cases, we might have already released the decoding
> context/reorderbuffer where this information is stored. It might be
> okay to update this when we stream or spill but let's see if we can do
> it easily at the time of slot release.
>

I have made the changes to update the replication statistics at
replication slot release. Please find the patch attached for the same.
Thoughts?

Regards,
Vignesh
From 2b92c8f2824e1ef731b3c79fa50b6afb75314f76 Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Mon, 19 Apr 2021 14:54:27 +0530
Subject: [PATCH] Update decoding stats while releasing replication slot.

Currently replication slot statistics are updated at commit/rollback, if
the transaction is interrupted the stats might not get updated. Fixed
this by updating statistics during replication slot release. As part of
the fix, moved the statistics variable from ReorderBuffer to
ReplicationSlot structure to make the stats variable accessible at ReplicationSlotRelease.
---
 src/backend/replication/logical/decode.c      |  6 +-
 src/backend/replication/logical/logical.c     | 71 +++++++++++--------
 .../replication/logical/reorderbuffer.c       | 39 +++++-----
 src/backend/replication/slot.c                |  6 ++
 src/include/replication/logical.h             |  1 -
 src/include/replication/reorderbuffer.h       | 23 ------
 src/include/replication/slot.h                | 24 +++++++
 7 files changed, 95 insertions(+), 75 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 7924581cdc..6fbf22f345 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -750,7 +750,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * not clear that sending more or less frequently than this would be
 	 * better.
 	 */
-	UpdateDecodingStats(ctx);
+	UpdateDecodingStats();
 }
 
 /*
@@ -832,7 +832,7 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * not clear that sending more or less frequently than this would be
 	 * better.
 	 */
-	UpdateDecodingStats(ctx);
+	UpdateDecodingStats();
 }
 
 
@@ -889,7 +889,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	}
 
 	/* update the decoding stats */
-	UpdateDecodingStats(ctx);
+	UpdateDecodingStats();
 }
 
 /*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 35b0c67641..22b30ffdef 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -197,6 +197,15 @@ StartupDecodingContext(List *output_plugin_options,
 		LWLockRelease(ProcArrayLock);
 	}
 
+	slot->spillTxns = 0;
+	slot->spillCount = 0;
+	slot->spillBytes = 0;
+	slot->streamTxns = 0;
+	slot->streamCount = 0;
+	slot->streamBytes = 0;
+	slot->totalTxns = 0;
+	slot->totalBytes = 0;
+
 	ctx->slot = slot;
 
 	ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, cleanup_cb);
@@ -1770,44 +1779,46 @@ ResetLogicalStreamingState(void)
  * Report stats for a slot.
  */
 void
-UpdateDecodingStats(LogicalDecodingContext *ctx)
+UpdateDecodingStats()
 {
-	ReorderBuffer *rb = ctx->reorder;
 	PgStat_ReplSlotStats repSlotStat;
+	ReplicationSlot *slot = MyReplicationSlot;
+
+	Assert(MyReplicationSlot != NULL);
 
 	/* Nothing to do if we don't have any replication stats to be sent. */
-	if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
+	if (slot->spillBytes <= 0 && slot->streamBytes <= 0 && slot->totalBytes <= 0)
 		return;
 
 	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
-		 rb,
-		 (long long) rb->spillTxns,
-		 (long long) rb->spillCount,
-		 (long long) rb->spillBytes,
-		 (long long) rb->streamTxns,
-		 (long long) rb->streamCount,
-		 (long long) rb->streamBytes,
-		 (long long) rb->totalTxns,
-		 (long long) rb->totalBytes);
-
-	namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
-	repSlotStat.spill_txns = rb->spillTxns;
-	repSlotStat.spill_count = rb->spillCount;
-	repSlotStat.spill_bytes = rb->spillBytes;
-	repSlotStat.stream_txns = rb->streamTxns;
-	repSlotStat.stream_count = rb->streamCount;
-	repSlotStat.stream_bytes = rb->streamBytes;
-	repSlotStat.total_txns = rb->totalTxns;
-	repSlotStat.total_bytes = rb->totalBytes;
+		 slot,
+		 (long long) slot->spillTxns,
+		 (long long) slot->spillCount,
+		 (long long) slot->spillBytes,
+		 (long long) slot->streamTxns,
+		 (long long) slot->streamCount,
+		 (long long) slot->streamBytes,
+		 (long long) slot->totalTxns,
+		 (long long) slot->totalBytes);
+
+	namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name));
+	repSlotStat.spill_txns = slot->spillTxns;
+	repSlotStat.spill_count = slot->spillCount;
+	repSlotStat.spill_bytes = slot->spillBytes;
+	repSlotStat.stream_txns = slot->streamTxns;
+	repSlotStat.stream_count = slot->streamCount;
+	repSlotStat.stream_bytes = slot->streamBytes;
+	repSlotStat.total_txns = slot->totalTxns;
+	repSlotStat.total_bytes = slot->totalBytes;
 
 	pgstat_report_replslot(&repSlotStat);
 
-	rb->spillTxns = 0;
-	rb->spillCount = 0;
-	rb->spillBytes = 0;
-	rb->streamTxns = 0;
-	rb->streamCount = 0;
-	rb->streamBytes = 0;
-	rb->totalTxns = 0;
-	rb->totalBytes = 0;
+	slot->spillTxns = 0;
+	slot->spillCount = 0;
+	slot->spillBytes = 0;
+	slot->streamTxns = 0;
+	slot->streamCount = 0;
+	slot->streamBytes = 0;
+	slot->totalTxns = 0;
+	slot->totalBytes = 0;
 }
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5cb484f032..700ccf542c 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -344,15 +344,6 @@ ReorderBufferAllocate(void)
 	buffer->outbufsize = 0;
 	buffer->size = 0;
 
-	buffer->spillTxns = 0;
-	buffer->spillCount = 0;
-	buffer->spillBytes = 0;
-	buffer->streamTxns = 0;
-	buffer->streamCount = 0;
-	buffer->streamBytes = 0;
-	buffer->totalTxns = 0;
-	buffer->totalBytes = 0;
-
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
 	dlist_init(&buffer->toplevel_by_lsn);
@@ -1316,6 +1307,9 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 	ReorderBufferChange *change;
 	ReorderBufferIterTXNEntry *entry;
 	int32		off;
+	ReplicationSlot *slot = MyReplicationSlot;
+
+	Assert(MyReplicationSlot != NULL);
 
 	/* nothing there anymore */
 	if (state->heap->bh_size == 0)
@@ -1369,7 +1363,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		 * Update the total bytes processed before releasing the current set
 		 * of changes and restoring the new set of changes.
 		 */
-		rb->totalBytes += rb->size;
+		slot->totalBytes += rb->size;
 		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
 										&state->entries[off].segno))
 		{
@@ -2018,6 +2012,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	ReorderBufferChange *volatile specinsert = NULL;
 	volatile bool stream_started = false;
 	ReorderBufferTXN *volatile curtxn = NULL;
+	ReplicationSlot *slot = MyReplicationSlot;
+
+	Assert(MyReplicationSlot != NULL);
 
 	/* build data to be able to lookup the CommandIds of catalog tuples */
 	ReorderBufferBuildTupleCidHash(rb, txn);
@@ -2380,9 +2377,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		 * which we have already accounted in ReorderBufferIterTXNNext.
 		 */
 		if (!rbtxn_is_streamed(txn))
-			rb->totalTxns++;
+			slot->totalTxns++;
 
-		rb->totalBytes += rb->size;
+		slot->totalBytes += rb->size;
 
 		/*
 		 * Done with current changes, send the last message for this set of
@@ -3482,6 +3479,9 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	XLogSegNo	curOpenSegNo = 0;
 	Size		spilled = 0;
 	Size		size = txn->size;
+	ReplicationSlot *slot = MyReplicationSlot;
+
+	Assert(MyReplicationSlot != NULL);
 
 	elog(DEBUG2, "spill %u changes in XID %u to disk",
 		 (uint32) txn->nentries_mem, txn->xid);
@@ -3543,11 +3543,11 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	/* update the statistics iff we have spilled anything */
 	if (spilled)
 	{
-		rb->spillCount += 1;
-		rb->spillBytes += size;
+		slot->spillCount += 1;
+		slot->spillBytes += size;
 
 		/* don't consider already serialized transactions */
-		rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
+		slot->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
 	}
 
 	Assert(spilled == txn->nentries_mem);
@@ -3818,6 +3818,9 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	CommandId	command_id;
 	Size		stream_bytes;
 	bool		txn_is_streamed;
+	ReplicationSlot *slot = MyReplicationSlot;
+
+	Assert(MyReplicationSlot != NULL);
 
 	/* We can never reach here for a subtransaction. */
 	Assert(txn->toptxn == NULL);
@@ -3911,11 +3914,11 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
 							command_id, true);
 
-	rb->streamCount += 1;
-	rb->streamBytes += stream_bytes;
+	slot->streamCount += 1;
+	slot->streamBytes += stream_bytes;
 
 	/* Don't consider already streamed transaction. */
-	rb->streamTxns += (txn_is_streamed) ? 0 : 1;
+	slot->streamTxns += (txn_is_streamed) ? 0 : 1;
 
 	Assert(dlist_is_empty(&txn->changes));
 	Assert(txn->nentries == 0);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index f61b163f78..07e7901fa6 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -500,6 +500,12 @@ ReplicationSlotRelease(void)
 
 	Assert(slot != NULL && slot->active_pid != 0);
 
+	if (SlotIsLogical(slot))
+	{
+		/* update the decoding stats that are present in the slot */
+		UpdateDecodingStats();
+	}
+
 	if (slot->data.persistency == RS_EPHEMERAL)
 	{
 		/*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 7dfcb7be18..f109e90692 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -134,6 +134,5 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
-extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 
 #endif
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bfab8303ee..c8295fc9ed 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -602,29 +602,6 @@ struct ReorderBuffer
 
 	/* memory accounting */
 	Size		size;
-
-	/*
-	 * Statistics about transactions spilled to disk.
-	 *
-	 * A single transaction may be spilled repeatedly, which is why we keep
-	 * two different counters. For spilling, the transaction counter includes
-	 * both toplevel transactions and subtransactions.
-	 */
-	int64		spillTxns;		/* number of transactions spilled to disk */
-	int64		spillCount;		/* spill-to-disk invocation counter */
-	int64		spillBytes;		/* amount of data spilled to disk */
-
-	/* Statistics about transactions streamed to the decoding output plugin */
-	int64		streamTxns;		/* number of transactions streamed */
-	int64		streamCount;	/* streaming invocation counter */
-	int64		streamBytes;	/* amount of data streamed */
-
-	/*
-	 * Statistics about all the transactions sent to the decoding output
-	 * plugin
-	 */
-	int64		totalTxns;		/* total number of transactions sent */
-	int64		totalBytes;		/* total amount of data sent */
 };
 
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 1ad5e6c50d..d1f30e095c 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -172,6 +172,29 @@ typedef struct ReplicationSlot
 	XLogRecPtr	candidate_xmin_lsn;
 	XLogRecPtr	candidate_restart_valid;
 	XLogRecPtr	candidate_restart_lsn;
+
+	/*
+	 * Statistics about transactions spilled to disk.
+	 *
+	 * A single transaction may be spilled repeatedly, which is why we keep
+	 * two different counters. For spilling, the transaction counter includes
+	 * both toplevel transactions and subtransactions.
+	 */
+	int64		spillTxns;		/* number of transactions spilled to disk */
+	int64		spillCount;		/* spill-to-disk invocation counter */
+	int64		spillBytes;		/* amount of data spilled to disk */
+
+	/* Statistics about transactions streamed to the decoding output plugin */
+	int64		streamTxns;		/* number of transactions streamed */
+	int64		streamCount;	/* streaming invocation counter */
+	int64		streamBytes;	/* amount of data streamed */
+
+	/*
+	 * Statistics about all the transactions sent to the decoding output
+	 * plugin
+	 */
+	int64		totalTxns;		/* total number of transactions sent */
+	int64		totalBytes;		/* total amount of data sent */
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
@@ -231,5 +254,6 @@ extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);
+extern void UpdateDecodingStats(void);
 
 #endif							/* SLOT_H */
-- 
2.25.1

Reply via email to