Dear Bharath, > I'm attaching 0002 patch (on top of v45) which implements the new > decodable callback approach that I have in mind. IMO, this new > approach is extensible, better than the current approach (hard-coding > of certain WAL records that may be generated during pg_upgrade) taken > by the patch, and helps deal with the issue that custom WAL resource > managers can have with the current approach taken by the patch.
Thanks for sharing your PoC! I tested yours and worked well. I have also made the decoding approach locally, but your approach is conceptually faster. I think it still checks the type one by one so not sure the acceptable, but at least checkings are centerized. We must hear opinions from others. How do other think? Comments for your patch. I attached the txt file, please include if it is OK. 1. According to your post, we must have comments to notify developers that is_decodable API must be implemented. Please share it too if you have idea. 2. The existence of is_decodable should be checked in RegisterCustomRmgr(). 3. Anther rmgr API (rm_identify) requries uint8 without doing a bit operation: they do "info & ~XLR_INFO_MASK" in the callbacks. Should we follow that? 4. It is helpful for developers to add a function to test_custom_rmgrs module. Best Regards, Hayato Kuroda FUJITSU LIMITED
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 001bdf3535..850ba7829a 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -117,6 +117,11 @@ RegisterCustomRmgr(RmgrId rmid, const RmgrData *rmgr) errdetail("Custom resource manager \"%s\" already registered with the same ID.", RmgrTable[rmid].rm_name))); + if (rmgr->rm_decode && rmgr->rm_is_record_decodable == NULL) + ereport(ERROR, + (errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid), + errdetail("Custom resource manager which has a decode function must have is_reacode_decodable function too."))); + /* check for existing rmgr with the same name */ for (int existing_rmid = 0; existing_rmid <= RM_MAX_ID; existing_rmid++) { diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 60d26ae015..2e97962e60 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -214,7 +214,7 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool xlog_is_record_decodable(uint8 info) { - switch (info) + switch (info & ~XLR_INFO_MASK) { case XLOG_CHECKPOINT_SHUTDOWN: case XLOG_END_OF_RECOVERY: @@ -401,7 +401,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool xact_is_record_decodable(uint8 info) { - switch (info) + switch (info & ~XLR_INFO_MASK) { case XLOG_XACT_COMMIT: case XLOG_XACT_COMMIT_PREPARED: @@ -471,7 +471,7 @@ standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool standy_is_record_decodable(uint8 info) { - switch (info) + switch (info & ~XLR_INFO_MASK) { case XLOG_RUNNING_XACTS: return true; @@ -550,7 +550,7 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool heap2_is_record_decodable(uint8 info) { - switch (info) + switch (info & ~XLR_INFO_MASK) { case XLOG_HEAP2_MULTI_INSERT: case XLOG_HEAP2_NEW_CID: @@ -661,7 +661,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool heap_is_record_decodable(uint8 info) { - switch (info) + switch (info & ~XLR_INFO_MASK) { case XLOG_HEAP_INSERT: case XLOG_HEAP_HOT_UPDATE: @@ -782,7 +782,7 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool logicalmsg_is_record_decodable(uint8 info) { - switch (info) + switch (info & ~XLR_INFO_MASK) { case XLOG_LOGICAL_MESSAGE: return true; diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index cfd3e448b1..f19cb68d92 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -320,15 +320,13 @@ binary_upgrade_validate_wal_logical_end(PG_FUNCTION_ARGS) while (is_valid && ReadNextXLogRecord(xlogreader)) { RmgrData rmgr; - RmgrIds rmid; - uint8 info; - - /* Check the type of WAL */ - rmid = XLogRecGetRmid(xlogreader); - info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; if (initial_record) { + /* Check the type of WAL */ + RmgrIds rmid = XLogRecGetRmid(xlogreader); + uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; + /* * Initial record must be either XLOG_CHECKPOINT_SHUTDOWN or * XLOG_SWITCH. @@ -350,11 +348,11 @@ binary_upgrade_validate_wal_logical_end(PG_FUNCTION_ARGS) if (rmgr.rm_decode != NULL) { if (rmgr.rm_is_record_decodable != NULL) - is_valid = rmgr.rm_is_record_decodable(info); + is_valid = rmgr.rm_is_record_decodable(XLogRecGetInfo(xlogreader)); else ereport(ERROR, errmsg("cannot check logical decodability for resource manager \"%s\" with ID %d", - rmgr.rm_name, rmid), + rmgr.rm_name, XLogRecGetRmid(xlogreader)), errdetail("Logical decodability callback is not defined for the resource manager.")); } diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 6e113ef53d..44e10c0a94 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -342,6 +342,9 @@ struct XLogRecordBuffer; * rm_mask takes as input a page modified by the resource manager and masks * out bits that shouldn't be flagged by wal_consistency_checking. * + * If a resource manager implements rm_decode function, rm_is_record_decodable + * function must be also implemented. + * * RmgrTable[] is indexed by RmgrId values (see rmgrlist.h). If rm_name is * NULL, the corresponding RmgrTable entry is considered invalid. */ @@ -356,7 +359,7 @@ typedef struct RmgrData void (*rm_mask) (char *pagedata, BlockNumber blkno); void (*rm_decode) (struct LogicalDecodingContext *ctx, struct XLogRecordBuffer *buf); - bool (*rm_is_record_decodable) (uint8 type); + bool (*rm_is_record_decodable) (uint8 info); } RmgrData; extern PGDLLIMPORT RmgrData RmgrTable[]; diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index 3885ce671d..d8a912296c 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -21,6 +21,12 @@ typedef struct XLogRecordBuffer XLogReaderState *record; } XLogRecordBuffer; +/* + * Decode functions for resource managers. + * + * Note that if a rmgr has rm_decode function, it must have + * rm_is_record_decodable function as well. + */ extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -28,6 +34,7 @@ extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +/* is_record_decodable functions */ extern bool xlog_is_record_decodable(uint8 info); extern bool xact_is_record_decodable(uint8 info); extern bool standy_is_record_decodable(uint8 info); diff --git a/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c b/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c index a304ba54bb..7ac90633f4 100644 --- a/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c +++ b/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c @@ -10,7 +10,7 @@ * src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c * * Custom WAL resource manager for records containing a simple textual - * payload, no-op redo, and no decoding. + * payload, no-op redo and decode. * * ------------------------------------------------------------------------- */ @@ -21,6 +21,7 @@ #include "access/xlog_internal.h" #include "access/xloginsert.h" #include "fmgr.h" +#include "replication/decode.h" #include "utils/pg_lsn.h" #include "varatt.h" @@ -51,12 +52,17 @@ typedef struct xl_testcustomrmgrs_message void testcustomrmgrs_redo(XLogReaderState *record); void testcustomrmgrs_desc(StringInfo buf, XLogReaderState *record); const char *testcustomrmgrs_identify(uint8 info); +void testcustomrmgrs_decode(struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf); +bool testcustomrmgrs_is_record_decodable(uint8 info); static const RmgrData testcustomrmgrs_rmgr = { .rm_name = TESTCUSTOMRMGRS_NAME, .rm_redo = testcustomrmgrs_redo, .rm_desc = testcustomrmgrs_desc, - .rm_identify = testcustomrmgrs_identify + .rm_identify = testcustomrmgrs_identify, + .rm_decode = testcustomrmgrs_decode, + .rm_is_record_decodable = testcustomrmgrs_is_record_decodable }; /* @@ -111,6 +117,30 @@ testcustomrmgrs_identify(uint8 info) return NULL; } +void +testcustomrmgrs_decode(struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + + if (info != XLOG_TEST_CUSTOM_RMGRS_MESSAGE) + elog(PANIC, "testcustomrmgrs_redo: unknown op code %u", info); +} + +bool +testcustomrmgrs_is_record_decodable(uint8 info) +{ + switch (info & ~XLR_INFO_MASK) + { + case XLOG_TEST_CUSTOM_RMGRS_MESSAGE: + return true; + default: + elog(ERROR, "unexpected RM_TESTCUSTOMRMGRS_ID record type: %u", + info); + } +} + /* * SQL function for writing a simple message into WAL with the help of custom * WAL resource manager.