Hi,

In logical replication, currently Walsender sends the data that is
generated locally and the data that are replicated from other
instances. This results in infinite recursion in circular logical
replication setup.
Here the user is trying to have a 2-way replication setup with node 1
publishing data to node2 and node2 publishing data to node1, so that
the user can perform dml operations from any node, it can act as a
2-way multi master replication setup.

This problem can be reproduced with the following steps:
-- Instance 1
create publication pub1 for table t1;
create table t1(c1 int);

-- Instance 2
create table t1(c1 int);
create publication pub2 for table t1;
create subscription sub1 CONNECTION 'dbname=postgres port=5432'
publication pub1;

-- Instance 1
create subscription sub2 CONNECTION 'dbname=postgres port=5433'
publication pub2; insert into t1 values(10);

In this scenario, the Walsender in publisher pub1 sends data to the
apply worker in subscriber sub1, the apply worker in sub1 maps the
data to local tables and applies the individual changes as they are
received. Then the Walsender in publisher pub2 sends data to the apply
worker in subscriber sub2, the apply worker in sub2 maps the data to
local tables and applies the individual changes as they are received.
This process repeats infinitely.

Currently we do not differentiate if the data is locally generated
data, or a replicated data and we send both the data which causes
infinite recursion.

We could see that the record count has increased significantly within sometime:
select count(*) from t1;
  count
--------------
 4000000
(1 row)

If the table had primary key constraint, we could notice that the
first insert is successful and when the same insert is sent back, the
insert fails because of constraint error:
2022-02-23 09:28:43.592 IST [14743] ERROR:  duplicate key value
violates unique constraint "t1_pkey"
2022-02-23 09:28:43.592 IST [14743] DETAIL:  Key (c1)=(10) already exists.
2022-02-23 09:28:43.592 IST [14743] CONTEXT:  processing remote data
during "INSERT" for replication target relation "public.t1" in
transaction 727 at 2022-02-23 09:28:43.406738+05:30
2022-02-23 09:28:43.593 IST [14678] LOG:  background worker "logical
replication worker" (PID 14743) exited with exit code 1
2022-02-23 09:28:48.608 IST [14745] LOG:  logical replication apply
worker for subscription "sub2" has started
2022-02-23 09:28:48.624 IST [14745] ERROR:  duplicate key value
violates unique constraint "t1_pkey"
2022-02-23 09:28:48.624 IST [14745] DETAIL:  Key (c1)=(10) already exists.
2022-02-23 09:28:48.624 IST [14745] CONTEXT:  processing remote data
during "INSERT" for replication target relation "public.t1" in
transaction 727 at 2022-02-23 09:28:43.406738+05:30
2022-02-23 09:28:48.626 IST [14678] LOG:  background worker "logical
replication worker" (PID 14745) exited with exit code 1

The same problem can occur in any circular node setup like 3 nodes,
4node etc like: a) node1 publishing to node2 b) node2 publishing to
node3 c) node3 publishing back to node1.

Here there are two problems for the user: a) incremental
synchronization of table sending both local data and replicated data
by walsender b) Table synchronization of table using copy command
sending both local data and replicated data

For the first problem "Incremental synchronization of table by
Walsender" can be solved by:
Currently the locally generated data does not have replication origin
associated and the data that has originated from another instance will
have a replication origin associated. We could use this information to
differentiate locally generated data and replicated data and send only
the locally generated data. This "only_local" could be provided as an
option while subscription is created:
ex: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname =postgres port=5433'
PUBLICATION pub1 with (only_local = on);

I have attached a basic patch for this, if the idea is accepted, I
will work further to test more scenarios, add documentation, and test
and post an updated patch.
For the second problem, Table synchronization of table including local
data and replicated data using copy command.

Let us consider the following scenario:
a) node1 publishing to node2 b) node2 publishing to node1. Here in
this case node1 will have replicated data from node2 and vice versa.

