On Mon, 2021-11-08 at 15:36 -0800, Jeff Davis wrote: > The attached patch (against v14, so it's easier to test columnar) is > somewhat like a simplified version of [3] combined with refactoring > to > make decoding a part of the rmgr.
New patches attached (v3). Essentially the content as v2, but split into 3 patches and rebased. Review on patch 0001 would be helpful. It makes decoding just another method of rmgr, which makes a lot of sense to me from a code organization standpoint regardless of the other patches. Is there any reason not to do that? The other patches then make rmgr extensible, which in turn makes decoding extensible and solves the logical replication problem for custom table AMs. The most obvious way to make rmgr extensible would be to expand the rmgr table, but I was concerned about making that dynamic (right now the structure is entirely constant and perhaps that's important for some optimizations?). So, at the cost of complexity I made a separate, dynamic rmgr table to hold the custom entries. The custom rmgr API would probably be marked "experimental" for a while, and I don't expect lots of people to use it given the challenges of a production-quality table AM. But it's still important, because without it a table AM has no chance to participate in logical replication. Regards, Jeff Davis
From b6b6c93513d42223ac2bdcb1cd1522fd845d0af7 Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Sat, 6 Nov 2021 12:58:04 -0700 Subject: [PATCH 1/3] Make logical decoding a part of the rmgr. Add a new rmgr method, rm_decode, and use that rather than a switch statement. In preparation for extensible rmgr. Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com --- src/backend/access/transam/rmgr.c | 5 +- src/backend/replication/logical/decode.c | 105 +++++------------------ src/bin/pg_rewind/parsexlog.c | 2 +- src/bin/pg_waldump/rmgrdesc.c | 2 +- src/include/access/rmgr.h | 2 +- src/include/access/rmgrlist.h | 44 +++++----- src/include/access/xlog_internal.h | 5 ++ src/include/replication/decode.h | 16 +++- 8 files changed, 69 insertions(+), 112 deletions(-) diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 58091f6b520..f8847d5aebf 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -24,14 +24,15 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" +#include "replication/decode.h" #include "replication/message.h" #include "replication/origin.h" #include "storage/standby.h" #include "utils/relmapper.h" /* must be kept in sync with RmgrData definition in xlog_internal.h */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ - { name, redo, desc, identify, startup, cleanup, mask }, +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ + { name, redo, desc, identify, startup, cleanup, mask, decode }, const RmgrData RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 1d22208c1ad..9b450c9f90e 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -43,21 +43,6 @@ #include "replication/snapbuild.h" #include "storage/standby.h" -typedef struct XLogRecordBuffer -{ - XLogRecPtr origptr; - XLogRecPtr endptr; - XLogReaderState *record; -} XLogRecordBuffer; - -/* RMGR Handlers */ -static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); - /* individual record(group)'s handlers */ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor { XLogRecordBuffer buf; TransactionId txid; + RmgrId rmid; buf.origptr = ctx->reader->ReadRecPtr; buf.endptr = ctx->reader->EndRecPtr; @@ -127,72 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor buf.origptr); } - /* cast so we get a warning when new rmgrs are added */ - switch ((RmgrId) XLogRecGetRmid(record)) - { - /* - * Rmgrs we care about for logical decoding. Add new rmgrs in - * rmgrlist.h's order. - */ - case RM_XLOG_ID: - DecodeXLogOp(ctx, &buf); - break; - - case RM_XACT_ID: - DecodeXactOp(ctx, &buf); - break; + rmid = XLogRecGetRmid(record); - case RM_STANDBY_ID: - DecodeStandbyOp(ctx, &buf); - break; - - case RM_HEAP2_ID: - DecodeHeap2Op(ctx, &buf); - break; - - case RM_HEAP_ID: - DecodeHeapOp(ctx, &buf); - break; - - case RM_LOGICALMSG_ID: - DecodeLogicalMsgOp(ctx, &buf); - break; - - /* - * Rmgrs irrelevant for logical decoding; they describe stuff not - * represented in logical decoding. Add new rmgrs in rmgrlist.h's - * order. - */ - case RM_SMGR_ID: - case RM_CLOG_ID: - case RM_DBASE_ID: - case RM_TBLSPC_ID: - case RM_MULTIXACT_ID: - case RM_RELMAP_ID: - case RM_BTREE_ID: - case RM_HASH_ID: - case RM_GIN_ID: - case RM_GIST_ID: - case RM_SEQ_ID: - case RM_SPGIST_ID: - case RM_BRIN_ID: - case RM_COMMIT_TS_ID: - case RM_REPLORIGIN_ID: - case RM_GENERIC_ID: - /* just deal with xid, and done */ - ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), - buf.origptr); - break; - case RM_NEXT_ID: - elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); + if (RmgrTable[rmid].rm_decode != NULL) + RmgrTable[rmid].rm_decode(ctx, &buf); + else + { + /* just deal with xid, and done */ + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), + buf.origptr); } } /* * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; @@ -234,8 +171,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; ReorderBuffer *reorder = ctx->reorder; @@ -391,8 +328,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; XLogReaderState *r = buf->record; @@ -437,8 +374,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; TransactionId xid = XLogRecGetXid(buf->record); @@ -497,8 +434,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; TransactionId xid = XLogRecGetXid(buf->record); @@ -619,8 +556,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) /* * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; XLogReaderState *r = buf->record; diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 91437974584..f6cfee4ce8c 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -28,7 +28,7 @@ * RmgrNames is an array of resource manager names, to make error messages * a bit nicer. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ name, static const char *RmgrNames[RM_MAX_ID + 1] = { diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 852d8ca4b1c..6a4ebd1310b 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -32,7 +32,7 @@ #include "storage/standbydefs.h" #include "utils/relmapper.h" -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ { name, desc, identify}, const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = { diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index c9b5c56a4c6..d9b512630ca 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -19,7 +19,7 @@ typedef uint8 RmgrId; * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG * file format. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ symname, typedef enum RmgrIds diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index ed751aaf039..9a74721c97c 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -25,25 +25,25 @@ */ /* symbol name, textual name, redo, desc, identify, startup, cleanup */ -PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL) -PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL) -PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL) -PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL) -PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL) -PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL) -PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL) -PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL) -PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL) -PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask) -PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask) -PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask) -PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask) -PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask) -PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask) -PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL) -PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL) -PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask) -PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL) +PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode) +PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode) +PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode) +PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode) +PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode) +PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL) +PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL) +PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL) +PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL) +PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL) +PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL) +PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index e27fca0cc0e..849954a8e5a 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -287,6 +287,9 @@ typedef enum RECOVERY_TARGET_ACTION_SHUTDOWN } RecoveryTargetAction; +struct LogicalDecodingContext; +struct XLogRecordBuffer; + /* * Method table for resource managers. * @@ -312,6 +315,8 @@ typedef struct RmgrData void (*rm_startup) (void); void (*rm_cleanup) (void); void (*rm_mask) (char *pagedata, BlockNumber blkno); + void (*rm_decode) (struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf); } RmgrData; extern const RmgrData RmgrTable[]; diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index 1db73f35549..a33c2a718a7 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -14,7 +14,21 @@ #include "replication/logical.h" #include "replication/reorderbuffer.h" -void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, +typedef struct XLogRecordBuffer +{ + XLogRecPtr origptr; + XLogRecPtr endptr; + XLogReaderState *record; +} XLogRecordBuffer; + +extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); + +extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record); #endif -- 2.17.1
From cb723a9f77c848784e952a6eeeeaf2d7f939d312 Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Sat, 6 Nov 2021 13:01:38 -0700 Subject: [PATCH 2/3] Add macro GetRmgr in preparation for extensible rmgr. --- src/backend/access/transam/xlog.c | 22 +++++++++++----------- src/backend/access/transam/xlogreader.c | 2 +- src/backend/replication/logical/decode.c | 4 ++-- src/backend/utils/misc/guc.c | 6 +++--- src/include/access/xlog_internal.h | 2 ++ 5 files changed, 19 insertions(+), 17 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index c9d4cbf3ff5..84a0d485e99 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1531,10 +1531,10 @@ checkXLogConsistency(XLogReaderState *record) * If masking function is defined, mask both the primary and replay * images */ - if (RmgrTable[rmid].rm_mask != NULL) + if (GetRmgr(rmid).rm_mask != NULL) { - RmgrTable[rmid].rm_mask(replay_image_masked, blkno); - RmgrTable[rmid].rm_mask(primary_image_masked, blkno); + GetRmgr(rmid).rm_mask(replay_image_masked, blkno); + GetRmgr(rmid).rm_mask(primary_image_masked, blkno); } /* Time to compare the primary and replay images. */ @@ -7494,8 +7494,8 @@ StartupXLOG(void) /* Initialize resource managers */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { - if (RmgrTable[rmid].rm_startup != NULL) - RmgrTable[rmid].rm_startup(); + if (GetRmgr(rmid).rm_startup != NULL) + GetRmgr(rmid).rm_startup(); } /* @@ -7717,7 +7717,7 @@ StartupXLOG(void) RecordKnownAssignedTransactionIds(record->xl_xid); /* Now apply the WAL record itself */ - RmgrTable[record->xl_rmid].rm_redo(xlogreader); + GetRmgr(record->xl_rmid).rm_redo(xlogreader); /* * After redo, check whether the backup pages associated with @@ -7827,8 +7827,8 @@ StartupXLOG(void) /* Allow resource managers to do any required cleanup. */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { - if (RmgrTable[rmid].rm_cleanup != NULL) - RmgrTable[rmid].rm_cleanup(); + if (GetRmgr(rmid).rm_cleanup != NULL) + GetRmgr(rmid).rm_cleanup(); } ereport(LOG, @@ -10740,16 +10740,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record) uint8 info = XLogRecGetInfo(record); const char *id; - appendStringInfoString(buf, RmgrTable[rmid].rm_name); + appendStringInfoString(buf, GetRmgr(rmid).rm_name); appendStringInfoChar(buf, '/'); - id = RmgrTable[rmid].rm_identify(info); + id = GetRmgr(rmid).rm_identify(info); if (id == NULL) appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK); else appendStringInfo(buf, "%s: ", id); - RmgrTable[rmid].rm_desc(buf, record); + GetRmgr(rmid).rm_desc(buf, record); } diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 35029cf97d6..612b1b37233 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -738,7 +738,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, (uint32) SizeOfXLogRecord, record->xl_tot_len); return false; } - if (record->xl_rmid > RM_MAX_ID) + if (record->xl_rmid > RM_MAX_ID && record->xl_rmid < RM_CUSTOM_MIN_ID) { report_invalid_record(state, "invalid resource manager ID %u at %X/%X", diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 9b450c9f90e..4093b650635 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -115,8 +115,8 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor rmid = XLogRecGetRmid(record); - if (RmgrTable[rmid].rm_decode != NULL) - RmgrTable[rmid].rm_decode(ctx, &buf); + if (GetRmgr(rmid).rm_decode != NULL) + GetRmgr(rmid).rm_decode(ctx, &buf); else { /* just deal with xid, and done */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 4c94f09c645..f8bb0fabc11 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -11676,7 +11676,7 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source) if (pg_strcasecmp(tok, "all") == 0) { for (rmid = 0; rmid <= RM_MAX_ID; rmid++) - if (RmgrTable[rmid].rm_mask != NULL) + if (GetRmgr(rmid).rm_mask != NULL) newwalconsistency[rmid] = true; found = true; } @@ -11688,8 +11688,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source) */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { - if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 && - RmgrTable[rmid].rm_mask != NULL) + if (pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0 && + GetRmgr(rmid).rm_mask != NULL) { newwalconsistency[rmid] = true; found = true; diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 849954a8e5a..9d4e122e946 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -338,4 +338,6 @@ extern bool InArchiveRecovery; extern bool StandbyMode; extern char *recoveryRestoreCommand; +#define GetRmgr(rmid) RmgrTable[(rmid)] + #endif /* XLOG_INTERNAL_H */ -- 2.17.1
From 22c2f6895039ce39b0aca300a8051eb51a006071 Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Sat, 6 Nov 2021 14:39:50 -0700 Subject: [PATCH 3/3] Custom Rmgr --- src/backend/access/transam/rmgr.c | 86 ++++++++++++++++++++++++++++++ src/include/access/rmgr.h | 9 ++++ src/include/access/xlog_internal.h | 7 ++- 3 files changed, 101 insertions(+), 1 deletion(-) diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index f8847d5aebf..354f1033bf7 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -24,12 +24,19 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" +#include "miscadmin.h" #include "replication/decode.h" #include "replication/message.h" #include "replication/origin.h" #include "storage/standby.h" +#include "utils/memutils.h" #include "utils/relmapper.h" +typedef struct CustomRmgrEntry { + RmgrId rmid; + RmgrData *rmgr; +} CustomRmgrEntry; + /* must be kept in sync with RmgrData definition in xlog_internal.h */ #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ { name, redo, desc, identify, startup, cleanup, mask, decode }, @@ -37,3 +44,82 @@ const RmgrData RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" }; + +static CustomRmgrEntry *CustomRmgrTable = NULL; +static int NumCustomRmgrs = 0; + +/* + * Register a new custom rmgr. + * + * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a + * unique RmgrId for your extension, to avoid conflicts. During development, + * use RM_EXPERIMENTAL_ID. + */ +void +RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr) +{ + if (!process_shared_preload_libraries_in_progress) + elog(ERROR, "custom rmgr must be registered while initializing extensions in shared_preload_libraries"); + + elog(LOG, "registering customer rmgr \"%s\" with ID %d", + rmgr->rm_name, rmid); + + if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID) + elog(PANIC, "custom rmgr id %d out of range", rmid); + + if (CustomRmgrTable == NULL) + CustomRmgrTable = MemoryContextAllocZero( + TopMemoryContext, sizeof(CustomRmgrEntry)); + + /* check for existing builtin rmgr with the same name */ + for (int i = 0; i <= RM_MAX_ID; i++) + { + const RmgrData *existing_rmgr = &RmgrTable[i]; + + if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name)) + elog(PANIC, "custom rmgr \"%s\" has the same name as builtin rmgr", + existing_rmgr->rm_name); + } + + /* check for conflicting custom rmgrs already registered */ + for (int i = 0; i < NumCustomRmgrs; i++) + { + CustomRmgrEntry entry = CustomRmgrTable[i]; + + if (entry.rmid == rmid) + elog(PANIC, "custom rmgr ID %d already registered with name \"%s\"", + rmid, entry.rmgr->rm_name); + + if (!strcmp(entry.rmgr->rm_name, rmgr->rm_name)) + elog(PANIC, "custom rmgr \"%s\" already registered with ID %d", + rmgr->rm_name, entry.rmid); + } + + CustomRmgrTable = (CustomRmgrEntry *) repalloc( + CustomRmgrTable, sizeof(CustomRmgrEntry) * NumCustomRmgrs + 1); + + CustomRmgrTable[NumCustomRmgrs].rmid = rmid; + CustomRmgrTable[NumCustomRmgrs].rmgr = rmgr; + NumCustomRmgrs++; +} + +/* + * GetCustomRmgr + * + * This is an O(N) list traversal because the expected size is very small. + */ +RmgrData +GetCustomRmgr(RmgrId rmid) +{ + if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID) + elog(PANIC, "custom rmgr id %d out of range", rmid); + + for (int i = 0; i < NumCustomRmgrs; i++) + { + CustomRmgrEntry entry = CustomRmgrTable[i]; + if (entry.rmid == rmid) + return *entry.rmgr; + } + + elog(PANIC, "custom rmgr with ID %d not found!", rmid); +} diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index d9b512630ca..9ad790a5cd0 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -31,5 +31,14 @@ typedef enum RmgrIds #undef PG_RMGR #define RM_MAX_ID (RM_NEXT_ID - 1) +#define RM_CUSTOM_MIN_ID 128 +#define RM_CUSTOM_MAX_ID UINT8_MAX + +/* + * RmgrId to use for extensions that require an RmgrId, but are still in + * development and have not reserved their own unique RmgrId yet. See: + * https://wiki.postgresql.org/wiki/ExtensibleRmgr + */ +#define RM_EXPERIMENTAL_ID 128 #endif /* RMGR_H */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 9d4e122e946..7cb1f6c3893 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -321,6 +321,9 @@ typedef struct RmgrData extern const RmgrData RmgrTable[]; +extern RmgrData GetCustomRmgr(RmgrId rmid); +extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr); + /* * Exported to support xlog switching from checkpointer */ @@ -338,6 +341,8 @@ extern bool InArchiveRecovery; extern bool StandbyMode; extern char *recoveryRestoreCommand; -#define GetRmgr(rmid) RmgrTable[(rmid)] +#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)] +#define GetRmgr(rmid) ((rmid < RM_CUSTOM_MIN_ID) ? \ + GetBuiltinRmgr(rmid) : GetCustomRmgr(rmid)) #endif /* XLOG_INTERNAL_H */ -- 2.17.1