On Mon, Mar 7, 2022 at 2:28 PM Peter Smith <smithpb2...@gmail.com> wrote:
>
> Hi Vignesh,
>
> Here are some review comments for patch v2.
>
> ======
>
> 1. Question about syntax
>
> I already posted some questions about why the syntax is on the CREATE
> SUBSCRCRIBER side.
> IMO "local_only" is a publisher option, so it seemed more natural to
> me for it to be specified as a "publish" option.
>
> Ref [1] my original question + suggestion for Option 2
> Ref [2] some other examples of subscribing to multiple-publishers
>
> Anyway, +1 to see what other people think.
>

I feel we can support it in the subscriber side first and then extend
it to the publisher side as being discussed in [1]. I have retained it
as it is.

> ~~~
>
> 2. ALTER
>
> (related also to the question about syntax)
>
> If subscribing to multiple publications then ALTER is going to change
> the 'local_only' for all of them, which might not be what you want
> (??)
>

 I feel we can support it in the subscriber side first and then extend
it to the publisher side as being discussed in [1]. When it is
extended to the publisher side, it will get handled. I have retained
it as it is.

> ~~~
>
> 3. subscription_parameter
>
> (related also to the question about syntax)
>
> CREATE SUBSCRIPTION subscription_name
>     CONNECTION 'conninfo'
>     PUBLICATION publication_name [, ...]
>     [ WITH ( subscription_parameter [= value] [, ... ] ) ]
>
> ~
>
> That WITH is for *subscription* options, not the publication options.
>
> So IMO 'local_only' intuitively seems like "local" means local where
> the subscriber is.
>
> So, if the Option 1 syntax is chosen (see comment #1) then I think the
> option name maybe should change to be something more like
> 'publish_local_only' or something similar to be more clear what local
> actually means.
>

Changed it to publish_local_only

> ~~~
>
> 4. contrib/test_decoding/test_decoding.c
>
> @@ -484,6 +487,16 @@ pg_decode_filter(LogicalDecodingContext *ctx,
>   return false;
>  }
>
> +static bool
> +pg_decode_filter_remotedata(LogicalDecodingContext *ctx,
> +   RepOriginId origin_id)
> +{
> + TestDecodingData *data = ctx->output_plugin_private;
> +
> + if (data->only_local && origin_id != InvalidRepOriginId)
> + return true;
> + return false;
> +}
>
> 4a. Maybe needs function comment.

Modified

> 4b. Missing blank line following this function
>

Modified

> ~~~
>
> 5. General - please check all of the patch.
>
> There seems inconsistency with the member names, local variable names,
> parameter names etc. There are all variations of:
>
> - only_local
> - onlylocaldata
> - onlylocal_data
> - etc
>
> Please try using the same name everywhere for everything if possible.
>

I have changed it to only_local wherever possible.

> ~~~
>
> 6. src/backend/replication/logical/decode.c - FilterRemoteOriginData
>
> @@ -585,7 +594,8 @@ logicalmsg_decode(LogicalDecodingContext *ctx,
> XLogRecordBuffer *buf)
>   message = (xl_logical_message *) XLogRecGetData(r);
>
>   if (message->dbId != ctx->slot->data.database ||
> - FilterByOrigin(ctx, origin_id))
> + FilterByOrigin(ctx, origin_id) ||
> + FilterRemoteOriginData(ctx, origin_id))
>   return;
>
> I noticed that every call to FilterRemoteOriginData has an associated
> preceding call to FilterByOrigin. It might be worth just combining the
> logic into FilterByOrigin. Then none of that calling code (9 x places)
> would need to change at all.

Modified

