On Mon, 2021-11-08 at 15:36 -0800, Jeff Davis wrote:
> The attached patch (against v14, so it's easier to test columnar) is
> somewhat like a simplified version of [3] combined with refactoring
> to
> make decoding a part of the rmgr.

New patches attached (v3). Essentially the content as v2, but split
into 3 patches and rebased.

Review on patch 0001 would be helpful. It makes decoding just another
method of rmgr, which makes a lot of sense to me from a code
organization standpoint regardless of the other patches. Is there any
reason not to do that?

The other patches then make rmgr extensible, which in turn makes
decoding extensible and solves the logical replication problem for
custom table AMs. The most obvious way to make rmgr extensible would be
to expand the rmgr table, but I was concerned about making that dynamic
(right now the structure is entirely constant and perhaps that's
important for some optimizations?). So, at the cost of complexity I
made a separate, dynamic rmgr table to hold the custom entries.

The custom rmgr API would probably be marked "experimental" for a
while, and I don't expect lots of people to use it given the challenges
of a production-quality table AM. But it's still important, because
without it a table AM has no chance to participate in logical
replication.

Regards,
        Jeff Davis

From b6b6c93513d42223ac2bdcb1cd1522fd845d0af7 Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Sat, 6 Nov 2021 12:58:04 -0700
Subject: [PATCH 1/3] Make logical decoding a part of the rmgr.

Add a new rmgr method, rm_decode, and use that rather than a switch
statement.

In preparation for extensible rmgr.

Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com
---
 src/backend/access/transam/rmgr.c        |   5 +-
 src/backend/replication/logical/decode.c | 105 +++++------------------
 src/bin/pg_rewind/parsexlog.c            |   2 +-
 src/bin/pg_waldump/rmgrdesc.c            |   2 +-
 src/include/access/rmgr.h                |   2 +-
 src/include/access/rmgrlist.h            |  44 +++++-----
 src/include/access/xlog_internal.h       |   5 ++
 src/include/replication/decode.h         |  16 +++-
 8 files changed, 69 insertions(+), 112 deletions(-)

diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 58091f6b520..f8847d5aebf 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,14 +24,15 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
 
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
-	{ name, redo, desc, identify, startup, cleanup, mask },
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
+	{ name, redo, desc, identify, startup, cleanup, mask, decode },
 
 const RmgrData RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 1d22208c1ad..9b450c9f90e 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -43,21 +43,6 @@
 #include "replication/snapbuild.h"
 #include "storage/standby.h"
 
-typedef struct XLogRecordBuffer
-{
-	XLogRecPtr	origptr;
-	XLogRecPtr	endptr;
-	XLogReaderState *record;
-} XLogRecordBuffer;
-
-/* RMGR Handlers */
-static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 {
 	XLogRecordBuffer buf;
 	TransactionId txid;
+	RmgrId rmid;
 
 	buf.origptr = ctx->reader->ReadRecPtr;
 	buf.endptr = ctx->reader->EndRecPtr;
@@ -127,72 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 								 buf.origptr);
 	}
 
-	/* cast so we get a warning when new rmgrs are added */
-	switch ((RmgrId) XLogRecGetRmid(record))
-	{
-			/*
-			 * Rmgrs we care about for logical decoding. Add new rmgrs in
-			 * rmgrlist.h's order.
-			 */
-		case RM_XLOG_ID:
-			DecodeXLogOp(ctx, &buf);
-			break;
-
-		case RM_XACT_ID:
-			DecodeXactOp(ctx, &buf);
-			break;
+	rmid = XLogRecGetRmid(record);
 
-		case RM_STANDBY_ID:
-			DecodeStandbyOp(ctx, &buf);
-			break;
-
-		case RM_HEAP2_ID:
-			DecodeHeap2Op(ctx, &buf);
-			break;
-
-		case RM_HEAP_ID:
-			DecodeHeapOp(ctx, &buf);
-			break;
-
-		case RM_LOGICALMSG_ID:
-			DecodeLogicalMsgOp(ctx, &buf);
-			break;
-
-			/*
-			 * Rmgrs irrelevant for logical decoding; they describe stuff not
-			 * represented in logical decoding. Add new rmgrs in rmgrlist.h's
-			 * order.
-			 */
-		case RM_SMGR_ID:
-		case RM_CLOG_ID:
-		case RM_DBASE_ID:
-		case RM_TBLSPC_ID:
-		case RM_MULTIXACT_ID:
-		case RM_RELMAP_ID:
-		case RM_BTREE_ID:
-		case RM_HASH_ID:
-		case RM_GIN_ID:
-		case RM_GIST_ID:
-		case RM_SEQ_ID:
-		case RM_SPGIST_ID:
-		case RM_BRIN_ID:
-		case RM_COMMIT_TS_ID:
-		case RM_REPLORIGIN_ID:
-		case RM_GENERIC_ID:
-			/* just deal with xid, and done */
-			ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
-									buf.origptr);
-			break;
-		case RM_NEXT_ID:
-			elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
+	if (RmgrTable[rmid].rm_decode != NULL)
+		RmgrTable[rmid].rm_decode(ctx, &buf);
+	else
+	{
+		/* just deal with xid, and done */
+		ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
+								buf.origptr);
 	}
 }
 
 /*
  * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	uint8		info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
@@ -234,8 +171,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	ReorderBuffer *reorder = ctx->reorder;
@@ -391,8 +328,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	XLogReaderState *r = buf->record;
@@ -437,8 +374,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	uint8		info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
 	TransactionId xid = XLogRecGetXid(buf->record);
@@ -497,8 +434,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 /*
  * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	uint8		info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
 	TransactionId xid = XLogRecGetXid(buf->record);
@@ -619,8 +556,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 /*
  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
  */
