On Mon, 2021-11-08 at 15:36 -0800, Jeff Davis wrote: > The attached patch (against v14, so it's easier to test columnar) is > somewhat like a simplified version of [3] combined with refactoring > to > make decoding a part of the rmgr.
I created a wiki page here: https://wiki.postgresql.org/wiki/ExtensibleRmgr To coordinate reservation of RmgrIds, to avoid conflicts. I don't expect it to be a practical problem given how much work it takes to create a new table AM that needs full WAL support, but might as well have some transparency on how to choose a new RmgrId. I also updated the patch to point to the wiki page in the comments, and added in a new RM_EXPERIMENTAL_ID that can be used while an extension is still in development. Hopefully this will prevent people reserving lots of RmgrIds for extensions that never get released. Regards, Jeff Davis
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 58091f6b520..354f1033bf7 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -24,15 +24,102 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" +#include "miscadmin.h" +#include "replication/decode.h" #include "replication/message.h" #include "replication/origin.h" #include "storage/standby.h" +#include "utils/memutils.h" #include "utils/relmapper.h" +typedef struct CustomRmgrEntry { + RmgrId rmid; + RmgrData *rmgr; +} CustomRmgrEntry; + /* must be kept in sync with RmgrData definition in xlog_internal.h */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ - { name, redo, desc, identify, startup, cleanup, mask }, +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ + { name, redo, desc, identify, startup, cleanup, mask, decode }, const RmgrData RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" }; + +static CustomRmgrEntry *CustomRmgrTable = NULL; +static int NumCustomRmgrs = 0; + +/* + * Register a new custom rmgr. + * + * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a + * unique RmgrId for your extension, to avoid conflicts. During development, + * use RM_EXPERIMENTAL_ID. + */ +void +RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr) +{ + if (!process_shared_preload_libraries_in_progress) + elog(ERROR, "custom rmgr must be registered while initializing extensions in shared_preload_libraries"); + + elog(LOG, "registering customer rmgr \"%s\" with ID %d", + rmgr->rm_name, rmid); + + if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID) + elog(PANIC, "custom rmgr id %d out of range", rmid); + + if (CustomRmgrTable == NULL) + CustomRmgrTable = MemoryContextAllocZero( + TopMemoryContext, sizeof(CustomRmgrEntry)); + + /* check for existing builtin rmgr with the same name */ + for (int i = 0; i <= RM_MAX_ID; i++) + { + const RmgrData *existing_rmgr = &RmgrTable[i]; + + if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name)) + elog(PANIC, "custom rmgr \"%s\" has the same name as builtin rmgr", + existing_rmgr->rm_name); + } + + /* check for conflicting custom rmgrs already registered */ + for (int i = 0; i < NumCustomRmgrs; i++) + { + CustomRmgrEntry entry = CustomRmgrTable[i]; + + if (entry.rmid == rmid) + elog(PANIC, "custom rmgr ID %d already registered with name \"%s\"", + rmid, entry.rmgr->rm_name); + + if (!strcmp(entry.rmgr->rm_name, rmgr->rm_name)) + elog(PANIC, "custom rmgr \"%s\" already registered with ID %d", + rmgr->rm_name, entry.rmid); + } + + CustomRmgrTable = (CustomRmgrEntry *) repalloc( + CustomRmgrTable, sizeof(CustomRmgrEntry) * NumCustomRmgrs + 1); + + CustomRmgrTable[NumCustomRmgrs].rmid = rmid; + CustomRmgrTable[NumCustomRmgrs].rmgr = rmgr; + NumCustomRmgrs++; +} + +/* + * GetCustomRmgr + * + * This is an O(N) list traversal because the expected size is very small. + */ +RmgrData +GetCustomRmgr(RmgrId rmid) +{ + if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID) + elog(PANIC, "custom rmgr id %d out of range", rmid); + + for (int i = 0; i < NumCustomRmgrs; i++) + { + CustomRmgrEntry entry = CustomRmgrTable[i]; + if (entry.rmid == rmid) + return *entry.rmgr; + } + + elog(PANIC, "custom rmgr with ID %d not found!", rmid); +} diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6208e123e5d..07f715f0d9f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1505,10 +1505,10 @@ checkXLogConsistency(XLogReaderState *record) * If masking function is defined, mask both the primary and replay * images */ - if (RmgrTable[rmid].rm_mask != NULL) + if (GetRmgr(rmid).rm_mask != NULL) { - RmgrTable[rmid].rm_mask(replay_image_masked, blkno); - RmgrTable[rmid].rm_mask(primary_image_masked, blkno); + GetRmgr(rmid).rm_mask(replay_image_masked, blkno); + GetRmgr(rmid).rm_mask(primary_image_masked, blkno); } /* Time to compare the primary and replay images. */ @@ -7292,8 +7292,8 @@ StartupXLOG(void) /* Initialize resource managers */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { - if (RmgrTable[rmid].rm_startup != NULL) - RmgrTable[rmid].rm_startup(); + if (GetRmgr(rmid).rm_startup != NULL) + GetRmgr(rmid).rm_startup(); } /* @@ -7518,7 +7518,7 @@ StartupXLOG(void) RecordKnownAssignedTransactionIds(record->xl_xid); /* Now apply the WAL record itself */ - RmgrTable[record->xl_rmid].rm_redo(xlogreader); + GetRmgr(record->xl_rmid).rm_redo(xlogreader); /* * After redo, check whether the backup pages associated with @@ -7628,8 +7628,8 @@ StartupXLOG(void) /* Allow resource managers to do any required cleanup. */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { - if (RmgrTable[rmid].rm_cleanup != NULL) - RmgrTable[rmid].rm_cleanup(); + if (GetRmgr(rmid).rm_cleanup != NULL) + GetRmgr(rmid).rm_cleanup(); } ereport(LOG, @@ -10663,16 +10663,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record) uint8 info = XLogRecGetInfo(record); const char *id; - appendStringInfoString(buf, RmgrTable[rmid].rm_name); + appendStringInfoString(buf, GetRmgr(rmid).rm_name); appendStringInfoChar(buf, '/'); - id = RmgrTable[rmid].rm_identify(info); + id = GetRmgr(rmid).rm_identify(info); if (id == NULL) appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK); else appendStringInfo(buf, "%s: ", id); - RmgrTable[rmid].rm_desc(buf, record); + GetRmgr(rmid).rm_desc(buf, record); } diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index f01aea6ddad..6e2d40695ac 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -735,7 +735,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, (uint32) SizeOfXLogRecord, record->xl_tot_len); return false; } - if (record->xl_rmid > RM_MAX_ID) + if (record->xl_rmid > RM_MAX_ID && record->xl_rmid < RM_CUSTOM_MIN_ID) { report_invalid_record(state, "invalid resource manager ID %u at %X/%X", diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index daf2efb0d83..67e0107b191 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -43,21 +43,6 @@ #include "replication/snapbuild.h" #include "storage/standby.h" -typedef struct XLogRecordBuffer -{ - XLogRecPtr origptr; - XLogRecPtr endptr; - XLogReaderState *record; -} XLogRecordBuffer; - -/* RMGR Handlers */ -static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); - /* individual record(group)'s handlers */ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor { XLogRecordBuffer buf; TransactionId txid; + RmgrId rmid; buf.origptr = ctx->reader->ReadRecPtr; buf.endptr = ctx->reader->EndRecPtr; @@ -127,72 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor buf.origptr); } - /* cast so we get a warning when new rmgrs are added */ - switch ((RmgrId) XLogRecGetRmid(record)) - { - /* - * Rmgrs we care about for logical decoding. Add new rmgrs in - * rmgrlist.h's order. - */ - case RM_XLOG_ID: - DecodeXLogOp(ctx, &buf); - break; - - case RM_XACT_ID: - DecodeXactOp(ctx, &buf); - break; + rmid = XLogRecGetRmid(record); - case RM_STANDBY_ID: - DecodeStandbyOp(ctx, &buf); - break; - - case RM_HEAP2_ID: - DecodeHeap2Op(ctx, &buf); - break; - - case RM_HEAP_ID: - DecodeHeapOp(ctx, &buf); - break; - - case RM_LOGICALMSG_ID: - DecodeLogicalMsgOp(ctx, &buf); - break; - - /* - * Rmgrs irrelevant for logical decoding; they describe stuff not - * represented in logical decoding. Add new rmgrs in rmgrlist.h's - * order. - */ - case RM_SMGR_ID: - case RM_CLOG_ID: - case RM_DBASE_ID: - case RM_TBLSPC_ID: - case RM_MULTIXACT_ID: - case RM_RELMAP_ID: - case RM_BTREE_ID: - case RM_HASH_ID: - case RM_GIN_ID: - case RM_GIST_ID: - case RM_SEQ_ID: - case RM_SPGIST_ID: - case RM_BRIN_ID: - case RM_COMMIT_TS_ID: - case RM_REPLORIGIN_ID: - case RM_GENERIC_ID: - /* just deal with xid, and done */ - ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), - buf.origptr); - break; - case RM_NEXT_ID: - elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); + if (GetRmgr(rmid).rm_decode != NULL) + GetRmgr(rmid).rm_decode(ctx, &buf); + else + { + /* just deal with xid, and done */ + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), + buf.origptr); } } /* * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; @@ -234,8 +171,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; ReorderBuffer *reorder = ctx->reorder; @@ -392,8 +329,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; XLogReaderState *r = buf->record; @@ -438,8 +375,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; TransactionId xid = XLogRecGetXid(buf->record); @@ -498,8 +435,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; TransactionId xid = XLogRecGetXid(buf->record); @@ -620,8 +557,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) /* * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; XLogReaderState *r = buf->record; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 6652a60ec31..c7b640befb8 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -11599,7 +11599,7 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source) if (pg_strcasecmp(tok, "all") == 0) { for (rmid = 0; rmid <= RM_MAX_ID; rmid++) - if (RmgrTable[rmid].rm_mask != NULL) + if (GetRmgr(rmid).rm_mask != NULL) newwalconsistency[rmid] = true; found = true; } @@ -11611,8 +11611,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source) */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { - if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 && - RmgrTable[rmid].rm_mask != NULL) + if (pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0 && + GetRmgr(rmid).rm_mask != NULL) { newwalconsistency[rmid] = true; found = true; diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 59ebac7d6aa..9fc1729d84c 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -28,7 +28,7 @@ * RmgrNames is an array of resource manager names, to make error messages * a bit nicer. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ name, static const char *RmgrNames[RM_MAX_ID + 1] = { diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 852d8ca4b1c..6a4ebd1310b 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -32,7 +32,7 @@ #include "storage/standbydefs.h" #include "utils/relmapper.h" -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ { name, desc, identify}, const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = { diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index c9b5c56a4c6..9ad790a5cd0 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -19,7 +19,7 @@ typedef uint8 RmgrId; * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG * file format. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ symname, typedef enum RmgrIds @@ -31,5 +31,14 @@ typedef enum RmgrIds #undef PG_RMGR #define RM_MAX_ID (RM_NEXT_ID - 1) +#define RM_CUSTOM_MIN_ID 128 +#define RM_CUSTOM_MAX_ID UINT8_MAX + +/* + * RmgrId to use for extensions that require an RmgrId, but are still in + * development and have not reserved their own unique RmgrId yet. See: + * https://wiki.postgresql.org/wiki/ExtensibleRmgr + */ +#define RM_EXPERIMENTAL_ID 128 #endif /* RMGR_H */ diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index f582cf535f6..b1ffa2728b5 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -25,25 +25,25 @@ */ /* symbol name, textual name, redo, desc, identify, startup, cleanup */ -PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL) -PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL) -PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL) -PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL) -PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL) -PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL) -PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL) -PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL) -PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL) -PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask) -PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask) -PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask) -PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask) -PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask) -PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask) -PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL) -PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL) -PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask) -PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL) +PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode) +PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode) +PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode) +PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode) +PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode) +PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL) +PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL) +PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL) +PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL) +PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL) +PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL) +PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index dcf41e9257c..ee7422097c6 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -287,6 +287,9 @@ typedef enum RECOVERY_TARGET_ACTION_SHUTDOWN } RecoveryTargetAction; +struct LogicalDecodingContext; +struct XLogRecordBuffer; + /* * Method table for resource managers. * @@ -312,10 +315,15 @@ typedef struct RmgrData void (*rm_startup) (void); void (*rm_cleanup) (void); void (*rm_mask) (char *pagedata, BlockNumber blkno); + void (*rm_decode) (struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf); } RmgrData; extern const RmgrData RmgrTable[]; +extern RmgrData GetCustomRmgr(RmgrId rmid); +extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr); + /* * Exported to support xlog switching from checkpointer */ @@ -333,4 +341,8 @@ extern bool InArchiveRecovery; extern bool StandbyMode; extern char *recoveryRestoreCommand; +#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)] +#define GetRmgr(rmid) ((rmid < RM_CUSTOM_MIN_ID) ? \ + GetBuiltinRmgr(rmid) : GetCustomRmgr(rmid)) + #endif /* XLOG_INTERNAL_H */ diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index 69918080bb5..9cc4783c3ba 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -14,7 +14,21 @@ #include "replication/logical.h" #include "replication/reorderbuffer.h" -void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, +typedef struct XLogRecordBuffer +{ + XLogRecPtr origptr; + XLogRecPtr endptr; + XLogReaderState *record; +} XLogRecordBuffer; + +extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); + +extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record); #endif