On Fri, 2022-02-04 at 22:56 +0800, Julien Rouhaud wrote:
> > Right, which I guess raises another question: if the maximum is
> > UINT8_MAX, which BTW I find perfectly reasonable, why are we not
> > just
> > defining this as an array of size 256? There's no point in adding
> > code
> > complexity to save a few kB of memory.
> 
> Agreed, especially if combined with your suggested approach 3 (array
> of
> pointers).

Implemented and attached. I also updated pg_waldump and pg_rewind to do
something reasonable.

Additionally, I now have a reasonably complete implementation of a
custom resource manager now:

https://github.com/citusdata/citus/tree/custom-rmgr-15

(Not committed or intended to actually be used right now -- just a
POC.)

Offline, Andres mentioned that I should test recovery performance if we
take your approach, because making the RmgrTable non-const could impact
optimizations. Not sure if that would be a practical concern compared
to all the other work done at REDO time.

Regards,
        Jeff Davis

From faa7eb847fab30c0fbe47e703d1c2d64f84ba51d Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Sat, 6 Nov 2021 13:01:38 -0700
Subject: [PATCH] Extensible rmgr.

Allow extensions to specify a new custom rmgr, which allows
specialized WAL. This is meant to be used by a custom Table Access
Method, which would not otherwise be able to offer logical
decoding/replication. It may also be used by new Index Access Methods.

Prior to this commit, only Generic WAL was available, which offers
support for recovery and physical replication but not logical
replication.

Reviewed-by: Julien Rouhaud
Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com
---
 src/backend/access/transam/rmgr.c         | 87 ++++++++++++++++++++++-
 src/backend/access/transam/xlogreader.c   |  2 +-
 src/backend/access/transam/xlogrecovery.c | 29 +++-----
 src/backend/replication/logical/decode.c  |  4 +-
 src/backend/utils/misc/guc.c              |  6 +-
 src/bin/pg_rewind/parsexlog.c             | 11 +--
 src/bin/pg_waldump/pg_waldump.c           | 27 +++----
 src/bin/pg_waldump/rmgrdesc.c             | 32 ++++++++-
 src/bin/pg_waldump/rmgrdesc.h             |  2 +-
 src/include/access/rmgr.h                 | 11 ++-
 src/include/access/xlog_internal.h        |  5 +-
 11 files changed, 168 insertions(+), 48 deletions(-)

diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index f8847d5aebf..1805596071e 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,6 +24,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "miscadmin.h"
 #include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
@@ -32,8 +33,90 @@
 
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
-	{ name, redo, desc, identify, startup, cleanup, mask, decode },
+	&(struct RmgrData){ name, redo, desc, identify, startup, cleanup, mask, decode },
 