-static void
-DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	SnapBuild  *builder = ctx->snapshot_builder;
 	XLogReaderState *r = buf->record;
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 91437974584..f6cfee4ce8c 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -28,7 +28,7 @@
  * RmgrNames is an array of resource manager names, to make error messages
  * a bit nicer.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
   name,
 
 static const char *RmgrNames[RM_MAX_ID + 1] = {
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 852d8ca4b1c..6a4ebd1310b 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -32,7 +32,7 @@
 #include "storage/standbydefs.h"
 #include "utils/relmapper.h"
 
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	{ name, desc, identify},
 
 const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index c9b5c56a4c6..d9b512630ca 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -19,7 +19,7 @@ typedef uint8 RmgrId;
  * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG
  * file format.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	symname,
 
 typedef enum RmgrIds
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index ed751aaf039..9a74721c97c 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -25,25 +25,25 @@
  */
 
 /* symbol name, textual name, redo, desc, identify, startup, cleanup */
-PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL)
-PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL)
-PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL)
-PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL)
-PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL)
-PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask)
-PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask)
-PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask)
-PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask)
-PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask)
-PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask)
-PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask)
-PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL)
-PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
-PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
+PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode)
+PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode)
+PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode)
+PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode)
+PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode)
+PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL)
+PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL)
+PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL)
+PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL)
+PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL)
+PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL)
+PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL)
+PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index e27fca0cc0e..849954a8e5a 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -287,6 +287,9 @@ typedef enum
 	RECOVERY_TARGET_ACTION_SHUTDOWN
 }			RecoveryTargetAction;
 
+struct LogicalDecodingContext;
+struct XLogRecordBuffer;
+
 /*
  * Method table for resource managers.
  *
@@ -312,6 +315,8 @@ typedef struct RmgrData
 	void		(*rm_startup) (void);
 	void		(*rm_cleanup) (void);
 	void		(*rm_mask) (char *pagedata, BlockNumber blkno);
+	void		(*rm_decode) (struct LogicalDecodingContext *ctx,
+							  struct XLogRecordBuffer *buf);
 } RmgrData;
 
 extern const RmgrData RmgrTable[];
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 1db73f35549..a33c2a718a7 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -14,7 +14,21 @@
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
 
-void		LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
+typedef struct XLogRecordBuffer
+{
+	XLogRecPtr	origptr;
+	XLogRecPtr	endptr;
+	XLogReaderState *record;
+} XLogRecordBuffer;
+
+extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
+extern void	LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
 										 XLogReaderState *record);
 
 #endif
-- 
2.17.1

From cb723a9f77c848784e952a6eeeeaf2d7f939d312 Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Sat, 6 Nov 2021 13:01:38 -0700
Subject: [PATCH 2/3] Add macro GetRmgr in preparation for extensible rmgr.

---
 src/backend/access/transam/xlog.c        | 22 +++++++++++-----------
 src/backend/access/transam/xlogreader.c  |  2 +-
 src/backend/replication/logical/decode.c |  4 ++--
 src/backend/utils/misc/guc.c             |  6 +++---
 src/include/access/xlog_internal.h       |  2 ++
 5 files changed, 19 insertions(+), 17 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c9d4cbf3ff5..84a0d485e99 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1531,10 +1531,10 @@ checkXLogConsistency(XLogReaderState *record)
 		 * If masking function is defined, mask both the primary and replay
 		 * images
 		 */
