On Tue, 2022-01-18 at 17:53 +0800, Julien Rouhaud wrote: > > Review on patch 0001 would be helpful. It makes decoding just > > another > > method of rmgr, which makes a lot of sense to me from a code > > organization standpoint regardless of the other patches. Is there > > any > > reason not to do that? > > I think that it's a clean and sensible approach, so +1 for me.
Thank you, committed 0001. Other patches not committed yet. > There's a bit of 0003 present in 002: I just squashed 0002 and 0003 together. Not large enough to keep separate. > A few random comments on 0003: > > #define RM_MAX_ID (RM_NEXT_ID - 1) > +#define RM_CUSTOM_MIN_ID 128 > +#define RM_CUSTOM_MAX_ID UINT8_MAX > > It would be a good idea to add a StaticAssertStmt here to make sure > that > there's no overlap in the ranges. Done. > + > +/* > + * RmgrId to use for extensions that require an RmgrId, but are > still in > + * development and have not reserved their own unique RmgrId yet. > See: > + * https://wiki.postgresql.org/wiki/ExtensibleRmgr > + */ > +#define RM_EXPERIMENTAL_ID 128 > > I'm a bit dubious about the whole "register your ID in the Wiki" > approach. It > might not be a problem since there probably won't be hundreds of > users, and I > don't have any better suggestion since it has to be consistent across > nodes. Agree, but I don't see a better approach, either. I do some sanity checking, which should catch collisions when they happen. > + elog(LOG, "registering customer rmgr \"%s\" with ID %d", > + rmgr->rm_name, rmid); > > Should it be a DEBUG message instead? Also s/customer/custom/ It seems like a fairly important thing to have in the log. Only a couple extensions will ever encounter this message, and only at server start. Typo fixed. > +RmgrData > +GetCustomRmgr(RmgrId rmid) > +{ > + if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID) > + elog(PANIC, "custom rmgr id %d out of range", rmid); > > Should this be an assert? If we make it an Assert, then it won't be caught in production builds. > +#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)] > +#define GetRmgr(rmid) ((rmid < RM_CUSTOM_MIN_ID) ? \ > + GetBuiltinRmgr(rmid) : GetCustomRmgr(rmid)) > > rmid should be protected in the macro. Done. > How you plan to mark it experimental? I suppose it doesn't need to be marked explicitly -- there are other APIs that change. For instance, the ProcessUtility_hook changed, and that's used much more widely. As long as we generally agree that some kind of custom rmgrs are the way to go, if we change the API or implementation around from version to version I can easily update my table access method. Regards, Jeff Davis
From 62799e4546aa0a15b2a09f6b14900d785d64f42f Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Sat, 6 Nov 2021 13:01:38 -0700 Subject: [PATCH] Extensible rmgr. Allow extensions to specify a new custom rmgr, which allows specialized WAL. This is meant to be used by a custom Table Access Method, which would not otherwise be able to offer logical decoding/replication. It may also be used by new Index Access Methods. Prior to this commit, only Generic WAL was available, which offers support for recovery and physical replication but not logical replication. Reviewed-by: Julien Rouhaud Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com --- src/backend/access/transam/rmgr.c | 91 ++++++++++++++++++++++++ src/backend/access/transam/xlog.c | 22 +++--- src/backend/access/transam/xlogreader.c | 2 +- src/backend/replication/logical/decode.c | 4 +- src/backend/utils/misc/guc.c | 6 +- src/include/access/rmgr.h | 14 ++++ src/include/access/xlog_internal.h | 7 ++ 7 files changed, 129 insertions(+), 17 deletions(-) diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index f8847d5aebf..e04492a9507 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -24,12 +24,19 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" +#include "miscadmin.h" #include "replication/decode.h" #include "replication/message.h" #include "replication/origin.h" #include "storage/standby.h" +#include "utils/memutils.h" #include "utils/relmapper.h" +typedef struct CustomRmgrEntry { + RmgrId rmid; + RmgrData *rmgr; +} CustomRmgrEntry; + /* must be kept in sync with RmgrData definition in xlog_internal.h */ #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ { name, redo, desc, identify, startup, cleanup, mask, decode }, @@ -37,3 +44,87 @@ const RmgrData RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" }; + +static CustomRmgrEntry *CustomRmgrTable = NULL; +static int NumCustomRmgrs = 0; + +/* + * Register a new custom rmgr. + * + * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a + * unique RmgrId for your extension, to avoid conflicts. During development, + * use RM_EXPERIMENTAL_ID. + */ +void +RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr) +{ + if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID) + ereport(PANIC, errmsg("custom rmgr id %d is out of range", rmid)); + + if (!process_shared_preload_libraries_in_progress) + ereport(ERROR, + (errmsg("custom rmgr must be registered while initializing modules in shared_preload_libraries"))); + + ereport(LOG, + (errmsg("registering custom rmgr \"%s\" with ID %d", + rmgr->rm_name, rmid))); + + if (CustomRmgrTable == NULL) + CustomRmgrTable = MemoryContextAllocZero( + TopMemoryContext, sizeof(CustomRmgrEntry)); + + /* check for existing builtin rmgr with the same name */ + for (int i = 0; i <= RM_MAX_ID; i++) + { + const RmgrData *existing_rmgr = &RmgrTable[i]; + + if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name)) + ereport(PANIC, + (errmsg("custom rmgr \"%s\" has the same name as builtin rmgr", + existing_rmgr->rm_name))); + } + + /* check for conflicting custom rmgrs already registered */ + for (int i = 0; i < NumCustomRmgrs; i++) + { + CustomRmgrEntry entry = CustomRmgrTable[i]; + + if (entry.rmid == rmid) + ereport(PANIC, + (errmsg("custom rmgr ID %d already registered with name \"%s\"", + rmid, entry.rmgr->rm_name))); + + if (!strcmp(entry.rmgr->rm_name, rmgr->rm_name)) + ereport(PANIC, + (errmsg("custom rmgr \"%s\" already registered with ID %d", + rmgr->rm_name, entry.rmid))); + } + + CustomRmgrTable = (CustomRmgrEntry *) repalloc( + CustomRmgrTable, sizeof(CustomRmgrEntry) * NumCustomRmgrs + 1); + + CustomRmgrTable[NumCustomRmgrs].rmid = rmid; + CustomRmgrTable[NumCustomRmgrs].rmgr = rmgr; + NumCustomRmgrs++; +} + +/* + * GetCustomRmgr + * + * This is an O(N) list traversal because the expected size is very small. + */ +RmgrData +GetCustomRmgr(RmgrId rmid) +{ + if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID) + ereport(PANIC, errmsg("custom rmgr id %d is out of range", rmid)); + + for (int i = 0; i < NumCustomRmgrs; i++) + { + CustomRmgrEntry entry = CustomRmgrTable[i]; + if (entry.rmid == rmid) + return *entry.rmgr; + } + + ereport(PANIC, errmsg("custom rmgr with ID %d not found!", rmid)); +} diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index dfe2a0bcce9..b56c7927194 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1531,10 +1531,10 @@ checkXLogConsistency(XLogReaderState *record) * If masking function is defined, mask both the primary and replay * images */ - if (RmgrTable[rmid].rm_mask != NULL) + if (GetRmgr(rmid).rm_mask != NULL) { - RmgrTable[rmid].rm_mask(replay_image_masked, blkno); - RmgrTable[rmid].rm_mask(primary_image_masked, blkno); + GetRmgr(rmid).rm_mask(replay_image_masked, blkno); + GetRmgr(rmid).rm_mask(primary_image_masked, blkno); } /* Time to compare the primary and replay images. */ @@ -7493,8 +7493,8 @@ StartupXLOG(void) /* Initialize resource managers */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { - if (RmgrTable[rmid].rm_startup != NULL) - RmgrTable[rmid].rm_startup(); + if (GetRmgr(rmid).rm_startup != NULL) + GetRmgr(rmid).rm_startup(); } /* @@ -7716,7 +7716,7 @@ StartupXLOG(void) RecordKnownAssignedTransactionIds(record->xl_xid); /* Now apply the WAL record itself */ - RmgrTable[record->xl_rmid].rm_redo(xlogreader); + GetRmgr(record->xl_rmid).rm_redo(xlogreader); /* * After redo, check whether the backup pages associated with @@ -7826,8 +7826,8 @@ StartupXLOG(void) /* Allow resource managers to do any required cleanup. */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { - if (RmgrTable[rmid].rm_cleanup != NULL) - RmgrTable[rmid].rm_cleanup(); + if (GetRmgr(rmid).rm_cleanup != NULL) + GetRmgr(rmid).rm_cleanup(); } ereport(LOG, @@ -10739,16 +10739,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record) uint8 info = XLogRecGetInfo(record); const char *id; - appendStringInfoString(buf, RmgrTable[rmid].rm_name); + appendStringInfoString(buf, GetRmgr(rmid).rm_name); appendStringInfoChar(buf, '/'); - id = RmgrTable[rmid].rm_identify(info); + id = GetRmgr(rmid).rm_identify(info); if (id == NULL) appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK); else appendStringInfo(buf, "%s: ", id); - RmgrTable[rmid].rm_desc(buf, record); + GetRmgr(rmid).rm_desc(buf, record); } diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 35029cf97d6..612b1b37233 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -738,7 +738,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, (uint32) SizeOfXLogRecord, record->xl_tot_len); return false; } - if (record->xl_rmid > RM_MAX_ID) + if (record->xl_rmid > RM_MAX_ID && record->xl_rmid < RM_CUSTOM_MIN_ID) { report_invalid_record(state, "invalid resource manager ID %u at %X/%X", diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 3fb5a92a1a1..52257a06882 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -115,8 +115,8 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor rmid = XLogRecGetRmid(record); - if (RmgrTable[rmid].rm_decode != NULL) - RmgrTable[rmid].rm_decode(ctx, &buf); + if (GetRmgr(rmid).rm_decode != NULL) + GetRmgr(rmid).rm_decode(ctx, &buf); else { /* just deal with xid, and done */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index b3fd42e0f18..68971b0f1ea 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -11715,7 +11715,7 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source) if (pg_strcasecmp(tok, "all") == 0) { for (rmid = 0; rmid <= RM_MAX_ID; rmid++) - if (RmgrTable[rmid].rm_mask != NULL) + if (GetRmgr(rmid).rm_mask != NULL) newwalconsistency[rmid] = true; found = true; } @@ -11727,8 +11727,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source) */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { - if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 && - RmgrTable[rmid].rm_mask != NULL) + if (pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0 && + GetRmgr(rmid).rm_mask != NULL) { newwalconsistency[rmid] = true; found = true; diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index d9b512630ca..d387530c6f5 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -31,5 +31,19 @@ typedef enum RmgrIds #undef PG_RMGR #define RM_MAX_ID (RM_NEXT_ID - 1) +#define RM_CUSTOM_MIN_ID 128 +#define RM_CUSTOM_MAX_ID UINT8_MAX + +StaticAssertDecl(RM_MAX_ID < RM_CUSTOM_MIN_ID, + "RM_MAX_ID >= RM_CUSTOM_MIN_ID"); +StaticAssertDecl(RM_CUSTOM_MIN_ID < RM_CUSTOM_MAX_ID, + "RM_CUSTOM_MIN_ID >= RM_CUSTOM_MAX_ID"); + +/* + * RmgrId to use for extensions that require an RmgrId, but are still in + * development and have not reserved their own unique RmgrId yet. See: + * https://wiki.postgresql.org/wiki/ExtensibleRmgr + */ +#define RM_EXPERIMENTAL_ID 128 #endif /* RMGR_H */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 849954a8e5a..61f421c98c2 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -321,6 +321,9 @@ typedef struct RmgrData extern const RmgrData RmgrTable[]; +extern RmgrData GetCustomRmgr(RmgrId rmid); +extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr); + /* * Exported to support xlog switching from checkpointer */ @@ -338,4 +341,8 @@ extern bool InArchiveRecovery; extern bool StandbyMode; extern char *recoveryRestoreCommand; +#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)] +#define GetRmgr(rmid) (((rmid) < RM_CUSTOM_MIN_ID) ? \ + GetBuiltinRmgr((rmid)) : GetCustomRmgr((rmid))) + #endif /* XLOG_INTERNAL_H */ -- 2.17.1