In the above if user wants to include node3 to subscribe data from
node2. Users will have to create a subscription in node3 to get the
data from node2. During table synchronization we send the complete
table data from node2 to node3. Node2 will have local data from node2
and also replicated data from node1. Currently we don't have an option
to differentiate between the locally generated data and replicated
data in the heap which will cause infinite recursion as described
above.

To handle this if user has specified only_local option, we could throw
a warning or error out while creating subscription in this case, we
could have a column srreplicateddata in pg_subscription_rel which
could indicate if the table has any replicated data or not:
postgres=# select * from pg_subscription_rel;
 srsubid | srrelid | srsubstate | srsublsn  | srreplicateddata
---------+---------+------------+-----------+------------------
   16389 |   16384 | r          | 0/14A4640 |        t
   16389 |   16385 | r          | 0/14A4690 |        f
(1 row)

In the above example, srreplicateddata with true indicates, tabel t1
whose relid is 16384 has replicated data and the other row having
srreplicateddata  as false indicates table t2 whose relid is 16385
does not have replicated data.
When creating a new subscription, the subscriber will connect to the
publisher and check if the relation has replicated data by checking
srreplicateddata in pg_subscription_rel table.
If the table has any replicated data, log a warning or error for this.

Also, we could document the steps on how to handle the initial sync like:
a) Complete the ongoing transactions on this table in the replication
setup nodes i.e. node1 and node2 in the above case,  so that the table
data is consistent, b) Once there are no ongoing transaction, Copy the
table data using copy command from any one of the nodes, c) create
subscription with copy_data option as off d) Perform further
transactions on the table e) All the further transactions performed
will be handled by the walsender which will take care of skipping
replicated data and sending only the local data. i.e. node2 will send
the locally generated data to node3.

I'm not sure if there is any other better way to handle this. If there
is a better way, we could handle it accordingly.
Thoughts?

Regards,
Vignesh
From f7bc51504919a5f34265a0f02720f1b0b34fc480 Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignesh21@gmail.com>
Date: Wed, 23 Feb 2022 11:37:30 +0530
Subject: [PATCH v1] Skip replication of non local data.

Add an option only_local which will subscribe only to the locally
generated data in the publisher node. If subscriber is created with this
option, publisher will skip publishing the data that was subscribed
from other nodes. It can be created using following syntax:
ex: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname =postgres port=9999' PUBLICATION pub1 with (only_local = on);
---
 contrib/test_decoding/test_decoding.c         | 13 +++++++
 src/backend/catalog/pg_subscription.c         |  1 +
 src/backend/catalog/system_views.sql          |  3 +-
 src/backend/commands/subscriptioncmds.c       | 20 +++++++++--
 .../libpqwalreceiver/libpqwalreceiver.c       | 18 ++++++++--
 src/backend/replication/logical/decode.c      | 36 ++++++++++++++-----
 src/backend/replication/logical/logical.c     | 35 ++++++++++++++++++
 src/backend/replication/logical/tablesync.c   |  2 +-
 src/backend/replication/logical/worker.c      |  1 +
 src/backend/replication/pgoutput/pgoutput.c   | 25 +++++++++++++
 src/backend/replication/slot.c                |  4 ++-
 src/backend/replication/slotfuncs.c           | 18 +++++++---
 src/backend/replication/walreceiver.c         |  2 +-
 src/backend/replication/walsender.c           | 21 ++++++++---
 src/bin/psql/tab-complete.c                   |  2 +-
 src/include/catalog/pg_proc.dat               |  6 ++--
 src/include/catalog/pg_subscription.h         |  3 ++
 src/include/replication/logical.h             |  4 +++
 src/include/replication/output_plugin.h       |  7 ++++
 src/include/replication/pgoutput.h            |  1 +
 src/include/replication/slot.h                |  5 ++-
 src/include/replication/walreceiver.h         |  8 +++--
 src/test/regress/expected/rules.out           |  5 +--
 src/tools/pgindent/typedefs.list              |  1 +
 24 files changed, 205 insertions(+), 36 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ea22649e41..58bc5dbc1c 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -73,6 +73,8 @@ static void pg_decode_truncate(LogicalDecodingContext *ctx,
 							   ReorderBufferChange *change);
 static bool pg_decode_filter(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id);
