On Thu, 2022-02-24 at 20:35 +0000, Simon Riggs wrote:
> The approach is perfectly fine, it just needs to be finished and
> rebased.

Attached a new version. The scope expanded, so this is likely to slip
v15 at this late time. For 15, I'll focus on my extensible rmgr work,
which can serve similar purposes.

The main purpose of this patch is to be able to emit logical events for
a table (insert/update/delete/truncate) without actually modifying a
table or relying on the heap at all. That allows table AMs to support
logical decoding/replication, and perhaps allows a few other
interesting use cases (maybe foreign tables?). There are really two
advantages of this approach over relying on a custom rmgr:

  1. Easier for extension authors
  2. Doesn't require an extension module to be loaded to start the
server

Those are very nice advantages, but they come at the price of
flexibility and performance. I think there's room for both, and we can
discuss the merits individually.

Changes:
  * Support logical messages for INSERT/UPDATE/DELETE/TRUNCATE
    (before it only supported INSERT)
  * SQL functions pg_logical_emit_insert/update/delete/truncate
    (callable by superuser)
  * Tests (using test_decoding)
  * Use replica identities properly
  * In general a lot of cleanup, but still not quite ready

TODO:
  * Should SQL functions be callable by the table owner? I would lean
toward superuser-only, because logical replication is used for
administrative purposes like upgrades, and I don't think we want table
owners to be able to create inconsistencies.
  * Support for multi-insert
  * Docs for SQL functions, and maybe docs in the section on Generic
WAL
  * Try to get away from reliance on heap tuples specifically

Regards,
        Jeff Davis

From f7b85aea60b06eb7019befec38566b5e014bea12 Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Sat, 30 Oct 2021 12:07:35 -0700
Subject: [PATCH] Logical wal.

---
 contrib/test_decoding/expected/messages.out   | 148 +++++++
 contrib/test_decoding/sql/messages.sql        |  58 +++
 src/backend/access/heap/heapam.c              |   4 +-
 src/backend/access/rmgrdesc/Makefile          |   2 +-
 .../{logicalmsgdesc.c => logicaldesc.c}       |  45 +-
 src/backend/access/transam/rmgr.c             |   2 +-
 src/backend/replication/logical/Makefile      |   2 +-
 src/backend/replication/logical/decode.c      | 275 ++++++++++--
 .../replication/logical/logical_xlog.c        | 399 ++++++++++++++++++
 .../replication/logical/logicalfuncs.c        | 165 +++++++-
 src/backend/replication/logical/message.c     |  89 ----
 src/bin/pg_waldump/.gitignore                 |   2 +-
 src/bin/pg_waldump/rmgrdesc.c                 |   2 +-
 src/include/access/heapam.h                   |   2 +
 src/include/access/heapam_xlog.h              |   3 +
 src/include/access/rmgrlist.h                 |   2 +-
 src/include/catalog/pg_proc.dat               |  17 +
 src/include/replication/decode.h              |   2 +-
 src/include/replication/logical_xlog.h        | 124 ++++++
 src/include/replication/message.h             |  41 --
 20 files changed, 1211 insertions(+), 173 deletions(-)
 rename src/backend/access/rmgrdesc/{logicalmsgdesc.c => logicaldesc.c} (59%)
 create mode 100644 src/backend/replication/logical/logical_xlog.c
 delete mode 100644 src/backend/replication/logical/message.c
 create mode 100644 src/include/replication/logical_xlog.h
 delete mode 100644 src/include/replication/message.h

diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
index c75d40190b6..aa284bc37c2 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -91,6 +91,154 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'for
 ------
 (0 rows)
 
+-- no data in this table, but emit logical INSERT/UPDATE/DELETE for it
+CREATE TABLE dummy(i int, t text, n numeric, primary key(t));
+SELECT pg_logical_emit_insert('dummy', row(1, 'one', 0.1)::dummy) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT pg_logical_emit_insert('dummy', row(2, 'two', 0.2)::dummy) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+                                 data                                  
+-----------------------------------------------------------------------
+ BEGIN
+ table public.dummy: INSERT: i[integer]:1 t[text]:'one' n[numeric]:0.1
+ COMMIT
+ BEGIN
+ table public.dummy: INSERT: i[integer]:2 t[text]:'two' n[numeric]:0.2
+ COMMIT
+(6 rows)
+
+SELECT * FROM dummy;
+ i | t | n 
+---+---+---
+(0 rows)
+
+SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy,
+       row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+                                                       data                                                        
+-------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.dummy: DELETE: t[text]:'twelve'
+ COMMIT
+ BEGIN
+ table public.dummy: UPDATE: old-key: t[text]:'fifteen' new-tuple: i[integer]:16 t[text]:'sixteen' n[numeric]:0.16
+ COMMIT
+(6 rows)
+
+ALTER TABLE dummy REPLICA IDENTITY NOTHING;
+SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy,
+       row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+                                    data                                     
+-----------------------------------------------------------------------------
+ BEGIN
+ table public.dummy: DELETE: (no-tuple-data)
+ COMMIT
+ BEGIN
+ table public.dummy: UPDATE: i[integer]:16 t[text]:'sixteen' n[numeric]:0.16
+ COMMIT
+(6 rows)
+
+SELECT pg_logical_emit_truncate(ARRAY['dummy'], true, false) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+                 data                  
+---------------------------------------
+ BEGIN
+ table public.dummy: TRUNCATE: cascade
+ COMMIT
+(3 rows)
+
+CREATE UNLOGGED TABLE dummy_u(i int, t text, n numeric, primary key (t));
+-- return invalid
+SELECT pg_logical_emit_insert('dummy_u', row(7, 'seven', 0.7)::dummy_u);
+ pg_logical_emit_insert 
+------------------------
+ 0/0
+(1 row)
+
+-- return invalid
+SELECT pg_logical_emit_update('dummy_u', row(7, 'seven', 0.7)::dummy_u,
+       row(11, 'eleven', 0.11)::dummy_u);
+ pg_logical_emit_update 
+------------------------
+ 0/0
+(1 row)
+
+-- return invalid
+SELECT pg_logical_emit_update('dummy_u', row(NULL, 'seven', 0.7)::dummy_u,
+       row(11, 'eleven', 0.11)::dummy_u);
+ pg_logical_emit_update 
+------------------------
+ 0/0
+(1 row)
+
+-- error
+SELECT pg_logical_emit_update('dummy_u', row(7, NULL, 0.7)::dummy_u,
+       row(11, 'eleven', 0.11)::dummy_u);
+ERROR:  replica identity column is NULL
+-- return invalid
+SELECT pg_logical_emit_delete('dummy_u', row(7, 'seven', 0.7)::dummy_u);
+ pg_logical_emit_delete 
+------------------------
+ 0/0
+(1 row)
+
+-- error
+SELECT pg_logical_emit_delete('dummy_u', row(7, NULL, 0.7)::dummy_u);
+ERROR:  replica identity column is NULL
+-- return invalid
+SELECT pg_logical_emit_truncate(ARRAY['dummy_u'], false, false);
+ pg_logical_emit_truncate 
+--------------------------
+ 0/0
+(1 row)
+
+SELECT pg_logical_emit_truncate(ARRAY['dummy','dummy_u'], false, true) <> '0/0'::pg_lsn;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+ERROR:  could not open relation with OID 1663
+DROP TABLE dummy;
+DROP TABLE dummy_u;
 SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
  ?column? 
 ----------
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
index cf3f7738e57..c04a2e1a0c8 100644
--- a/contrib/test_decoding/sql/messages.sql
+++ b/contrib/test_decoding/sql/messages.sql
@@ -31,4 +31,62 @@ SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
 \c :prevdb
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
 
