Dear Bharath,

> I'm attaching 0002 patch (on top of v45) which implements the new
> decodable callback approach that I have in mind. IMO, this new
> approach is extensible, better than the current approach (hard-coding
> of certain WAL records that may be generated during pg_upgrade) taken
> by the patch, and helps deal with the issue that custom WAL resource
> managers can have with the current approach taken by the patch.

Thanks for sharing your PoC! I tested yours and worked well. I have also made
the decoding approach locally, but your approach is conceptually faster. I think
it still checks the type one by one so not sure the acceptable, but at least
checkings are centerized. We must hear opinions from others. How do other think?
 
Comments for your patch. I attached the txt file, please include if it is OK.

1.
According to your post, we must have comments to notify developers that
is_decodable API must be implemented. Please share it too if you have idea.

 
2.
The existence of is_decodable should be checked in RegisterCustomRmgr().

3.
Anther rmgr API (rm_identify) requries uint8 without doing a bit operation:
they do "info & ~XLR_INFO_MASK" in the callbacks. Should we follow that?

4.
It is helpful for developers to add a function to test_custom_rmgrs module.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

diff --git a/src/backend/access/transam/rmgr.c 
b/src/backend/access/transam/rmgr.c
index 001bdf3535..850ba7829a 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -117,6 +117,11 @@ RegisterCustomRmgr(RmgrId rmid, const RmgrData *rmgr)
                                 errdetail("Custom resource manager \"%s\" 
already registered with the same ID.",
                                                   RmgrTable[rmid].rm_name)));
 