> ~~~
>
> 7. src/backend/replication/logical/logical.c  - CreateInitDecodingContext
>
> @@ -451,6 +453,8 @@ CreateInitDecodingContext(const char *plugin,
>   */
>   ctx->twophase &= slot->data.two_phase;
>
> + ctx->onlylocal_data &= slot->data.onlylocal_data;
>
> The equivalent 'twophase' option had a big comment. Probably this new
> option should also have a similar comment?

These change is not required anymore, the comment no more applies. I
have not made any change for this.

> ~~~
>
> 8. src/backend/replication/logical/logical.c - filter_remotedata_cb_wrapper
>
> +bool
> +filter_remotedata_cb_wrapper(LogicalDecodingContext *ctx,
> +    RepOriginId origin_id)
> +{
> + LogicalErrorCallbackState state;
> + ErrorContextCallback errcallback;
> + bool ret;
> +
> + Assert(!ctx->fast_forward);
> +
> + /* Push callback + info on the error context stack */
> + state.ctx = ctx;
> + state.callback_name = "filter_remoteorigin";
>
> There is no consistency between the function and the name:
>
> "filter_remoteorigin" versus filter_remotedata_cb.
>
> A similar inconsistency for this is elsewhere. See review comment #9

Modified it to filter_remote_origin_cb_wrapper and changed to
*_remotedata_* to *_remote_origin_*

> ~~~
>
> 9. src/backend/replication/pgoutput/pgoutput.c
>
> @@ -215,6 +217,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
>   cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
>   cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
>   cb->filter_by_origin_cb = pgoutput_origin_filter;
> + cb->filter_remotedata_cb = pgoutput_remoteorigin_filter;
>
> Inconsistent names for the member and function.
>
> filter_remotedata_cb VS pgoutput_remoteorigin_filter.

Modified it to filter_remote_origin_cb and changed to *_remotedata_*
to *_remote_origin_*

> ~~~
>
> 10. src/backend/replication/pgoutput/pgoutput.c
>
> @@ -1450,6 +1465,16 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
>   return false;
>  }
>
> +static bool
> +pgoutput_remoteorigin_filter(LogicalDecodingContext *ctx,
> + RepOriginId origin_id)
> +{
> + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> +
> + if (data->onlylocal_data && origin_id != InvalidRepOriginId)
> + return true;
> + return false;
> +}
>  /*
>   * Shutdown the output plugin.
>   *
>
> 10a. Add a function comment.

Modified

> 10b. Missing blank line after the function

Modified

> ~~~
>
> 11. src/backend/replication/slotfuncs.c - pg_create_logical_replication_slot
>
> @@ -171,6 +174,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
>   Name plugin = PG_GETARG_NAME(1);
>   bool temporary = PG_GETARG_BOOL(2);
>   bool two_phase = PG_GETARG_BOOL(3);
> + bool onlylocal_data = PG_GETARG_BOOL(4);
>   Datum result;
>   TupleDesc tupdesc;
>   HeapTuple tuple;
>
>
> Won't there be some PG Docs needing to be updated now there is another
> parameter?

This change is not required anymore, the comment no longer applies. I
have not made any changes for this.

> ~~~
>
> 12. src/include/catalog/pg_proc.dat - pg_get_replication_slots
>
> I did not see any update for pg_get_replication_slots,  but you added
> the 4th parameter elsewhere. Is something missing here?

This change is not required anymore, the comment no longer applies. I
have not made any changes for this.

> ~~~
>
> 13. src/include/replication/logical.h
>
> @@ -99,6 +99,8 @@ typedef struct LogicalDecodingContext
>   */
>   bool twophase_opt_given;
>
> + bool onlylocal_data;
> +
>
> I think the new member needs some comment.

Modified

> ~~~
>
> 14. src/include/replication/walreceiver.h
>
> @@ -183,6 +183,7 @@ typedef struct
>   bool streaming; /* Streaming of large transactions */
>   bool twophase; /* Streaming of two-phase transactions at
>   * prepare time */
> + bool onlylocal_data;
>   } logical;
>   } proto;
>  } WalRcvStreamOptions;
>
> I think the new member needs some comment.

Modified

> ~~~
>
> 15. src/test/regress/sql/subscription.sql
>
> ALTER SUBSCRIPTION test missing?

Added

Thanks for the comments, the attached v3 patch has the fixes for the same.
[1] - 
https://www.postgresql.org/message-id/CAA4eK1%2BMtz%2BStvNNtTg9%3D9BTq8%3DpMu-V5i4yWqs%3DKJUh0Z_L4g%40mail.gmail.com

Regards,
Vignesh
From 949cdc02c62cbf01617592a2e7cd93af08e8179c Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignes...@gmail.com>
Date: Mon, 7 Mar 2022 11:19:10 +0530
Subject: [PATCH v3] Skip replication of non local data.

