Motivation: I'm working on a columnar compression AM[0]. Currently, it uses generic xlog, which works for crash recovery and physical replication, but not logical decoding/replication.
Extensible rmgr would enable the table AM to support its own redo/decode hooks and WAL format, so that it could support crash recovery, physical replication, and logical replication. Background: I submitted another patch[0] to add new logical records, which could be used to support logical decoding directly, without the need for extensible rmgr and without any assumptions about the table AM. This was designed to be easy to use, but inefficient. Amit raised concerns[1] about whether it could meet the needs of zheap. Andres suggested (off-list) that it would be better to just tackle the extensible rmgr problem. The idea for extensible rmgr has been proposed before[3]. The biggest argument against it seemed to be that there was no complete use case[4], so the worry was that something would be left out. Columnar is complete enough that I think it qualifies as a good use case. A subsequent proposal[5] was shot down because of a (potential?) need for catalog access[6]. The attached patch does not use the catalog; instead, it relies on table AM authors choosing IDs that don't conflict with each other. This seems like a reasonable answer, considering that there will likely be very few table AMs that go far enough to fully support WAL including decoding. Are there any other major arguments/objections that I missed? Proposal: The attached patch (against v14, so it's easier to test columnar) is somewhat like a simplified version of [3] combined with refactoring to make decoding a part of the rmgr. * adds a new RmgrData method rm_decode * refactors decode.c to use the new method * add a layer of indirection GetRmgr to find an rmgr * fast path to find builtin rmgr in RmgrTable * to find a custom rmgr, traverses list of custom rmgrs that are currently loaded (unlikely to ever be more than a few) * rmgr IDs from 0-127 are reserved for builtin rmgrs * rmgr IDs from 128-255 are reserved for custom rmgrs * table AM authors need to avoid collisions between rmgr IDs I have tested with columnar using a simple WAL format for logical decoding only, and I'm still using generic xlog for recovery and physical replication. I haven't tested the redo path, or how easy it might be to do something like generic xlog. Questions: 0. Do we want to go this route, or something simpler like my other proposal, which introduces new logical record types[0]? 1. I am allocating the custom rmgr list in TopMemoryContext, and it only works when loading as a part of shared_preload_libraries. This avoids the need for shared memory in Simon's patch[3]. Is that the right thing to do? 2. If we go this route, what do we do with generic xlog? It seems like a half feature, since it doesn't work with logical decoding. 3. If the custom rmgr throws an error during redo, the server won't start. Should we have a GUC to turn non-builtin redo into a no-op to reduce the impact of bugs in the implementation of a custom rmgr? 4. Do we want to encourage index AMs to use this mechanism as well? I didn't really look into how suitable it is, but at a high level it seems reasonable. Regards, Jeff Davis [0] https://postgr.es/m/20ee0b0ae6958804a88fe9580157587720faf664.ca...@j-davis.com [1] https://postgr.es/m/CAA4eK1JVDnbQ80ULdZuhzQkzr_yYhfON-tg%3Dd1U7aWjK_R1ixQ%40mail.gmail.com [2] https://github.com/citusdata/citus/tree/master/src/backend/columnar [3] https://postgr.es/m/1229541840.4793.79.camel%40ebony.2ndQuadrant [4] https://postgr.es/m/20992.1232667957%40sss.pgh.pa.us [5] https://postgr.es/m/1266774840.7341.29872.camel%40ebony [6] https://postgr.es/m/26134.1266776040%40sss.pgh.pa.us
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 58091f6b520..3e4a3c5b675 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -24,15 +24,94 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" +#include "replication/decode.h" #include "replication/message.h" #include "replication/origin.h" #include "storage/standby.h" +#include "utils/memutils.h" #include "utils/relmapper.h" +typedef struct CustomRmgrEntry { + RmgrId rmid; + RmgrData *rmgr; +} CustomRmgrEntry; + /* must be kept in sync with RmgrData definition in xlog_internal.h */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ - { name, redo, desc, identify, startup, cleanup, mask }, +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ + { name, redo, desc, identify, startup, cleanup, mask, decode }, const RmgrData RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" }; + +static CustomRmgrEntry *CustomRmgrTable = NULL; +static int NumCustomRmgrs = 0; + +/* + * Register a new custom rmgr. + */ +void +RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr) +{ + elog(LOG, "registering customer rmgr \"%s\" with ID %d", + rmgr->rm_name, rmid); + + if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID) + elog(PANIC, "custom rmgr id %d out of range", rmid); + + if (CustomRmgrTable == NULL) + CustomRmgrTable = MemoryContextAllocZero( + TopMemoryContext, sizeof(CustomRmgrEntry)); + + /* check for existing builtin rmgr with the same name */ + for (int i = 0; i <= RM_MAX_ID; i++) + { + const RmgrData *existing_rmgr = &RmgrTable[i]; + + if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name)) + elog(PANIC, "custom rmgr \"%s\" has the same name as builtin rmgr", + existing_rmgr->rm_name); + } + + /* check for conflicting custom rmgrs already registered */ + for (int i = 0; i < NumCustomRmgrs; i++) + { + CustomRmgrEntry entry = CustomRmgrTable[i]; + + if (entry.rmid == rmid) + elog(PANIC, "custom rmgr ID %d already registered with name \"%s\"", + rmid, entry.rmgr->rm_name); + + if (!strcmp(entry.rmgr->rm_name, rmgr->rm_name)) + elog(PANIC, "custom rmgr \"%s\" already registered with ID %d", + rmgr->rm_name, entry.rmid); + } + + CustomRmgrTable = (CustomRmgrEntry *) repalloc( + CustomRmgrTable, sizeof(CustomRmgrEntry) * NumCustomRmgrs + 1); + + CustomRmgrTable[NumCustomRmgrs].rmid = rmid; + CustomRmgrTable[NumCustomRmgrs].rmgr = rmgr; + NumCustomRmgrs++; +} + +/* + * GetCustomRmgr + * + * This is an O(N) list traversal because the expected size is very small. + */ +RmgrData +GetCustomRmgr(RmgrId rmid) +{ + if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID) + elog(PANIC, "custom rmgr id %d out of range", rmid); + + for (int i = 0; i < NumCustomRmgrs; i++) + { + CustomRmgrEntry entry = CustomRmgrTable[i]; + if (entry.rmid == rmid) + return *entry.rmgr; + } + + elog(PANIC, "custom rmgr with ID %d not found!", rmid); +} diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6208e123e5d..07f715f0d9f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1505,10 +1505,10 @@ checkXLogConsistency(XLogReaderState *record) * If masking function is defined, mask both the primary and replay * images */ - if (RmgrTable[rmid].rm_mask != NULL) + if (GetRmgr(rmid).rm_mask != NULL) { - RmgrTable[rmid].rm_mask(replay_image_masked, blkno); - RmgrTable[rmid].rm_mask(primary_image_masked, blkno); + GetRmgr(rmid).rm_mask(replay_image_masked, blkno); + GetRmgr(rmid).rm_mask(primary_image_masked, blkno); } /* Time to compare the primary and replay images. */ @@ -7292,8 +7292,8 @@ StartupXLOG(void) /* Initialize resource managers */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { - if (RmgrTable[rmid].rm_startup != NULL) - RmgrTable[rmid].rm_startup(); + if (GetRmgr(rmid).rm_startup != NULL) + GetRmgr(rmid).rm_startup(); } /* @@ -7518,7 +7518,7 @@ StartupXLOG(void) RecordKnownAssignedTransactionIds(record->xl_xid); /* Now apply the WAL record itself */ - RmgrTable[record->xl_rmid].rm_redo(xlogreader); + GetRmgr(record->xl_rmid).rm_redo(xlogreader); /* * After redo, check whether the backup pages associated with @@ -7628,8 +7628,8 @@ StartupXLOG(void) /* Allow resource managers to do any required cleanup. */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { - if (RmgrTable[rmid].rm_cleanup != NULL) - RmgrTable[rmid].rm_cleanup(); + if (GetRmgr(rmid).rm_cleanup != NULL) + GetRmgr(rmid).rm_cleanup(); } ereport(LOG, @@ -10663,16 +10663,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record) uint8 info = XLogRecGetInfo(record); const char *id; - appendStringInfoString(buf, RmgrTable[rmid].rm_name); + appendStringInfoString(buf, GetRmgr(rmid).rm_name); appendStringInfoChar(buf, '/'); - id = RmgrTable[rmid].rm_identify(info); + id = GetRmgr(rmid).rm_identify(info); if (id == NULL) appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK); else appendStringInfo(buf, "%s: ", id); - RmgrTable[rmid].rm_desc(buf, record); + GetRmgr(rmid).rm_desc(buf, record); } diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index f01aea6ddad..6e2d40695ac 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -735,7 +735,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, (uint32) SizeOfXLogRecord, record->xl_tot_len); return false; } - if (record->xl_rmid > RM_MAX_ID) + if (record->xl_rmid > RM_MAX_ID && record->xl_rmid < RM_CUSTOM_MIN_ID) { report_invalid_record(state, "invalid resource manager ID %u at %X/%X", diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index daf2efb0d83..67e0107b191 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -43,21 +43,6 @@ #include "replication/snapbuild.h" #include "storage/standby.h" -typedef struct XLogRecordBuffer -{ - XLogRecPtr origptr; - XLogRecPtr endptr; - XLogReaderState *record; -} XLogRecordBuffer; - -/* RMGR Handlers */ -static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); - /* individual record(group)'s handlers */ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor { XLogRecordBuffer buf; TransactionId txid; + RmgrId rmid; buf.origptr = ctx->reader->ReadRecPtr; buf.endptr = ctx->reader->EndRecPtr; @@ -127,72 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor buf.origptr); } - /* cast so we get a warning when new rmgrs are added */ - switch ((RmgrId) XLogRecGetRmid(record)) - { - /* - * Rmgrs we care about for logical decoding. Add new rmgrs in - * rmgrlist.h's order. - */ - case RM_XLOG_ID: - DecodeXLogOp(ctx, &buf); - break; - - case RM_XACT_ID: - DecodeXactOp(ctx, &buf); - break; + rmid = XLogRecGetRmid(record); - case RM_STANDBY_ID: - DecodeStandbyOp(ctx, &buf); - break; - - case RM_HEAP2_ID: - DecodeHeap2Op(ctx, &buf); - break; - - case RM_HEAP_ID: - DecodeHeapOp(ctx, &buf); - break; - - case RM_LOGICALMSG_ID: - DecodeLogicalMsgOp(ctx, &buf); - break; - - /* - * Rmgrs irrelevant for logical decoding; they describe stuff not - * represented in logical decoding. Add new rmgrs in rmgrlist.h's - * order. - */ - case RM_SMGR_ID: - case RM_CLOG_ID: - case RM_DBASE_ID: - case RM_TBLSPC_ID: - case RM_MULTIXACT_ID: - case RM_RELMAP_ID: - case RM_BTREE_ID: - case RM_HASH_ID: - case RM_GIN_ID: - case RM_GIST_ID: - case RM_SEQ_ID: - case RM_SPGIST_ID: - case RM_BRIN_ID: - case RM_COMMIT_TS_ID: - case RM_REPLORIGIN_ID: - case RM_GENERIC_ID: - /* just deal with xid, and done */ - ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), - buf.origptr); - break; - case RM_NEXT_ID: - elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); + if (GetRmgr(rmid).rm_decode != NULL) + GetRmgr(rmid).rm_decode(ctx, &buf); + else + { + /* just deal with xid, and done */ + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), + buf.origptr); } } /* * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; @@ -234,8 +171,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; ReorderBuffer *reorder = ctx->reorder; @@ -392,8 +329,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; XLogReaderState *r = buf->record; @@ -438,8 +375,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; TransactionId xid = XLogRecGetXid(buf->record); @@ -498,8 +435,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; TransactionId xid = XLogRecGetXid(buf->record); @@ -620,8 +557,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) /* * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; XLogReaderState *r = buf->record; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 6652a60ec31..c7b640befb8 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -11599,7 +11599,7 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source) if (pg_strcasecmp(tok, "all") == 0) { for (rmid = 0; rmid <= RM_MAX_ID; rmid++) - if (RmgrTable[rmid].rm_mask != NULL) + if (GetRmgr(rmid).rm_mask != NULL) newwalconsistency[rmid] = true; found = true; } @@ -11611,8 +11611,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source) */ for (rmid = 0; rmid <= RM_MAX_ID; rmid++) { - if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 && - RmgrTable[rmid].rm_mask != NULL) + if (pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0 && + GetRmgr(rmid).rm_mask != NULL) { newwalconsistency[rmid] = true; found = true; diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 59ebac7d6aa..9fc1729d84c 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -28,7 +28,7 @@ * RmgrNames is an array of resource manager names, to make error messages * a bit nicer. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ name, static const char *RmgrNames[RM_MAX_ID + 1] = { diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 852d8ca4b1c..6a4ebd1310b 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -32,7 +32,7 @@ #include "storage/standbydefs.h" #include "utils/relmapper.h" -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ { name, desc, identify}, const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = { diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index c9b5c56a4c6..e433ec7ecfd 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -19,7 +19,7 @@ typedef uint8 RmgrId; * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG * file format. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ symname, typedef enum RmgrIds @@ -31,5 +31,7 @@ typedef enum RmgrIds #undef PG_RMGR #define RM_MAX_ID (RM_NEXT_ID - 1) +#define RM_CUSTOM_MIN_ID 128 +#define RM_CUSTOM_MAX_ID UINT8_MAX #endif /* RMGR_H */ diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index f582cf535f6..b1ffa2728b5 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -25,25 +25,25 @@ */ /* symbol name, textual name, redo, desc, identify, startup, cleanup */ -PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL) -PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL) -PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL) -PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL) -PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL) -PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL) -PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL) -PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL) -PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL) -PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask) -PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask) -PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask) -PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask) -PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask) -PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask) -PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL) -PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL) -PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask) -PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL) +PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode) +PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode) +PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode) +PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode) +PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode) +PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL) +PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL) +PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL) +PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL) +PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL) +PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL) +PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index dcf41e9257c..ee7422097c6 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -287,6 +287,9 @@ typedef enum RECOVERY_TARGET_ACTION_SHUTDOWN } RecoveryTargetAction; +struct LogicalDecodingContext; +struct XLogRecordBuffer; + /* * Method table for resource managers. * @@ -312,10 +315,15 @@ typedef struct RmgrData void (*rm_startup) (void); void (*rm_cleanup) (void); void (*rm_mask) (char *pagedata, BlockNumber blkno); + void (*rm_decode) (struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf); } RmgrData; extern const RmgrData RmgrTable[]; +extern RmgrData GetCustomRmgr(RmgrId rmid); +extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr); + /* * Exported to support xlog switching from checkpointer */ @@ -333,4 +341,8 @@ extern bool InArchiveRecovery; extern bool StandbyMode; extern char *recoveryRestoreCommand; +#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)] +#define GetRmgr(rmid) ((rmid < RM_CUSTOM_MIN_ID) ? \ + GetBuiltinRmgr(rmid) : GetCustomRmgr(rmid)) + #endif /* XLOG_INTERNAL_H */ diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index 69918080bb5..9cc4783c3ba 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -14,7 +14,21 @@ #include "replication/logical.h" #include "replication/reorderbuffer.h" -void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, +typedef struct XLogRecordBuffer +{ + XLogRecPtr origptr; + XLogRecPtr endptr; + XLogReaderState *record; +} XLogRecordBuffer; + +extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); + +extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record); #endif