On Mon, May 3, 2021 at 10:21 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Mon, May 3, 2021 at 5:48 PM Masahiko Sawada <sawada.m...@gmail.com> wrote:
> >
> > On Mon, May 3, 2021 at 2:27 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> > >
> > > On Thu, Apr 29, 2021 at 10:37 AM Amit Kapila <amit.kapil...@gmail.com> 
> > > wrote:
> > > >
> > > > On Wed, Apr 28, 2021 at 7:43 PM Masahiko Sawada <sawada.m...@gmail.com> 
> > > > wrote:
> > > > >
> > > > > On Wed, Apr 28, 2021 at 3:25 PM Amit Kapila <amit.kapil...@gmail.com> 
> > > > > wrote:
> > > > > >
> > > > >
> > > > > > I am not sure if any of these alternatives are a good idea. What do
> > > > > > you think? Do you have any other ideas for this?
> > > > >
> > > > > I've been considering some ideas but don't come up with a good one
> > > > > yet. It’s just an idea and not tested but how about having
> > > > > CreateDecodingContext() register before_shmem_exit() callback with the
> > > > > decoding context to ensure that we send slot stats even on
> > > > > interruption. And FreeDecodingContext() cancels the callback.
> > > > >
> > > >
> > > > Is it a good idea to send stats while exiting and rely on the same? I
> > > > think before_shmem_exit is mostly used for the cleanup purpose so not
> > > > sure if we can rely on it for this purpose. I think we can't be sure
> > > > that in all cases we will send all stats, so maybe Vignesh's patch is
> > > > sufficient to cover the cases where we avoid losing it in cases where
> > > > we would have sent a large amount of data.
> > > >
> > >
> > > Sawada-San, any thoughts on this point?
> >
> > before_shmem_exit is mostly used to the cleanup purpose but how about
> > on_shmem_exit()? pgstats relies on that to send stats at the
> > interruption. See pgstat_shutdown_hook().
> >
>
> Yeah, that is worth trying. Would you like to give it a try?

Yes.

In this approach, I think we will need to have a static pointer in
logical.c pointing to LogicalDecodingContext that we’re using. At
StartupDecodingContext(), we set the pointer to the just created
LogicalDecodingContext and register the callback so that we can refer
to the LogicalDecodingContext on that callback. And at
FreeDecodingContext(), we reset the pointer to NULL (however, since
FreeDecodingContext() is not called when an error happens we would
need to ensure resetting it somehow). But, after more thought, if we
have the static pointer in logical.c it would rather be better to have
a global function that sends slot stats based on the
LogicalDecodingContext pointed by the static pointer and can be called
by ReplicationSlotRelease(). That way, we don’t need to worry about
erroring out cases as well as interruption cases, although we need to
have a new static pointer.

I've attached a quick-hacked patch. I also incorporated the change
that calls UpdateDecodingStats() at FreeDecodingContext() so that we
can send slot stats also in the case where we spilled/streamed changes
but finished without commit/abort/prepare record.

>  I think
> it still might not cover the cases where we error out in the backend
> while decoding via APIs because at that time we won't exit, maybe for
> that we can consider Vignesh's patch.

Agreed. It seems to me that the approach of the attached patch is
better than the approach using on_shmem_exit(). So if we want to avoid
having the new static pointer and function for this purpose we can
consider Vignesh’s patch.

Regards,

-- 
Masahiko Sawada
EDB:  https://www.enterprisedb.com/
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 00543ede45..f32a2da565 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -51,6 +51,12 @@ typedef struct LogicalErrorCallbackState
 	XLogRecPtr	report_location;
 } LogicalErrorCallbackState;
 
+/*
+ * Pointing to the currently-used logical decoding context andu sed to sent
+ * slot statistics on releasing slots.
+ */
+static LogicalDecodingContext *MyLogicalDecodingContext = NULL;
+
 /* wrappers around output plugin callbacks */
 static void output_plugin_error_callback(void *arg);
 static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -290,6 +296,13 @@ StartupDecodingContext(List *output_plugin_options,
 
 	MemoryContextSwitchTo(old_context);
 
+	/*
+	 * Keep holding the decoding context until freeing the decoding context
+	 * or releasing the logical slot.
+	 */
+	Assert(MyLogicalDecodingContext == NULL);
+	MyLogicalDecodingContext = ctx;
+
 	return ctx;
 }
 
@@ -626,10 +639,12 @@ FreeDecodingContext(LogicalDecodingContext *ctx)
 	if (ctx->callbacks.shutdown_cb != NULL)
 		shutdown_cb_wrapper(ctx);
 
+	UpdateDecodingStats(ctx);
 	ReorderBufferFree(ctx->reorder);
 	FreeSnapshotBuilder(ctx->snapshot_builder);
 	XLogReaderFree(ctx->reader);
 	MemoryContextDelete(ctx->context);
+	MyLogicalDecodingContext = NULL;
 }
 
 /*
@@ -1811,3 +1826,17 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->totalTxns = 0;
 	rb->totalBytes = 0;
 }
+
+/*
+ * The function called at releasing a logical replication slot, sending the
+ * remaining slot statistics.
+ */
+void
+DecodingContextAtSlotRelease(void)
+{
+	if (MyLogicalDecodingContext)
+	{
+		UpdateDecodingStats(MyLogicalDecodingContext);
+		MyLogicalDecodingContext = NULL;
+	}
+}
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index cf261e200e..4e9b45c84b 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -45,6 +45,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "replication/slot.h"
+#include "replication/logical.h"
 #include "storage/fd.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -496,6 +497,9 @@ ReplicationSlotRelease(void)
 
 	Assert(slot != NULL && slot->active_pid != 0);
 
+	if (SlotIsLogical(slot))
+		DecodingContextAtSlotRelease();
+
 	if (slot->data.persistency == RS_EPHEMERAL)
 	{
 		/*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 7dfcb7be18..c156045020 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -135,5 +135,6 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
+extern void DecodingContextAtSlotRelease(void);
 
 #endif

Reply via email to