Add an option only_local which will subscribe only to the locally
generated data in the publisher node. If subscriber is created with this
option, publisher will skip publishing the data that was subscribed
from other nodes. It can be created using following syntax:
ex: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname =postgres port=9999' PUBLICATION pub1 with (only_local = on);
---
 contrib/test_decoding/test_decoding.c         |  20 +++
 doc/src/sgml/ref/alter_subscription.sgml      |   3 +-
 doc/src/sgml/ref/create_subscription.sgml     |  12 ++
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/catalog/system_views.sql          |   5 +-
 src/backend/commands/subscriptioncmds.c       |  26 +++-
 .../libpqwalreceiver/libpqwalreceiver.c       |   4 +
 src/backend/replication/logical/decode.c      |  15 ++-
 src/backend/replication/logical/logical.c     |  33 +++++
 src/backend/replication/logical/worker.c      |   2 +
 src/backend/replication/pgoutput/pgoutput.c   |  45 +++++++
 src/bin/pg_dump/pg_dump.c                     |  16 ++-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/describe.c                       |   8 +-
 src/bin/psql/tab-complete.c                   |   4 +-
 src/include/catalog/pg_subscription.h         |   3 +
 src/include/replication/logical.h             |   4 +
 src/include/replication/output_plugin.h       |   7 ++
 src/include/replication/pgoutput.h            |   1 +
 src/include/replication/walreceiver.h         |   1 +
 src/test/regress/expected/subscription.out    | 115 ++++++++++--------
 src/test/regress/sql/subscription.sql         |   7 ++
 src/test/subscription/t/029_circular.pl       | 108 ++++++++++++++++
 src/tools/pgindent/typedefs.list              |   1 +
 24 files changed, 381 insertions(+), 61 deletions(-)
 create mode 100644 src/test/subscription/t/029_circular.pl

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ea22649e41..13c40ca167 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -73,6 +73,8 @@ static void pg_decode_truncate(LogicalDecodingContext *ctx,
 							   ReorderBufferChange *change);
 static bool pg_decode_filter(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id);
+static bool pg_decode_filter_remote_origin(LogicalDecodingContext *ctx,
+										   RepOriginId origin_id);
 static void pg_decode_message(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 							  bool transactional, const char *prefix,
@@ -148,6 +150,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->truncate_cb = pg_decode_truncate;
 	cb->commit_cb = pg_decode_commit_txn;
 	cb->filter_by_origin_cb = pg_decode_filter;
+	cb->filter_remote_origin_cb = pg_decode_filter_remote_origin;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
 	cb->sequence_cb = pg_decode_sequence;
@@ -484,6 +487,23 @@ pg_decode_filter(LogicalDecodingContext *ctx,
 	return false;
 }
 
+/*
+ * Filter out the transactions that had originated remotely.
+ *
+ * Return true if only_local option was specified and if the transaction has a
+ * valid originid.
+ */
+static bool
+pg_decode_filter_remote_origin(LogicalDecodingContext *ctx,
+							   RepOriginId origin_id)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->only_local && origin_id != InvalidRepOriginId)
+		return true;
+	return false;
+}
+
 /*
  * Print literal `outputstr' already represented as string of type `typid'
  * into stringbuf `s'.
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 0d6f064f58..67bffd1555 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -204,7 +204,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       information.  The parameters that can be altered
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
-      <literal>binary</literal>, and
+      <literal>binary</literal>,
+      <literal>publish_local_only</literal>, and
       <literal>streaming</literal>.
      </para>
     </listitem>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index e80a2617a3..c69bb16d39 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -152,6 +152,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
         </listitem>
        </varlistentry>
 
+       <varlistentry>
+        <term><literal>publish_local_only</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscription should subscribe only to the
+          locally generated changes or subscribe to both the locally generated
+          changes and the replicated changes that was generated from other
+          nodes in the publisher. The default is <literal>false</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
        <varlistentry>
         <term><literal>slot_name</literal> (<type>string</type>)</term>
         <listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index ca65a8bd20..ca2ef33a94 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->binary = subform->subbinary;
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
+	sub->only_local = subform->sublocal;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 40b7bca5a9..592ef0aa2d 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1260,8 +1260,9 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
-              substream, subtwophasestate, subslotname, subsynccommit, subpublications)
+GRANT SELECT (oid, subdbid, subname, sublocal, subowner, subenabled,
+              subbinary, substream, subtwophasestate, subslotname,
+              subsynccommit, subpublications)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3ef6607d24..78136050b8 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -61,6 +61,7 @@
 #define SUBOPT_BINARY				0x00000080
 #define SUBOPT_STREAMING			0x00000100
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
+#define SUBOPT_PUBLISH_LOCAL_ONLY	0x00000400
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -82,6 +83,7 @@ typedef struct SubOpts
 	bool		binary;
 	bool		streaming;
 	bool		twophase;
+	bool		only_local;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -130,6 +132,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->streaming = false;
 	if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
 		opts->twophase = false;
+	if (IsSet(supported_opts, SUBOPT_PUBLISH_LOCAL_ONLY))
+		opts->only_local = false;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -228,6 +232,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_STREAMING;
 			opts->streaming = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_PUBLISH_LOCAL_ONLY) &&
+				 strcmp(defel->defname, "publish_local_only") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_PUBLISH_LOCAL_ONLY))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_PUBLISH_LOCAL_ONLY;
+			opts->only_local = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "two_phase") == 0)
 		{
 			/*
@@ -390,7 +403,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
+					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+					  SUBOPT_PUBLISH_LOCAL_ONLY);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -460,6 +474,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
 	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
 	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
+	values[Anum_pg_subscription_sublocal - 1] = BoolGetDatum(opts.only_local);
 	values[Anum_pg_subscription_subtwophasestate - 1] =
 		CharGetDatum(opts.twophase ?
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
@@ -864,7 +879,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			{
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-								  SUBOPT_STREAMING);
+								  SUBOPT_STREAMING | SUBOPT_PUBLISH_LOCAL_ONLY);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -913,6 +928,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_substream - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_PUBLISH_LOCAL_ONLY))
+				{
+					values[Anum_pg_subscription_sublocal - 1] =
+						BoolGetDatum(opts.streaming);
+					replaces[Anum_pg_subscription_sublocal - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 0d89db4e6a..3e057ef47b 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -453,6 +453,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			PQserverVersion(conn->streamConn) >= 150000)
 			appendStringInfoString(&cmd, ", two_phase 'on'");
 
+		if (options->proto.logical.only_local &&
+			PQserverVersion(conn->streamConn) >= 150000)
+			appendStringInfoString(&cmd, ", publish_local_only 'on'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8c00a73cb9..0ae57f2161 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -546,13 +546,22 @@ FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
 	return filter_prepare_cb_wrapper(ctx, xid, gid);
 }
 
+/*
+ * Ask output plugin whether we want to skip the transaction having this
+ * origin_id or if the transaction has originated from a different node.
+ */
 static inline bool
 FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