+-- no data in this table, but emit logical INSERT/UPDATE/DELETE for it
+CREATE TABLE dummy(i int, t text, n numeric, primary key(t));
+
+SELECT pg_logical_emit_insert('dummy', row(1, 'one', 0.1)::dummy) <> '0/0'::pg_lsn;
+SELECT pg_logical_emit_insert('dummy', row(2, 'two', 0.2)::dummy) <> '0/0'::pg_lsn;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+
+SELECT * FROM dummy;
+
+SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn;
+
+SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy,
+       row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+
+ALTER TABLE dummy REPLICA IDENTITY NOTHING;
+
+SELECT pg_logical_emit_delete('dummy', row(12, 'twelve', 0.12)::dummy) <> '0/0'::pg_lsn;
+
+SELECT pg_logical_emit_update('dummy', row(15, 'fifteen', 0.15)::dummy,
+       row(16, 'sixteen', 0.16)::dummy) <> '0/0'::pg_lsn;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+
+SELECT pg_logical_emit_truncate(ARRAY['dummy'], true, false) <> '0/0'::pg_lsn;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+
+CREATE UNLOGGED TABLE dummy_u(i int, t text, n numeric, primary key (t));
+-- return invalid
+SELECT pg_logical_emit_insert('dummy_u', row(7, 'seven', 0.7)::dummy_u);
+-- return invalid
+SELECT pg_logical_emit_update('dummy_u', row(7, 'seven', 0.7)::dummy_u,
+       row(11, 'eleven', 0.11)::dummy_u);
+-- return invalid
+SELECT pg_logical_emit_update('dummy_u', row(NULL, 'seven', 0.7)::dummy_u,
+       row(11, 'eleven', 0.11)::dummy_u);
+-- error
+SELECT pg_logical_emit_update('dummy_u', row(7, NULL, 0.7)::dummy_u,
+       row(11, 'eleven', 0.11)::dummy_u);
+-- return invalid
+SELECT pg_logical_emit_delete('dummy_u', row(7, 'seven', 0.7)::dummy_u);
+-- error
+SELECT pg_logical_emit_delete('dummy_u', row(7, NULL, 0.7)::dummy_u);
+
+-- return invalid
+SELECT pg_logical_emit_truncate(ARRAY['dummy_u'], false, false);
+
+SELECT pg_logical_emit_truncate(ARRAY['dummy','dummy_u'], false, true) <> '0/0'::pg_lsn;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
+
+DROP TABLE dummy;
+
+DROP TABLE dummy_u;
+
 SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 74ad445e59b..9d70598dbde 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -108,8 +108,6 @@ static bool ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status
 static void index_delete_sort(TM_IndexDeleteOp *delstate);
 static int	bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
 static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
-static HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, bool key_required,
-										bool *copy);
 
 
 /*
@@ -8372,7 +8370,7 @@ log_heap_new_cid(Relation relation, HeapTuple tup)
  * *copy is set to true if the returned tuple is a modified copy rather than
  * the same tuple that was passed in.
  */
-static HeapTuple
+HeapTuple
 ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 					   bool *copy)
 {
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index f88d72fd862..ed6dff179be 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -18,7 +18,7 @@ OBJS = \
 	gistdesc.o \
 	hashdesc.o \
 	heapdesc.o \
-	logicalmsgdesc.o \
+	logicaldesc.o \
 	mxactdesc.o \
 	nbtdesc.o \
 	relmapdesc.o \
diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c b/src/backend/access/rmgrdesc/logicaldesc.c
similarity index 59%
rename from src/backend/access/rmgrdesc/logicalmsgdesc.c
rename to src/backend/access/rmgrdesc/logicaldesc.c
index 099e11a84e7..c2b0434a606 100644
--- a/src/backend/access/rmgrdesc/logicalmsgdesc.c
+++ b/src/backend/access/rmgrdesc/logicaldesc.c
@@ -1,22 +1,22 @@
 /*-------------------------------------------------------------------------
  *
- * logicalmsgdesc.c
- *	  rmgr descriptor routines for replication/logical/message.c
+ * logicaldesc.c
+ *	  rmgr descriptor routines for replication/logical/logical_xlog.c
  *
  * Portions Copyright (c) 2015-2022, PostgreSQL Global Development Group
  *
  *
  * IDENTIFICATION
- *	  src/backend/access/rmgrdesc/logicalmsgdesc.c
+ *	  src/backend/access/rmgrdesc/logicaldesc.c
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 
 void
-logicalmsg_desc(StringInfo buf, XLogReaderState *record)
+logical_desc(StringInfo buf, XLogReaderState *record)
 {
 	char	   *rec = XLogRecGetData(record);
 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
@@ -40,13 +40,42 @@ logicalmsg_desc(StringInfo buf, XLogReaderState *record)
 			sep = " ";
 		}
 	}
+	else if (info == XLOG_LOGICAL_INSERT)
+	{
+
+	}
+	else if (info == XLOG_LOGICAL_UPDATE)
+	{
+
+	}
+	else if (info == XLOG_LOGICAL_DELETE)
+	{
+
+	}
+	else if (info == XLOG_LOGICAL_TRUNCATE)
+	{
+
+	}
 }
 
 const char *
-logicalmsg_identify(uint8 info)
+logical_identify(uint8 info)
 {
-	if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
-		return "MESSAGE";
+	switch (info)
+	{
+		case XLOG_LOGICAL_MESSAGE:
+			return "MESSAGE";
+		case XLOG_LOGICAL_INSERT:
+			return "INSERT";
+		case XLOG_LOGICAL_UPDATE:
+			return "UPDATE";
+		case XLOG_LOGICAL_DELETE:
+			return "DELETE";
+		case XLOG_LOGICAL_TRUNCATE:
+			return "TRUNCATE";
+		default:
+			return NULL;
+	}
 
 	return NULL;
 }
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index f8847d5aebf..ca0bd614fcf 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -25,7 +25,7 @@
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
 #include "replication/decode.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c4e2fdeb719..9fa281a1dca 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -19,7 +19,7 @@ OBJS = \
 	launcher.o \
 	logical.o \
 	logicalfuncs.o \
-	message.o \
+	logical_xlog.o \
 	origin.o \
 	proto.o \
 	relation.o \
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 77bc7aea7a0..81feca97eb0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -35,23 +35,29 @@
 #include "access/xlogrecord.h"
 #include "access/xlogutils.h"
 #include "catalog/pg_control.h"
+#include "commands/sequence.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 #include "replication/origin.h"
 #include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
 #include "storage/standby.h"
-#include "commands/sequence.h"
 
 /* individual record(group)'s handlers */
-static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
+static void DecodeLogicalMsg(LogicalDecodingContext *cxt, XLogRecordBuffer *buf);
+static void DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
 static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						 xl_xact_parsed_commit *parsed, TransactionId xid,
 						 bool two_phase);
