On Tue, Apr 27, 2021 at 9:48 AM vignesh C <vignes...@gmail.com> wrote:
>
> On Tue, Apr 27, 2021 at 9:43 AM Amit Kapila <amit.kapil...@gmail.com> wrote:
> >
> > On Tue, Apr 27, 2021 at 9:17 AM Masahiko Sawada <sawada.m...@gmail.com> 
> > wrote:
> > >
> > > On Tue, Apr 27, 2021 at 11:31 AM vignesh C <vignes...@gmail.com> wrote:
> > > >
> > > > > > And I think there is
> > > > > > also a risk to increase shared memory when we want to add other
> > > > > > statistics in the future.
> > > > > >
> > > > >
> > > > > Yeah, so do you think it is not a good idea to store stats in
> > > > > ReplicationSlot? Actually storing them in a slot makes it easier to
> > > > > send them during ReplicationSlotRelease which is quite helpful if the
> > > > > replication is interrupted due to some reason. Or the other idea was
> > > > > that we send stats every time we stream or spill changes.
> > > >
> > > > We use around 64 bytes of shared memory to store the statistics
> > > > information per slot, I'm not sure if this is a lot of memory. If this
> > > > memory is fine, then I felt the approach to store stats seems fine. If
> > > > that memory is too much then we could use the other approach to update
> > > > stats when we stream or spill the changes as suggested by Amit.
> > >
> > > I agree that makes it easier to send slot stats during
> > > ReplicationSlotRelease() but I'd prefer to avoid storing data that
> > > doesn't need to be shared in the shared buffer if possible.
> > >
> >
> > Sounds reasonable and we might add some stats in the future so that
> > will further increase the usage of shared memory.
> >
> > > And those
> > > counters are not used by physical slots at all. If sending slot stats
> > > every time we stream or spill changes doesn't affect the system much,
> > > I think it's better than having slot stats in the shared memory.
> > >
> >
> > As the minimum size of logical_decoding_work_mem is 64KB, so in the
> > worst case, we will send stats after decoding that many changes. I
> > don't think it would impact too much considering that we need to spill
> > or stream those many changes.  If it concerns any users they can
> > always increase logical_decoding_work_mem. The default value is 64MB
> > at which point, I don't think it will matter sending the stats.
>
> Sounds good to me, I will rebase my previous patch and send a patch for this.
>

Attached patch has the changes to update statistics during
spill/stream which prevents the statistics from being lost during
interrupt.
Thoughts?

Regards,
Vignesh
From a1fd6f76c955482efa7e63ea6c25ae6350160b4d Mon Sep 17 00:00:00 2001
From: vignesh <vignes...@gmail.com>
Date: Tue, 27 Apr 2021 10:56:02 +0530
Subject: [PATCH v3] Update replication statistics after every stream/spill.

Currently, replication slot statistics are updated at prepare, commit, and
rollback. Now, if the transaction is interrupted the stats might not get
updated. Fixed this by updating replication statistics after every
stream/spill.
---
 src/backend/replication/logical/decode.c        | 6 +++---
 src/backend/replication/logical/logical.c       | 6 +++---
 src/backend/replication/logical/reorderbuffer.c | 3 +++
 src/include/replication/logical.h               | 2 +-
 4 files changed, 10 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 7924581cdc..2c009d51ec 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -750,7 +750,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * not clear that sending more or less frequently than this would be
 	 * better.
 	 */
-	UpdateDecodingStats(ctx);
+	UpdateDecodingStats(ctx->reorder);
 }
 
 /*
@@ -832,7 +832,7 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * not clear that sending more or less frequently than this would be
 	 * better.
 	 */
-	UpdateDecodingStats(ctx);
+	UpdateDecodingStats(ctx->reorder);
 }
 
 
@@ -889,7 +889,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	}
 
 	/* update the decoding stats */
-	UpdateDecodingStats(ctx);
+	UpdateDecodingStats(ctx->reorder);
 }
 
 /*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 00543ede45..0c3ef0f93f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1770,10 +1770,10 @@ ResetLogicalStreamingState(void)
  * Report stats for a slot.
  */
 void
-UpdateDecodingStats(LogicalDecodingContext *ctx)
+UpdateDecodingStats(ReorderBuffer *rb)
 {
-	ReorderBuffer *rb = ctx->reorder;
 	PgStat_StatReplSlotEntry repSlotStat;
+	ReplicationSlot *slot = MyReplicationSlot;
 
 	/* Nothing to do if we don't have any replication stats to be sent. */
 	if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
@@ -1790,7 +1790,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 		 (long long) rb->totalTxns,
 		 (long long) rb->totalBytes);
 
-	namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
+	namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name));
 	repSlotStat.spill_txns = rb->spillTxns;
 	repSlotStat.spill_count = rb->spillCount;
 	repSlotStat.spill_bytes = rb->spillBytes;
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c27f710053..f4d97c1de3 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -3551,6 +3551,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 		/* don't consider already serialized transactions */
 		rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
+		UpdateDecodingStats(rb);
 	}
 
 	Assert(spilled == txn->nentries_mem);
@@ -3920,6 +3921,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	/* Don't consider already streamed transaction. */
 	rb->streamTxns += (txn_is_streamed) ? 0 : 1;
 
+	UpdateDecodingStats(rb);
+
 	Assert(dlist_is_empty(&txn->changes));
 	Assert(txn->nentries == 0);
 	Assert(txn->nentries_mem == 0);
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 7dfcb7be18..8e03e055f6 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -134,6 +134,6 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
-extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
+extern void UpdateDecodingStats(ReorderBuffer *rb);
 
 #endif
-- 
2.25.1

Reply via email to