Sorry, git tricked me. Here's the patch including actual changes.
Regards Markus
From: Markus Wanner <mar...@bluegap.ch> Date: Tue, 2 Mar 2021 11:33:54 +0100 Subject: [PATCH] Add an xid argument to the filter_prepare callback for output plugins --- contrib/test_decoding/test_decoding.c | 4 ++- doc/src/sgml/logicaldecoding.sgml | 35 ++++++++++++++++------- src/backend/replication/logical/decode.c | 17 +++++++---- src/backend/replication/logical/logical.c | 5 ++-- src/include/replication/logical.h | 3 +- src/include/replication/output_plugin.h | 1 + 6 files changed, 44 insertions(+), 21 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index ae5f397f351..de1b6926581 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx, bool transactional, const char *prefix, Size sz, const char *message); static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, + TransactionId xid, const char *gid); static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); @@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, * substring, then we filter it out. */ static bool -pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid) +pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, + const char *gid) { if (strstr(gid, "_nodecode") != NULL) return true; diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 80eb96d609a..84717ae93e5 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -794,20 +794,31 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, <command>COMMIT PREPARED</command> time. To signal that decoding should be skipped, return <literal>true</literal>; <literal>false</literal> otherwise. When the callback is not - defined, <literal>false</literal> is assumed (i.e. nothing is - filtered). + defined, <literal>false</literal> is assumed (i.e. no filtering, all + transactions using two-phase commit are decoded in two phases as well). <programlisting> typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + TransactionId xid, const char *gid); </programlisting> - The <parameter>ctx</parameter> parameter has the same contents as for the - other callbacks. The <parameter>gid</parameter> is the identifier that later - identifies this transaction for <command>COMMIT PREPARED</command> or - <command>ROLLBACK PREPARED</command>. + The <parameter>ctx</parameter> parameter has the same contents as for + the other callbacks. The parameters <parameter>xid</parameter> + and <parameter>gid</parameter> provide two different ways to identify + the transaction. For some systems, the <parameter>gid</parameter> may + be sufficient. However, reuse of the same <parameter>gid</parameter> + for example by a downstream node using multiple subscriptions may lead + to it not being a unique identifier. Therefore, other systems combine + the <parameter>xid</parameter> with an identifier of the origin node to + form a globally unique transaction identifier. The later + <command>COMMIT PREPARED</command> or <command>ROLLBACK + PREPARED</command> carries both identifiers, providing an output plugin + the choice of what to use. </para> <para> - The callback has to provide the same static answer for a given - <parameter>gid</parameter> every time it is called. + The callback may be invoked multiple times per transaction to decode + and must provide the same static answer for a given pair of + <parameter>xid</parameter> and <parameter>gid</parameter> every time + it is called. </para> </sect3> @@ -1219,9 +1230,11 @@ stream_commit_cb(...); <-- commit of the streamed transaction </para> <para> - Optionally the output plugin can specify a name pattern in the - <function>filter_prepare_cb</function> and transactions with gid containing - that name pattern will not be decoded as a two-phase commit transaction. + Optionally the output plugin can define filtering rules via + <function>filter_prepare_cb</function> to decode only specific transaction + in two phases. This can be achieved by pattern matching on the + <parameter>gid</parameter> or via lookups using the + <parameter>xid</parameter>. </para> <para> diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 5f596135b15..97be4b0f23f 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); /* helper functions for decoding transactions */ -static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid); +static inline bool FilterPrepare(LogicalDecodingContext *ctx, + TransactionId xid, const char *gid); static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, Oid dbId, RepOriginId origin_id); @@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * doesn't filter the transaction at prepare time. */ if (info == XLOG_XACT_COMMIT_PREPARED) - two_phase = !(FilterPrepare(ctx, parsed.twophase_gid)); + two_phase = !(FilterPrepare(ctx, xid, + parsed.twophase_gid)); DecodeCommit(ctx, buf, &parsed, xid, two_phase); break; @@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * doesn't filter the transaction at prepare time. */ if (info == XLOG_XACT_ABORT_PREPARED) - two_phase = !(FilterPrepare(ctx, parsed.twophase_gid)); + two_phase = !(FilterPrepare(ctx, xid, + parsed.twophase_gid)); DecodeAbort(ctx, buf, &parsed, xid, two_phase); break; @@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * manner iff output plugin supports two-phase commits and * doesn't filter the transaction at prepare time. */ - if (FilterPrepare(ctx, parsed.twophase_gid)) + if (FilterPrepare(ctx, parsed.twophase_xid, + parsed.twophase_gid)) { ReorderBufferProcessXid(reorder, parsed.twophase_xid, buf->origptr); @@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * this transaction as a regular commit later. */ static inline bool -FilterPrepare(LogicalDecodingContext *ctx, const char *gid) +FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid, + const char *gid) { /* * Skip if decoding of two-phase transactions at PREPARE time is not @@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid) if (ctx->callbacks.filter_prepare_cb == NULL) return false; - return filter_prepare_cb_wrapper(ctx, gid); + return filter_prepare_cb_wrapper(ctx, xid, gid); } static inline bool diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 37b75deb728..2f6803637bf 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } bool -filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid) +filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, + const char *gid) { LogicalErrorCallbackState state; ErrorContextCallback errcallback; @@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid) ctx->accept_writes = false; /* do the actual work: call callback */ - ret = ctx->callbacks.filter_prepare_cb(ctx, gid); + ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid); /* Pop the error context stack */ error_context_stack = errcallback.previous; diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index c2534033723..af551d6f4ee 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn); extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); -extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid); +extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, + TransactionId xid, const char *gid); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); extern void ResetLogicalStreamingState(void); extern void UpdateDecodingStats(LogicalDecodingContext *ctx); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 2c2c964c55f..810495ed0e4 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx); * and sent as usual transaction. */ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + TransactionId xid, const char *gid); /* -- 2.30.2