@@ -457,7 +463,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	{
 		case XLOG_HEAP_INSERT:
 			if (SnapBuildProcessChange(builder, xid, buf->origptr))
-				DecodeInsert(ctx, buf);
+				DecodeHeapInsert(ctx, buf);
 			break;
 
 			/*
@@ -468,17 +474,17 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		case XLOG_HEAP_HOT_UPDATE:
 		case XLOG_HEAP_UPDATE:
 			if (SnapBuildProcessChange(builder, xid, buf->origptr))
-				DecodeUpdate(ctx, buf);
+				DecodeHeapUpdate(ctx, buf);
 			break;
 
 		case XLOG_HEAP_DELETE:
 			if (SnapBuildProcessChange(builder, xid, buf->origptr))
-				DecodeDelete(ctx, buf);
+				DecodeHeapDelete(ctx, buf);
 			break;
 
 		case XLOG_HEAP_TRUNCATE:
 			if (SnapBuildProcessChange(builder, xid, buf->origptr))
-				DecodeTruncate(ctx, buf);
+				DecodeHeapTruncate(ctx, buf);
 			break;
 
 		case XLOG_HEAP_INPLACE:
@@ -559,31 +565,71 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
  */
 void
-logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+logical_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
-	SnapBuild  *builder = ctx->snapshot_builder;
 	XLogReaderState *r = buf->record;
+	SnapBuild  *builder = ctx->snapshot_builder;
 	TransactionId xid = XLogRecGetXid(r);
 	uint8		info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
-	RepOriginId origin_id = XLogRecGetOrigin(r);
-	Snapshot	snapshot;
-	xl_logical_message *message;
-
-	if (info != XLOG_LOGICAL_MESSAGE)
-		elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
 
-	ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
 
 	/*
 	 * If we don't have snapshot or we are just fast-forwarding, there is no
-	 * point in decoding messages.
+	 * point in decoding data changes.
 	 */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
 		ctx->fast_forward)
 		return;
 
+	switch (info)
+	{
+		case XLOG_LOGICAL_MESSAGE:
+			DecodeLogicalMsg(ctx, buf);
+			break;
+
+		case XLOG_LOGICAL_INSERT:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeLogicalInsert(ctx, buf);
+			break;
+
+		case XLOG_LOGICAL_UPDATE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeLogicalUpdate(ctx, buf);
+			break;
+
+		case XLOG_LOGICAL_DELETE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeLogicalDelete(ctx, buf);
+			break;
+
+		case XLOG_LOGICAL_TRUNCATE:
+			if (SnapBuildProcessChange(builder, xid, buf->origptr))
+				DecodeLogicalTruncate(ctx, buf);
+			break;
+
+		default:
+			elog(ERROR, "unexpected RM_LOGICAL_ID record type: %u", info);
+			break;
+	}
+}
+
+static void
+DecodeLogicalMsg(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	SnapBuild  *builder = ctx->snapshot_builder;
+	TransactionId xid = XLogRecGetXid(r);
+	RepOriginId origin_id = XLogRecGetOrigin(r);
+	Snapshot	snapshot;
+	xl_logical_message *message;
+
 	message = (xl_logical_message *) XLogRecGetData(r);
 
+	if (message->transactional &&
+		!SnapBuildProcessChange(builder, xid, buf->origptr))
+		return;
+
 	if (message->dbId != ctx->slot->data.database ||
 		FilterByOrigin(ctx, origin_id))
 		return;
@@ -603,6 +649,187 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 												 * prefix */
 							  message->message_size,
 							  message->message + message->prefix_size);
+
+}
+
+/*
+ * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
+ *
+ * Deletes can contain the new tuple.
+ */
+static void
+DecodeLogicalInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	Size		datalen = XLogRecGetDataLen(r) - SizeOfLogicalInsert;
+	char	   *tupledata = XLogRecGetData(r) + SizeOfLogicalInsert;
+	Size		tuplelen = datalen - SizeOfHeapHeader;
+	xl_logical_insert *xlrec;
+	ReorderBufferChange *change;
+
+	xlrec = (xl_logical_insert *) XLogRecGetData(r);
+
+	/* only interested in our database */
+	if (xlrec->node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_INSERT;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode));
+
+	change->data.tp.newtuple =
+		ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+
+	DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+							 change, false);
+}
+
+/*
+ * Parse XLOG_LOGICAL_UPDATE from wal into proper tuplebufs.
+ *
+ * Updates can possibly contain a new tuple and the old primary key.
+ */
+static void
+DecodeLogicalUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_logical_update *xlrec = (xl_logical_update *) XLogRecGetData(r);
+	char	   *new_tupledata = XLogRecGetData(r) + SizeOfLogicalUpdate;
+	Size		new_datalen = xlrec->new_datalen;
+	Size		new_tuplelen = new_datalen - SizeOfHeapHeader;
+	ReorderBufferChange *change;
+
+	/* only interested in our database */
+	if (xlrec->node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_UPDATE;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode));
+
+	change->data.tp.newtuple =
+		ReorderBufferGetTupleBuf(ctx->reorder, new_tuplelen);
+
+	DecodeXLogTuple(new_tupledata, new_datalen, change->data.tp.newtuple);
+
+	if (xlrec->flags & XLL_UPDATE_CONTAINS_OLD)
+	{
+		char	*old_tupledata = new_tupledata + new_datalen;
+		Size	 old_datalen;
+		Size	 old_tuplelen;
+
+		/* remaining data is the old tuple */
+		old_datalen = XLogRecGetDataLen(r) - new_datalen - SizeOfLogicalUpdate;
+		old_tuplelen = old_datalen - SizeOfHeapHeader;
+
+		change->data.tp.oldtuple =
+			ReorderBufferGetTupleBuf(ctx->reorder, old_tuplelen);
+
+		DecodeXLogTuple(old_tupledata, old_datalen, change->data.tp.oldtuple);
+	}
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+							 change, false);
+}
+
+/*
+ * Parse XLOG_LOGICAL_DELETE from wal into proper tuplebufs.
+ *
+ * Deletes can possibly contain the old primary key.
+ */
+static void
+DecodeLogicalDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_logical_delete *xlrec;
+	ReorderBufferChange *change;
+
+	xlrec = (xl_logical_delete *) XLogRecGetData(r);
+
+	/* only interested in our database */
+	if (xlrec->node.dbNode != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_DELETE;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode));
+
+	if (xlrec->flags & XLL_DELETE_CONTAINS_OLD)
+	{
+		Size	 old_datalen   = XLogRecGetDataLen(r) - SizeOfLogicalDelete;
+		char	*old_tupledata = XLogRecGetData(r) + SizeOfLogicalDelete;
+		Size	 old_tuplelen  = old_datalen - SizeOfHeapHeader;
+
+		change->data.tp.oldtuple =
+			ReorderBufferGetTupleBuf(ctx->reorder, old_tuplelen);
+
+		DecodeXLogTuple(old_tupledata, old_datalen, change->data.tp.oldtuple);
+	}
+
+	change->data.tp.clear_toast_afterwards = true;
+
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+							 change, false);
+}
+
+/*
+ * Parse XLOG_LOGICAL_TRUNCATE from wal
+ */
+static void
+DecodeLogicalTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	xl_logical_truncate *xlrec;
+	ReorderBufferChange *change;
+
+	xlrec = (xl_logical_truncate *) XLogRecGetData(r);
+
+	/* only interested in our database */
+	if (xlrec->dbId != ctx->slot->data.database)
+		return;
+
+	/* output plugin doesn't look for this origin, no need to queue */
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+		return;
+
+	change = ReorderBufferGetChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
+	change->origin_id = XLogRecGetOrigin(r);
+	if (xlrec->flags & XLL_TRUNCATE_CASCADE)
+		change->data.truncate.cascade = true;
+	if (xlrec->flags & XLL_TRUNCATE_RESTART_SEQS)
+		change->data.truncate.restart_seqs = true;
+	change->data.truncate.nrelids = xlrec->nrelids;
+	change->data.truncate.relids = ReorderBufferGetRelids(ctx->reorder,
+														  xlrec->nrelids);
+	memcpy(change->data.truncate.relids, xlrec->relids,
+		   xlrec->nrelids * sizeof(Oid));
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+							 buf->origptr, change, false);
 }
 
 /*
@@ -839,7 +1066,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
  * Deletes can contain the new tuple.
  */
 static void
-DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeHeapInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	Size		datalen;
 	char	   *tupledata;
@@ -898,7 +1125,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * Updates can possibly contain a new tuple and the old primary key.
  */
 static void
-DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeHeapUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	XLogReaderState *r = buf->record;
 	xl_heap_update *xlrec;
@@ -965,7 +1192,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * Deletes can possibly contain the old primary key.
  */
 static void
-DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeHeapDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	XLogReaderState *r = buf->record;
 	xl_heap_delete *xlrec;
@@ -1019,7 +1246,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * Parse XLOG_HEAP_TRUNCATE from wal
  */
 static void
-DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+DecodeHeapTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 {
 	XLogReaderState *r = buf->record;
 	xl_heap_truncate *xlrec;
diff --git a/src/backend/replication/logical/logical_xlog.c b/src/backend/replication/logical/logical_xlog.c
new file mode 100644
index 00000000000..30ae3e73be6
--- /dev/null
+++ b/src/backend/replication/logical/logical_xlog.c
@@ -0,0 +1,399 @@
+/*-------------------------------------------------------------------------
+ *
+ * logical_xlog.c
+ *	  Logical xlog records.
+ *
+ * Copyright (c) 2013-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/logical_xlog.c
+ *
+ * Logical Messages
+ *
+ * Generic logical messages allow XLOG logging of arbitrary binary blobs that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * These messages can be either transactional or non-transactional.
+ * Transactional messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ * Non-transactional messages are sent to the plugin at the time when the
+ * logical decoding reads them from XLOG. This also means that transactional
+ * messages won't be delivered if the transaction was rolled back but the
+ * non-transactional one will always be delivered.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The plugin authors must take extra care to use unique prefix,
+ * good options seems to be for example to use the name of the extension.
+ *
+ * Logical Insert/Update/Delete/Truncate
+ *
+ * These records are intended to be used by non-heap table access methods that
+ * wish to support logical decoding and replication. They are treated
+ * similarly to the analogous heap records, but are not tied to physical pages
+ * or other details of the heap. These records are not processed during redo,
+ * so do not contribute to durability or physical replication; use generic WAL
+ * records for that. Note that using both logical WAL records and generic WAL
+ * records is redundant compared with the heap.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/heapam_xlog.h"
+#include "access/xact.h"
+#include "access/xloginsert.h"
+#include "catalog/catalog.h"
+#include "miscadmin.h"
+#include "nodes/execnodes.h"
+#include "replication/logical.h"
+#include "replication/logical_xlog.h"
+#include "utils/memutils.h"
+#include "utils/relcache.h"
+
+static void CheckReplicaIdentity(Relation relation, TupleTableSlot *slot);
+
+/*
+ * Write logical decoding message into XLog.
+ */
+XLogRecPtr
+LogLogicalMessage(const char *prefix, const char *message, size_t size,
+				  bool transactional)
+{
+	xl_logical_message xlrec;
+
+	/*
+	 * Force xid to be allocated if we're emitting a transactional message.
+	 */
+	if (transactional)
+	{
+		Assert(IsTransactionState());
+		GetCurrentTransactionId();
+	}
+
+	xlrec.dbId = MyDatabaseId;
+	xlrec.transactional = transactional;
+	/* trailing zero is critical; see logicalmsg_desc */
+	xlrec.prefix_size = strlen(prefix) + 1;
+	xlrec.message_size = size;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
+	XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
+	XLogRegisterData(unconstify(char *, message), size);
+
+	/* allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	return XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_MESSAGE);
+}
+
+/*
+ * Write logical insert into log.
+ */
+XLogRecPtr
+LogLogicalInsert(Relation relation, TupleTableSlot *new_slot)
+{
+	bool		free_new_tuple;
+	HeapTuple	new_tuple;
+	xl_logical_insert xlrec;
+	xl_heap_header xlhdr;
+	XLogRecPtr recptr;
+
+	if (!equalTupleDescs(relation->rd_att, new_slot->tts_tupleDescriptor))
+		ereport(ERROR, (errmsg("record type must match relation type")));
+
+	if (!RelationIsLogicallyLogged(relation))
+		return InvalidXLogRecPtr;
+
+	new_tuple = ExecFetchSlotHeapTuple(new_slot, true, &free_new_tuple);
+
+	/* force xid to be allocated */
+	Assert(IsTransactionState());
+	GetCurrentTransactionId();
+
+	new_tuple->t_tableOid = new_slot->tts_tableOid;
+	ItemPointerCopy(&new_tuple->t_self, &new_slot->tts_tid);
+
+	xlrec.node = relation->rd_node;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalInsert);
+
+	xlhdr.t_infomask2 = new_tuple->t_data->t_infomask2;
+	xlhdr.t_infomask = new_tuple->t_data->t_infomask;
+	xlhdr.t_hoff = new_tuple->t_data->t_hoff;
+
+	XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader);
+	XLogRegisterData((char *) new_tuple->t_data + SizeofHeapTupleHeader,
+					 new_tuple->t_len - SizeofHeapTupleHeader);
+
+	/* allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_INSERT);
+
+	if (free_new_tuple)
+		pfree(new_tuple);
+
+	return recptr;
+}
+
+/*
+ * Write logical update into log.
+ */
+XLogRecPtr
+LogLogicalUpdate(Relation relation, TupleTableSlot *old_slot,
+				 TupleTableSlot *new_slot)
+{
+	HeapTuple	old_whole_tuple;
+	HeapTuple	old_id_tuple;
+	HeapTuple	new_tuple;
+	bool		free_old_whole_tuple;
+	bool		free_old_id_tuple;
+	bool		free_new_tuple;
+	xl_heap_header new_xlhdr;
+	xl_logical_update xlrec;
+	XLogRecPtr recptr;
+
+	if (!equalTupleDescs(relation->rd_att, old_slot->tts_tupleDescriptor) ||
+		!equalTupleDescs(relation->rd_att, new_slot->tts_tupleDescriptor))
+		ereport(ERROR, (errmsg("record types must match relation type")));
+
+	CheckReplicaIdentity(relation, old_slot);
+
+	if (!RelationIsLogicallyLogged(relation))
+		return InvalidXLogRecPtr;
+
+	/* force xid to be allocated */
+	Assert(IsTransactionState());
+	GetCurrentTransactionId();
+
+	old_whole_tuple = ExecFetchSlotHeapTuple(old_slot, true,
+											 &free_old_whole_tuple);
+	new_tuple = ExecFetchSlotHeapTuple(new_slot, true, &free_new_tuple);
+
+	xlrec.node = relation->rd_node;
+	xlrec.new_datalen = new_tuple->t_len - SizeofHeapTupleHeader +
+		SizeOfHeapHeader;
+	xlrec.flags = 0;
+
+	old_id_tuple = ExtractReplicaIdentity(relation, old_whole_tuple, true,
+										  &free_old_id_tuple);
+
+	if (old_id_tuple != NULL)
+	{
+		xlrec.flags |= XLL_UPDATE_CONTAINS_OLD;
+		old_id_tuple->t_tableOid = old_slot->tts_tableOid;
+		ItemPointerCopy(&old_id_tuple->t_self, &old_slot->tts_tid);
+	}
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalUpdate);
+
+	new_tuple->t_tableOid = new_slot->tts_tableOid;
+	ItemPointerCopy(&new_tuple->t_self, &new_slot->tts_tid);
+
+	new_xlhdr.t_infomask2 = new_tuple->t_data->t_infomask2;
+	new_xlhdr.t_infomask = new_tuple->t_data->t_infomask;
+	new_xlhdr.t_hoff = new_tuple->t_data->t_hoff;
+
+	/* write new tuple first, then old */
+	XLogRegisterData((char *) &new_xlhdr, SizeOfHeapHeader);
+	XLogRegisterData((char *) new_tuple->t_data + SizeofHeapTupleHeader,
+					 new_tuple->t_len - SizeofHeapTupleHeader);
+
+	if (old_id_tuple != NULL)
+	{
+		xl_heap_header old_xlhdr;
+
+		old_xlhdr.t_infomask2 = old_id_tuple->t_data->t_infomask2;
+		old_xlhdr.t_infomask = old_id_tuple->t_data->t_infomask;
+		old_xlhdr.t_hoff = old_id_tuple->t_data->t_hoff;
+
+		XLogRegisterData((char *) &old_xlhdr, SizeOfHeapHeader);
+		XLogRegisterData((char *) old_id_tuple->t_data + SizeofHeapTupleHeader,
+					 old_id_tuple->t_len - SizeofHeapTupleHeader);
+	}
+
+	/* allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_UPDATE);
+
+	if (free_old_whole_tuple)
+		pfree(old_whole_tuple);
+	if (free_old_id_tuple)
+		pfree(old_id_tuple);
+	if (free_new_tuple)
+		pfree(new_tuple);
+
+	return recptr;
+}
+
+/*
+ * Write logical update into log.
+ */
+XLogRecPtr
+LogLogicalDelete(Relation relation, TupleTableSlot *old_slot)
+{
+	HeapTuple		 old_whole_tuple;
+	HeapTuple		 old_id_tuple;
+	bool			 free_old_whole_tuple;
+	bool			 free_old_id_tuple;
+	xl_logical_delete xlrec;
+	XLogRecPtr		 recptr;
+
+	if (!equalTupleDescs(relation->rd_att, old_slot->tts_tupleDescriptor))
+		ereport(ERROR, (errmsg("record type must match relation type")));
+
+	CheckReplicaIdentity(relation, old_slot);
+
+	if (!RelationIsLogicallyLogged(relation))
+		return InvalidXLogRecPtr;
+
+	/* force xid to be allocated */
+	Assert(IsTransactionState());
+	GetCurrentTransactionId();
+
+	old_whole_tuple = ExecFetchSlotHeapTuple(old_slot, true,
+											 &free_old_whole_tuple);
+
+	xlrec.node = relation->rd_node;
+	xlrec.flags = 0;
+
+	old_id_tuple = ExtractReplicaIdentity(relation, old_whole_tuple, true,
+										  &free_old_id_tuple);
+
+	if (old_id_tuple != NULL)
+	{
+		xlrec.flags |= XLL_UPDATE_CONTAINS_OLD;
+		old_id_tuple->t_tableOid = old_slot->tts_tableOid;
+		ItemPointerCopy(&old_id_tuple->t_self, &old_slot->tts_tid);
+	}
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalDelete);
+
+	if (old_id_tuple != NULL)
+	{
+		xl_heap_header old_xlhdr;
+
+		old_xlhdr.t_infomask2 = old_id_tuple->t_data->t_infomask2;
+		old_xlhdr.t_infomask = old_id_tuple->t_data->t_infomask;
+		old_xlhdr.t_hoff = old_id_tuple->t_data->t_hoff;
+
+		XLogRegisterData((char *) &old_xlhdr, SizeOfHeapHeader);
+		XLogRegisterData((char *) old_id_tuple->t_data + SizeofHeapTupleHeader,
+					 old_id_tuple->t_len - SizeofHeapTupleHeader);
+	}
+
+	/* allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_DELETE);
+
+	if (free_old_whole_tuple)
+		pfree(old_whole_tuple);
+	if (free_old_id_tuple)
+		pfree(old_id_tuple);
+
+	return recptr;
+}
+
+/*
+ * Write logical truncate into log.
+ */
+XLogRecPtr
+LogLogicalTruncate(List *relids, bool cascade, bool restart_seqs)
+{
+	xl_logical_truncate	 xlrec;
+	XLogRecPtr			 recptr;
+	Oid					*logrelids;
+	ListCell			*lc;
+	int					 nrelids = 0;
+
+	/* force xid to be allocated */
+	Assert(IsTransactionState());
+	GetCurrentTransactionId();
+
+	xlrec.dbId = MyDatabaseId;
+	xlrec.nrelids = list_length(relids);
+	xlrec.flags = 0;
+	if (cascade)
+		xlrec.flags |= XLL_TRUNCATE_CASCADE;
+	if (restart_seqs)
+		xlrec.flags |= XLL_TRUNCATE_RESTART_SEQS;
+
+	logrelids = palloc(list_length(relids) * sizeof(Oid));
+	foreach(lc, relids)
+	{
+		Oid			relid = lfirst_oid(lc);
+		Relation	rel	  = relation_open(relid, AccessShareLock);
+
+		if (RelationIsLogicallyLogged(rel))
+		{
+			logrelids[nrelids++] = lfirst_oid(lc);
+		}
+
+		relation_close(rel, NoLock);
+	}
+
+	if (nrelids == 0)
+		return InvalidXLogRecPtr;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalTruncate);
+	XLogRegisterData((char *) logrelids, nrelids * sizeof(Oid));
+
+	/* allow origin filtering */
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+	recptr = XLogInsert(RM_LOGICAL_ID, XLOG_LOGICAL_TRUNCATE);
+
+	return recptr;
+}
+
+/*
+ * Redo is basically just noop for logical decoding messages.
+ */
+void
+logical_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	switch (info)
+	{
+		case XLOG_LOGICAL_MESSAGE:
+		case XLOG_LOGICAL_INSERT:
+		case XLOG_LOGICAL_UPDATE:
+		case XLOG_LOGICAL_DELETE:
+		case XLOG_LOGICAL_TRUNCATE:
+			break;
+		default:
+			elog(PANIC, "logical_redo: unknown op code %u", info);
+	}
+
+	/* This is only interesting for logical decoding, see decode.c. */
+}
+
+/*
+ * Check that replica identity columns are non-NULL.
+ */
+static void
+CheckReplicaIdentity(Relation relation, TupleTableSlot *slot)
+{
+	/* check for NULL attributes in the replica identity */
+	Bitmapset *id_attrs = RelationGetIndexAttrBitmap(
+		relation, INDEX_ATTR_BITMAP_IDENTITY_KEY);
+	int id_attr = (-1) * FirstLowInvalidHeapAttributeNumber;
+
+	while ((id_attr = bms_next_member(id_attrs, id_attr)) >= 0)
+	{
+		AttrNumber attno = id_attr + FirstLowInvalidHeapAttributeNumber;
+		if (slot_attisnull(slot, attno))
+			ereport(ERROR, (errmsg("replica identity column is NULL")));
+	}
+}
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 6058d36e0d5..2776861ab8d 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -17,6 +17,7 @@
 
 #include <unistd.h>
 
