On 22.03.21 09:50, Markus Wanner wrote:
thank you for reconsidering this patch.  I updated it to include the required adjustments to the documentation.  Please review.

I tweaked the wording in the docs a bit, resulting in a v3 of this patch.

Regards

Markus
From: Markus Wanner <mar...@bluegap.ch>
Date: Tue, 2 Mar 2021 11:33:54 +0100
Subject: [PATCH] Add an xid argument to the filter_prepare callback for output plugins

---
 contrib/test_decoding/test_decoding.c     |  4 ++-
 doc/src/sgml/logicaldecoding.sgml         | 34 +++++++++++++++--------
 src/backend/replication/logical/decode.c  | 17 ++++++++----
 src/backend/replication/logical/logical.c |  5 ++--
 src/include/replication/logical.h         |  3 +-
 src/include/replication/output_plugin.h   |  1 +
 6 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ae5f397f351..de1b6926581 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -77,6 +77,7 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  bool transactional, const char *prefix,
 							  Size sz, const char *message);
 static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+									 TransactionId xid,
 									 const char *gid);
 static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn);
@@ -440,7 +441,8 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
  * substring, then we filter it out.
  */
 static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+						 const char *gid)
 {
 	if (strstr(gid, "_nodecode") != NULL)
 		return true;
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 80eb96d609a..f3ac84aa85a 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -794,20 +794,30 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
        <command>COMMIT PREPARED</command> time. To signal that
        decoding should be skipped, return <literal>true</literal>;
        <literal>false</literal> otherwise. When the callback is not
-       defined, <literal>false</literal> is assumed (i.e. nothing is
-       filtered).
+       defined, <literal>false</literal> is assumed (i.e. no filtering, all
+       transactions using two-phase commit are decoded in two phases as well).
 <programlisting>
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              TransactionId xid,
                                               const char *gid);
 </programlisting>
-      The <parameter>ctx</parameter> parameter has the same contents as for the
-      other callbacks. The <parameter>gid</parameter> is the identifier that later
-      identifies this transaction for <command>COMMIT PREPARED</command> or
-      <command>ROLLBACK PREPARED</command>.
+       The <parameter>ctx</parameter> parameter has the same contents as for
+       the other callbacks. The parameters <parameter>xid</parameter>
+       and <parameter>gid</parameter> provide two different ways to identify
+       the transaction.  For some systems, the <parameter>gid</parameter> may
+       be sufficient.  However, reuse of the same <parameter>gid</parameter>
+       for example by a downstream node using multiple subscriptions may lead
+       to it not being a unique identifier.  Therefore, other systems combine
+       the <parameter>xid</parameter> with a node identifier to form a
+       globally unique transaction identifier.  The later <command>COMMIT
+       PREPARED</command> or <command>ROLLBACK PREPARED</command> carries both
+       identifiers, providing an output plugin the choice of what to use.
      </para>
      <para>
-      The callback has to provide the same static answer for a given
-      <parameter>gid</parameter> every time it is called.
+       The callback may be invoked several times per transaction to decode and
+       must provide the same static answer for a given pair of
+       <parameter>xid</parameter> and <parameter>gid</parameter> every time
+       it is called.
      </para>
      </sect3>
 
@@ -1219,9 +1229,11 @@ stream_commit_cb(...);  &lt;-- commit of the streamed transaction
    </para>
 
    <para>
-    Optionally the output plugin can specify a name pattern in the
-    <function>filter_prepare_cb</function> and transactions with gid containing
-    that name pattern will not be decoded as a two-phase commit transaction.
+    Optionally the output plugin can define filtering rules via
+    <function>filter_prepare_cb</function> to decode only specific transaction
+    in two phases.  This can be achieved by pattern matching on the
+    <parameter>gid</parameter> or via lookups using the
+    <parameter>xid</parameter>.
    </para>
 
    <para>
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
 /* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool FilterPrepare(LogicalDecodingContext *ctx,
+								 TransactionId xid, const char *gid);
 static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
 							  XLogRecordBuffer *buf, Oid dbId,
 							  RepOriginId origin_id);
@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_COMMIT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeCommit(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * doesn't filter the transaction at prepare time.
 				 */
 				if (info == XLOG_XACT_ABORT_PREPARED)
-					two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+					two_phase = !(FilterPrepare(ctx, xid,
+												parsed.twophase_gid));
 
 				DecodeAbort(ctx, buf, &parsed, xid, two_phase);
 				break;
@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 * manner iff output plugin supports two-phase commits and
 				 * doesn't filter the transaction at prepare time.
 				 */
-				if (FilterPrepare(ctx, parsed.twophase_gid))
+				if (FilterPrepare(ctx, parsed.twophase_xid,
+								  parsed.twophase_gid))
 				{
 					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
 											buf->origptr);
@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * this transaction as a regular commit later.
  */
 static inline bool
-FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
+			  const char *gid)
 {
 	/*
 	 * Skip if decoding of two-phase transactions at PREPARE time is not
@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
 	if (ctx->callbacks.filter_prepare_cb == NULL)
 		return false;
 
-	return filter_prepare_cb_wrapper(ctx, gid);
+	return filter_prepare_cb_wrapper(ctx, xid, gid);
 }
 
 static inline bool
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 37b75deb728..2f6803637bf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 bool
-filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
+						  const char *gid)
 {
 	LogicalErrorCallbackState state;
 	ErrorContextCallback errcallback;
@@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
 	ctx->accept_writes = false;
 
 	/* do the actual work: call callback */
-	ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+	ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c2534033723..af551d6f4ee 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 												  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
+									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2c2c964c55f..810495ed0e4 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
  * and sent as usual transaction.
  */
 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+											  TransactionId xid,
 											  const char *gid);
 
 /*
-- 
2.30.2

Reply via email to