-	if (ctx->callbacks.filter_by_origin_cb == NULL)
-		return false;
+	bool		result = false;
+
+	if (ctx->callbacks.filter_by_origin_cb != NULL)
+		result = filter_by_origin_cb_wrapper(ctx, origin_id);
+
+	if (ctx->callbacks.filter_remote_origin_cb != NULL)
+		result |= filter_remote_origin_cb_wrapper(ctx, origin_id);
 
-	return filter_by_origin_cb_wrapper(ctx, origin_id);
+	return result;
 }
 
 /*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 934aa13f2d..8a3c276be9 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -246,6 +246,8 @@ StartupDecodingContext(List *output_plugin_options,
 		(ctx->callbacks.stream_sequence_cb != NULL) ||
 		(ctx->callbacks.stream_truncate_cb != NULL);
 
+	ctx->only_local = ctx->callbacks.filter_remote_origin_cb != NULL;
+
 	/*
 	 * streaming callbacks
 	 *
@@ -1178,6 +1180,37 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return ret;
 }
 
+bool
+filter_remote_origin_cb_wrapper(LogicalDecodingContext *ctx,
+								RepOriginId origin_id)
+{
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	Assert(!ctx->fast_forward);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_remote_origin";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_remote_origin_cb(ctx, origin_id);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	return ret;
+}
+
 static void
 message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				   XLogRecPtr message_lsn, bool transactional,
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 8653e1d840..182a48a9aa 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2962,6 +2962,7 @@ maybe_reread_subscription(void)
 		newsub->binary != MySubscription->binary ||
 		newsub->stream != MySubscription->stream ||
 		newsub->owner != MySubscription->owner ||
+		newsub->only_local != MySubscription->only_local ||
 		!equal(newsub->publications, MySubscription->publications))
 	{
 		ereport(LOG,
@@ -3589,6 +3590,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.proto.logical.binary = MySubscription->binary;
 	options.proto.logical.streaming = MySubscription->stream;
 	options.proto.logical.twophase = false;
+	options.proto.logical.only_local = MySubscription->only_local;
 
 	if (!am_tablesync_worker())
 	{
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index ea57a0477f..b31e0c3ebb 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -55,6 +55,8 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
+static bool pgoutput_remote_origin_filter(LogicalDecodingContext *ctx,
+										  RepOriginId origin_id);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn);
 static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
@@ -215,6 +217,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
 	cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
+	cb->filter_remote_origin_cb = pgoutput_remote_origin_filter;
 	cb->shutdown_cb = pgoutput_shutdown;
 
 	/* transaction streaming */