+       if (rmgr->rm_decode && rmgr->rm_is_record_decodable == NULL)
+               ereport(ERROR,
+                               (errmsg("failed to register custom resource 
manager \"%s\" with ID %d", rmgr->rm_name, rmid),
+                                errdetail("Custom resource manager which has a 
decode function must have is_reacode_decodable function too.")));
+
        /* check for existing rmgr with the same name */
        for (int existing_rmid = 0; existing_rmid <= RM_MAX_ID; existing_rmid++)
        {
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index 60d26ae015..2e97962e60 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -214,7 +214,7 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
 bool
 xlog_is_record_decodable(uint8 info)
 {
-       switch (info)
+       switch (info & ~XLR_INFO_MASK)
        {
                case XLOG_CHECKPOINT_SHUTDOWN:
                case XLOG_END_OF_RECOVERY:
@@ -401,7 +401,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
 bool
 xact_is_record_decodable(uint8 info)
 {
-       switch (info)
+       switch (info & ~XLR_INFO_MASK)
        {
                case XLOG_XACT_COMMIT:
                case XLOG_XACT_COMMIT_PREPARED:
@@ -471,7 +471,7 @@ standby_decode(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf)
 bool
 standy_is_record_decodable(uint8 info)
 {
-       switch (info)
+       switch (info & ~XLR_INFO_MASK)
        {
                case XLOG_RUNNING_XACTS:
                        return true;
@@ -550,7 +550,7 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
 bool
 heap2_is_record_decodable(uint8 info)
 {
-       switch (info)
+       switch (info & ~XLR_INFO_MASK)
        {
                case XLOG_HEAP2_MULTI_INSERT:
                case XLOG_HEAP2_NEW_CID:
@@ -661,7 +661,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf)
 bool
 heap_is_record_decodable(uint8 info)
 {
-       switch (info)
+       switch (info & ~XLR_INFO_MASK)
        {
                case XLOG_HEAP_INSERT:
                case XLOG_HEAP_HOT_UPDATE:
@@ -782,7 +782,7 @@ logicalmsg_decode(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf)
 bool
 logicalmsg_is_record_decodable(uint8 info)
 {
-       switch (info)
+       switch (info & ~XLR_INFO_MASK)
        {
                case XLOG_LOGICAL_MESSAGE:
                        return true;
diff --git a/src/backend/utils/adt/pg_upgrade_support.c 
b/src/backend/utils/adt/pg_upgrade_support.c
index cfd3e448b1..f19cb68d92 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -320,15 +320,13 @@ binary_upgrade_validate_wal_logical_end(PG_FUNCTION_ARGS)
        while (is_valid && ReadNextXLogRecord(xlogreader))
        {
                RmgrData        rmgr;
-               RmgrIds         rmid;
-               uint8           info;
-
-               /* Check the type of WAL */
-               rmid = XLogRecGetRmid(xlogreader);
-               info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
 
                if (initial_record)
                {
+                       /* Check the type of WAL */
+                       RmgrIds         rmid = XLogRecGetRmid(xlogreader);
+                       uint8           info = XLogRecGetInfo(xlogreader) & 
~XLR_INFO_MASK;
+
                        /*
                         * Initial record must be either 
XLOG_CHECKPOINT_SHUTDOWN or
                         * XLOG_SWITCH.
@@ -350,11 +348,11 @@ binary_upgrade_validate_wal_logical_end(PG_FUNCTION_ARGS)
                if (rmgr.rm_decode != NULL)
                {
                        if (rmgr.rm_is_record_decodable != NULL)
-                               is_valid = rmgr.rm_is_record_decodable(info);
+                               is_valid = 
rmgr.rm_is_record_decodable(XLogRecGetInfo(xlogreader));
                        else
                                ereport(ERROR,
                                                errmsg("cannot check logical 
decodability for resource manager \"%s\" with ID %d",
-                                                          rmgr.rm_name, rmid),
+                                                          rmgr.rm_name, 
XLogRecGetRmid(xlogreader)),
                                                errdetail("Logical decodability 
callback is not defined for the resource manager."));
                }
 
diff --git a/src/include/access/xlog_internal.h 
b/src/include/access/xlog_internal.h
index 6e113ef53d..44e10c0a94 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -342,6 +342,9 @@ struct XLogRecordBuffer;
  * rm_mask takes as input a page modified by the resource manager and masks
  * out bits that shouldn't be flagged by wal_consistency_checking.
  *
+ * If a resource manager implements rm_decode function, rm_is_record_decodable
+ * function must be also implemented.
+ *
  * RmgrTable[] is indexed by RmgrId values (see rmgrlist.h). If rm_name is
  * NULL, the corresponding RmgrTable entry is considered invalid.
  */
@@ -356,7 +359,7 @@ typedef struct RmgrData
        void            (*rm_mask) (char *pagedata, BlockNumber blkno);
        void            (*rm_decode) (struct LogicalDecodingContext *ctx,
                                                          struct 
XLogRecordBuffer *buf);
-       bool            (*rm_is_record_decodable) (uint8 type);
+       bool            (*rm_is_record_decodable) (uint8 info);
 } RmgrData;
 
 extern PGDLLIMPORT RmgrData RmgrTable[];
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 3885ce671d..d8a912296c 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -21,6 +21,12 @@ typedef struct XLogRecordBuffer
        XLogReaderState *record;
 } XLogRecordBuffer;
 
+/*
+ * Decode functions for resource managers.
+ *
+ * Note that if a rmgr has rm_decode function, it must have
+ * rm_is_record_decodable function as well.
+ */
 extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -28,6 +34,7 @@ extern void xact_decode(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf);
 extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer 
*buf);
 
+/* is_record_decodable functions */
 extern bool xlog_is_record_decodable(uint8 info);
 extern bool xact_is_record_decodable(uint8 info);
 extern bool standy_is_record_decodable(uint8 info);
diff --git a/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c 
b/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c
index a304ba54bb..7ac90633f4 100644
--- a/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c
+++ b/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c
@@ -10,7 +10,7 @@
  *             src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c
  *
  * Custom WAL resource manager for records containing a simple textual
- * payload, no-op redo, and no decoding.
+ * payload, no-op redo and decode.
  *
  * -------------------------------------------------------------------------
  */
@@ -21,6 +21,7 @@
 #include "access/xlog_internal.h"
 #include "access/xloginsert.h"
 #include "fmgr.h"
+#include "replication/decode.h"
 #include "utils/pg_lsn.h"
 #include "varatt.h"
 
@@ -51,12 +52,17 @@ typedef struct xl_testcustomrmgrs_message
 void           testcustomrmgrs_redo(XLogReaderState *record);
 void           testcustomrmgrs_desc(StringInfo buf, XLogReaderState *record);
 const char *testcustomrmgrs_identify(uint8 info);
+void           testcustomrmgrs_decode(struct LogicalDecodingContext *ctx,
+                                                                  struct 
XLogRecordBuffer *buf);
+bool           testcustomrmgrs_is_record_decodable(uint8 info);
 
 static const RmgrData testcustomrmgrs_rmgr = {
        .rm_name = TESTCUSTOMRMGRS_NAME,
        .rm_redo = testcustomrmgrs_redo,
        .rm_desc = testcustomrmgrs_desc,
-       .rm_identify = testcustomrmgrs_identify
+       .rm_identify = testcustomrmgrs_identify,
+       .rm_decode = testcustomrmgrs_decode,
+       .rm_is_record_decodable = testcustomrmgrs_is_record_decodable
 };
 
 /*
@@ -111,6 +117,30 @@ testcustomrmgrs_identify(uint8 info)
        return NULL;
 }
 
+void
+testcustomrmgrs_decode(struct LogicalDecodingContext *ctx,
+                                          struct XLogRecordBuffer *buf)
+{
+       XLogReaderState *r = buf->record;
+       uint8           info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+
+       if (info != XLOG_TEST_CUSTOM_RMGRS_MESSAGE)
+               elog(PANIC, "testcustomrmgrs_redo: unknown op code %u", info);
+}
+
+bool
+testcustomrmgrs_is_record_decodable(uint8 info)
+{
+       switch (info & ~XLR_INFO_MASK)
+       {
+               case XLOG_TEST_CUSTOM_RMGRS_MESSAGE:
+                       return true;
+               default:
+                       elog(ERROR, "unexpected RM_TESTCUSTOMRMGRS_ID record 
type: %u",
+                                info);
+       }
+}
+
 /*
  * SQL function for writing a simple message into WAL with the help of custom
  * WAL resource manager.

Reply via email to