Sybase has a feature to turn off replication at the session level: set replication = off, which can be temporarily turned off when there is a maintenance action on the table. Our users also want this feature. I add a new flag bit in xinfo, control it with a session-level variable, when set to true, this flag is written when the transaction is committed, and when the logic is decoded it abandons the transaction like aborted transactions. Since PostgreSQL has two types of replication, I call the variable "logical_replication" to avoid confusion and default value is true.

Sample SQL

insert into a values(100);
set logical_replication to off;
insert into a values(200);
reset logical_replication;
insert into a values(300);

pg_recvlogical output(the second is not output.)
BEGIN 492
table public.a: INSERT: col1[integer]:100
COMMIT 492
BEGIN 494
table public.a: INSERT: col1[integer]:300
COMMIT 494

I'm not sure this is the most appropriate way. What do you think?

Regards,
Quan Zongliang
diff --git a/src/backend/access/transam/xact.c 
b/src/backend/access/transam/xact.c
index 9162286c98..d7a04539d6 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -82,6 +82,8 @@ bool          XactDeferrable;
 
 int                    synchronous_commit = SYNCHRONOUS_COMMIT_ON;
 
+bool           logical_replication = true;
+
 /*
  * When running as a parallel worker, we place only a single
  * TransactionStateData on the parallel worker's state stack, and the XID
@@ -5535,6 +5537,9 @@ XactLogCommitRecord(TimestampTz commit_time,
                xl_origin.origin_timestamp = 
replorigin_session_origin_timestamp;
        }
 
+       if (!logical_replication)
+               xl_xinfo.xinfo |= XACT_XINFO_LOGIREPL_OFF;
+
        if (xl_xinfo.xinfo != 0)
                info |= XLOG_XACT_HAS_INFO;
 
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index c53e7e2279..b11c89ff74 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -618,6 +618,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf,
         */
        if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
                (parsed->dbId != InvalidOid && parsed->dbId != 
ctx->slot->data.database) ||
+               (parsed->xinfo & XACT_XINFO_LOGIREPL_OFF) ||
                ctx->fast_forward || FilterByOrigin(ctx, origin_id))
        {
                for (i = 0; i < parsed->nsubxacts; i++)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 90ffd89339..0880a2765f 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1952,6 +1952,16 @@ static struct config_bool ConfigureNamesBool[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"logical_replication", PGC_USERSET, REPLICATION_MASTER,
+                       gettext_noop("Close logical replication of transactions 
in the session."),
+                       NULL
+               },
+               &logical_replication,
+               true,
+               NULL, NULL, NULL
+       },
+
        /* End-of-list marker */
        {
                {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index d714551704..1a7f52ac50 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -81,6 +81,9 @@ typedef enum
 /* Synchronous commit level */
 extern int     synchronous_commit;
 
+/* turn on/off logical replication */
+extern bool    logical_replication;
+
 /*
  * Miscellaneous flag bits to record events which occur on the top level
  * transaction. These flags are only persisted in MyXactFlags and are intended
@@ -168,6 +171,12 @@ typedef void (*SubXactCallback) (SubXactEvent event, 
SubTransactionId mySubid,
 #define XACT_XINFO_HAS_AE_LOCKS                        (1U << 6)
 #define XACT_XINFO_HAS_GID                             (1U << 7)
 
+/*
+ * It indicates that the data affected by this transaction does not need
+ * to be included in the logical replication.
+ */
+#define XACT_XINFO_LOGIREPL_OFF                        (1U << 28)
+
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
  * need to occur when emulating transaction effects during recovery.

Reply via email to