On Fri, 2022-02-04 at 22:56 +0800, Julien Rouhaud wrote: > > Right, which I guess raises another question: if the maximum is > > UINT8_MAX, which BTW I find perfectly reasonable, why are we not > > just > > defining this as an array of size 256? There's no point in adding > > code > > complexity to save a few kB of memory. > > Agreed, especially if combined with your suggested approach 3 (array > of > pointers).
Implemented and attached. I also updated pg_waldump and pg_rewind to do something reasonable. Additionally, I now have a reasonably complete implementation of a custom resource manager now: https://github.com/citusdata/citus/tree/custom-rmgr-15 (Not committed or intended to actually be used right now -- just a POC.) Offline, Andres mentioned that I should test recovery performance if we take your approach, because making the RmgrTable non-const could impact optimizations. Not sure if that would be a practical concern compared to all the other work done at REDO time. Regards, Jeff Davis
From faa7eb847fab30c0fbe47e703d1c2d64f84ba51d Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Sat, 6 Nov 2021 13:01:38 -0700 Subject: [PATCH] Extensible rmgr. Allow extensions to specify a new custom rmgr, which allows specialized WAL. This is meant to be used by a custom Table Access Method, which would not otherwise be able to offer logical decoding/replication. It may also be used by new Index Access Methods. Prior to this commit, only Generic WAL was available, which offers support for recovery and physical replication but not logical replication. Reviewed-by: Julien Rouhaud Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com --- src/backend/access/transam/rmgr.c | 87 ++++++++++++++++++++++- src/backend/access/transam/xlogreader.c | 2 +- src/backend/access/transam/xlogrecovery.c | 29 +++----- src/backend/replication/logical/decode.c | 4 +- src/backend/utils/misc/guc.c | 6 +- src/bin/pg_rewind/parsexlog.c | 11 +-- src/bin/pg_waldump/pg_waldump.c | 27 +++---- src/bin/pg_waldump/rmgrdesc.c | 32 ++++++++- src/bin/pg_waldump/rmgrdesc.h | 2 +- src/include/access/rmgr.h | 11 ++- src/include/access/xlog_internal.h | 5 +- 11 files changed, 168 insertions(+), 48 deletions(-) diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index f8847d5aebf..1805596071e 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -24,6 +24,7 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" +#include "miscadmin.h" #include "replication/decode.h" #include "replication/message.h" #include "replication/origin.h" @@ -32,8 +33,90 @@ /* must be kept in sync with RmgrData definition in xlog_internal.h */ #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ - { name, redo, desc, identify, startup, cleanup, mask, decode }, + &(struct RmgrData){ name, redo, desc, identify, startup, cleanup, mask, decode }, -const RmgrData RmgrTable[RM_MAX_ID + 1] = { +static RmgrData *RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" }; + +/* + * Start up all resource managers. + */ +void +StartupResourceManagers() +{ + for (int i = 0; i < RM_MAX_ID; i++) + { + if (RmgrTable[i] == NULL) + continue; + + if (RmgrTable[i]->rm_startup != NULL) + RmgrTable[i]->rm_startup(); + } +} + +/* + * Clean up all resource managers. + */ +void +CleanupResourceManagers() +{ + for (int i = 0; i < RM_MAX_ID; i++) + { + if (RmgrTable[i] == NULL) + continue; + + if (RmgrTable[i]->rm_cleanup != NULL) + RmgrTable[i]->rm_cleanup(); + } +} + +/* + * Register a new custom rmgr. + * + * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a + * unique RmgrId for your extension, to avoid conflicts. During development, + * use RM_EXPERIMENTAL_ID. + */ +void +RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr) +{ + if (rmid < RM_MIN_CUSTOM_ID) + ereport(PANIC, errmsg("custom rmgr id %d is out of range", rmid)); + + if (!process_shared_preload_libraries_in_progress) + ereport(ERROR, + (errmsg("custom rmgr must be registered while initializing modules in shared_preload_libraries"))); + + ereport(LOG, + (errmsg("registering custom rmgr \"%s\" with ID %d", + rmgr->rm_name, rmid))); + + if (RmgrTable[rmid] != NULL) + ereport(PANIC, + (errmsg("custom rmgr ID %d already registered with name \"%s\"", + rmid, RmgrTable[rmid]->rm_name))); + + /* check for existing rmgr with the same name */ + for (int i = 0; i <= RM_MAX_ID; i++) + { + const RmgrData *existing_rmgr = RmgrTable[i]; + + if (existing_rmgr == NULL) + continue; + + if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name)) + ereport(PANIC, + (errmsg("custom rmgr \"%s\" has the same name as builtin rmgr", + existing_rmgr->rm_name))); + } + + /* register it */ + RmgrTable[rmid] = rmgr; +} + +RmgrData * +GetRmgr(RmgrId rmid) +{ + return RmgrTable[rmid]; +} diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index e437c429920..e2c4d97b91f 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1102,7 +1102,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, (uint32) SizeOfXLogRecord, record->xl_tot_len); return false; } - if (record->xl_rmid > RM_MAX_ID) + if (record->xl_rmid > RM_MAX_BUILTIN_ID && record->xl_rmid < RM_MIN_CUSTOM_ID) { report_invalid_record(state, "invalid resource manager ID %u at %X/%X", diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 9feea3e6ec9..e903af95c0a 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -1531,7 +1531,6 @@ ShutdownWalRecovery(void) void PerformWalRecovery(void) { - int rmid; XLogRecord *record; bool reachedRecoveryTarget = false; TimeLineID replayTLI; @@ -1604,12 +1603,7 @@ PerformWalRecovery(void) InRedo = true; - /* Initialize resource managers */ - for (rmid = 0; rmid <= RM_MAX_ID; rmid++) - { - if (RmgrTable[rmid].rm_startup != NULL) - RmgrTable[rmid].rm_startup(); - } + StartupResourceManagers(); ereport(LOG, (errmsg("redo starts at %X/%X", @@ -1746,12 +1740,7 @@ PerformWalRecovery(void) } } - /* Allow resource managers to do any required cleanup. */ - for (rmid = 0; rmid <= RM_MAX_ID; rmid++) - { - if (RmgrTable[rmid].rm_cleanup != NULL) - RmgrTable[rmid].rm_cleanup(); - } + CleanupResourceManagers(); ereport(LOG, (errmsg("redo done at %X/%X system usage: %s", @@ -1871,7 +1860,7 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl xlogrecovery_redo(xlogreader, *replayTLI); /* Now apply the WAL record itself */ - RmgrTable[record->xl_rmid].rm_redo(xlogreader); + GetRmgr(record->xl_rmid)->rm_redo(xlogreader); /* * After redo, check whether the backup pages associated with the WAL @@ -2101,16 +2090,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record) uint8 info = XLogRecGetInfo(record); const char *id; - appendStringInfoString(buf, RmgrTable[rmid].rm_name); + appendStringInfoString(buf, GetRmgr(rmid)->rm_name); appendStringInfoChar(buf, '/'); - id = RmgrTable[rmid].rm_identify(info); + id = GetRmgr(rmid)->rm_identify(info); if (id == NULL) appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK); else appendStringInfo(buf, "%s: ", id); - RmgrTable[rmid].rm_desc(buf, record); + GetRmgr(rmid)->rm_desc(buf, record); } #ifdef WAL_DEBUG @@ -2339,10 +2328,10 @@ verifyBackupPageConsistency(XLogReaderState *record) * If masking function is defined, mask both the primary and replay * images */ - if (RmgrTable[rmid].rm_mask != NULL) + if (GetRmgr(rmid)->rm_mask != NULL) { - RmgrTable[rmid].rm_mask(replay_image_masked, blkno); - RmgrTable[rmid].rm_mask(primary_image_masked, blkno); + GetRmgr(rmid)->rm_mask(replay_image_masked, blkno); + GetRmgr(rmid)->rm_mask(primary_image_masked, blkno); } /* Time to compare the primary and replay images. */ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 77bc7aea7a0..f49e1bbacd1 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -117,8 +117,8 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor rmid = XLogRecGetRmid(record); - if (RmgrTable[rmid].rm_decode != NULL) - RmgrTable[rmid].rm_decode(ctx, &buf); + if (GetRmgr(rmid)->rm_decode != NULL) + GetRmgr(rmid)->rm_decode(ctx, &buf); else { /* just deal with xid, and done */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 932aefc777d..d6e729b51bc 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -11753,7 +11753,7 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source) if (pg_strcasecmp(tok, "all") == 0) { for (rmid = 0; rmid <= RM_MAX_ID; rmid++) - if (RmgrTable[rmid].rm_mask != NULL) + if (GetRmgr(rmid) != NULL && GetRmgr(rmid)->rm_mask != NULL) newwalconsistency[rmid] = true; found = true; } @@ -11765,8 +11765,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source) */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { - if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 && - RmgrTable[rmid].rm_mask != NULL) + if (GetRmgr(rmid) != NULL && GetRmgr(rmid)->rm_mask != NULL && + pg_strcasecmp(tok, GetRmgr(rmid)->rm_name) == 0) { newwalconsistency[rmid] = true; found = true; diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 7cfa169e9b9..470134248b9 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -25,8 +25,8 @@ #include "pg_rewind.h" /* - * RmgrNames is an array of resource manager names, to make error messages - * a bit nicer. + * RmgrNames is an array of the built-in resource manager names, to make error + * messages a bit nicer. */ #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ name, @@ -35,6 +35,9 @@ static const char *RmgrNames[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" }; +#define RmgrName(rmid) (((rmid) <= RM_MAX_BUILTIN_ID) ? \ + RmgrNames[rmid] : "custom") + static void extractPageInfo(XLogReaderState *record); static int xlogreadfd = -1; @@ -427,9 +430,9 @@ extractPageInfo(XLogReaderState *record) * track that change. */ pg_fatal("WAL record modifies a relation, but record type is not recognized: " - "lsn: %X/%X, rmgr: %s, info: %02X", + "lsn: %X/%X, rmid: %d, rmgr: %s, info: %02X", LSN_FORMAT_ARGS(record->ReadRecPtr), - RmgrNames[rmid], info); + rmid, RmgrName(rmid), info); } for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index fc081adfb8c..d424a7afca8 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -51,7 +51,7 @@ typedef struct XLogDumpConfig bool stats_per_record; /* filter options */ - bool filter_by_rmgr[RM_MAX_ID + 1]; + bool filter_by_rmgr[RM_MAX_BUILTIN_ID + 1]; bool filter_by_rmgr_enabled; TransactionId filter_by_xid; bool filter_by_xid_enabled; @@ -71,8 +71,8 @@ typedef struct XLogDumpStats uint64 count; XLogRecPtr startptr; XLogRecPtr endptr; - Stats rmgr_stats[RM_NEXT_ID]; - Stats record_stats[RM_NEXT_ID][MAX_XLINFO_TYPES]; + Stats rmgr_stats[RM_MAX_ID + 1]; + Stats record_stats[RM_MAX_ID + 1][MAX_XLINFO_TYPES]; } XLogDumpStats; #define fatal_error(...) do { pg_log_fatal(__VA_ARGS__); exit(EXIT_FAILURE); } while(0) @@ -95,9 +95,9 @@ print_rmgr_list(void) { int i; - for (i = 0; i <= RM_MAX_ID; i++) + for (i = 0; i <= RM_MAX_BUILTIN_ID; i++) { - printf("%s\n", RmgrDescTable[i].rm_name); + printf("%s\n", GetRmgrDesc(i)->rm_name); } } @@ -473,7 +473,7 @@ static void XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) { const char *id; - const RmgrDescData *desc = &RmgrDescTable[XLogRecGetRmid(record)]; + const RmgrDescData *desc = GetRmgrDesc(XLogRecGetRmid(record)); uint32 rec_len; uint32 fpi_len; RelFileNode rnode; @@ -658,7 +658,7 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats) * calculate column totals. */ - for (ri = 0; ri < RM_NEXT_ID; ri++) + for (ri = 0; ri < RM_MAX_ID; ri++) { total_count += stats->rmgr_stats[ri].count; total_rec_len += stats->rmgr_stats[ri].rec_len; @@ -679,13 +679,13 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats) "Type", "N", "(%)", "Record size", "(%)", "FPI size", "(%)", "Combined size", "(%)", "----", "-", "---", "-----------", "---", "--------", "---", "-------------", "---"); - for (ri = 0; ri < RM_NEXT_ID; ri++) + for (ri = 0; ri <= RM_MAX_ID; ri++) { uint64 count, rec_len, fpi_len, tot_len; - const RmgrDescData *desc = &RmgrDescTable[ri]; + const RmgrDescData *desc = GetRmgrDesc(ri); if (!config->stats_per_record) { @@ -694,6 +694,9 @@ XLogDumpDisplayStats(XLogDumpConfig *config, XLogDumpStats *stats) fpi_len = stats->rmgr_stats[ri].fpi_len; tot_len = rec_len + fpi_len; + if (ri > RM_MAX_BUILTIN_ID && count == 0) + continue; + XLogDumpStatsRow(desc->rm_name, count, total_count, rec_len, total_rec_len, fpi_len, total_fpi_len, tot_len, total_len); @@ -913,16 +916,16 @@ main(int argc, char **argv) exit(EXIT_SUCCESS); } - for (i = 0; i <= RM_MAX_ID; i++) + for (i = 0; i <= RM_MAX_BUILTIN_ID; i++) { - if (pg_strcasecmp(optarg, RmgrDescTable[i].rm_name) == 0) + if (pg_strcasecmp(optarg, GetRmgrDesc(i)->rm_name) == 0) { config.filter_by_rmgr[i] = true; config.filter_by_rmgr_enabled = true; break; } } - if (i > RM_MAX_ID) + if (i > RM_MAX_BUILTIN_ID) { pg_log_error("resource manager \"%s\" does not exist", optarg); diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 6a4ebd1310b..dd8b1bcd9bf 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -35,6 +35,36 @@ #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ { name, desc, identify}, -const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = { +static const RmgrDescData RmgrDescTable[RM_MAX_BUILTIN_ID + 1] = { #include "access/rmgrlist.h" }; + +/* + * No information on custom resource managers; just print the ID. + */ +static void +default_desc(StringInfo buf, XLogReaderState *record) +{ + appendStringInfo(buf, "rmid: %d", XLogRecGetRmid(record)); +} + +/* + * No information on custom resource managers; just return NULL and let the + * caller handle it. + */ +static const char * +default_identify(uint8 info) +{ + return NULL; +} + +const RmgrDescData * +GetRmgrDesc(RmgrId rmid) +{ + if (rmid <= RM_MAX_BUILTIN_ID) + return &RmgrDescTable[rmid]; + else + { + return &(RmgrDescData){ "custom", default_desc, default_identify }; + } +} diff --git a/src/bin/pg_waldump/rmgrdesc.h b/src/bin/pg_waldump/rmgrdesc.h index 42f8483b482..f733cd467d5 100644 --- a/src/bin/pg_waldump/rmgrdesc.h +++ b/src/bin/pg_waldump/rmgrdesc.h @@ -18,6 +18,6 @@ typedef struct RmgrDescData const char *(*rm_identify) (uint8 info); } RmgrDescData; -extern const RmgrDescData RmgrDescTable[]; +extern const RmgrDescData *GetRmgrDesc(RmgrId rmid); #endif /* RMGRDESC_H */ diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index d9b512630ca..13b65567a5f 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -30,6 +30,15 @@ typedef enum RmgrIds #undef PG_RMGR -#define RM_MAX_ID (RM_NEXT_ID - 1) +#define RM_MAX_ID UINT8_MAX +#define RM_MAX_BUILTIN_ID (RM_NEXT_ID - 1) +#define RM_MIN_CUSTOM_ID 128 + +/* + * RmgrId to use for extensions that require an RmgrId, but are still in + * development and have not reserved their own unique RmgrId yet. See: + * https://wiki.postgresql.org/wiki/ExtensibleRmgr + */ +#define RM_EXPERIMENTAL_ID 128 #endif /* RMGR_H */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 0e94833129a..1b8a0067036 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -319,7 +319,10 @@ typedef struct RmgrData struct XLogRecordBuffer *buf); } RmgrData; -extern const RmgrData RmgrTable[]; +extern void StartupResourceManagers(void); +extern void CleanupResourceManagers(void); +extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr); +extern RmgrData *GetRmgr(RmgrId rmid); /* * Exported to support xlog switching from checkpointer -- 2.17.1