+static bool pg_decode_filter_remotedata(LogicalDecodingContext *ctx,
+										RepOriginId origin_id);
 static void pg_decode_message(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 							  bool transactional, const char *prefix,
@@ -148,6 +150,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->truncate_cb = pg_decode_truncate;
 	cb->commit_cb = pg_decode_commit_txn;
 	cb->filter_by_origin_cb = pg_decode_filter;
+	cb->filter_remotedata_cb = pg_decode_filter_remotedata;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
 	cb->sequence_cb = pg_decode_sequence;
@@ -484,6 +487,16 @@ pg_decode_filter(LogicalDecodingContext *ctx,
 	return false;
 }
 
+static bool
+pg_decode_filter_remotedata(LogicalDecodingContext *ctx,
+							  RepOriginId origin_id)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->only_local && origin_id != InvalidRepOriginId)
+		return true;
+	return false;
+}
 /*
  * Print literal `outputstr' already represented as string of type `typid'
  * into stringbuf `s'.
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index ca65a8bd20..94e096e5fb 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->binary = subform->subbinary;
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
+	sub->onlylocaldata = subform->subonlylocaldata;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 3cb69b1f87..931c549f7c 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -958,7 +958,8 @@ CREATE VIEW pg_replication_slots AS
             L.confirmed_flush_lsn,
             L.wal_status,
             L.safe_wal_size,
-            L.two_phase
+            L.two_phase,
+            L.only_local
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3ef6607d24..3f5cbe2c20 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -61,6 +61,7 @@
 #define SUBOPT_BINARY				0x00000080
 #define SUBOPT_STREAMING			0x00000100
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
+#define SUBOPT_ONLYLOCAL_DATA 		0x00000400
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -82,6 +83,7 @@ typedef struct SubOpts
 	bool		binary;
 	bool		streaming;
 	bool		twophase;
+	bool		onlylocal_data;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -130,6 +132,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->streaming = false;
 	if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
 		opts->twophase = false;
+	if (IsSet(supported_opts, SUBOPT_ONLYLOCAL_DATA))
+		opts->onlylocal_data = false;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -228,6 +232,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_STREAMING;
 			opts->streaming = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_ONLYLOCAL_DATA) &&
+				 strcmp(defel->defname, "only_local") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_ONLYLOCAL_DATA))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_ONLYLOCAL_DATA;
+			opts->onlylocal_data = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "two_phase") == 0)
 		{
 			/*
@@ -390,7 +403,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
+					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+					  SUBOPT_ONLYLOCAL_DATA);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -460,6 +474,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
 	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
 	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
+	values[Anum_pg_subscription_subonlylocaldata - 1] = BoolGetDatum(opts.onlylocal_data);
 	values[Anum_pg_subscription_subtwophasestate - 1] =
 		CharGetDatum(opts.twophase ?
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
@@ -565,7 +580,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					twophase_enabled = true;
 
 				walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
-								   CRS_NOEXPORT_SNAPSHOT, NULL);
+								   CRS_NOEXPORT_SNAPSHOT, NULL,
+								   opts.onlylocal_data);
 
 				if (twophase_enabled)
 					UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 0d89db4e6a..326f60414e 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -75,7 +75,8 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
 								  bool temporary,
 								  bool two_phase,
 								  CRSSnapshotAction snapshot_action,
-								  XLogRecPtr *lsn);
+								  XLogRecPtr *lsn,
+								  bool onlylocal_data);
 static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
 static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
 									   const char *query,
@@ -453,6 +454,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			PQserverVersion(conn->streamConn) >= 150000)
 			appendStringInfoString(&cmd, ", two_phase 'on'");
 
+		if (options->proto.logical.onlylocal_data &&
+			PQserverVersion(conn->streamConn) >= 150000)
+			appendStringInfoString(&cmd, ", only_local 'on'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
@@ -869,7 +874,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
 static char *
 libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 					 bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
-					 XLogRecPtr *lsn)
+					 XLogRecPtr *lsn, bool onlylocal_data)
 {
 	PGresult   *res;
 	StringInfoData cmd;
@@ -899,6 +904,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 				appendStringInfoChar(&cmd, ' ');
 		}
 
+		if (onlylocal_data)
+		{
+			appendStringInfoString(&cmd, "ONLY_LOCAL");
+			if (use_new_options_syntax)
+				appendStringInfoString(&cmd, ", ");
+			else
+				appendStringInfoChar(&cmd, ' ');
+		}
+
 		if (use_new_options_syntax)
 		{
 			switch (snapshot_action)
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 18cf931822..6305b93fc7 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -555,6 +555,15 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return filter_by_origin_cb_wrapper(ctx, origin_id);
 }
 
+static inline bool
+FilterRemoteOriginData(LogicalDecodingContext *ctx, RepOriginId origin_id)
+{
+	if (ctx->callbacks.filter_remotedata_cb == NULL)
+		return false;
+
+	return filter_remotedata_cb_wrapper(ctx, origin_id);
+}
+
 /*
  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
  */