+#include "access/relation.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "access/xlogrecovery.h"
@@ -29,7 +30,7 @@
 #include "nodes/makefuncs.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 #include "storage/fd.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
@@ -389,3 +390,165 @@ pg_logical_emit_message_text(PG_FUNCTION_ARGS)
 	/* bytea and text are compatible */
 	return pg_logical_emit_message_bytea(fcinfo);
 }
+
+/*
+ * SQL function for writing logical insert into WAL.
+ */
+Datum
+pg_logical_emit_insert(PG_FUNCTION_ARGS)
+{
+	Oid				 relid = PG_GETARG_OID(0);
+	HeapTupleHeader	 htup  = PG_GETARG_HEAPTUPLEHEADER(1);
+	Relation		 rel   = relation_open(relid, AccessShareLock);
+	HeapTupleData	 tuple;
+	TupleTableSlot	*slot;
+	Oid				 rel_type;
+	Oid				 tup_type;
+	XLogRecPtr		 lsn;
+
+	/* check that tuple matches the type of the relation */
+	rel_type = get_rel_type_id(relid);
+	tup_type = HeapTupleHeaderGetTypeId(htup);
+	if (rel_type != tup_type)
+		ereport(ERROR, (errmsg("record type must match table type")));
+
+	tuple.t_len = HeapTupleHeaderGetDatumLength(htup);
+	ItemPointerSetInvalid(&(tuple.t_self));
+	tuple.t_tableOid = relid;
+	tuple.t_data = htup;
+
+	slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple);
+	ExecClearTuple(slot);
+	ExecStoreHeapTuple(&tuple, slot, false);
+
+	lsn = LogLogicalInsert(rel, slot);
+
+	ExecDropSingleTupleTableSlot(slot);
+
+	relation_close(rel, NoLock);
+
+	PG_RETURN_LSN(lsn);
+}
+
+/*
+ * SQL function for writing logical update into WAL.
+ */
+Datum
+pg_logical_emit_update(PG_FUNCTION_ARGS)
+{
+	Oid				 relid = PG_GETARG_OID(0);
+	HeapTupleHeader	 old_htup  = PG_GETARG_HEAPTUPLEHEADER(1);
+	HeapTupleHeader	 new_htup  = PG_GETARG_HEAPTUPLEHEADER(2);
+	Relation		 rel   = relation_open(relid, AccessShareLock);
+	HeapTupleData	 old_tuple;
+	HeapTupleData	 new_tuple;
+	TupleTableSlot	*old_slot;
+	TupleTableSlot	*new_slot;
+	Oid				 rel_type;
+	Oid				 old_tup_type;
+	Oid				 new_tup_type;
+	XLogRecPtr		 lsn;
+
+	/* check that tuple matches the type of the relation */
+	rel_type = get_rel_type_id(relid);
+	old_tup_type = HeapTupleHeaderGetTypeId(old_htup);
+	new_tup_type = HeapTupleHeaderGetTypeId(new_htup);
+
+	if (rel_type != old_tup_type || rel_type != new_tup_type)
+		ereport(ERROR, (errmsg("record type must match table type")));
+
+	old_tuple.t_len = HeapTupleHeaderGetDatumLength(old_htup);
+	ItemPointerSetInvalid(&(old_tuple.t_self));
+	old_tuple.t_tableOid = relid;
+	old_tuple.t_data = old_htup;
+
+	new_tuple.t_len = HeapTupleHeaderGetDatumLength(new_htup);
+	ItemPointerSetInvalid(&(new_tuple.t_self));
+	new_tuple.t_tableOid = relid;
+	new_tuple.t_data = new_htup;
+
+	old_slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple);
+	new_slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple);
+	ExecClearTuple(old_slot);
+	ExecClearTuple(new_slot);
+	ExecStoreHeapTuple(&old_tuple, old_slot, false);
+	ExecStoreHeapTuple(&new_tuple, new_slot, false);
+
+	lsn = LogLogicalUpdate(rel, old_slot, new_slot);
+
+	ExecDropSingleTupleTableSlot(old_slot);
+	ExecDropSingleTupleTableSlot(new_slot);
+
+	relation_close(rel, NoLock);
+
+	PG_RETURN_LSN(lsn);
+}
+
+/*
+ * SQL function for writing logical delete into WAL.
+ */
+Datum
+pg_logical_emit_delete(PG_FUNCTION_ARGS)
+{
+	Oid				 relid = PG_GETARG_OID(0);
+	HeapTupleHeader	 htup  = PG_GETARG_HEAPTUPLEHEADER(1);
+	Relation		 rel   = relation_open(relid, AccessShareLock);
+	HeapTupleData	 tuple;
+	TupleTableSlot	*slot;
+	Oid				 rel_type;
+	Oid				 tup_type;
+	XLogRecPtr		 lsn;
+
+	/* check that tuple matches the type of the relation */
+	rel_type = get_rel_type_id(relid);
+	tup_type = HeapTupleHeaderGetTypeId(htup);
+	if (rel_type != tup_type)
+		ereport(ERROR, (errmsg("record type must match table type")));
+
+	tuple.t_len = HeapTupleHeaderGetDatumLength(htup);
+	ItemPointerSetInvalid(&(tuple.t_self));
+	tuple.t_tableOid = relid;
+	tuple.t_data = htup;
+
+	slot = MakeTupleTableSlot(rel->rd_att, &TTSOpsHeapTuple);
+	ExecClearTuple(slot);
+	ExecStoreHeapTuple(&tuple, slot, false);
+
+	lsn = LogLogicalDelete(rel, slot);
+
+	ExecDropSingleTupleTableSlot(slot);
+
+	relation_close(rel, NoLock);
+
+	PG_RETURN_LSN(lsn);
+}
+
+/*
+ * SQL function for writing logical truncate into WAL.
+ */
+Datum
+pg_logical_emit_truncate(PG_FUNCTION_ARGS)
+{
+	ArrayType	*arr		  = PG_GETARG_ARRAYTYPE_P(0);
+	bool		 cascade	  = PG_GETARG_BOOL(1);
+	bool		 restart_seqs = PG_GETARG_BOOL(2);
+	Datum		*values;
+	bool		*nulls;
+	int			 nrelids;
+	List		*relids		  = NIL;
+	XLogRecPtr	 lsn;
+
+	deconstruct_array(arr, REGCLASSOID, sizeof(Oid), true, TYPALIGN_INT,
+					  &values, &nulls, &nrelids);
+
+	for (int i = 0; i < nrelids; i++)
+	{
+		if (nulls[i])
+			ereport(ERROR, (errmsg("unexpected NULL element")));
+		relids = lappend_oid(relids, DatumGetObjectId(values[i]));
+	}
+
+	lsn = LogLogicalTruncate(relids, cascade, restart_seqs);
+
+	PG_RETURN_LSN(lsn);
+}
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c
deleted file mode 100644
index 1c34912610e..00000000000
--- a/src/backend/replication/logical/message.c
+++ /dev/null
@@ -1,89 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * message.c
- *	  Generic logical messages.
- *
- * Copyright (c) 2013-2022, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- *	  src/backend/replication/logical/message.c
- *
- * NOTES
- *
- * Generic logical messages allow XLOG logging of arbitrary binary blobs that
- * get passed to the logical decoding plugin. In normal XLOG processing they
- * are same as NOOP.
- *
- * These messages can be either transactional or non-transactional.
- * Transactional messages are part of current transaction and will be sent to
- * decoding plugin using in a same way as DML operations.
- * Non-transactional messages are sent to the plugin at the time when the
- * logical decoding reads them from XLOG. This also means that transactional
- * messages won't be delivered if the transaction was rolled back but the
- * non-transactional one will always be delivered.
- *
- * Every message carries prefix to avoid conflicts between different decoding
- * plugins. The plugin authors must take extra care to use unique prefix,
- * good options seems to be for example to use the name of the extension.
- *
- * ---------------------------------------------------------------------------
- */
-
-#include "postgres.h"
-
-#include "access/xact.h"
-#include "access/xloginsert.h"
-#include "miscadmin.h"
-#include "nodes/execnodes.h"
-#include "replication/logical.h"
-#include "replication/message.h"
-#include "utils/memutils.h"
-
-/*
- * Write logical decoding message into XLog.
- */
-XLogRecPtr
-LogLogicalMessage(const char *prefix, const char *message, size_t size,
-				  bool transactional)
-{
-	xl_logical_message xlrec;
-
-	/*
-	 * Force xid to be allocated if we're emitting a transactional message.
-	 */
-	if (transactional)
-	{
-		Assert(IsTransactionState());
-		GetCurrentTransactionId();
-	}
-
-	xlrec.dbId = MyDatabaseId;
-	xlrec.transactional = transactional;
-	/* trailing zero is critical; see logicalmsg_desc */
-	xlrec.prefix_size = strlen(prefix) + 1;
-	xlrec.message_size = size;
-
-	XLogBeginInsert();
-	XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
-	XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
-	XLogRegisterData(unconstify(char *, message), size);
-
-	/* allow origin filtering */
-	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
-
-	return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
-}
-
-/*
- * Redo is basically just noop for logical decoding messages.
- */
-void
-logicalmsg_redo(XLogReaderState *record)
-{
-	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
-
-	if (info != XLOG_LOGICAL_MESSAGE)
-		elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
-
-	/* This is only interesting for logical decoding, see decode.c. */
-}
diff --git a/src/bin/pg_waldump/.gitignore b/src/bin/pg_waldump/.gitignore
index 3be00a8b61f..567655fa626 100644
--- a/src/bin/pg_waldump/.gitignore
+++ b/src/bin/pg_waldump/.gitignore
@@ -10,7 +10,7 @@
 /gistdesc.c
 /hashdesc.c
 /heapdesc.c