@@ -239,11 +242,13 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		messages_option_given = false;
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
+	bool		publish_local_only_option_given = false;
 
 	data->binary = false;
 	data->streaming = false;
 	data->messages = false;
 	data->two_phase = false;
+	data->only_local = false;
 
 	foreach(lc, options)
 	{
@@ -332,6 +337,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 
 			data->two_phase = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "publish_local_only") == 0)
+		{
+			if (publish_local_only_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			publish_local_only_option_given = true;
+
+			data->only_local = defGetBoolean(defel);
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -430,6 +445,18 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		else
 			ctx->twophase_opt_given = true;
 
+		if (!data->only_local)
+			ctx->only_local = false;
+		else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("requested proto_version=%d does not support publish_local_only, need %d or higher",
+							data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
+		else if (!ctx->only_local)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("publish_local_only requested, but not supported by output plugin")));
+
 		/* Init publication state. */
 		data->publications = NIL;
 		publications_valid = false;
@@ -448,6 +475,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		 */
 		ctx->streaming = false;
 		ctx->twophase = false;
+		ctx->only_local = false;
 	}
 }
 
@@ -1450,6 +1478,23 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
 	return false;
 }
 
+/*
+ * Filter out the transactions that had originated remotely.
+ *
+ * Return true if only_local option was specified and if the transaction has a
+ * valid originid.
+ */
+static bool
+pgoutput_remote_origin_filter(LogicalDecodingContext *ctx,
+							  RepOriginId origin_id)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+	if (data->only_local && origin_id != InvalidRepOriginId)
+		return true;
+	return false;
+}
+
 /*
  * Shutdown the output plugin.
  *
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index e69dcf8a48..c4a38408cd 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4298,6 +4298,7 @@ getSubscriptions(Archive *fout)
 	int			i_subsynccommit;
 	int			i_subpublications;
 	int			i_subbinary;
+	int			i_sublocal;
 	int			i,
 				ntups;
 
@@ -4340,12 +4341,17 @@ getSubscriptions(Archive *fout)
 		appendPQExpBufferStr(query, " false AS substream,\n");
 
 	if (fout->remoteVersion >= 150000)
-		appendPQExpBufferStr(query, " s.subtwophasestate\n");
+		appendPQExpBufferStr(query, " s.subtwophasestate,\n");
 	else
 		appendPQExpBuffer(query,
-						  " '%c' AS subtwophasestate\n",
+						  " '%c' AS subtwophasestate,\n",
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
+	if (fout->remoteVersion >= 150000)
+		appendPQExpBufferStr(query, " s.sublocal\n");
+	else
+		appendPQExpBufferStr(query, " false AS sublocal\n");
+
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
 						 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
@@ -4366,6 +4372,7 @@ getSubscriptions(Archive *fout)
 	i_subbinary = PQfnumber(res, "subbinary");
 	i_substream = PQfnumber(res, "substream");
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
+	i_sublocal = PQfnumber(res, "sublocal");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4393,6 +4400,8 @@ getSubscriptions(Archive *fout)
 			pg_strdup(PQgetvalue(res, i, i_substream));
 		subinfo[i].subtwophasestate =
 			pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
+		subinfo[i].sublocal =
+			pg_strdup(PQgetvalue(res, i, i_sublocal));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4463,6 +4472,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
 		appendPQExpBufferStr(query, ", two_phase = on");
 
+	if (strcmp(subinfo->sublocal, "f") != 0)
+		appendPQExpBufferStr(query, ", publish_local_only = on");
+
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 997a3b6071..8edc7c849d 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -659,6 +659,7 @@ typedef struct _SubscriptionInfo
 	char	   *subtwophasestate;
 	char	   *subsynccommit;
 	char	   *subpublications;
+	char	   *sublocal;
 } SubscriptionInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index e3382933d9..885090d886 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6084,7 +6084,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false};
+	false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6124,6 +6124,12 @@ describeSubscriptions(const char *pattern, bool verbose)
 							  ", subtwophasestate AS \"%s\"\n",
 							  gettext_noop("Two phase commit"));
 
+		/* publish_local_only is only supported in v15 and higher */
+		if (pset.sversion >= 150000)
+			appendPQExpBuffer(&buf,
+							  ", sublocal AS \"%s\"\n",
+							  gettext_noop("Only local"));
+
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
 						  ",  subconninfo AS \"%s\"\n",
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 6957567264..dcd72c5257 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1834,7 +1834,7 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-		COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit");
+		COMPLETE_WITH("binary", "publish_local_only", "slot_name", "streaming", "synchronous_commit");
 	/* ALTER SUBSCRIPTION <name> SET PUBLICATION */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "PUBLICATION"))
 	{
@@ -3104,7 +3104,7 @@ psql_completion(const char *text, int start, int end)
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
 					  "enabled", "slot_name", "streaming",
-					  "synchronous_commit", "two_phase");
+					  "synchronous_commit", "two_phase", "publish_local_only");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 18c291289f..c4020210f3 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -65,6 +65,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	bool		substream;		/* Stream in-progress transactions. */
 