-const RmgrData RmgrTable[RM_MAX_ID + 1] = {
+static RmgrData *RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+/*
+ * Start up all resource managers.
+ */
+void
+StartupResourceManagers()
+{
+	for (int i = 0; i < RM_MAX_ID; i++)
+	{
+		if (RmgrTable[i] == NULL)
+			continue;
+
+		if (RmgrTable[i]->rm_startup != NULL)
+			RmgrTable[i]->rm_startup();
+	}
+}
+
+/*
+ * Clean up all resource managers.
+ */
+void
+CleanupResourceManagers()
+{
+	for (int i = 0; i < RM_MAX_ID; i++)
+	{
+		if (RmgrTable[i] == NULL)
+			continue;
+
+		if (RmgrTable[i]->rm_cleanup != NULL)
+			RmgrTable[i]->rm_cleanup();
+	}
+}
+
+/*
+ * Register a new custom rmgr.
+ *
+ * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a
+ * unique RmgrId for your extension, to avoid conflicts. During development,
+ * use RM_EXPERIMENTAL_ID.
+ */
+void
+RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
+{
+	if (rmid < RM_MIN_CUSTOM_ID)
+		ereport(PANIC, errmsg("custom rmgr id %d is out of range", rmid));
+
+	if (!process_shared_preload_libraries_in_progress)
+		ereport(ERROR,
+				(errmsg("custom rmgr must be registered while initializing modules in shared_preload_libraries")));
+
+	ereport(LOG,
+			(errmsg("registering custom rmgr \"%s\" with ID %d",
+					rmgr->rm_name, rmid)));
+
+	if (RmgrTable[rmid] != NULL)
+		ereport(PANIC,
+				(errmsg("custom rmgr ID %d already registered with name \"%s\"",
+						rmid, RmgrTable[rmid]->rm_name)));
+
+	/* check for existing rmgr with the same name */
+	for (int i = 0; i <= RM_MAX_ID; i++)
+	{
+		const RmgrData *existing_rmgr = RmgrTable[i];
+
+		if (existing_rmgr == NULL)
+			continue;
+
+		if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+			ereport(PANIC,
+					(errmsg("custom rmgr \"%s\" has the same name as builtin rmgr",
+							existing_rmgr->rm_name)));
+	}
+
+	/* register it */
+	RmgrTable[rmid] = rmgr;
+}
+
+RmgrData *
+GetRmgr(RmgrId rmid)
+{
+	return RmgrTable[rmid];
+}
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index e437c429920..e2c4d97b91f 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1102,7 +1102,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
-	if (record->xl_rmid > RM_MAX_ID)
+	if (record->xl_rmid > RM_MAX_BUILTIN_ID && record->xl_rmid < RM_MIN_CUSTOM_ID)
 	{
 		report_invalid_record(state,
 							  "invalid resource manager ID %u at %X/%X",
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 9feea3e6ec9..e903af95c0a 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1531,7 +1531,6 @@ ShutdownWalRecovery(void)
 void
 PerformWalRecovery(void)
 {
-	int			rmid;
 	XLogRecord *record;
 	bool		reachedRecoveryTarget = false;
 	TimeLineID	replayTLI;
@@ -1604,12 +1603,7 @@ PerformWalRecovery(void)
 
 		InRedo = true;
 
-		/* Initialize resource managers */
-		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-		{
-			if (RmgrTable[rmid].rm_startup != NULL)
-				RmgrTable[rmid].rm_startup();
-		}
+		StartupResourceManagers();
 
 		ereport(LOG,
 				(errmsg("redo starts at %X/%X",
@@ -1746,12 +1740,7 @@ PerformWalRecovery(void)
 			}
 		}
 
-		/* Allow resource managers to do any required cleanup. */
-		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-		{
-			if (RmgrTable[rmid].rm_cleanup != NULL)
-				RmgrTable[rmid].rm_cleanup();
-		}
+		CleanupResourceManagers();
 
 		ereport(LOG,
 				(errmsg("redo done at %X/%X system usage: %s",
@@ -1871,7 +1860,7 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 		xlogrecovery_redo(xlogreader, *replayTLI);
 
 	/* Now apply the WAL record itself */
-	RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+	GetRmgr(record->xl_rmid)->rm_redo(xlogreader);
 
 	/*
 	 * After redo, check whether the backup pages associated with the WAL
@@ -2101,16 +2090,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record)
 	uint8		info = XLogRecGetInfo(record);
 	const char *id;
 
-	appendStringInfoString(buf, RmgrTable[rmid].rm_name);
+	appendStringInfoString(buf, GetRmgr(rmid)->rm_name);
 	appendStringInfoChar(buf, '/');
 
-	id = RmgrTable[rmid].rm_identify(info);
+	id = GetRmgr(rmid)->rm_identify(info);
 	if (id == NULL)
 		appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
 	else
 		appendStringInfo(buf, "%s: ", id);
 
-	RmgrTable[rmid].rm_desc(buf, record);
+	GetRmgr(rmid)->rm_desc(buf, record);
 }
 
 #ifdef WAL_DEBUG
@@ -2339,10 +2328,10 @@ verifyBackupPageConsistency(XLogReaderState *record)
 		 * If masking function is defined, mask both the primary and replay
 		 * images
 		 */
-		if (RmgrTable[rmid].rm_mask != NULL)
+		if (GetRmgr(rmid)->rm_mask != NULL)
 		{
-			RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
-			RmgrTable[rmid].rm_mask(primary_image_masked, blkno);
+			GetRmgr(rmid)->rm_mask(replay_image_masked, blkno);
+			GetRmgr(rmid)->rm_mask(primary_image_masked, blkno);
 		}
 
 		/* Time to compare the primary and replay images. */
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 77bc7aea7a0..f49e1bbacd1 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -117,8 +117,8 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 
 	rmid = XLogRecGetRmid(record);
 
-	if (RmgrTable[rmid].rm_decode != NULL)
-		RmgrTable[rmid].rm_decode(ctx, &buf);
+	if (GetRmgr(rmid)->rm_decode != NULL)
+		GetRmgr(rmid)->rm_decode(ctx, &buf);
 	else
 	{
 		/* just deal with xid, and done */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 932aefc777d..d6e729b51bc 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -11753,7 +11753,7 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 		if (pg_strcasecmp(tok, "all") == 0)
 		{
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-				if (RmgrTable[rmid].rm_mask != NULL)
+				if (GetRmgr(rmid) != NULL && GetRmgr(rmid)->rm_mask != NULL)
 					newwalconsistency[rmid] = true;
 			found = true;
 		}
@@ -11765,8 +11765,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 			 */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
-					RmgrTable[rmid].rm_mask != NULL)
+				if (GetRmgr(rmid) != NULL && GetRmgr(rmid)->rm_mask != NULL &&
+					pg_strcasecmp(tok, GetRmgr(rmid)->rm_name) == 0)
 				{
 					newwalconsistency[rmid] = true;
 					found = true;
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 7cfa169e9b9..470134248b9 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -25,8 +25,8 @@
 #include "pg_rewind.h"
 
 /*
- * RmgrNames is an array of resource manager names, to make error messages
- * a bit nicer.
+ * RmgrNames is an array of the built-in resource manager names, to make error
+ * messages a bit nicer.
  */
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
   name,
@@ -35,6 +35,9 @@ static const char *RmgrNames[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
 
+#define RmgrName(rmid) (((rmid) <= RM_MAX_BUILTIN_ID) ? \
+						RmgrNames[rmid] : "custom")
+
 static void extractPageInfo(XLogReaderState *record);
 
 static int	xlogreadfd = -1;
@@ -427,9 +430,9 @@ extractPageInfo(XLogReaderState *record)
 		 * track that change.
 		 */
 		pg_fatal("WAL record modifies a relation, but record type is not recognized: "
-				 "lsn: %X/%X, rmgr: %s, info: %02X",
+				 "lsn: %X/%X, rmid: %d, rmgr: %s, info: %02X",
 				 LSN_FORMAT_ARGS(record->ReadRecPtr),
-				 RmgrNames[rmid], info);
+				 rmid, RmgrName(rmid), info);
 	}
 
 	for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index fc081adfb8c..d424a7afca8 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -51,7 +51,7 @@ typedef struct XLogDumpConfig
 	bool		stats_per_record;
 
 	/* filter options */
-	bool		filter_by_rmgr[RM_MAX_ID + 1];
+	bool		filter_by_rmgr[RM_MAX_BUILTIN_ID + 1];
 	bool		filter_by_rmgr_enabled;
 	TransactionId filter_by_xid;
 	bool		filter_by_xid_enabled;
@@ -71,8 +71,8 @@ typedef struct XLogDumpStats
 	uint64		count;
 	XLogRecPtr	startptr;
 	XLogRecPtr	endptr;
-	Stats		rmgr_stats[RM_NEXT_ID];
-	Stats		record_stats[RM_NEXT_ID][MAX_XLINFO_TYPES];
+	Stats		rmgr_stats[RM_MAX_ID + 1];
+	Stats		record_stats[RM_MAX_ID + 1][MAX_XLINFO_TYPES];
 } XLogDumpStats;
 
 #define fatal_error(...) do { pg_log_fatal(__VA_ARGS__); exit(EXIT_FAILURE); } while(0)
@@ -95,9 +95,9 @@ print_rmgr_list(void)
 {
 	int			i;
 
-	for (i = 0; i <= RM_MAX_ID; i++)
+	for (i = 0; i <= RM_MAX_BUILTIN_ID; i++)
 	{
-		printf("%s\n", RmgrDescTable[i].rm_name);
+		printf("%s\n", GetRmgrDesc(i)->rm_name);
 	}
 }
 
@@ -473,7 +473,7 @@ static void
 XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
 {
 	const char *id;
-	const RmgrDescData *desc = &RmgrDescTable[XLogRecGetRmid(record)];
+	const RmgrDescData *desc = GetRmgrDesc(XLogRecGetRmid(record));
 	uint32		rec_len;
 	uint32		fpi_len;
 	RelFileNode rnode;
@@ -658,7 +658,7 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
 	 * calculate column totals.
 	 */
 
-	for (ri = 0; ri < RM_NEXT_ID; ri++)
+	for (ri = 0; ri < RM_MAX_ID; ri++)
 	{
 		total_count += stats->rmgr_stats[ri].count;
 		total_rec_len += stats->rmgr_stats[ri].rec_len;
@@ -679,13 +679,13 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
 		   "Type", "N", "(%)", "Record size", "(%)", "FPI size", "(%)", "Combined size", "(%)",
 		   "----", "-", "---", "-----------", "---", "--------", "---", "-------------", "---");
 
-	for (ri = 0; ri < RM_NEXT_ID; ri++)
+	for (ri = 0; ri <= RM_MAX_ID; ri++)
 	{
 		uint64		count,
 					rec_len,
 					fpi_len,
 					tot_len;
-		const RmgrDescData *desc = &RmgrDescTable[ri];
+		const RmgrDescData *desc = GetRmgrDesc(ri);
 
 		if (!config->stats_per_record)
 		{
@@ -694,6 +694,9 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats)
 			fpi_len = stats->rmgr_stats[ri].fpi_len;
 			tot_len = rec_len + fpi_len;
 
+			if (ri > RM_MAX_BUILTIN_ID && count == 0)
+				continue;
+
 			XLogDumpStatsRow(desc->rm_name,
 							 count, total_count, rec_len, total_rec_len,
 							 fpi_len, total_fpi_len, tot_len, total_len);
@@ -913,16 +916,16 @@ main(int argc, char **argv)
 						exit(EXIT_SUCCESS);
 					}
 
-					for (i = 0; i <= RM_MAX_ID; i++)
+					for (i = 0; i <= RM_MAX_BUILTIN_ID; i++)
 					{
-						if (pg_strcasecmp(optarg, RmgrDescTable[i].rm_name) == 0)
+						if (pg_strcasecmp(optarg, GetRmgrDesc(i)->rm_name) == 0)
 						{
 							config.filter_by_rmgr[i] = true;
 							config.filter_by_rmgr_enabled = true;
 							break;
 						}
 					}
-					if (i > RM_MAX_ID)
+					if (i > RM_MAX_BUILTIN_ID)
 					{
 						pg_log_error("resource manager \"%s\" does not exist",
 									 optarg);
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 6a4ebd1310b..dd8b1bcd9bf 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -35,6 +35,36 @@
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	{ name, desc, identify},
 
-const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {
+static const RmgrDescData RmgrDescTable[RM_MAX_BUILTIN_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+/*
+ * No information on custom resource managers; just print the ID.
+ */
+static void
+default_desc(StringInfo buf, XLogReaderState *record)
+{
+	appendStringInfo(buf, "rmid: %d", XLogRecGetRmid(record));
+}
+
+/*
+ * No information on custom resource managers; just return NULL and let the
+ * caller handle it.
+ */
+static const char *
+default_identify(uint8 info)
+{
+	return NULL;
+}
+
+const RmgrDescData *
+GetRmgrDesc(RmgrId rmid)
+{
+	if (rmid <= RM_MAX_BUILTIN_ID)
+		return &RmgrDescTable[rmid];
+	else
+	{
+		return &(RmgrDescData){ "custom", default_desc, default_identify };
+	}
+}
diff --git a/src/bin/pg_waldump/rmgrdesc.h b/src/bin/pg_waldump/rmgrdesc.h
index 42f8483b482..f733cd467d5 100644
--- a/src/bin/pg_waldump/rmgrdesc.h
+++ b/src/bin/pg_waldump/rmgrdesc.h
@@ -18,6 +18,6 @@ typedef struct RmgrDescData
 	const char *(*rm_identify) (uint8 info);
 } RmgrDescData;
 
-extern const RmgrDescData RmgrDescTable[];
+extern const RmgrDescData *GetRmgrDesc(RmgrId rmid);
 
 #endif							/* RMGRDESC_H */
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index d9b512630ca..13b65567a5f 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -30,6 +30,15 @@ typedef enum RmgrIds
 
 #undef PG_RMGR
 
-#define RM_MAX_ID				(RM_NEXT_ID - 1)
+#define RM_MAX_ID				UINT8_MAX
+#define RM_MAX_BUILTIN_ID		(RM_NEXT_ID - 1)
+#define RM_MIN_CUSTOM_ID		128
+
+/*
+ * RmgrId to use for extensions that require an RmgrId, but are still in
+ * development and have not reserved their own unique RmgrId yet. See:
+ * https://wiki.postgresql.org/wiki/ExtensibleRmgr
+ */
+#define RM_EXPERIMENTAL_ID		128
 
 #endif							/* RMGR_H */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 0e94833129a..1b8a0067036 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -319,7 +319,10 @@ typedef struct RmgrData
 							  struct XLogRecordBuffer *buf);
 } RmgrData;
 
-extern const RmgrData RmgrTable[];
+extern void StartupResourceManagers(void);
+extern void CleanupResourceManagers(void);
+extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr);
+extern RmgrData *GetRmgr(RmgrId rmid);
 
 /*
  * Exported to support xlog switching from checkpointer
-- 
2.17.1

Reply via email to