-/logicalmsgdesc.c
+/logicaldesc.c
 /mxactdesc.c
 /nbtdesc.c
 /relmapdesc.c
diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c
index 6a4ebd1310b..86804a243bc 100644
--- a/src/bin/pg_waldump/rmgrdesc.c
+++ b/src/bin/pg_waldump/rmgrdesc.c
@@ -26,7 +26,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
-#include "replication/message.h"
+#include "replication/logical_xlog.h"
 #include "replication/origin.h"
 #include "rmgrdesc.h"
 #include "storage/standbydefs.h"
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index b46ab7d7390..77fa0337f6e 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -175,6 +175,8 @@ extern void simple_heap_insert(Relation relation, HeapTuple tup);
 extern void simple_heap_delete(Relation relation, ItemPointer tid);
 extern void simple_heap_update(Relation relation, ItemPointer otid,
 							   HeapTuple tup);
+extern HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup,
+										bool key_required, bool *copy);
 
 extern TransactionId heap_index_delete_tuples(Relation rel,
 											  TM_IndexDeleteOp *delstate);
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 5c47fdcec80..5460f7836f9 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -124,6 +124,9 @@ typedef struct xl_heap_delete
  * For truncate we list all truncated relids in an array, followed by all
  * sequence relids that need to be restarted, if any.
  * All rels are always within the same database, so we just list dbid once.