@@ -585,7 +594,8 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	message = (xl_logical_message *) XLogRecGetData(r);
 
 	if (message->dbId != ctx->slot->data.database ||
-		FilterByOrigin(ctx, origin_id))
+		FilterByOrigin(ctx, origin_id) ||
+		FilterRemoteOriginData(ctx, origin_id))
 		return;
 
 	if (message->transactional &&
@@ -864,7 +874,8 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* output plugin doesn't look for this origin, no need to queue */
-	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) ||
+		FilterRemoteOriginData(ctx, XLogRecGetOrigin(r)))
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
@@ -914,7 +925,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* output plugin doesn't look for this origin, no need to queue */
-	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) ||
+		FilterRemoteOriginData(ctx, XLogRecGetOrigin(r)))
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
@@ -980,7 +992,8 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* output plugin doesn't look for this origin, no need to queue */
-	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) ||
+		FilterRemoteOriginData(ctx, XLogRecGetOrigin(r)))
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
@@ -1032,7 +1045,8 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* output plugin doesn't look for this origin, no need to queue */
-	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) ||
+		FilterRemoteOriginData(ctx, XLogRecGetOrigin(r)))
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
@@ -1082,7 +1096,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* output plugin doesn't look for this origin, no need to queue */
-	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) ||
+		FilterRemoteOriginData(ctx, XLogRecGetOrigin(r)))
 		return;
 
 	/*
@@ -1175,7 +1190,8 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* output plugin doesn't look for this origin, no need to queue */
-	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) ||
+		FilterRemoteOriginData(ctx, XLogRecGetOrigin(r)))
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
@@ -1250,7 +1266,8 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 {
 	return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
 			(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
-			ctx->fast_forward || FilterByOrigin(ctx, origin_id));
+			ctx->fast_forward || FilterByOrigin(ctx, origin_id) ||
+			FilterRemoteOriginData(ctx, origin_id));
 }
 
 /*
@@ -1335,7 +1352,8 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* output plugin doesn't look for this origin, no need to queue */
-	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) ||
+		FilterRemoteOriginData(ctx, XLogRecGetOrigin(r)))
 		return;
 
 	tupledata = XLogRecGetData(r);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 934aa13f2d..19584eaea7 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -246,6 +246,8 @@ StartupDecodingContext(List *output_plugin_options,
 		(ctx->callbacks.stream_sequence_cb != NULL) ||
 		(ctx->callbacks.stream_truncate_cb != NULL);
 
+	ctx->onlylocal_data = ctx->callbacks.filter_remotedata_cb != NULL;
+
 	/*
 	 * streaming callbacks
 	 *
@@ -451,6 +453,8 @@ CreateInitDecodingContext(const char *plugin,
 	 */
 	ctx->twophase &= slot->data.two_phase;
 
+	ctx->onlylocal_data &= slot->data.onlylocal_data;
+
 	ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
 
 	return ctx;
@@ -1178,6 +1182,37 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return ret;
 }
 
