Sybase has a feature to turn off replication at the session level: set replication = off, which can be temporarily turned off when there is a maintenance action on the table. Our users also want this feature. I add a new flag bit in xinfo, control it with a session-level variable, when set to true, this flag is written when the transaction is committed, and when the logic is decoded it abandons the transaction like aborted transactions. Since PostgreSQL has two types of replication, I call the variable "logical_replication" to avoid confusion and default value is true.
Sample SQL insert into a values(100); set logical_replication to off; insert into a values(200); reset logical_replication; insert into a values(300); pg_recvlogical output(the second is not output.) BEGIN 492 table public.a: INSERT: col1[integer]:100 COMMIT 492 BEGIN 494 table public.a: INSERT: col1[integer]:300 COMMIT 494 I'm not sure this is the most appropriate way. What do you think? Regards, Quan Zongliang
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 9162286c98..d7a04539d6 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -82,6 +82,8 @@ bool XactDeferrable; int synchronous_commit = SYNCHRONOUS_COMMIT_ON; +bool logical_replication = true; + /* * When running as a parallel worker, we place only a single * TransactionStateData on the parallel worker's state stack, and the XID @@ -5535,6 +5537,9 @@ XactLogCommitRecord(TimestampTz commit_time, xl_origin.origin_timestamp = replorigin_session_origin_timestamp; } + if (!logical_replication) + xl_xinfo.xinfo |= XACT_XINFO_LOGIREPL_OFF; + if (xl_xinfo.xinfo != 0) info |= XLOG_XACT_HAS_INFO; diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c53e7e2279..b11c89ff74 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -618,6 +618,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, */ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || + (parsed->xinfo & XACT_XINFO_LOGIREPL_OFF) || ctx->fast_forward || FilterByOrigin(ctx, origin_id)) { for (i = 0; i < parsed->nsubxacts; i++) diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 90ffd89339..0880a2765f 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1952,6 +1952,16 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"logical_replication", PGC_USERSET, REPLICATION_MASTER, + gettext_noop("Close logical replication of transactions in the session."), + NULL + }, + &logical_replication, + true, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL diff --git a/src/include/access/xact.h b/src/include/access/xact.h index d714551704..1a7f52ac50 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -81,6 +81,9 @@ typedef enum /* Synchronous commit level */ extern int synchronous_commit; +/* turn on/off logical replication */ +extern bool logical_replication; + /* * Miscellaneous flag bits to record events which occur on the top level * transaction. These flags are only persisted in MyXactFlags and are intended @@ -168,6 +171,12 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XACT_XINFO_HAS_AE_LOCKS (1U << 6) #define XACT_XINFO_HAS_GID (1U << 7) +/* + * It indicates that the data affected by this transaction does not need + * to be included in the logical replication. + */ +#define XACT_XINFO_LOGIREPL_OFF (1U << 28) + /* * Also stored in xinfo, these indicating a variety of additional actions that * need to occur when emulating transaction effects during recovery.