On Thu, 2022-02-24 at 20:35 +0000, Simon Riggs wrote: > The approach is perfectly fine, it just needs to be finished and > rebased.
Attached a new version. The scope expanded, so this is likely to slip v15 at this late time. For 15, I'll focus on my extensible rmgr work, which can serve similar purposes. The main purpose of this patch is to be able to emit logical events for a table (insert/update/delete/truncate) without actually modifying a table or relying on the heap at all. That allows table AMs to support logical decoding/replication, and perhaps allows a few other interesting use cases (maybe foreign tables?). There are really two advantages of this approach over relying on a custom rmgr: 1. Easier for extension authors 2. Doesn't require an extension module to be loaded to start the server Those are very nice advantages, but they come at the price of flexibility and performance. I think there's room for both, and we can discuss the merits individually. Changes: * Support logical messages for INSERT/UPDATE/DELETE/TRUNCATE (before it only supported INSERT) * SQL functions pg_logical_emit_insert/update/delete/truncate (callable by superuser) * Tests (using test_decoding) * Use replica identities properly * In general a lot of cleanup, but still not quite ready TODO: * Should SQL functions be callable by the table owner? I would lean toward superuser-only, because logical replication is used for administrative purposes like upgrades, and I don't think we want table owners to be able to create inconsistencies. * Support for multi-insert * Docs for SQL functions, and maybe docs in the section on Generic WAL * Try to get away from reliance on heap tuples specifically Regards, Jeff Davis
From f7b85aea60b06eb7019befec38566b5e014bea12 Mon Sep 17 00:00:00 2001 From: Jeff Davis <j...@j-davis.com> Date: Sat, 30 Oct 2021 12:07:35 -0700 Subject: [PATCH] Logical wal. --- contrib/test_decoding/expected/messages.out | 148 +++++++ contrib/test_decoding/sql/messages.sql | 58 +++ src/backend/access/heap/heapam.c | 4 +- src/backend/access/rmgrdesc/Makefile | 2 +- .../{logicalmsgdesc.c => logicaldesc.c} | 45 +- src/backend/access/transam/rmgr.c | 2 +- src/backend/replication/logical/Makefile | 2 +- src/backend/replication/logical/decode.c | 275 ++++++++++-- .../replication/logical/logical_xlog.c | 399 ++++++++++++++++++ .../replication/logical/logicalfuncs.c | 165 +++++++- src/backend/replication/logical/message.c | 89 ---- src/bin/pg_waldump/.gitignore | 2 +- src/bin/pg_waldump/rmgrdesc.c | 2 +- src/include/access/heapam.h | 2 + src/include/access/heapam_xlog.h | 3 + src/include/access/rmgrlist.h | 2 +- src/include/catalog/pg_proc.dat | 17 + src/include/replication/decode.h | 2 +- src/include/replication/logical_xlog.h | 124 ++++++ src/include/replication/message.h | 41 -- 20 files changed, 1211 insertions(+), 173 deletions(-) rename src/backend/access/rmgrdesc/{logicalmsgdesc.c => logicaldesc.c} (59%) create mode 100644 src/backend/replication/logical/logical_xlog.c delete mode 100644 src/backend/replication/logical/message.c create mode 100644 src/include/replication/logical_xlog.h delete mode 100644 src/include/replication/message.h diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out index c75d40190b6..aa284bc37c2 100644 --- a/contrib/test_decoding/expected/messages.out +++ b/contrib/test_decoding/expected/messages.out @@ -91,6 +91,154 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'for ------ (0 rows) +-- no data in this table, but emit logical INSERT/UPDATE/DELETE for it +CREATE TABLE dummy(i int, t text, n numeric, primary key(t)); +SELECT pg_logical_emit_insert('dummy', row(1, 'one', 0.1)::dummy) <> '0/0'::pg_lsn; + ?column? +---------- + t +(1 row) + +SELECT pg_logical_emit_insert('dummy', row(2, 'two', 0.2)::dummy) <> '0/0'::pg_lsn; + ?column? +---------- + t +(1 row) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); + data +----------------------------------------------------------------------- + BEGIN + table public.dummy: INSERT: i[integer]:1 t[text]:'one' n[numeric]:0.1 + COMMIT + BEGIN + table public.dummy: INSERT: i[integer]:2 t[text]:'two' n[numeric]:0.2 + COMMIT +(6 rows) + +SELECT * FROM dummy; + i | t | n +---+---+--- +(0 rows) + +SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn; + ?column? +---------- + t +(1 row) + +SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy, + row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn; + ?column? +---------- + t +(1 row) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); + data +------------------------------------------------------------------------------------------------------------------- + BEGIN + table public.dummy: DELETE: t[text]:'twelve' + COMMIT + BEGIN + table public.dummy: UPDATE: old-key: t[text]:'fifteen' new-tuple: i[integer]:16 t[text]:'sixteen' n[numeric]:0.16 + COMMIT +(6 rows) + +ALTER TABLE dummy REPLICA IDENTITY NOTHING; +SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn; + ?column? +---------- + t +(1 row) + +SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy, + row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn; + ?column? +---------- + t +(1 row) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); + data +----------------------------------------------------------------------------- + BEGIN + table public.dummy: DELETE: (no-tuple-data) + COMMIT + BEGIN + table public.dummy: UPDATE: i[integer]:16 t[text]:'sixteen' n[numeric]:0.16 + COMMIT +(6 rows) + +SELECT pg_logical_emit_truncate(ARRAY['dummy'], true, false) <> '0/0'::pg_lsn; + ?column? +---------- + t +(1 row) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); + data +--------------------------------------- + BEGIN + table public.dummy: TRUNCATE: cascade + COMMIT +(3 rows) + +CREATE UNLOGGED TABLE dummy_u(i int, t text, n numeric, primary key (t)); +-- return invalid +SELECT pg_logical_emit_insert('dummy_u', row(7, 'seven', 0.7)::dummy_u); + pg_logical_emit_insert +------------------------ + 0/0 +(1 row) + +-- return invalid +SELECT pg_logical_emit_update('dummy_u', row(7, 'seven', 0.7)::dummy_u, + row(11, 'eleven', 0.11)::dummy_u); + pg_logical_emit_update +------------------------ + 0/0 +(1 row) + +-- return invalid +SELECT pg_logical_emit_update('dummy_u', row(NULL, 'seven', 0.7)::dummy_u, + row(11, 'eleven', 0.11)::dummy_u); + pg_logical_emit_update +------------------------ + 0/0 +(1 row) + +-- error +SELECT pg_logical_emit_update('dummy_u', row(7, NULL, 0.7)::dummy_u, + row(11, 'eleven', 0.11)::dummy_u); +ERROR: replica identity column is NULL +-- return invalid +SELECT pg_logical_emit_delete('dummy_u', row(7, 'seven', 0.7)::dummy_u); + pg_logical_emit_delete +------------------------ + 0/0 +(1 row) + +-- error +SELECT pg_logical_emit_delete('dummy_u', row(7, NULL, 0.7)::dummy_u); +ERROR: replica identity column is NULL +-- return invalid +SELECT pg_logical_emit_truncate(ARRAY['dummy_u'], false, false); + pg_logical_emit_truncate +-------------------------- + 0/0 +(1 row) + +SELECT pg_logical_emit_truncate(ARRAY['dummy','dummy_u'], false, true) <> '0/0'::pg_lsn; + ?column? +---------- + t +(1 row) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); +ERROR: could not open relation with OID 1663 +DROP TABLE dummy; +DROP TABLE dummy_u; SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot'); ?column? ---------- diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql index cf3f7738e57..c04a2e1a0c8 100644 --- a/contrib/test_decoding/sql/messages.sql +++ b/contrib/test_decoding/sql/messages.sql @@ -31,4 +31,62 @@ SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2'); \c :prevdb SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); +-- no data in this table, but emit logical INSERT/UPDATE/DELETE for it +CREATE TABLE dummy(i int, t text, n numeric, primary key(t)); + +SELECT pg_logical_emit_insert('dummy', row(1, 'one', 0.1)::dummy) <> '0/0'::pg_lsn; +SELECT pg_logical_emit_insert('dummy', row(2, 'two', 0.2)::dummy) <> '0/0'::pg_lsn; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); + +SELECT * FROM dummy; + +SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn; + +SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy, + row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); + +ALTER TABLE dummy REPLICA IDENTITY NOTHING; + +SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn; + +SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy, + row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); + +SELECT pg_logical_emit_truncate(ARRAY['dummy'], true, false) <> '0/0'::pg_lsn; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); + +CREATE UNLOGGED TABLE dummy_u(i int, t text, n numeric, primary key (t)); +-- return invalid +SELECT pg_logical_emit_insert('dummy_u', row(7, 'seven', 0.7)::dummy_u); +-- return invalid +SELECT pg_logical_emit_update('dummy_u', row(7, 'seven', 0.7)::dummy_u, + row(11, 'eleven', 0.11)::dummy_u); +-- return invalid +SELECT pg_logical_emit_update('dummy_u', row(NULL, 'seven', 0.7)::dummy_u, + row(11, 'eleven', 0.11)::dummy_u); +-- error +SELECT pg_logical_emit_update('dummy_u', row(7, NULL, 0.7)::dummy_u, + row(11, 'eleven', 0.11)::dummy_u); +-- return invalid +SELECT pg_logical_emit_delete('dummy_u', row(7, 'seven', 0.7)::dummy_u); +-- error +SELECT pg_logical_emit_delete('dummy_u', row(7, NULL, 0.7)::dummy_u); + +-- return invalid +SELECT pg_logical_emit_truncate(ARRAY['dummy_u'], false, false); + +SELECT pg_logical_emit_truncate(ARRAY['dummy','dummy_u'], false, true) <> '0/0'::pg_lsn; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0'); + +DROP TABLE dummy; + +DROP TABLE dummy_u; + SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot'); diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 74ad445e59b..9d70598dbde 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -108,8 +108,6 @@ static bool ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status static void index_delete_sort(TM_IndexDeleteOp *delstate); static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate); static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); -static HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, bool key_required, - bool *copy); /* @@ -8372,7 +8370,7 @@ log_heap_new_cid(Relation relation, HeapTuple tup) * *copy is set to true if the returned tuple is a modified copy rather than * the same tuple that was passed in. */ -static HeapTuple +HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, bool *copy) { diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index f88d72fd862..ed6dff179be 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -18,7 +18,7 @@ OBJS = \ gistdesc.o \ hashdesc.o \ heapdesc.o \ - logicalmsgdesc.o \ + logicaldesc.o \ mxactdesc.o \ nbtdesc.o \ relmapdesc.o \ diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c b/src/backend/access/rmgrdesc/logicaldesc.c similarity index 59% rename from src/backend/access/rmgrdesc/logicalmsgdesc.c rename to src/backend/access/rmgrdesc/logicaldesc.c index 099e11a84e7..c2b0434a606 100644 --- a/src/backend/access/rmgrdesc/logicalmsgdesc.c +++ b/src/backend/access/rmgrdesc/logicaldesc.c @@ -1,22 +1,22 @@ /*------------------------------------------------------------------------- * - * logicalmsgdesc.c - * rmgr descriptor routines for replication/logical/message.c + * logicaldesc.c + * rmgr descriptor routines for replication/logical/logical_xlog.c * * Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group * * * IDENTIFICATION - * src/backend/access/rmgrdesc/logicalmsgdesc.c + * src/backend/access/rmgrdesc/logicaldesc.c * *------------------------------------------------------------------------- */ #include "postgres.h" -#include "replication/message.h" +#include "replication/logical_xlog.h" void -logicalmsg_desc(StringInfo buf, XLogReaderState *record) +logical_desc(StringInfo buf, XLogReaderState *record) { char *rec = XLogRecGetData(record); uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; @@ -40,13 +40,42 @@ logicalmsg_desc(StringInfo buf, XLogReaderState *record) sep = " "; } } + else if (info == XLOG_LOGICAL_INSERT) + { + + } + else if (info == XLOG_LOGICAL_UPDATE) + { + + } + else if (info == XLOG_LOGICAL_DELETE) + { + + } + else if (info == XLOG_LOGICAL_TRUNCATE) + { + + } } const char * -logicalmsg_identify(uint8 info) +logical_identify(uint8 info) { - if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE) - return "MESSAGE"; + switch (info) + { + case XLOG_LOGICAL_MESSAGE: + return "MESSAGE"; + case XLOG_LOGICAL_INSERT: + return "INSERT"; + case XLOG_LOGICAL_UPDATE: + return "UPDATE"; + case XLOG_LOGICAL_DELETE: + return "DELETE"; + case XLOG_LOGICAL_TRUNCATE: + return "TRUNCATE"; + default: + return NULL; + } return NULL; } diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index f8847d5aebf..ca0bd614fcf 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -25,7 +25,7 @@ #include "commands/sequence.h" #include "commands/tablespace.h" #include "replication/decode.h" -#include "replication/message.h" +#include "replication/logical_xlog.h" #include "replication/origin.h" #include "storage/standby.h" #include "utils/relmapper.h" diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c4e2fdeb719..9fa281a1dca 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -19,7 +19,7 @@ OBJS = \ launcher.o \ logical.o \ logicalfuncs.o \ - message.o \ + logical_xlog.o \ origin.o \ proto.o \ relation.o \ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 77bc7aea7a0..81feca97eb0 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -35,23 +35,29 @@ #include "access/xlogrecord.h" #include "access/xlogutils.h" #include "catalog/pg_control.h" +#include "commands/sequence.h" #include "replication/decode.h" #include "replication/logical.h" -#include "replication/message.h" +#include "replication/logical_xlog.h" #include "replication/origin.h" #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" #include "storage/standby.h" -#include "commands/sequence.h" /* individual record(group)'s handlers */ -static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeHeapUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeHeapDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeHeapTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeLogicalMsg(LogicalDecodingContext *cxt, XLogRecordBuffer *buf); +static void DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeLogicalUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeLogicalDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeLogicalTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); + static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid, bool two_phase); @@ -457,7 +463,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { case XLOG_HEAP_INSERT: if (SnapBuildProcessChange(builder, xid, buf->origptr)) - DecodeInsert(ctx, buf); + DecodeHeapInsert(ctx, buf); break; /* @@ -468,17 +474,17 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) case XLOG_HEAP_HOT_UPDATE: case XLOG_HEAP_UPDATE: if (SnapBuildProcessChange(builder, xid, buf->origptr)) - DecodeUpdate(ctx, buf); + DecodeHeapUpdate(ctx, buf); break; case XLOG_HEAP_DELETE: if (SnapBuildProcessChange(builder, xid, buf->origptr)) - DecodeDelete(ctx, buf); + DecodeHeapDelete(ctx, buf); break; case XLOG_HEAP_TRUNCATE: if (SnapBuildProcessChange(builder, xid, buf->origptr)) - DecodeTruncate(ctx, buf); + DecodeHeapTruncate(ctx, buf); break; case XLOG_HEAP_INPLACE: @@ -559,31 +565,71 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). */ void -logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +logical_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { - SnapBuild *builder = ctx->snapshot_builder; XLogReaderState *r = buf->record; + SnapBuild *builder = ctx->snapshot_builder; TransactionId xid = XLogRecGetXid(r); uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; - RepOriginId origin_id = XLogRecGetOrigin(r); - Snapshot snapshot; - xl_logical_message *message; - - if (info != XLOG_LOGICAL_MESSAGE) - elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info); - ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); /* * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding messages. + * point in decoding data changes. */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || ctx->fast_forward) return; + switch (info) + { + case XLOG_LOGICAL_MESSAGE: + DecodeLogicalMsg(ctx, buf); + break; + + case XLOG_LOGICAL_INSERT: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeLogicalInsert(ctx, buf); + break; + + case XLOG_LOGICAL_UPDATE: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeLogicalUpdate(ctx, buf); + break; + + case XLOG_LOGICAL_DELETE: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeLogicalDelete(ctx, buf); + break; + + case XLOG_LOGICAL_TRUNCATE: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeLogicalTruncate(ctx, buf); + break; + + default: + elog(ERROR, "unexpected RM_LOGICAL_ID record type: %u", info); + break; + } +} + +static void +DecodeLogicalMsg(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + SnapBuild *builder = ctx->snapshot_builder; + TransactionId xid = XLogRecGetXid(r); + RepOriginId origin_id = XLogRecGetOrigin(r); + Snapshot snapshot; + xl_logical_message *message; + message = (xl_logical_message *) XLogRecGetData(r); + if (message->transactional && + !SnapBuildProcessChange(builder, xid, buf->origptr)) + return; + if (message->dbId != ctx->slot->data.database || FilterByOrigin(ctx, origin_id)) return; @@ -603,6 +649,187 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * prefix */ message->message_size, message->message + message->prefix_size); + +} + +/* + * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs. + * + * Deletes can contain the new tuple. + */ +static void +DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + Size datalen = XLogRecGetDataLen(r) - SizeOfLogicalInsert; + char *tupledata = XLogRecGetData(r) + SizeOfLogicalInsert; + Size tuplelen = datalen - SizeOfHeapHeader; + xl_logical_insert *xlrec; + ReorderBufferChange *change; + + xlrec = (xl_logical_insert *) XLogRecGetData(r); + + /* only interested in our database */ + if (xlrec->node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_INSERT; + change->origin_id = XLogRecGetOrigin(r); + + memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode)); + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); + + DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple); + + change->data.tp.clear_toast_afterwards = true; + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); +} + +/* + * Parse XLOG_LOGICAL_UPDATE from wal into proper tuplebufs. + * + * Updates can possibly contain a new tuple and the old primary key. + */ +static void +DecodeLogicalUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + xl_logical_update *xlrec = (xl_logical_update *) XLogRecGetData(r); + char *new_tupledata = XLogRecGetData(r) + SizeOfLogicalUpdate; + Size new_datalen = xlrec->new_datalen; + Size new_tuplelen = new_datalen - SizeOfHeapHeader; + ReorderBufferChange *change; + + /* only interested in our database */ + if (xlrec->node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_UPDATE; + change->origin_id = XLogRecGetOrigin(r); + + memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode)); + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, new_tuplelen); + + DecodeXLogTuple(new_tupledata, new_datalen, change->data.tp.newtuple); + + if (xlrec->flags & XLL_UPDATE_CONTAINS_OLD) + { + char *old_tupledata = new_tupledata + new_datalen; + Size old_datalen; + Size old_tuplelen; + + /* remaining data is the old tuple */ + old_datalen = XLogRecGetDataLen(r) - new_datalen - SizeOfLogicalUpdate; + old_tuplelen = old_datalen - SizeOfHeapHeader; + + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, old_tuplelen); + + DecodeXLogTuple(old_tupledata, old_datalen, change->data.tp.oldtuple); + } + + change->data.tp.clear_toast_afterwards = true; + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); +} + +/* + * Parse XLOG_LOGICAL_DELETE from wal into proper tuplebufs. + * + * Deletes can possibly contain the old primary key. + */ +static void +DecodeLogicalDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + xl_logical_delete *xlrec; + ReorderBufferChange *change; + + xlrec = (xl_logical_delete *) XLogRecGetData(r); + + /* only interested in our database */ + if (xlrec->node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_DELETE; + change->origin_id = XLogRecGetOrigin(r); + + memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode)); + + if (xlrec->flags & XLL_DELETE_CONTAINS_OLD) + { + Size old_datalen = XLogRecGetDataLen(r) - SizeOfLogicalDelete; + char *old_tupledata = XLogRecGetData(r) + SizeOfLogicalDelete; + Size old_tuplelen = old_datalen - SizeOfHeapHeader; + + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, old_tuplelen); + + DecodeXLogTuple(old_tupledata, old_datalen, change->data.tp.oldtuple); + } + + change->data.tp.clear_toast_afterwards = true; + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); +} + +/* + * Parse XLOG_LOGICAL_TRUNCATE from wal + */ +static void +DecodeLogicalTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + xl_logical_truncate *xlrec; + ReorderBufferChange *change; + + xlrec = (xl_logical_truncate *) XLogRecGetData(r); + + /* only interested in our database */ + if (xlrec->dbId != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_TRUNCATE; + change->origin_id = XLogRecGetOrigin(r); + if (xlrec->flags & XLL_TRUNCATE_CASCADE) + change->data.truncate.cascade = true; + if (xlrec->flags & XLL_TRUNCATE_RESTART_SEQS) + change->data.truncate.restart_seqs = true; + change->data.truncate.nrelids = xlrec->nrelids; + change->data.truncate.relids = ReorderBufferGetRelids(ctx->reorder, + xlrec->nrelids); + memcpy(change->data.truncate.relids, xlrec->relids, + xlrec->nrelids * sizeof(Oid)); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), + buf->origptr, change, false); } /* @@ -839,7 +1066,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * Deletes can contain the new tuple. */ static void -DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { Size datalen; char *tupledata; @@ -898,7 +1125,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * Updates can possibly contain a new tuple and the old primary key. */ static void -DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +DecodeHeapUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { XLogReaderState *r = buf->record; xl_heap_update *xlrec; @@ -965,7 +1192,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * Deletes can possibly contain the old primary key. */ static void -DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +DecodeHeapDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { XLogReaderState *r = buf->record; xl_heap_delete *xlrec; @@ -1019,7 +1246,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * Parse XLOG_HEAP_TRUNCATE from wal */ static void -DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +DecodeHeapTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { XLogReaderState *r = buf->record; xl_heap_truncate *xlrec; diff --git a/src/backend/replication/logical/logical_xlog.c b/src/backend/replication/logical/logical_xlog.c new file mode 100644 index 00000000000..30ae3e73be6 --- /dev/null +++ b/src/backend/replication/logical/logical_xlog.c @@ -0,0 +1,399 @@ +/*------------------------------------------------------------------------- + * + * logical_xlog.c + * Logical xlog records. + * + * Copyright (c) 2013-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/logical_xlog.c + * + * Logical Messages + * + * Generic logical messages allow XLOG logging of arbitrary binary blobs that + * get passed to the logical decoding plugin. In normal XLOG processing they + * are same as NOOP. + * + * These messages can be either transactional or non-transactional. + * Transactional messages are part of current transaction and will be sent to + * decoding plugin using in a same way as DML operations. + * Non-transactional messages are sent to the plugin at the time when the + * logical decoding reads them from XLOG. This also means that transactional + * messages won't be delivered if the transaction was rolled back but the + * non-transactional one will always be delivered. + * + * Every message carries prefix to avoid conflicts between different decoding + * plugins. The plugin authors must take extra care to use unique prefix, + * good options seems to be for example to use the name of the extension. + * + * Logical Insert/Update/Delete/Truncate + * + * These records are intended to be used by non-heap table access methods that + * wish to support logical decoding and replication. They are treated + * similarly to the analogous heap records, but are not tied to physical pages + * or other details of the heap. These records are not processed during redo, + * so do not contribute to durability or physical replication; use generic WAL + * records for that. Note that using both logical WAL records and generic WAL + * records is redundant compared with the heap. + * + * --------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/heapam.h" +#include "access/heapam_xlog.h" +#include "access/xact.h" +#include "access/xloginsert.h" +#include "catalog/catalog.h" +#include "miscadmin.h" +#include "nodes/execnodes.h" +#include "replication/logical.h" +#include "replication/logical_xlog.h" +#include "utils/memutils.h" +#include "utils/relcache.h" + +static void CheckReplicaIdentity(Relation relation, TupleTableSlot *slot); + +/* + * Write logical decoding message into XLog. + */ +XLogRecPtr +LogLogicalMessage(const char *prefix, const char *message, size_t size, + bool transactional) +{ + xl_logical_message xlrec; + + /* + * Force xid to be allocated if we're emitting a transactional message. + */ + if (transactional) + { + Assert(IsTransactionState()); + GetCurrentTransactionId(); + } + + xlrec.dbId = MyDatabaseId; + xlrec.transactional = transactional; + /* trailing zero is critical; see logicalmsg_desc */ + xlrec.prefix_size = strlen(prefix) + 1; + xlrec.message_size = size; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage); + XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size); + XLogRegisterData(unconstify(char *, message), size); + + /* allow origin filtering */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + return XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_MESSAGE); +} + +/* + * Write logical insert into log. + */ +XLogRecPtr +LogLogicalInsert(Relation relation, TupleTableSlot *new_slot) +{ + bool free_new_tuple; + HeapTuple new_tuple; + xl_logical_insert xlrec; + xl_heap_header xlhdr; + XLogRecPtr recptr; + + if (!equalTupleDescs(relation->rd_att, new_slot->tts_tupleDescriptor)) + ereport(ERROR, (errmsg("record type must match relation type"))); + + if (!RelationIsLogicallyLogged(relation)) + return InvalidXLogRecPtr; + + new_tuple = ExecFetchSlotHeapTuple(new_slot, true, &free_new_tuple); + + /* force xid to be allocated */ + Assert(IsTransactionState()); + GetCurrentTransactionId(); + + new_tuple->t_tableOid = new_slot->tts_tableOid; + ItemPointerCopy(&new_tuple->t_self, &new_slot->tts_tid); + + xlrec.node = relation->rd_node; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfLogicalInsert); + + xlhdr.t_infomask2 = new_tuple->t_data->t_infomask2; + xlhdr.t_infomask = new_tuple->t_data->t_infomask; + xlhdr.t_hoff = new_tuple->t_data->t_hoff; + + XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader); + XLogRegisterData((char *) new_tuple->t_data + SizeofHeapTupleHeader, + new_tuple->t_len - SizeofHeapTupleHeader); + + /* allow origin filtering */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_INSERT); + + if (free_new_tuple) + pfree(new_tuple); + + return recptr; +} + +/* + * Write logical update into log. + */ +XLogRecPtr +LogLogicalUpdate(Relation relation, TupleTableSlot *old_slot, + TupleTableSlot *new_slot) +{ + HeapTuple old_whole_tuple; + HeapTuple old_id_tuple; + HeapTuple new_tuple; + bool free_old_whole_tuple; + bool free_old_id_tuple; + bool free_new_tuple; + xl_heap_header new_xlhdr; + xl_logical_update xlrec; + XLogRecPtr recptr; + + if (!equalTupleDescs(relation->rd_att, old_slot->tts_tupleDescriptor) || + !equalTupleDescs(relation->rd_att, new_slot->tts_tupleDescriptor)) + ereport(ERROR, (errmsg("record types must match relation type"))); + + CheckReplicaIdentity(relation, old_slot); + + if (!RelationIsLogicallyLogged(relation)) + return InvalidXLogRecPtr; + + /* force xid to be allocated */ + Assert(IsTransactionState()); + GetCurrentTransactionId(); + + old_whole_tuple = ExecFetchSlotHeapTuple(old_slot, true, + &free_old_whole_tuple); + new_tuple = ExecFetchSlotHeapTuple(new_slot, true, &free_new_tuple); + + xlrec.node = relation->rd_node; + xlrec.new_datalen = new_tuple->t_len - SizeofHeapTupleHeader + + SizeOfHeapHeader; + xlrec.flags = 0; + + old_id_tuple = ExtractReplicaIdentity(relation, old_whole_tuple, true, + &free_old_id_tuple); + + if (old_id_tuple != NULL) + { + xlrec.flags |= XLL_UPDATE_CONTAINS_OLD; + old_id_tuple->t_tableOid = old_slot->tts_tableOid; + ItemPointerCopy(&old_id_tuple->t_self, &old_slot->tts_tid); + } + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfLogicalUpdate); + + new_tuple->t_tableOid = new_slot->tts_tableOid; + ItemPointerCopy(&new_tuple->t_self, &new_slot->tts_tid); + + new_xlhdr.t_infomask2 = new_tuple->t_data->t_infomask2; + new_xlhdr.t_infomask = new_tuple->t_data->t_infomask; + new_xlhdr.t_hoff = new_tuple->t_data->t_hoff; + + /* write new tuple first, then old */ + XLogRegisterData((char *) &new_xlhdr, SizeOfHeapHeader); + XLogRegisterData((char *) new_tuple->t_data + SizeofHeapTupleHeader, + new_tuple->t_len - SizeofHeapTupleHeader); + + if (old_id_tuple != NULL) + { + xl_heap_header old_xlhdr; + + old_xlhdr.t_infomask2 = old_id_tuple->t_data->t_infomask2; + old_xlhdr.t_infomask = old_id_tuple->t_data->t_infomask; + old_xlhdr.t_hoff = old_id_tuple->t_data->t_hoff; + + XLogRegisterData((char *) &old_xlhdr, SizeOfHeapHeader); + XLogRegisterData((char *) old_id_tuple->t_data + SizeofHeapTupleHeader, + old_id_tuple->t_len - SizeofHeapTupleHeader); + } + + /* allow origin filtering */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_UPDATE); + + if (free_old_whole_tuple) + pfree(old_whole_tuple); + if (free_old_id_tuple) + pfree(old_id_tuple); + if (free_new_tuple) + pfree(new_tuple); + + return recptr; +} + +/* + * Write logical update into log. + */ +XLogRecPtr +LogLogicalDelete(Relation relation, TupleTableSlot *old_slot) +{ + HeapTuple old_whole_tuple; + HeapTuple old_id_tuple; + bool free_old_whole_tuple; + bool free_old_id_tuple; + xl_logical_delete xlrec; + XLogRecPtr recptr; + + if (!equalTupleDescs(relation->rd_att, old_slot->tts_tupleDescriptor)) + ereport(ERROR, (errmsg("record type must match relation type"))); + + CheckReplicaIdentity(relation, old_slot); + + if (!RelationIsLogicallyLogged(relation)) + return InvalidXLogRecPtr; + + /* force xid to be allocated */ + Assert(IsTransactionState()); + GetCurrentTransactionId(); + + old_whole_tuple = ExecFetchSlotHeapTuple(old_slot, true, + &free_old_whole_tuple); + + xlrec.node = relation->rd_node; + xlrec.flags = 0; + + old_id_tuple = ExtractReplicaIdentity(relation, old_whole_tuple, true, + &free_old_id_tuple); + + if (old_id_tuple != NULL) + { + xlrec.flags |= XLL_UPDATE_CONTAINS_OLD; + old_id_tuple->t_tableOid = old_slot->tts_tableOid; + ItemPointerCopy(&old_id_tuple->t_self, &old_slot->tts_tid); + } + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfLogicalDelete); + + if (old_id_tuple != NULL) + { + xl_heap_header old_xlhdr; + + old_xlhdr.t_infomask2 = old_id_tuple->t_data->t_infomask2; + old_xlhdr.t_infomask = old_id_tuple->t_data->t_infomask; + old_xlhdr.t_hoff = old_id_tuple->t_data->t_hoff; + + XLogRegisterData((char *) &old_xlhdr, SizeOfHeapHeader); + XLogRegisterData((char *) old_id_tuple->t_data + SizeofHeapTupleHeader, + old_id_tuple->t_len - SizeofHeapTupleHeader); + } + + /* allow origin filtering */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_DELETE); + + if (free_old_whole_tuple) + pfree(old_whole_tuple); + if (free_old_id_tuple) + pfree(old_id_tuple); + + return recptr; +} + +/* + * Write logical truncate into log. + */ +XLogRecPtr +LogLogicalTruncate(List *relids, bool cascade, bool restart_seqs) +{ + xl_logical_truncate xlrec; + XLogRecPtr recptr; + Oid *logrelids; + ListCell *lc; + int nrelids = 0; + + /* force xid to be allocated */ + Assert(IsTransactionState()); + GetCurrentTransactionId(); + + xlrec.dbId = MyDatabaseId; + xlrec.nrelids = list_length(relids); + xlrec.flags = 0; + if (cascade) + xlrec.flags |= XLL_TRUNCATE_CASCADE; + if (restart_seqs) + xlrec.flags |= XLL_TRUNCATE_RESTART_SEQS; + + logrelids = palloc(list_length(relids) * sizeof(Oid)); + foreach(lc, relids) + { + Oid relid = lfirst_oid(lc); + Relation rel = relation_open(relid, AccessShareLock); + + if (RelationIsLogicallyLogged(rel)) + { + logrelids[nrelids++] = lfirst_oid(lc); + } + + relation_close(rel, NoLock); + } + + if (nrelids == 0) + return InvalidXLogRecPtr; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfLogicalTruncate); + XLogRegisterData((char *) logrelids, nrelids * sizeof(Oid)); + + /* allow origin filtering */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_TRUNCATE); + + return recptr; +} + +/* + * Redo is basically just noop for logical decoding messages. + */ +void +logical_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + switch (info) + { + case XLOG_LOGICAL_MESSAGE: + case XLOG_LOGICAL_INSERT: + case XLOG_LOGICAL_UPDATE: + case XLOG_LOGICAL_DELETE: + case XLOG_LOGICAL_TRUNCATE: + break; + default: + elog(PANIC, "logical_redo: unknown op code %u", info); + } + + /* This is only interesting for logical decoding, see decode.c. */ +} + +/* + * Check that replica identity columns are non-NULL. + */ +static void +CheckReplicaIdentity(Relation relation, TupleTableSlot *slot) +{ + /* check for NULL attributes in the replica identity */ + Bitmapset *id_attrs = RelationGetIndexAttrBitmap( + relation, INDEX_ATTR_BITMAP_IDENTITY_KEY); + int id_attr = (-1) * FirstLowInvalidHeapAttributeNumber; + + while ((id_attr = bms_next_member(id_attrs, id_attr)) >= 0) + { + AttrNumber attno = id_attr + FirstLowInvalidHeapAttributeNumber; + if (slot_attisnull(slot, attno)) + ereport(ERROR, (errmsg("replica identity column is NULL"))); + } +} diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 6058d36e0d5..2776861ab8d 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -17,6 +17,7 @@ #include <unistd.h> +#include "access/relation.h" #include "access/xact.h" #include "access/xlog_internal.h" #include "access/xlogrecovery.h" @@ -29,7 +30,7 @@ #include "nodes/makefuncs.h" #include "replication/decode.h" #include "replication/logical.h" -#include "replication/message.h" +#include "replication/logical_xlog.h" #include "storage/fd.h" #include "utils/array.h" #include "utils/builtins.h" @@ -389,3 +390,165 @@ pg_logical_emit_message_text(PG_FUNCTION_ARGS) /* bytea and text are compatible */ return pg_logical_emit_message_bytea(fcinfo); } + +/* + * SQL function for writing logical insert into WAL. + */ +Datum +pg_logical_emit_insert(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + HeapTupleHeader htup = PG_GETARG_HEAPTUPLEHEADER(1); + Relation rel = relation_open(relid, AccessShareLock); + HeapTupleData tuple; + TupleTableSlot *slot; + Oid rel_type; + Oid tup_type; + XLogRecPtr lsn; + + /* check that tuple matches the type of the relation */ + rel_type = get_rel_type_id(relid); + tup_type = HeapTupleHeaderGetTypeId(htup); + if (rel_type != tup_type) + ereport(ERROR, (errmsg("record type must match table type"))); + + tuple.t_len = HeapTupleHeaderGetDatumLength(htup); + ItemPointerSetInvalid(&(tuple.t_self)); + tuple.t_tableOid = relid; + tuple.t_data = htup; + + slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple); + ExecClearTuple(slot); + ExecStoreHeapTuple(&tuple, slot, false); + + lsn = LogLogicalInsert(rel, slot); + + ExecDropSingleTupleTableSlot(slot); + + relation_close(rel, NoLock); + + PG_RETURN_LSN(lsn); +} + +/* + * SQL function for writing logical update into WAL. + */ +Datum +pg_logical_emit_update(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + HeapTupleHeader old_htup = PG_GETARG_HEAPTUPLEHEADER(1); + HeapTupleHeader new_htup = PG_GETARG_HEAPTUPLEHEADER(2); + Relation rel = relation_open(relid, AccessShareLock); + HeapTupleData old_tuple; + HeapTupleData new_tuple; + TupleTableSlot *old_slot; + TupleTableSlot *new_slot; + Oid rel_type; + Oid old_tup_type; + Oid new_tup_type; + XLogRecPtr lsn; + + /* check that tuple matches the type of the relation */ + rel_type = get_rel_type_id(relid); + old_tup_type = HeapTupleHeaderGetTypeId(old_htup); + new_tup_type = HeapTupleHeaderGetTypeId(new_htup); + + if (rel_type != old_tup_type || rel_type != new_tup_type) + ereport(ERROR, (errmsg("record type must match table type"))); + + old_tuple.t_len = HeapTupleHeaderGetDatumLength(old_htup); + ItemPointerSetInvalid(&(old_tuple.t_self)); + old_tuple.t_tableOid = relid; + old_tuple.t_data = old_htup; + + new_tuple.t_len = HeapTupleHeaderGetDatumLength(new_htup); + ItemPointerSetInvalid(&(new_tuple.t_self)); + new_tuple.t_tableOid = relid; + new_tuple.t_data = new_htup; + + old_slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple); + new_slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple); + ExecClearTuple(old_slot); + ExecClearTuple(new_slot); + ExecStoreHeapTuple(&old_tuple, old_slot, false); + ExecStoreHeapTuple(&new_tuple, new_slot, false); + + lsn = LogLogicalUpdate(rel, old_slot, new_slot); + + ExecDropSingleTupleTableSlot(old_slot); + ExecDropSingleTupleTableSlot(new_slot); + + relation_close(rel, NoLock); + + PG_RETURN_LSN(lsn); +} + +/* + * SQL function for writing logical delete into WAL. + */ +Datum +pg_logical_emit_delete(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + HeapTupleHeader htup = PG_GETARG_HEAPTUPLEHEADER(1); + Relation rel = relation_open(relid, AccessShareLock); + HeapTupleData tuple; + TupleTableSlot *slot; + Oid rel_type; + Oid tup_type; + XLogRecPtr lsn; + + /* check that tuple matches the type of the relation */ + rel_type = get_rel_type_id(relid); + tup_type = HeapTupleHeaderGetTypeId(htup); + if (rel_type != tup_type) + ereport(ERROR, (errmsg("record type must match table type"))); + + tuple.t_len = HeapTupleHeaderGetDatumLength(htup); + ItemPointerSetInvalid(&(tuple.t_self)); + tuple.t_tableOid = relid; + tuple.t_data = htup; + + slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple); + ExecClearTuple(slot); + ExecStoreHeapTuple(&tuple, slot, false); + + lsn = LogLogicalDelete(rel, slot); + + ExecDropSingleTupleTableSlot(slot); + + relation_close(rel, NoLock); + + PG_RETURN_LSN(lsn); +} + +/* + * SQL function for writing logical truncate into WAL. + */ +Datum +pg_logical_emit_truncate(PG_FUNCTION_ARGS) +{ + ArrayType *arr = PG_GETARG_ARRAYTYPE_P(0); + bool cascade = PG_GETARG_BOOL(1); + bool restart_seqs = PG_GETARG_BOOL(2); + Datum *values; + bool *nulls; + int nrelids; + List *relids = NIL; + XLogRecPtr lsn; + + deconstruct_array(arr, REGCLASSOID, sizeof(Oid), true, TYPALIGN_INT, + &values, &nulls, &nrelids); + + for (int i = 0; i < nrelids; i++) + { + if (nulls[i]) + ereport(ERROR, (errmsg("unexpected NULL element"))); + relids = lappend_oid(relids, DatumGetObjectId(values[i])); + } + + lsn = LogLogicalTruncate(relids, cascade, restart_seqs); + + PG_RETURN_LSN(lsn); +} diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c deleted file mode 100644 index 1c34912610e..00000000000 --- a/src/backend/replication/logical/message.c +++ /dev/null @@ -1,89 +0,0 @@ -/*------------------------------------------------------------------------- - * - * message.c - * Generic logical messages. - * - * Copyright (c) 2013-2022, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/backend/replication/logical/message.c - * - * NOTES - * - * Generic logical messages allow XLOG logging of arbitrary binary blobs that - * get passed to the logical decoding plugin. In normal XLOG processing they - * are same as NOOP. - * - * These messages can be either transactional or non-transactional. - * Transactional messages are part of current transaction and will be sent to - * decoding plugin using in a same way as DML operations. - * Non-transactional messages are sent to the plugin at the time when the - * logical decoding reads them from XLOG. This also means that transactional - * messages won't be delivered if the transaction was rolled back but the - * non-transactional one will always be delivered. - * - * Every message carries prefix to avoid conflicts between different decoding - * plugins. The plugin authors must take extra care to use unique prefix, - * good options seems to be for example to use the name of the extension. - * - * --------------------------------------------------------------------------- - */ - -#include "postgres.h" - -#include "access/xact.h" -#include "access/xloginsert.h" -#include "miscadmin.h" -#include "nodes/execnodes.h" -#include "replication/logical.h" -#include "replication/message.h" -#include "utils/memutils.h" - -/* - * Write logical decoding message into XLog. - */ -XLogRecPtr -LogLogicalMessage(const char *prefix, const char *message, size_t size, - bool transactional) -{ - xl_logical_message xlrec; - - /* - * Force xid to be allocated if we're emitting a transactional message. - */ - if (transactional) - { - Assert(IsTransactionState()); - GetCurrentTransactionId(); - } - - xlrec.dbId = MyDatabaseId; - xlrec.transactional = transactional; - /* trailing zero is critical; see logicalmsg_desc */ - xlrec.prefix_size = strlen(prefix) + 1; - xlrec.message_size = size; - - XLogBeginInsert(); - XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage); - XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size); - XLogRegisterData(unconstify(char *, message), size); - - /* allow origin filtering */ - XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); - - return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); -} - -/* - * Redo is basically just noop for logical decoding messages. - */ -void -logicalmsg_redo(XLogReaderState *record) -{ - uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; - - if (info != XLOG_LOGICAL_MESSAGE) - elog(PANIC, "logicalmsg_redo: unknown op code %u", info); - - /* This is only interesting for logical decoding, see decode.c. */ -} diff --git a/src/bin/pg_waldump/.gitignore b/src/bin/pg_waldump/.gitignore index 3be00a8b61f..567655fa626 100644 --- a/src/bin/pg_waldump/.gitignore +++ b/src/bin/pg_waldump/.gitignore @@ -10,7 +10,7 @@ /gistdesc.c /hashdesc.c /heapdesc.c -/logicalmsgdesc.c +/logicaldesc.c /mxactdesc.c /nbtdesc.c /relmapdesc.c diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 6a4ebd1310b..86804a243bc 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -26,7 +26,7 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" -#include "replication/message.h" +#include "replication/logical_xlog.h" #include "replication/origin.h" #include "rmgrdesc.h" #include "storage/standbydefs.h" diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index b46ab7d7390..77fa0337f6e 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -175,6 +175,8 @@ extern void simple_heap_insert(Relation relation, HeapTuple tup); extern void simple_heap_delete(Relation relation, ItemPointer tid); extern void simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup); +extern HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, + bool key_required, bool *copy); extern TransactionId heap_index_delete_tuples(Relation rel, TM_IndexDeleteOp *delstate); diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 5c47fdcec80..5460f7836f9 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -124,6 +124,9 @@ typedef struct xl_heap_delete * For truncate we list all truncated relids in an array, followed by all * sequence relids that need to be restarted, if any. * All rels are always within the same database, so we just list dbid once. + * + * Note: identical to xl_logical_truncate, except that for + * xl_logical_truncate, no redo is performed. */ typedef struct xl_heap_truncate { diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index cf8b6d48193..d43175088fb 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -46,4 +46,4 @@ PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, bri PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL) PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) -PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) +PG_RMGR(RM_LOGICAL_ID, "Logical", logical_redo, logical_desc, logical_identify, NULL, NULL, NULL, logical_decode) diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 01e1dd4d6d1..cd57c2b3aa2 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10937,6 +10937,23 @@ proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u', prorettype => 'pg_lsn', proargtypes => 'bool text bytea', prosrc => 'pg_logical_emit_message_bytea' }, +{ oid => '9297', descr => 'emit a logical insert', + proname => 'pg_logical_emit_insert', provolatile => 'v', proparallel => 'u', + prorettype => 'pg_lsn', proargtypes => 'regclass record', + prosrc => 'pg_logical_emit_insert' }, +{ oid => '9298', descr => 'emit a logical update', + proname => 'pg_logical_emit_update', provolatile => 'v', proparallel => 'u', + prorettype => 'pg_lsn', proargtypes => 'regclass record record', + prosrc => 'pg_logical_emit_update' }, +{ oid => '9299', descr => 'emit a logical delete', + proname => 'pg_logical_emit_delete', provolatile => 'v', proparallel => 'u', + prorettype => 'pg_lsn', proargtypes => 'regclass record', + prosrc => 'pg_logical_emit_delete' }, +{ oid => '9300', descr => 'emit a logical truncate', + proname => 'pg_logical_emit_truncate', provolatile => 'v', proparallel => 'u', + prorettype => 'pg_lsn', proargtypes => '_regclass bool bool', + prosrc => 'pg_logical_emit_truncate' }, + # event triggers { oid => '3566', descr => 'list objects dropped by the current command', diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index 8e07bb7409a..118590ad61e 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -26,7 +26,7 @@ extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void logical_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, diff --git a/src/include/replication/logical_xlog.h b/src/include/replication/logical_xlog.h new file mode 100644 index 00000000000..dce74f71111 --- /dev/null +++ b/src/include/replication/logical_xlog.h @@ -0,0 +1,124 @@ +/*------------------------------------------------------------------------- + * logical_xlog.h + * Exports from replication/logical/logical_xlog.c + * + * Copyright (c) 2013-2022, PostgreSQL Global Development Group + * + * src/include/replication/logical_xlog.h + *------------------------------------------------------------------------- + */ +#ifndef PG_LOGICAL_MESSAGE_H +#define PG_LOGICAL_MESSAGE_H + +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "access/xlogreader.h" +#include "storage/off.h" +#include "utils/rel.h" + +/* + * xl_heap_update flag values, 8 bits are available. + */ +#define XLL_UPDATE_CONTAINS_OLD (1<<0) + +/* + * xl_heap_delete flag values, 8 bits are available. + */ +#define XLL_DELETE_CONTAINS_OLD (1<<0) + +/* + * Generic logical decoding message wal record. + */ +typedef struct xl_logical_message +{ + Oid dbId; /* database Oid emitted from */ + bool transactional; /* is message transactional? */ + Size prefix_size; /* length of prefix */ + Size message_size; /* size of the message */ + /* payload, including null-terminated prefix of length prefix_size */ + char message[FLEXIBLE_ARRAY_MEMBER]; +} xl_logical_message; + +#define SizeOfLogicalMessage (offsetof(xl_logical_message, message)) + +/* This is what we need to know about insert */ +typedef struct xl_logical_insert +{ + RelFileNode node; + + /* tuple data follows */ +} xl_logical_insert; + +#define SizeOfLogicalInsert (offsetof(xl_logical_insert, node) + sizeof(RelFileNode)) + +/* + * This is what we need to know about update. + */ +typedef struct xl_logical_update +{ + RelFileNode node; + Size new_datalen; + uint8 flags; + + /* tuple data follows */ +} xl_logical_update; + +#define SizeOfLogicalUpdate (offsetof(xl_logical_update, flags) + sizeof(uint8)) + +/* This is what we need to know about delete */ +typedef struct xl_logical_delete +{ + RelFileNode node; + uint8 flags; + + /* tuple data follows */ +} xl_logical_delete; + +#define SizeOfLogicalDelete (offsetof(xl_logical_delete, flags) + sizeof(uint8)) + +/* + * For truncate we list all truncated relids in an array, followed by all + * sequence relids that need to be restarted, if any. + * All rels are always within the same database, so we just list dbid once. + * + * Note: identical to xl_logical_truncate, except that no redo is performed, only + * decoding. + */ +typedef struct xl_logical_truncate +{ + Oid dbId; + uint32 nrelids; + uint8 flags; + Oid relids[FLEXIBLE_ARRAY_MEMBER]; +} xl_logical_truncate; + +#define SizeOfLogicalTruncate (offsetof(xl_logical_truncate, relids)) + +struct TupleTableSlot; + +extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, + size_t size, bool transactional); +extern XLogRecPtr LogLogicalInsert(Relation relation, struct TupleTableSlot *slot); +extern XLogRecPtr LogLogicalUpdate(Relation relation, struct TupleTableSlot *old_slot, + struct TupleTableSlot *new_slot); +extern XLogRecPtr LogLogicalDelete(Relation relation, struct TupleTableSlot *slot); +extern XLogRecPtr LogLogicalTruncate(List *relids, bool cascade, bool restart_seqs); + +/* RMGR API*/ +#define XLOG_LOGICAL_MESSAGE 0x00 +#define XLOG_LOGICAL_INSERT 0x10 +#define XLOG_LOGICAL_UPDATE 0x20 +#define XLOG_LOGICAL_DELETE 0x30 +#define XLOG_LOGICAL_TRUNCATE 0x40 + +/* + * xl_logical_truncate flag values, 8 bits are available. + */ +#define XLL_TRUNCATE_CASCADE (1<<0) +#define XLL_TRUNCATE_RESTART_SEQS (1<<1) + +void logical_redo(XLogReaderState *record); +void logical_desc(StringInfo buf, XLogReaderState *record); +const char *logical_identify(uint8 info); + +#endif /* PG_LOGICAL_MESSAGE_H */ diff --git a/src/include/replication/message.h b/src/include/replication/message.h deleted file mode 100644 index 7d7785292f1..00000000000 --- a/src/include/replication/message.h +++ /dev/null @@ -1,41 +0,0 @@ -/*------------------------------------------------------------------------- - * message.h - * Exports from replication/logical/message.c - * - * Copyright (c) 2013-2022, PostgreSQL Global Development Group - * - * src/include/replication/message.h - *------------------------------------------------------------------------- - */ -#ifndef PG_LOGICAL_MESSAGE_H -#define PG_LOGICAL_MESSAGE_H - -#include "access/xlog.h" -#include "access/xlogdefs.h" -#include "access/xlogreader.h" - -/* - * Generic logical decoding message wal record. - */ -typedef struct xl_logical_message -{ - Oid dbId; /* database Oid emitted from */ - bool transactional; /* is message transactional? */ - Size prefix_size; /* length of prefix */ - Size message_size; /* size of the message */ - /* payload, including null-terminated prefix of length prefix_size */ - char message[FLEXIBLE_ARRAY_MEMBER]; -} xl_logical_message; - -#define SizeOfLogicalMessage (offsetof(xl_logical_message, message)) - -extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, - size_t size, bool transactional); - -/* RMGR API*/ -#define XLOG_LOGICAL_MESSAGE 0x00 -void logicalmsg_redo(XLogReaderState *record); -void logicalmsg_desc(StringInfo buf, XLogReaderState *record); -const char *logicalmsg_identify(uint8 info); - -#endif /* PG_LOGICAL_MESSAGE_H */ -- 2.17.1