+bool
+filter_remotedata_cb_wrapper(LogicalDecodingContext *ctx,
+							   RepOriginId origin_id)
+{
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	Assert(!ctx->fast_forward);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_remoteorigin";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_remotedata_cb(ctx, origin_id);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	return ret;
+}
+
 static void
 message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				   XLogRecPtr message_lsn, bool transactional,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 1659964571..f5093ce8c9 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1224,7 +1224,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	HOLD_INTERRUPTS();
 	walrcv_create_slot(LogRepWorkerWalRcvConn,
 					   slotname, false /* permanent */ , false /* two_phase */ ,
-					   CRS_USE_SNAPSHOT, origin_startpos);
+					   CRS_USE_SNAPSHOT, origin_startpos, false /* only_local */);
 	RESUME_INTERRUPTS();
 
 	/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5d9acc6173..15385fb614 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3575,6 +3575,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.proto.logical.binary = MySubscription->binary;
 	options.proto.logical.streaming = MySubscription->stream;
 	options.proto.logical.twophase = false;
+	options.proto.logical.onlylocal_data = MySubscription->onlylocaldata;
 
 	if (!am_tablesync_worker())
 	{
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index ea57a0477f..0c9b60bd65 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -55,6 +55,8 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
+static bool pgoutput_remoteorigin_filter(LogicalDecodingContext *ctx,
+										 RepOriginId origin_id);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn);
 static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
@@ -215,6 +217,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
 	cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
+	cb->filter_remotedata_cb = pgoutput_remoteorigin_filter;
 	cb->shutdown_cb = pgoutput_shutdown;
 
 	/* transaction streaming */
@@ -239,11 +242,13 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		messages_option_given = false;
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
+	bool		onlylocal_data_given = false;
 
 	data->binary = false;
 	data->streaming = false;
 	data->messages = false;
 	data->two_phase = false;
+	data->onlylocal_data = false;
 
 	foreach(lc, options)
 	{
@@ -332,6 +337,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 
 			data->two_phase = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "only_local") == 0)