+	bool		sublocal;		/* skip copying of remote origin data */
+
 	char		subtwophasestate;	/* Stream two-phase transactions */
 
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
@@ -102,6 +104,7 @@ typedef struct Subscription
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
+	bool		only_local;		/* Skip copying of remote orgin data */
 	char		twophasestate;	/* Allow streaming two-phase transactions */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 1097cc9799..e3858d3b24 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -99,6 +99,8 @@ typedef struct LogicalDecodingContext
 	 */
 	bool		twophase_opt_given;
 
+	bool		only_local;		/* publish only locally generated data */
+
 	/*
 	 * State for writing output.
 	 */
@@ -138,6 +140,8 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+extern bool filter_remote_origin_cb_wrapper(LogicalDecodingContext *ctx,
+											RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index a16bebf76c..0883031176 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,6 +106,12 @@ typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
 typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
 											   RepOriginId origin_id);
 
+/*
+ * Filter remote origin changes.
+ */
+typedef bool (*LogicalDecodeFilterRemoteOriginCB) (struct LogicalDecodingContext *ctx,
+												   RepOriginId origin_id);
+
 /*
  * Called to shutdown an output plugin.
  */
@@ -246,6 +252,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeSequenceCB sequence_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+	LogicalDecodeFilterRemoteOriginCB filter_remote_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
 	/* streaming of changes at prepare time */
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index eafedd610a..0461f4e634 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -29,6 +29,7 @@ typedef struct PGOutputData
 	bool		streaming;
 	bool		messages;
 	bool		two_phase;
+	bool		only_local;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 92f73a55b8..65c83977a3 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -183,6 +183,7 @@ typedef struct
 			bool		streaming;	/* Streaming of large transactions */
 			bool		twophase;	/* Streaming of two-phase transactions at
 									 * prepare time */
+			bool		only_local; /* publish only locally generated data */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 80aae83562..535f6da660 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -70,16 +70,35 @@ ALTER SUBSCRIPTION regress_testsub3 ENABLE;
 ERROR:  cannot enable subscription that does not have a slot name
 ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
 ERROR:  ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions
+-- ok - with publish_local_only = true
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, publish_local_only = true);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+ regress_testsub4
+                                                                            List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Only local | Synchronous commit |          Conninfo           
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------+--------------------+-----------------------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t          | off                | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub4 SET (publish_local_only = false);
+\dRs+ regress_testsub4
+                                                                            List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Only local | Synchronous commit |          Conninfo           
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------+--------------------+-----------------------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f          | off                | dbname=regress_doesnotexist
+(1 row)
+
 DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Only local | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f          | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -94,10 +113,10 @@ ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
 \dRs+
-                                                                          List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | off                | dbname=regress_doesnotexist2
+                                                                                List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Only local | Synchronous commit |           Conninfo           
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f          | off                | dbname=regress_doesnotexist2
 (1 row)
 
 BEGIN;
@@ -129,10 +148,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                            List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | local              | dbname=regress_doesnotexist2
+                                                                                  List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Only local | Synchronous commit |           Conninfo           
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f          | local              | dbname=regress_doesnotexist2
 (1 row)
 
 -- rename back to keep the rest simple