-		if (RmgrTable[rmid].rm_mask != NULL)
+		if (GetRmgr(rmid).rm_mask != NULL)
 		{
-			RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
-			RmgrTable[rmid].rm_mask(primary_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(replay_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(primary_image_masked, blkno);
 		}
 
 		/* Time to compare the primary and replay images. */
@@ -7494,8 +7494,8 @@ StartupXLOG(void)
 		/* Initialize resource managers */
 		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 		{
-			if (RmgrTable[rmid].rm_startup != NULL)
-				RmgrTable[rmid].rm_startup();
+			if (GetRmgr(rmid).rm_startup != NULL)
+				GetRmgr(rmid).rm_startup();
 		}
 
 		/*
@@ -7717,7 +7717,7 @@ StartupXLOG(void)
 					RecordKnownAssignedTransactionIds(record->xl_xid);
 
 				/* Now apply the WAL record itself */
-				RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+				GetRmgr(record->xl_rmid).rm_redo(xlogreader);
 
 				/*
 				 * After redo, check whether the backup pages associated with
@@ -7827,8 +7827,8 @@ StartupXLOG(void)
 			/* Allow resource managers to do any required cleanup. */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (RmgrTable[rmid].rm_cleanup != NULL)
-					RmgrTable[rmid].rm_cleanup();
+				if (GetRmgr(rmid).rm_cleanup != NULL)
+					GetRmgr(rmid).rm_cleanup();
 			}
 
 			ereport(LOG,
@@ -10740,16 +10740,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record)
 	uint8		info = XLogRecGetInfo(record);
 	const char *id;
 
-	appendStringInfoString(buf, RmgrTable[rmid].rm_name);
+	appendStringInfoString(buf, GetRmgr(rmid).rm_name);
 	appendStringInfoChar(buf, '/');
 
-	id = RmgrTable[rmid].rm_identify(info);
+	id = GetRmgr(rmid).rm_identify(info);
 	if (id == NULL)
 		appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
 	else
 		appendStringInfo(buf, "%s: ", id);
 
-	RmgrTable[rmid].rm_desc(buf, record);
+	GetRmgr(rmid).rm_desc(buf, record);
 }
 
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 35029cf97d6..612b1b37233 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -738,7 +738,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
-	if (record->xl_rmid > RM_MAX_ID)
+	if (record->xl_rmid > RM_MAX_ID && record->xl_rmid < RM_CUSTOM_MIN_ID)
 	{
 		report_invalid_record(state,
 							  "invalid resource manager ID %u at %X/%X",
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 9b450c9f90e..4093b650635 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -115,8 +115,8 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 
 	rmid = XLogRecGetRmid(record);
 
-	if (RmgrTable[rmid].rm_decode != NULL)
-		RmgrTable[rmid].rm_decode(ctx, &buf);
+	if (GetRmgr(rmid).rm_decode != NULL)
+		GetRmgr(rmid).rm_decode(ctx, &buf);
 	else
 	{
 		/* just deal with xid, and done */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4c94f09c645..f8bb0fabc11 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -11676,7 +11676,7 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 		if (pg_strcasecmp(tok, "all") == 0)
 		{
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-				if (RmgrTable[rmid].rm_mask != NULL)
+				if (GetRmgr(rmid).rm_mask != NULL)
 					newwalconsistency[rmid] = true;
 			found = true;
 		}
@@ -11688,8 +11688,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 			 */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
-					RmgrTable[rmid].rm_mask != NULL)
+				if (pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0 &&
+					GetRmgr(rmid).rm_mask != NULL)
 				{
 					newwalconsistency[rmid] = true;
 					found = true;
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 849954a8e5a..9d4e122e946 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -338,4 +338,6 @@ extern bool InArchiveRecovery;
 extern bool StandbyMode;
 extern char *recoveryRestoreCommand;
 
+#define GetRmgr(rmid) RmgrTable[(rmid)]
+
 #endif							/* XLOG_INTERNAL_H */
-- 
2.17.1

From 22c2f6895039ce39b0aca300a8051eb51a006071 Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Sat, 6 Nov 2021 14:39:50 -0700
Subject: [PATCH 3/3] Custom Rmgr

---
 src/backend/access/transam/rmgr.c  | 86 ++++++++++++++++++++++++++++++
 src/include/access/rmgr.h          |  9 ++++
 src/include/access/xlog_internal.h |  7 ++-
 3 files changed, 101 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index f8847d5aebf..354f1033bf7 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,12 +24,19 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "miscadmin.h"
 #include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
+#include "utils/memutils.h"
 #include "utils/relmapper.h"
 
+typedef struct CustomRmgrEntry {
+	RmgrId rmid;
+	RmgrData *rmgr;
+} CustomRmgrEntry;
+
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	{ name, redo, desc, identify, startup, cleanup, mask, decode },
@@ -37,3 +44,82 @@
 const RmgrData RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+static CustomRmgrEntry *CustomRmgrTable = NULL;
+static int NumCustomRmgrs = 0;
+
+/*
+ * Register a new custom rmgr.
+ *
+ * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a
+ * unique RmgrId for your extension, to avoid conflicts. During development,
+ * use RM_EXPERIMENTAL_ID.
+ */
+void
+RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
+{
+	if (!process_shared_preload_libraries_in_progress)
+		elog(ERROR, "custom rmgr must be registered while initializing extensions in shared_preload_libraries");
+
+	elog(LOG, "registering customer rmgr \"%s\" with ID %d",
+		 rmgr->rm_name, rmid);
+
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		elog(PANIC, "custom rmgr id %d out of range", rmid);
+
+	if (CustomRmgrTable == NULL)
+		CustomRmgrTable = MemoryContextAllocZero(
+			TopMemoryContext, sizeof(CustomRmgrEntry));
+
+	/* check for existing builtin rmgr with the same name */
+	for (int i = 0; i <= RM_MAX_ID; i++)
+	{
+		const RmgrData *existing_rmgr = &RmgrTable[i];
+
+		if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+			elog(PANIC, "custom rmgr \"%s\" has the same name as builtin rmgr",
+				 existing_rmgr->rm_name);
+	}
+
+	/* check for conflicting custom rmgrs already registered */
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+
+		if (entry.rmid == rmid)
+			elog(PANIC, "custom rmgr ID %d already registered with name \"%s\"",
+				 rmid, entry.rmgr->rm_name);
+
+		if (!strcmp(entry.rmgr->rm_name, rmgr->rm_name))
+			elog(PANIC, "custom rmgr \"%s\" already registered with ID %d",
+				 rmgr->rm_name, entry.rmid);
+	}
+
+	CustomRmgrTable = (CustomRmgrEntry *) repalloc(
+		CustomRmgrTable, sizeof(CustomRmgrEntry) * NumCustomRmgrs + 1);
+
+	CustomRmgrTable[NumCustomRmgrs].rmid = rmid;
+	CustomRmgrTable[NumCustomRmgrs].rmgr = rmgr;
+	NumCustomRmgrs++;
+}
+
+/*
+ * GetCustomRmgr
+ *
+ * This is an O(N) list traversal because the expected size is very small.
+ */
+RmgrData
+GetCustomRmgr(RmgrId rmid)
+{
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		elog(PANIC, "custom rmgr id %d out of range", rmid);
+
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+		if (entry.rmid == rmid)
+			return *entry.rmgr;
+	}
+
+	elog(PANIC, "custom rmgr with ID %d not found!", rmid);
+}
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index d9b512630ca..9ad790a5cd0 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -31,5 +31,14 @@ typedef enum RmgrIds
 #undef PG_RMGR
 
 #define RM_MAX_ID				(RM_NEXT_ID - 1)
+#define RM_CUSTOM_MIN_ID		128
+#define RM_CUSTOM_MAX_ID		UINT8_MAX
+
+/*
+ * RmgrId to use for extensions that require an RmgrId, but are still in
+ * development and have not reserved their own unique RmgrId yet. See:
+ * https://wiki.postgresql.org/wiki/ExtensibleRmgr
+ */
+#define RM_EXPERIMENTAL_ID		128
 
 #endif							/* RMGR_H */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 9d4e122e946..7cb1f6c3893 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -321,6 +321,9 @@ typedef struct RmgrData
 
 extern const RmgrData RmgrTable[];
 
+extern RmgrData GetCustomRmgr(RmgrId rmid);
+extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr);
+
 /*
  * Exported to support xlog switching from checkpointer
  */
@@ -338,6 +341,8 @@ extern bool InArchiveRecovery;
 extern bool StandbyMode;
 extern char *recoveryRestoreCommand;
 
-#define GetRmgr(rmid) RmgrTable[(rmid)]
+#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)]
+#define GetRmgr(rmid) ((rmid < RM_CUSTOM_MIN_ID) ? \
+					   GetBuiltinRmgr(rmid) : GetCustomRmgr(rmid))
 
 #endif							/* XLOG_INTERNAL_H */
-- 
2.17.1

Reply via email to