+		{
+			if (onlylocal_data_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			onlylocal_data_given = true;
+
+			data->onlylocal_data = defGetBoolean(defel);
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -1450,6 +1465,16 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
 	return false;
 }
 
+static bool
+pgoutput_remoteorigin_filter(LogicalDecodingContext *ctx,
+							 RepOriginId origin_id)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+	if (data->onlylocal_data && origin_id != InvalidRepOriginId)
+		return true;
+	return false;
+}
 /*
  * Shutdown the output plugin.
  *
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 3d39fddaae..429bc1328c 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -253,7 +253,8 @@ ReplicationSlotValidateName(const char *name, int elevel)
  */
 void
 ReplicationSlotCreate(const char *name, bool db_specific,
-					  ReplicationSlotPersistency persistency, bool two_phase)
+					  ReplicationSlotPersistency persistency, bool two_phase,
+					  bool onlylocal_data)
 {
 	ReplicationSlot *slot = NULL;
 	int			i;
@@ -313,6 +314,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->data.persistency = persistency;
 	slot->data.two_phase = two_phase;
 	slot->data.two_phase_at = InvalidXLogRecPtr;
+	slot->data.onlylocal_data = onlylocal_data;
 
 	/* and then data only present in shared memory */
 	slot->just_dirtied = false;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 886899afd2..0e0bc1e940 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -42,7 +42,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
 
 	/* acquire replication slot, this will check for conflicting names */
 	ReplicationSlotCreate(name, false,
-						  temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
+						  temporary ? RS_TEMPORARY : RS_PERSISTENT,
+						  false, false);
 
 	if (immediately_reserve)
 	{
@@ -118,7 +119,8 @@ static void
 create_logical_replication_slot(char *name, char *plugin,
 								bool temporary, bool two_phase,
 								XLogRecPtr restart_lsn,
-								bool find_startpoint)
+								bool find_startpoint,
+								bool onlylocal_data)
 {
 	LogicalDecodingContext *ctx = NULL;
 
@@ -133,7 +135,8 @@ create_logical_replication_slot(char *name, char *plugin,
 	 * error as well.
 	 */
 	ReplicationSlotCreate(name, true,
-						  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
+						  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
+						  onlylocal_data);
 
 	/*
 	 * Create logical decoding context to find start point or, if we don't
@@ -171,6 +174,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	Name		plugin = PG_GETARG_NAME(1);
 	bool		temporary = PG_GETARG_BOOL(2);
 	bool		two_phase = PG_GETARG_BOOL(3);
+	bool		onlylocal_data = PG_GETARG_BOOL(4);
 	Datum		result;
 	TupleDesc	tupdesc;
 	HeapTuple	tuple;
@@ -189,7 +193,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 									temporary,
 									two_phase,
 									InvalidXLogRecPtr,
-									true);
+									true,
+									onlylocal_data);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
@@ -231,7 +236,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 14
+#define PG_GET_REPLICATION_SLOTS_COLS 15
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -429,6 +434,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 		values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
+		values[i++] = BoolGetDatum(slot_contents.data.onlylocal_data);
+
 		Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -794,6 +801,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 										temporary,
 										false,
 										src_restart_lsn,
+										false,
 										false);
 	}
 	else
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ceaff097b9..cfdefb1f22 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -374,7 +374,7 @@ WalReceiverMain(void)
 					 "pg_walreceiver_%lld",
 					 (long long int) walrcv_get_backend_pid(wrconn));
 
-			walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
+			walrcv_create_slot(wrconn, slotname, true, false, 0, NULL, false);
 
 			SpinLockAcquire(&walrcv->mutex);
 			strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5a718b1fe9..b826326b98 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -963,12 +963,14 @@ static void
 parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
 						   bool *reserve_wal,
 						   CRSSnapshotAction *snapshot_action,
-						   bool *two_phase)
+						   bool *two_phase,
+						   bool *onlylocal_data)
 {
 	ListCell   *lc;
 	bool		snapshot_action_given = false;
 	bool		reserve_wal_given = false;
 	bool		two_phase_given = false;
+	bool		onlylocal_data_given = false;
 
 	/* Parse options */
 	foreach(lc, cmd->options)
@@ -1019,6 +1021,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
 			two_phase_given = true;
 			*two_phase = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "only_local") == 0)
+		{
+			if (onlylocal_data_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			onlylocal_data_given = true;
+			*onlylocal_data = defGetBoolean(defel);
+		}
 		else
 			elog(ERROR, "unrecognized option: %s", defel->defname);
 	}
@@ -1035,6 +1046,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	char	   *slot_name;
 	bool		reserve_wal = false;
 	bool		two_phase = false;
+	bool		onlylocal_data = false;
 	CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
 	DestReceiver *dest;
 	TupOutputState *tstate;
@@ -1044,13 +1056,14 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
+	parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
+							   &onlylocal_data);
 
 	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
 	{
 		ReplicationSlotCreate(cmd->slotname, false,
 							  cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
-							  false);
+							  false, false);
 	}
 	else
 	{
@@ -1065,7 +1078,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 */
 		ReplicationSlotCreate(cmd->slotname, true,
 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-							  two_phase);
+							  two_phase, onlylocal_data);
 	}
 
 	if (cmd->kind == REPLICATION_KIND_LOGICAL)
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 6957567264..d7a4e24167 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -3104,7 +3104,7 @@ psql_completion(const char *text, int start, int end)
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
 					  "enabled", "slot_name", "streaming",
-					  "synchronous_commit", "two_phase");
+					  "synchronous_commit", "two_phase", "only_local");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 7f1ee97f55..e002563a2a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10782,9 +10782,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,only_local}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 18c291289f..6e3119247c 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -65,6 +65,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	bool		substream;		/* Stream in-progress transactions. */
 
