Hi all, While updating the patch I recently posted[1] to make pg_waldump report replication origin ID, LSN, and timestamp, I found a bug that replication origin timestamp is not set in ROLLBACK PREPARED case. Commit 8bdb1332eb5 (CC'ed Amit) added an argument to ReorderBufferFinishPrepared() but in ROLLBACK PREPARED case, the caller specified it at the wrong position:
@@ -730,6 +730,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, if (two_phase) { ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + SnapBuildInitialConsistentPoint(ctx->snapshot_builder), commit_time, origin_id, origin_lsn, parsed->twophase_gid, true); } @@ -868,6 +869,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, { ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, abort_time, origin_id, origin_lsn, + InvalidXLogRecPtr, parsed->twophase_gid, false); } This affects the value of rollback_data.rollback_time on the subscriber, resulting in setting the wrong timestamp to both the replication origin timestamp and the error callback argument on the subscriber. I've attached the patch to fix it. Besides, I think we can improve checking input data on subscribers. This bug was not detected by compilers but it could have been detected if we checked the input data. Looking at logicalrep_read_xxx functions in proto.c, there are some inconsistencies; we check the value of prepare_data->xid in logicalrep_read_prepare_common() but we don't in both logicalrep_read_commit_prepared() and logicalrep_read_rollback_prepared(), and we don't check anything in stream_start/stream_stop. Also, IIUC that since timestamps are always set in prepare/commit prepared/rollback prepared cases we can check them too. I've attached a PoC patch that introduces macros for checking input data and adds some new checks. Since it could be frequently called, I used unlikely() but probably we can also consider replacing elog(ERROR) with assertions. Regards, [1] https://www.postgresql.org/message-id/CAD21AoD2dJfgsdxk4_KciAZMZQoUiCvmV9sDpp8ZuKLtKCNXaA%40mail.gmail.com -- Masahiko Sawada EDB: https://www.enterprisedb.com/
From 922aff7d1f7cf75b7672c794e7960a26cf07c8f6 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <sawada.mshk@gmail.com> Date: Mon, 6 Dec 2021 18:23:21 +0900 Subject: [PATCH 1/2] Fix a bug of passing incorrect arguments to ReorderBufferFinishPrepared(). This has been introduced by commit 8bdb1332eb5. --- src/backend/replication/logical/decode.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index a2b69511b4..59aed6cee6 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -875,8 +875,8 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, if (two_phase && !skip_xact) { ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, - abort_time, origin_id, origin_lsn, InvalidXLogRecPtr, + abort_time, origin_id, origin_lsn, parsed->twophase_gid, false); } else -- 2.24.3 (Apple Git-128)
From 97a21e11ff5df940a35f1cad62c56c320e92dd78 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <sawada.mshk@gmail.com> Date: Tue, 7 Dec 2021 00:25:49 +0900 Subject: [PATCH 2/2] Improve input data check of logical replication. --- src/backend/replication/logical/proto.c | 47 +++++++++++++++++++------ 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 9f5bf4b639..c85cad7859 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -20,6 +20,11 @@ #include "utils/lsyscache.h" #include "utils/syscache.h" +/* Macros for checking input data */ +#define LOGICALREP_CHECK_INVALID_LSN(lsn) unlikely(XLogRecPtrIsInvalid(lsn)) +#define LOGICALREP_CHECK_INVALID_TIMESTAMP(ts) unlikely((ts) == 0) +#define LOGICALREP_CHECK_INVALID_XID(xid) unlikely(!TransactionIdIsValid(xid)) + /* * Protocol message flags. */ @@ -61,7 +66,7 @@ logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data) { /* read fields */ begin_data->final_lsn = pq_getmsgint64(in); - if (begin_data->final_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(begin_data->final_lsn)) elog(ERROR, "final_lsn not set in begin message"); begin_data->committime = pq_getmsgint64(in); begin_data->xid = pq_getmsgint(in, 4); @@ -132,10 +137,10 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da { /* read fields */ begin_data->prepare_lsn = pq_getmsgint64(in); - if (begin_data->prepare_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(begin_data->prepare_lsn)) elog(ERROR, "prepare_lsn not set in begin prepare message"); begin_data->end_lsn = pq_getmsgint64(in); - if (begin_data->end_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(begin_data->end_lsn)) elog(ERROR, "end_lsn not set in begin prepare message"); begin_data->prepare_time = pq_getmsgint64(in); begin_data->xid = pq_getmsgint(in, 4); @@ -204,14 +209,16 @@ logicalrep_read_prepare_common(StringInfo in, char *msgtype, /* read fields */ prepare_data->prepare_lsn = pq_getmsgint64(in); - if (prepare_data->prepare_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(prepare_data->prepare_lsn)) elog(ERROR, "prepare_lsn is not set in %s message", msgtype); prepare_data->end_lsn = pq_getmsgint64(in); - if (prepare_data->end_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(prepare_data->end_lsn)) elog(ERROR, "end_lsn is not set in %s message", msgtype); prepare_data->prepare_time = pq_getmsgint64(in); + if (LOGICALREP_CHECK_INVALID_TIMESTAMP(prepare_data->prepare_time)) + elog(ERROR, "prepare_time is not set in %s message", msgtype); prepare_data->xid = pq_getmsgint(in, 4); - if (prepare_data->xid == InvalidTransactionId) + if (LOGICALREP_CHECK_INVALID_XID(prepare_data->xid)) elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype); /* read gid (copy it into a pre-allocated buffer) */ @@ -271,13 +278,17 @@ logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData * /* read fields */ prepare_data->commit_lsn = pq_getmsgint64(in); - if (prepare_data->commit_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(prepare_data->commit_lsn)) elog(ERROR, "commit_lsn is not set in commit prepared message"); prepare_data->end_lsn = pq_getmsgint64(in); - if (prepare_data->end_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(prepare_data->end_lsn)) elog(ERROR, "end_lsn is not set in commit prepared message"); prepare_data->commit_time = pq_getmsgint64(in); + if (LOGICALREP_CHECK_INVALID_TIMESTAMP(prepare_data->commit_time)) + elog(ERROR, "commit_time is not set in commit prepared message"); prepare_data->xid = pq_getmsgint(in, 4); + if (LOGICALREP_CHECK_INVALID_XID(prepare_data->xid)) + elog(ERROR, "invalid two-phase transaction ID in commit prepared message"); /* read gid (copy it into a pre-allocated buffer) */ strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid)); @@ -330,14 +341,20 @@ logicalrep_read_rollback_prepared(StringInfo in, /* read fields */ rollback_data->prepare_end_lsn = pq_getmsgint64(in); - if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(rollback_data->prepare_end_lsn)) elog(ERROR, "prepare_end_lsn is not set in rollback prepared message"); rollback_data->rollback_end_lsn = pq_getmsgint64(in); - if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr) + if (LOGICALREP_CHECK_INVALID_LSN(rollback_data->rollback_end_lsn)) elog(ERROR, "rollback_end_lsn is not set in rollback prepared message"); rollback_data->prepare_time = pq_getmsgint64(in); + if (LOGICALREP_CHECK_INVALID_TIMESTAMP(rollback_data->prepare_time)) + elog(ERROR, "prepare_time is not set in rollback prepared message"); rollback_data->rollback_time = pq_getmsgint64(in); + if (LOGICALREP_CHECK_INVALID_TIMESTAMP(rollback_data->rollback_time)) + elog(ERROR, "rollback_time is not set in rollback prepared message"); rollback_data->xid = pq_getmsgint(in, 4); + if (LOGICALREP_CHECK_INVALID_XID(rollback_data->xid)) + elog(ERROR, "invalid two-phase transaction ID in rollback prepared message"); /* read gid (copy it into a pre-allocated buffer) */ strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid)); @@ -1063,6 +1080,8 @@ logicalrep_read_stream_start(StringInfo in, bool *first_segment) Assert(first_segment); xid = pq_getmsgint(in, 4); + if (LOGICALREP_CHECK_INVALID_XID(xid)) + elog(ERROR, "xid not set in stream start message"); *first_segment = (pq_getmsgbyte(in) == 1); return xid; @@ -1121,7 +1140,11 @@ logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data) /* read fields */ commit_data->commit_lsn = pq_getmsgint64(in); + if (LOGICALREP_CHECK_INVALID_LSN(commit_data->commit_lsn)) + elog(ERROR, "commit_lsn not set in stream commit message"); commit_data->end_lsn = pq_getmsgint64(in); + if (LOGICALREP_CHECK_INVALID_LSN(commit_data->end_lsn)) + elog(ERROR, "end_lsn not set in stream commit message"); commit_data->committime = pq_getmsgint64(in); return xid; @@ -1154,7 +1177,11 @@ logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, Assert(xid && subxid); *xid = pq_getmsgint(in, 4); + if (LOGICALREP_CHECK_INVALID_XID(*xid)) + elog(ERROR, "xid not set in stream abort message"); *subxid = pq_getmsgint(in, 4); + if (LOGICALREP_CHECK_INVALID_XID(*subxid)) + elog(ERROR, "subxid not set in stream abort message"); } /* -- 2.24.3 (Apple Git-128)