@@ -165,19 +184,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Only local | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f          | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Only local | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f          | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -188,19 +207,19 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Only local | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f          | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Only local | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f          | off                | dbname=regress_doesnotexist
 (1 row)
 
 -- fail - publication already exists
@@ -215,10 +234,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Only local | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f          | off                | dbname=regress_doesnotexist
 (1 row)
 
 -- fail - publication used more then once
@@ -233,10 +252,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Only local | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f          | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -270,10 +289,10 @@ ERROR:  two_phase requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Only local | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f          | off                | dbname=regress_doesnotexist
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -282,10 +301,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Only local | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f          | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -294,10 +313,10 @@ DROP SUBSCRIPTION regress_testsub;
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
+                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Only local | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f          | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index bd0f4af1e4..313c4ada25 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -54,7 +54,14 @@ CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PU
 ALTER SUBSCRIPTION regress_testsub3 ENABLE;
 ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
 
+-- ok - with publish_local_only = true
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, publish_local_only = true);
+\dRs+ regress_testsub4
+ALTER SUBSCRIPTION regress_testsub4 SET (publish_local_only = false);
+\dRs+ regress_testsub4
+
 DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
 
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
diff --git a/src/test/subscription/t/029_circular.pl b/src/test/subscription/t/029_circular.pl
new file mode 100644
index 0000000000..752be9dccf
--- /dev/null
+++ b/src/test/subscription/t/029_circular.pl
@@ -0,0 +1,108 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test circular logical replication.
+#
+# Includes tests for circulation replication using publish_local_only option.
+#
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###################################################
+# Setup a circulation replication of pub/sub nodes.
+# node_A -> node_B -> node_A
+###################################################
+
+# Initialize nodes
+# node_A
+my $node_A = PostgreSQL::Test::Cluster->new('node_A');
+$node_A->init(allows_streaming => 'logical');
+$node_A->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
+$node_A->start;
+# node_B
+my $node_B = PostgreSQL::Test::Cluster->new('node_B');
+$node_B->init(allows_streaming => 'logical');
+$node_B->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
+$node_B->start;
+
+# Create tables on node_A
+$node_A->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Create the same tables on node_B
+$node_B->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication
+# node_A (pub) -> node_B (sub)
+my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
+$node_A->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_A FOR TABLE tab_full");
+my $appname_B = 'tap_sub_B';
+$node_B->safe_psql('postgres',	"
+	CREATE SUBSCRIPTION tap_sub_B
+	CONNECTION '$node_A_connstr application_name=$appname_B'
+	PUBLICATION tap_pub_A
+	WITH (publish_local_only = on)");
+
+# node_B (pub) -> node_A (sub)
+my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
+$node_B->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_B FOR TABLE tab_full");
+my $appname_A = 'tap_sub_A';
+$node_A->safe_psql('postgres',	"
+	CREATE SUBSCRIPTION tap_sub_A
+	CONNECTION '$node_B_connstr application_name=$appname_A'
+	PUBLICATION tap_pub_B
+	WITH (publish_local_only = on)");
+
+# Wait for subscribers to finish initialization
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_A);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_A->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+$node_B->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+is(1,1, "Circular replication setup is complete");
+
+my $result;
+
+##########################################################################
+# check that circular replication setup does not cause infinite recursive
+# insertion.
+##########################################################################
+
+# insert a record
+$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);");
+$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_A);
+
+# check that transaction was committed on subscriber(s)
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full;");
+is($result, qq(11
+12), 'Inserted successfully without leading to infinite recursion in circular replication setup');
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full;");
+is($result, qq(11
+12), 'Inserted successfully without leading to infinite recursion in circular replication setup');
+
+# shutdown
+$node_B->stop('fast');
+$node_A->stop('fast');
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d9b83f744f..9608cb7bbc 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1370,6 +1370,7 @@ LogicalDecodeCommitCB
 LogicalDecodeCommitPreparedCB
 LogicalDecodeFilterByOriginCB
 LogicalDecodeFilterPrepareCB
+LogicalDecodeFilterRemoteOriginCB
 LogicalDecodeMessageCB
 LogicalDecodePrepareCB
 LogicalDecodeRollbackPreparedCB
-- 
2.32.0

Reply via email to