+ *
+ * Note: identical to xl_logical_truncate, except that for
+ * xl_logical_truncate, no redo is performed.
  */
 typedef struct xl_heap_truncate
 {
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index cf8b6d48193..d43175088fb 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -46,4 +46,4 @@ PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, bri
 PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode)
+PG_RMGR(RM_LOGICAL_ID, "Logical", logical_redo, logical_desc, logical_identify, NULL, NULL, NULL, logical_decode)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 01e1dd4d6d1..cd57c2b3aa2 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10937,6 +10937,23 @@
   proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
   prorettype => 'pg_lsn', proargtypes => 'bool text bytea',
   prosrc => 'pg_logical_emit_message_bytea' },
+{ oid => '9297', descr => 'emit a logical insert',
+  proname => 'pg_logical_emit_insert', provolatile => 'v', proparallel => 'u',
+  prorettype => 'pg_lsn', proargtypes => 'regclass record',
+  prosrc => 'pg_logical_emit_insert' },
+{ oid => '9298', descr => 'emit a logical update',
+  proname => 'pg_logical_emit_update', provolatile => 'v', proparallel => 'u',
+  prorettype => 'pg_lsn', proargtypes => 'regclass record record',
+  prosrc => 'pg_logical_emit_update' },
+{ oid => '9299', descr => 'emit a logical delete',
+  proname => 'pg_logical_emit_delete', provolatile => 'v', proparallel => 'u',
+  prorettype => 'pg_lsn', proargtypes => 'regclass record',
+  prosrc => 'pg_logical_emit_delete' },
+{ oid => '9300', descr => 'emit a logical truncate',
+  proname => 'pg_logical_emit_truncate', provolatile => 'v', proparallel => 'u',
+  prorettype => 'pg_lsn', proargtypes => '_regclass bool bool',
+  prosrc => 'pg_logical_emit_truncate' },
+
 
 # event triggers
 { oid => '3566', descr => 'list objects dropped by the current command',
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 8e07bb7409a..118590ad61e 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -26,7 +26,7 @@ extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logical_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 extern void sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 extern void	LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
diff --git a/src/include/replication/logical_xlog.h b/src/include/replication/logical_xlog.h
new file mode 100644
index 00000000000..dce74f71111
--- /dev/null
+++ b/src/include/replication/logical_xlog.h
@@ -0,0 +1,124 @@
+/*-------------------------------------------------------------------------
+ * logical_xlog.h
+ *	   Exports from replication/logical/logical_xlog.c
+ *
+ * Copyright (c) 2013-2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/logical_xlog.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_MESSAGE_H
+#define PG_LOGICAL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+#include "storage/off.h"
+#include "utils/rel.h"
+
+/*
+ * xl_heap_update flag values, 8 bits are available.
+ */
+#define XLL_UPDATE_CONTAINS_OLD		(1<<0)
+
+/*
+ * xl_heap_delete flag values, 8 bits are available.
+ */
+#define XLL_DELETE_CONTAINS_OLD		(1<<0)
+
+/*
+ * Generic logical decoding message wal record.
+ */
+typedef struct xl_logical_message
+{
+	Oid			dbId;			/* database Oid emitted from */
+	bool		transactional;	/* is message transactional? */
+	Size		prefix_size;	/* length of prefix */
+	Size		message_size;	/* size of the message */
+	/* payload, including null-terminated prefix of length prefix_size */
+	char		message[FLEXIBLE_ARRAY_MEMBER];
+} xl_logical_message;
+
+#define SizeOfLogicalMessage	(offsetof(xl_logical_message, message))
+
+/* This is what we need to know about insert */
+typedef struct xl_logical_insert
+{
+	RelFileNode		node;
+
+	/* tuple data follows */
+} xl_logical_insert;
+
+#define SizeOfLogicalInsert	(offsetof(xl_logical_insert, node) + sizeof(RelFileNode))
+
+/*
+ * This is what we need to know about update.
+ */
+typedef struct xl_logical_update
+{
+	RelFileNode	node;
+	Size		new_datalen;
+	uint8		flags;
+
+	/* tuple data follows */
+} xl_logical_update;
+
+#define SizeOfLogicalUpdate	(offsetof(xl_logical_update, flags) + sizeof(uint8))
+
+/* This is what we need to know about delete */
+typedef struct xl_logical_delete
+{
+	RelFileNode	node;
+	uint8		flags;
+
+	/* tuple data follows */
+} xl_logical_delete;
+
+#define SizeOfLogicalDelete	(offsetof(xl_logical_delete, flags) + sizeof(uint8))
+
+/*
+ * For truncate we list all truncated relids in an array, followed by all
+ * sequence relids that need to be restarted, if any.
+ * All rels are always within the same database, so we just list dbid once.
+ *
+ * Note: identical to xl_logical_truncate, except that no redo is performed, only
+ * decoding.
+ */
+typedef struct xl_logical_truncate
+{
+	Oid			dbId;
+	uint32		nrelids;
+	uint8		flags;
+	Oid			relids[FLEXIBLE_ARRAY_MEMBER];
+} xl_logical_truncate;
+
+#define SizeOfLogicalTruncate	(offsetof(xl_logical_truncate, relids))
+
+struct TupleTableSlot;
+
+extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
+									size_t size, bool transactional);
+extern XLogRecPtr LogLogicalInsert(Relation relation, struct TupleTableSlot *slot);
+extern XLogRecPtr LogLogicalUpdate(Relation relation, struct TupleTableSlot *old_slot,
+								   struct TupleTableSlot *new_slot);
+extern XLogRecPtr LogLogicalDelete(Relation relation, struct TupleTableSlot *slot);
+extern XLogRecPtr LogLogicalTruncate(List *relids, bool cascade, bool restart_seqs);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_MESSAGE	0x00
+#define XLOG_LOGICAL_INSERT		0x10
+#define XLOG_LOGICAL_UPDATE		0x20
+#define XLOG_LOGICAL_DELETE		0x30
+#define XLOG_LOGICAL_TRUNCATE	0x40
+
+/*
+ * xl_logical_truncate flag values, 8 bits are available.
+ */
+#define XLL_TRUNCATE_CASCADE					(1<<0)
+#define XLL_TRUNCATE_RESTART_SEQS				(1<<1)
+
+void		logical_redo(XLogReaderState *record);
+void		logical_desc(StringInfo buf, XLogReaderState *record);
+const char *logical_identify(uint8 info);
+
+#endif							/* PG_LOGICAL_MESSAGE_H */
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
deleted file mode 100644
index 7d7785292f1..00000000000
--- a/src/include/replication/message.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/*-------------------------------------------------------------------------
- * message.h
- *	   Exports from replication/logical/message.c
- *
- * Copyright (c) 2013-2022, PostgreSQL Global Development Group
- *
- * src/include/replication/message.h
- *-------------------------------------------------------------------------
- */
-#ifndef PG_LOGICAL_MESSAGE_H
-#define PG_LOGICAL_MESSAGE_H
-
-#include "access/xlog.h"
-#include "access/xlogdefs.h"
-#include "access/xlogreader.h"
-
-/*
- * Generic logical decoding message wal record.
- */
-typedef struct xl_logical_message
-{
-	Oid			dbId;			/* database Oid emitted from */
-	bool		transactional;	/* is message transactional? */
-	Size		prefix_size;	/* length of prefix */
-	Size		message_size;	/* size of the message */
-	/* payload, including null-terminated prefix of length prefix_size */
-	char		message[FLEXIBLE_ARRAY_MEMBER];
-} xl_logical_message;
-
-#define SizeOfLogicalMessage	(offsetof(xl_logical_message, message))
-
-extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
-									size_t size, bool transactional);
-
-/* RMGR API*/
-#define XLOG_LOGICAL_MESSAGE	0x00
-void		logicalmsg_redo(XLogReaderState *record);
-void		logicalmsg_desc(StringInfo buf, XLogReaderState *record);
-const char *logicalmsg_identify(uint8 info);
-
-#endif							/* PG_LOGICAL_MESSAGE_H */
-- 
2.17.1

Reply via email to