+	bool		subonlylocaldata; /* skip copying of remote origin data */
+
 	char		subtwophasestate;	/* Stream two-phase transactions */
 
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
@@ -102,6 +104,7 @@ typedef struct Subscription
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
+	bool		onlylocaldata;	/* Skip copying of remote orging data */
 	char		twophasestate;	/* Allow streaming two-phase transactions */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 1097cc9799..82014fe252 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -99,6 +99,8 @@ typedef struct LogicalDecodingContext
 	 */
 	bool		twophase_opt_given;
 
+	bool 		onlylocal_data;
+
 	/*
 	 * State for writing output.
 	 */
@@ -138,6 +140,8 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+extern bool filter_remotedata_cb_wrapper(LogicalDecodingContext *ctx,
+										 RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index a16bebf76c..52b5de3eb8 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,6 +106,12 @@ typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
 typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
 											   RepOriginId origin_id);
 
+/*
+ * Filter remote origin changes.
+ */
+typedef bool (*LogicalDecodeFilterRemoteOriginCB) (struct LogicalDecodingContext *ctx,
+												   RepOriginId origin_id);
+
 /*
  * Called to shutdown an output plugin.
  */
@@ -246,6 +252,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeSequenceCB sequence_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+	LogicalDecodeFilterRemoteOriginCB filter_remotedata_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
 	/* streaming of changes at prepare time */
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index eafedd610a..e8fac6b3f8 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -29,6 +29,7 @@ typedef struct PGOutputData
 	bool		streaming;
 	bool		messages;
 	bool		two_phase;
+	bool		onlylocal_data;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 24b30210c3..833d380b0f 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -94,6 +94,8 @@ typedef struct ReplicationSlotPersistentData
 	 */
 	bool		two_phase;
 
+	bool		onlylocal_data;
+
 	/* plugin name */
 	NameData	plugin;
 } ReplicationSlotPersistentData;
@@ -195,7 +197,8 @@ extern void ReplicationSlotsShmemInit(void);
 
 /* management of individual slots */
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
-								  ReplicationSlotPersistency p, bool two_phase);
+								  ReplicationSlotPersistency p, bool two_phase,
+								  bool onlylocal_data);
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 92f73a55b8..e62dca9b45 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -183,6 +183,7 @@ typedef struct
 			bool		streaming;	/* Streaming of large transactions */
 			bool		twophase;	/* Streaming of two-phase transactions at
 									 * prepare time */
+			bool		onlylocal_data;
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
@@ -351,7 +352,8 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
 										bool temporary,
 										bool two_phase,
 										CRSSnapshotAction snapshot_action,
-										XLogRecPtr *lsn);
+										XLogRecPtr *lsn,
+										bool onlylocal_data);
 
 /*
  * walrcv_get_backend_pid_fn
@@ -423,8 +425,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
 #define walrcv_send(conn, buffer, nbytes) \
 	WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
-#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
-	WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
+#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn, onlylocal_data) \
+	WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn, onlylocal_data)
 #define walrcv_get_backend_pid(conn) \
 	WalReceiverFunctions->walrcv_get_backend_pid(conn)
 #define walrcv_exec(conn, exec, nRetTypes, retTypes) \
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 1420288d67..dc677e5c67 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1456,8 +1456,9 @@ pg_replication_slots| SELECT l.slot_name,
     l.confirmed_flush_lsn,
     l.wal_status,
     l.safe_wal_size,
-    l.two_phase
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
+    l.two_phase,
+    l.only_local
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, only_local)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index c6b302c7b2..0bf093858b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1370,6 +1370,7 @@ LogicalDecodeCommitCB
 LogicalDecodeCommitPreparedCB
 LogicalDecodeFilterByOriginCB
 LogicalDecodeFilterPrepareCB
+LogicalDecodeFilterRemoteOriginCB
 LogicalDecodeMessageCB
 LogicalDecodePrepareCB
 LogicalDecodeRollbackPreparedCB
-- 
2.30.2

Reply via email to