From 044910c607ef91839bffb70d081ae682f3756e1c Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 10 Dec 2021 14:41:30 +0900
Subject: [PATCH v14] Add ALTER SUBSCRIPTION ... SKIP to skip the transaction
 on subscriber nodes.

If incoming change violates any constraint, logical replication stops
until it's resolved. This commit introduces another way to skip the
transaction in question, other than manually updating the subscriber's
database or using pg_replication_origin_advance().

The user can specify LSN by ALTER SUBSCRIPTION ... SKIP (lsn = XXX),
updating pg_subscription.subskiplsn field, telling the apply worker to
skip the transaction. The apply worker skips all data modification changes
within the specified transaction.

After successfully skipping the transaction or finishing the
transaction, the apply worker clears pg_subscription.subskiplsn.

Author: Masahiko Sawada
Reviewed-by: Vignesh C, Greg Nancarrow, Takamichi Osumi, Haiying Tang, Hou Zhijie, Peter Eisentraut, Amit Kapila
Discussion: https://postgr.es/m/CAD21AoDeScrsHhLyEPYqN3sydg6PxAPVBboK=30xJfUVihNZDA@mail.gmail.com
---
 doc/src/sgml/catalogs.sgml                 |  10 +
 doc/src/sgml/logical-replication.sgml      |  18 +-
 doc/src/sgml/ref/alter_subscription.sgml   |  40 ++++
 src/backend/catalog/pg_subscription.c      |   1 +
 src/backend/catalog/system_views.sql       |   2 +-
 src/backend/commands/subscriptioncmds.c    |  67 +++++++
 src/backend/parser/gram.y                  |   9 +
 src/backend/replication/logical/worker.c   | 218 ++++++++++++++++++++-
 src/bin/pg_dump/pg_dump.c                  |   4 +
 src/bin/psql/describe.c                    |   8 +-
 src/bin/psql/tab-complete.c                |   5 +-
 src/include/catalog/pg_subscription.h      |   5 +
 src/include/nodes/parsenodes.h             |   3 +-
 src/test/regress/expected/subscription.out | 126 ++++++------
 src/test/regress/sql/subscription.sql      |  11 ++
 src/test/subscription/t/030_skip_xact.pl   | 182 +++++++++++++++++
 16 files changed, 635 insertions(+), 74 deletions(-)
 create mode 100644 src/test/subscription/t/030_skip_xact.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 7777d60514..eec06b90e8 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7779,6 +7779,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subskiplsn</structfield> <type>pg_lsn</type>
+      </para>
+      <para>
+       Finish LSN of the transaction whose changes are to be skipped, if a valid
+       LSN; otherwise <literal>0/0</literal>.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subconninfo</structfield> <type>text</type>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 6431d4796d..18e4e4b186 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -366,15 +366,19 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
    transaction, the subscription needs to be disabled temporarily by
    <command>ALTER SUBSCRIPTION ... DISABLE</command> first or alternatively, the
    subscription can be used with the <literal>disable_on_error</literal> option.
-   Then, the transaction can be skipped by calling the
+   Then, the transaction can be skipped by using
+   <command>ALTER SUBSCRITPION ... SKIP</command> with the finish LSN
+   (i.e., LSN 0/14C0378). After that the replication
+   can be resumed by <command>ALTER SUBSCRIPTION ... ENABLE</command>.
+   Alternatively, the transaction can also be skipped by calling the
    <link linkend="pg-replication-origin-advance">
-   <function>pg_replication_origin_advance()</function></link> function with
-   the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the
-   next LSN of the transaction's LSN (i.e., LSN 0/14C0379).  After that the replication
-   can be resumed by <command>ALTER SUBSCRIPTION ... ENABLE</command>.  The current
-   position of origins can be seen in the
-   <link linkend="view-pg-replication-origin-status">
+   <function>pg_replication_origin_advance()</function></link> function with the
+   <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the next
+   LSN of the finish LSN (i.e., 0/14C0379).  The current position of origins can
+   be seen in the <link linkend="view-pg-replication-origin-status">
    <structname>pg_replication_origin_status</structname></link> system view.
+   Please note that skipping the whole transaction include skipping changes that
+   might not violate any constraint.  This can easily make the subscriber inconsistent.
   </para>
  </sect1>
 
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 58b78a94ea..266b5717d5 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -29,6 +29,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH PUB
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ENABLE
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DISABLE
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SKIP ( <replaceable class="parameter">skip_option</replaceable> = <replaceable class="parameter">value</replaceable> )
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
 </synopsis>
@@ -210,6 +211,45 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>SKIP ( <replaceable class="parameter">skip_option</replaceable> = <replaceable class="parameter">value</replaceable> )</literal></term>
+    <listitem>
+     <para>
+      Skips applying all changes of the specified remote transaction.  If incoming
+      data violates any constraints, logical replication will stop until it is
+      resolved.  By using <command>ALTER SUBSCRIPTION ... SKIP</command> command,
+      the logical replication worker skips all data modification changes within
+      the specified transaction.  This option has no effect on the transactions
+      that are already prepared by enabling <literal>two_phase</literal> on
+      subscriber.
+      After logical replication worker successfully skips the transaction or
+      finishes the transaction, LSN (stored in
+      <structname>pg_subscription</structname>.<structfield>subskiplsn</structfield>)
+      is cleared.  See <xref linkend="logical-replication-conflicts"/> for
+      the details of logical replication conflicts.
+     </para>
+
+     <para>
+      <replaceable>skip_option</replaceable> specifies options for this operation.
+      The supported option is:
+
+      <variablelist>
+       <varlistentry>
+        <term><literal>lsn</literal> (<type>pg_lsn</type>)</term>
+        <listitem>
+         <para>
+          Specifies the finish LSN of the remote transaction whose changes
+          are to be skipped by the logical replication worker.  Skipping individual
+          subtransactions is not supported.  Setting <literal>NONE</literal>
+          resets the LSN.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><replaceable class="parameter">new_owner</replaceable></term>
     <listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a6304f5f81..0ff0982f7b 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -70,6 +70,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
 	sub->disableonerr = subform->subdisableonerr;
+	sub->skiplsn = subform->subskiplsn;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index bb1ac30cd1..bd48ee7bd2 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1261,7 +1261,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
-              substream, subtwophasestate, subdisableonerr, subslotname,
+              substream, subtwophasestate, subdisableonerr, subskiplsn, subslotname,
               subsynccommit, subpublications)
     ON pg_subscription TO public;
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3922658bbc..4d0bee0403 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -45,6 +45,7 @@
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 #include "utils/syscache.h"
 
 /*
@@ -62,6 +63,7 @@
 #define SUBOPT_STREAMING			0x00000100
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
+#define SUBOPT_LSN					0x00000800
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -84,6 +86,7 @@ typedef struct SubOpts
 	bool		streaming;
 	bool		twophase;
 	bool		disableonerr;
+	XLogRecPtr	lsn;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -262,6 +265,33 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
 			opts->disableonerr = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_LSN) &&
+				 strcmp(defel->defname, "lsn") == 0)
+		{
+			char	   *lsn_str = defGetString(defel);
+			XLogRecPtr	lsn;
+
+			if (IsSet(opts->specified_opts, SUBOPT_LSN))
+				errorConflictingDefElem(defel, pstate);
+
+			/* Setting lsn = NONE is treated as resetting LSN */
+			if (strcmp(lsn_str, "none") == 0)
+				lsn = InvalidXLogRecPtr;
+			else
+			{
+				/* Parse the argument as LSN */
+				lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+													  CStringGetDatum(lsn_str)));
+
+				if (XLogRecPtrIsInvalid(lsn))
+					ereport(ERROR,
+							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							 errmsg("invalid WAL location (LSN): %s", lsn_str)));
+			}
+
+			opts->specified_opts |= SUBOPT_LSN;
+			opts->lsn = lsn;
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -479,6 +509,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
 					 LOGICALREP_TWOPHASE_STATE_DISABLED);
 	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
+	values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -1106,6 +1137,42 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				break;
 			}
 
+		case ALTER_SUBSCRIPTION_SKIP:
+			{
+				parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
+
+				/* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
+				Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
+
+				/*
+				 * If the user set subskiplsn, we do a sanity check to make
+				 * sure that the specified LSN is a probable value.
+				 */
+				if (!XLogRecPtrIsInvalid(opts.lsn))
+				{
+					RepOriginId originid;
+					char		originname[NAMEDATALEN];
+					XLogRecPtr	remote_lsn;
+
+					snprintf(originname, sizeof(originname), "pg_%u", subid);
+					originid = replorigin_by_name(originname, false);
+					remote_lsn = replorigin_get_progress(originid, false);
+
+					/* Check the given LSN is at least a future LSN */
+					if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
+						ereport(ERROR,
+								(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+								 errmsg("skip WAL location (LSN) must be greater than origin LSN %X/%X",
+										LSN_FORMAT_ARGS(remote_lsn))));
+				}
+
+				values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
+				replaces[Anum_pg_subscription_subskiplsn - 1] = true;
+
+				update_tuple = true;
+				break;
+			}
+
 		default:
 			elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
 				 stmt->kind);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a03b33b53b..0036c2f9e2 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9983,6 +9983,15 @@ AlterSubscriptionStmt:
 											(Node *)makeBoolean(false), @1));
 					$$ = (Node *)n;
 				}
+			| ALTER SUBSCRIPTION name SKIP definition
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+					n->kind = ALTER_SUBSCRIPTION_SKIP;
+					n->subname = $3;
+					n->options = $5;
+					$$ = (Node *)n;
+				}
 		;
 
 /*****************************************************************************
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 03e069c7cd..18419c9a60 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -136,6 +136,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
+#include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
@@ -189,6 +190,7 @@
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
 #include "utils/syscache.h"
@@ -259,6 +261,21 @@ static bool in_streamed_transaction = false;
 
 static TransactionId stream_xid = InvalidTransactionId;
 
+/*
+ * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
+ * the subscription if the remote transaction's finish LSN matches the subskiplsn.
+ * Once we start skipping changes, we don't stop it until we skip all changes of
+ * the transaction even if pg_subscription is updated and MySubscription->skiplsn
+ * gets changed or reset during that.  Also, in streaming transaction cases, we
+ * don't skip receiving and spooling the changes since we decide whether or not
+ * to skip applying the changes when starting to apply changes.  The subskiplsn is
+ * cleared after successfully skipping the transaction or applying non-empty
+ * transaction. The latter prevents the mistakenly specified subskiplsn from
+ * being left.
+ */
+static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
+#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
+
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
@@ -336,6 +353,11 @@ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int
 /* Common streaming function to apply all the spooled messages */
 static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
 
+/* Functions for skipping changes */
+static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
+static void stop_skipping_changes(void);
+static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
+
 /* Functions for apply error callback */
 static void apply_error_callback(void *arg);
 static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
@@ -795,6 +817,8 @@ apply_handle_begin(StringInfo s)
 
 	remote_final_lsn = begin_data.final_lsn;
 
+	maybe_start_skipping_changes(begin_data.final_lsn);
+
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
@@ -847,6 +871,8 @@ apply_handle_begin_prepare(StringInfo s)
 
 	remote_final_lsn = begin_data.prepare_lsn;
 
+	maybe_start_skipping_changes(begin_data.prepare_lsn);
+
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
@@ -905,9 +931,9 @@ apply_handle_prepare(StringInfo s)
 
 	/*
 	 * Unlike commit, here, we always prepare the transaction even though no
-	 * change has happened in this transaction. It is done this way because at
-	 * commit prepared time, we won't know whether we have skipped preparing a
-	 * transaction because of no change.
+	 * change has happened in this transaction or all changes are skipped. It
+	 * is done this way because at commit prepared time, we won't know whether
+	 * we have skipped preparing a transaction because of no change.
 	 *
 	 * XXX, We can optimize such that at commit prepared time, we first check
 	 * whether we have prepared the transaction or not but that doesn't seem
@@ -928,6 +954,15 @@ apply_handle_prepare(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
+	/*
+	 * Since we already have prepared the transaction, in a case where the
+	 * server crashes before clearing the subskiplsn, it will be left but the
+	 * transaction won't be resent.  But that's okay because it's a rare case
+	 * and the subskiplsn will be cleared when finishing the next transaction.
+	 */
+	stop_skipping_changes();
+	clear_subscription_skip_lsn(prepare_data.prepare_lsn);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -969,6 +1004,8 @@ apply_handle_commit_prepared(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
+	clear_subscription_skip_lsn(prepare_data.end_lsn);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -1010,6 +1047,8 @@ apply_handle_rollback_prepared(StringInfo s)
 		FinishPreparedTransaction(gid, false);
 		end_replication_step();
 		CommitTransactionCommand();
+
+		clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
 	}
 
 	pgstat_report_stat(false);
@@ -1072,6 +1111,13 @@ apply_handle_stream_prepare(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
+	/*
+	 * Similar to prepare case, the subskiplsn could be left in a case of
+	 * server crash but it's okay. See the comments in apply_handle_prepare().
+	 */
+	stop_skipping_changes();
+	clear_subscription_skip_lsn(prepare_data.prepare_lsn);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 
 	reset_apply_error_context_info();
@@ -1311,6 +1357,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
 	MemoryContext oldcxt;
 	BufFile    *fd;
 
+	maybe_start_skipping_changes(lsn);
+
 	/* Make sure we have an open transaction */
 	begin_replication_step();
 
@@ -1455,8 +1503,26 @@ apply_handle_stream_commit(StringInfo s)
 static void
 apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 {
+	if (is_skipping_changes())
+	{
+		stop_skipping_changes();
+
+		/*
+		 * Start a new transaction to clear the subskipxid, if not started
+		 * yet. The transaction is committed below.
+		 */
+		if (!IsTransactionState())
+			StartTransactionCommand();
+	}
+
 	if (IsTransactionState())
 	{
+		/*
+		 * The transaction is either non-empty or skipped, so we clear the
+		 * subskiplsn.
+		 */
+		clear_subscription_skip_lsn(commit_data->commit_lsn);
+
 		/*
 		 * Update origin state so we can restart streaming from correct
 		 * position in case of crash.
@@ -1583,7 +1649,8 @@ apply_handle_insert(StringInfo s)
 	TupleTableSlot *remoteslot;
 	MemoryContext oldctx;
 
-	if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
+	if (is_skipping_changes() ||
+		handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
 		return;
 
 	begin_replication_step();
@@ -1710,7 +1777,8 @@ apply_handle_update(StringInfo s)
 	RangeTblEntry *target_rte;
 	MemoryContext oldctx;
 
-	if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
+	if (is_skipping_changes() ||
+		handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
 		return;
 
 	begin_replication_step();
@@ -1874,7 +1942,8 @@ apply_handle_delete(StringInfo s)
 	TupleTableSlot *remoteslot;
 	MemoryContext oldctx;
 
-	if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
+	if (is_skipping_changes() ||
+		handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
 		return;
 
 	begin_replication_step();
@@ -2261,7 +2330,8 @@ apply_handle_truncate(StringInfo s)
 	ListCell   *lc;
 	LOCKMODE	lockmode = AccessExclusiveLock;
 
-	if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
+	if (is_skipping_changes() ||
+		handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
 		return;
 
 	begin_replication_step();
@@ -3738,6 +3808,140 @@ IsLogicalWorker(void)
 	return MyLogicalRepWorker != NULL;
 }
 
+/*
+ * Start skipping changes of the transaction if the given LSN matches the
+ * LSN specified by subscription's skiplsn.
+ */
+static void
+maybe_start_skipping_changes(XLogRecPtr finish_lsn)
+{
+	Assert(!is_skipping_changes());
+	Assert(!in_remote_transaction);
+	Assert(!in_streamed_transaction);
+
+	/*
+	 * Quick return if it's not requested to skip this transaction. This
+	 * function is called every start of applying changes and we assume that
+	 * skipping the transaction is not used in many cases.
+	 */
+	if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
+			   MySubscription->skiplsn != finish_lsn))
+		return;
+
+	/* Start skipping all changes of this transaction */
+	skip_xact_finish_lsn = finish_lsn;
+
+	ereport(LOG,
+			errmsg("start skipping logical replication transaction which finished at %X/%X",
+				   LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
+}
+
+/*
+ * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
+ */
+static void
+stop_skipping_changes(void)
+{
+	if (!is_skipping_changes())
+		return;
+
+	ereport(LOG,
+			(errmsg("done skipping logical replication transaction which finished at %X/%X",
+					LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
+
+	/* Stop skipping changes */
+	skip_xact_finish_lsn = InvalidXLogRecPtr;
+}
+
+/*
+ * Clear subskiplsn of pg_subscription catalog with origin state updated.
+ *
+ * finish_lsn is the transaction's finish LSN that is used to check if the
+ * subskiplsn matches it.  If not matched, we raise a warning when clearing the
+ * subskipxid in order to inform users for cases e.g., where the user mistakenly
+ * specified the wrong subskiplsn.
+ */
+static void
+clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
+{
+	Relation	rel;
+	Form_pg_subscription subform;
+	HeapTuple	tup;
+	XLogRecPtr	myskiplsn = MySubscription->skiplsn;
+	bool		started_tx = false;
+
+	if (likely(XLogRecPtrIsInvalid(myskiplsn)))
+		return;
+
+	if (!IsTransactionState())
+	{
+		StartTransactionCommand();
+		started_tx = true;
+	}
+
+	/*
+	 * Protect subskiplsn of pg_subscription from being concurrently updated
+	 * while clearing it.
+	 */
+	LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
+					 AccessShareLock);
+
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+	/* Fetch the existing tuple. */
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
+							  ObjectIdGetDatum(MySubscription->oid));
+
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
+
+	subform = (Form_pg_subscription) GETSTRUCT(tup);
+
+	/*
+	 * Update the subskiplsn of the tuple to InvalidXLogRecPtr.  If user has
+	 * already changed subskiplsn before clearing it we don't update the
+	 * catalog and don't advance the replication origin state.  So in the
+	 * worst case, if the server crashes before sending an acknowledgment of
+	 * the flush position the transaction will be sent again and the user
+	 * needs to set subskiplsn again.  We can reduce the possibility by
+	 * logging a replication origin WAL record to advance the origin LSN
+	 * instead but there is no way to advance the origin timestamp and it
+	 * doesn't seem to be worth doing anything about it since it's a very rare
+	 * case.
+	 */
+	if (subform->subskiplsn == myskiplsn)
+	{
+		bool		nulls[Natts_pg_subscription];
+		bool		replaces[Natts_pg_subscription];
+		Datum		values[Natts_pg_subscription];
+
+		memset(values, 0, sizeof(values));
+		memset(nulls, false, sizeof(nulls));
+		memset(replaces, false, sizeof(replaces));
+
+		/* reset subskiplsn */
+		values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
+		replaces[Anum_pg_subscription_subskiplsn - 1] = true;
+
+		tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+								replaces);
+		CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+		if (myskiplsn != finish_lsn)
+			ereport(WARNING,
+					errmsg("skip-LSN of logical replication subscription \"%s\" cleared", MySubscription->name),
+					errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X",
+							  LSN_FORMAT_ARGS(finish_lsn),
+							  LSN_FORMAT_ARGS(myskiplsn)));
+	}
+
+	heap_freetuple(tup);
+	table_close(rel, NoLock);
+
+	if (started_tx)
+		CommitTransactionCommand();
+}
+
 /* Error callback to give more context info about the change being applied */
 static void
 apply_error_callback(void *arg)
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 4dd24b8c89..202bca4b23 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4359,6 +4359,10 @@ getSubscriptions(Archive *fout)
 
 	ntups = PQntuples(res);
 
+	/*
+	 * Get subscription fields. We don't include subskiplsn in the dump as
+	 * after restoring the dump this value may no longer be relevant.
+	 */
 	i_tableoid = PQfnumber(res, "tableoid");
 	i_oid = PQfnumber(res, "oid");
 	i_subname = PQfnumber(res, "subname");
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 9229eacb6d..4c6c370b6f 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6084,7 +6084,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false, false};
+	false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6131,6 +6131,12 @@ describeSubscriptions(const char *pattern, bool verbose)
 						  ",  subconninfo AS \"%s\"\n",
 						  gettext_noop("Synchronous commit"),
 						  gettext_noop("Conninfo"));
+
+		/* Skip LSN is only supported in v15 and higher */
+		if (pset.sversion >= 150000)
+			appendPQExpBuffer(&buf,
+							  ", subskiplsn AS \"%s\"\n",
+							  gettext_noop("Skip LSN"));
 	}
 
 	/* Only display subscriptions in current database. */
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 17172827a9..11cb41128e 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1819,7 +1819,7 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> */
 	else if (Matches("ALTER", "SUBSCRIPTION", MatchAny))
 		COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO",
-					  "RENAME TO", "REFRESH PUBLICATION", "SET",
+					  "RENAME TO", "REFRESH PUBLICATION", "SET", "SKIP (",
 					  "ADD PUBLICATION", "DROP PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
@@ -1835,6 +1835,9 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
 		COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error");
+	/* ALTER SUBSCRIPTION <name> SKIP ( */
+	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
+		COMPLETE_WITH("lsn");
 	/* ALTER SUBSCRIPTION <name> SET PUBLICATION */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "PUBLICATION"))
 	{
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index e2befaf351..9275212d56 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -70,6 +70,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subdisableonerr;	/* True if a worker error should cause the
 									 * subscription to be disabled */
 
+	XLogRecPtr	subskiplsn;		/* All changes which finished at this LSN are
+								 * skipped */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -109,6 +112,8 @@ typedef struct Subscription
 	bool		disableonerr;	/* Indicates if the subscription should be
 								 * automatically disabled if a worker error
 								 * occurs */
+	XLogRecPtr	skiplsn;		/* All changes which finished at this LSN are
+								 * skipped */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 1617702d9d..6f83a79a96 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3726,7 +3726,8 @@ typedef enum AlterSubscriptionType
 	ALTER_SUBSCRIPTION_ADD_PUBLICATION,
 	ALTER_SUBSCRIPTION_DROP_PUBLICATION,
 	ALTER_SUBSCRIPTION_REFRESH,
-	ALTER_SUBSCRIPTION_ENABLED
+	ALTER_SUBSCRIPTION_ENABLED,
+	ALTER_SUBSCRIPTION_SKIP
 } AlterSubscriptionType;
 
 typedef struct AlterSubscriptionStmt
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index ad8003fae1..7fcfad1591 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -93,11 +93,25 @@ ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2
 ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
+-- ok
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
+\dRs+
+                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2 | 0/12345
+(1 row)
+
+-- ok - with lsn = NONE
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
+-- fail
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
+ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                   List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2
+                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -129,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                     List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | local              | dbname=regress_doesnotexist2
+                                                                                           List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -165,19 +179,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -188,19 +202,19 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -215,10 +229,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more then once
@@ -233,10 +247,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -270,10 +284,10 @@ ERROR:  two_phase requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -282,10 +296,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -294,10 +308,10 @@ DROP SUBSCRIPTION regress_testsub;
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -309,18 +323,18 @@ ERROR:  disable_on_error requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index a7c15b1daf..74c38ead5d 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -72,6 +72,17 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = '');
 ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2';
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 
+-- ok
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
+
+\dRs+
+
+-- ok - with lsn = NONE
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
+
+-- fail
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
+
 \dRs+
 
 BEGIN;
diff --git a/src/test/subscription/t/030_skip_xact.pl b/src/test/subscription/t/030_skip_xact.pl
new file mode 100644
index 0000000000..d631400673
--- /dev/null
+++ b/src/test/subscription/t/030_skip_xact.pl
@@ -0,0 +1,182 @@
+
+# Copyright (c) 2022, PostgreSQL Global Development Group
+
+# Tests for skipping logical replication transactions
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $offset  = 0;
+my $relname = 'tap_tab';
+my $subname = 'tap_sub';
+
+# Test skipping the transaction. This function must be called after the caller
+# has inserted data that conflicts with the subscriber.  The finish LSN of the
+# error transaction that is used to specify to ALTER SUBSCRIPTION ... SKIP is
+# fetched from the server logs. After executing ALTER SUBSCRITPION ... SKIP, we
+# check if logical replication can continue working by inserting $nonconflict_data
+# on the publisher.
+sub test_skip_xact
+{
+	my ($node_publisher, $node_subscriber, $nonconflict_data, $expected, $msg)
+	  = @_;
+
+	# Wait until a conflict occurs on the subscriber.
+	$node_subscriber->poll_query_until(
+		'postgres',
+		qq[
+SELECT subenabled = FALSE FROM pg_subscription WHERE subname = '$subname'
+]);
+
+	# Get the finish LSN of the error transaction.
+	my $contents = slurp_file($node_subscriber->logfile, $offset);
+	$contents =~
+	  qr/processing remote data for replication origin \"pg_\d+\" during "INSERT" for replication target relation "public.$relname" in transaction \d+ finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/
+	  or die "could not get error-LSN";
+	my $lsn = $1;
+
+	# Set skip lsn.
+	$node_subscriber->safe_psql('postgres',
+		"ALTER SUBSCRIPTION $subname SKIP (lsn = '$lsn')");
+
+	# Re-enable the subscription.
+	$node_subscriber->safe_psql('postgres',
+		"ALTER SUBSCRIPTION $subname ENABLE");
+
+	# Wait for the failed transaction to be skipped
+	$node_subscriber->poll_query_until('postgres',
+		"SELECT subskiplsn = '0/0' FROM pg_subscription WHERE subname = '$subname'"
+	);
+
+	# Check the log indicating that successfully skipped the transaction, and
+	# advance the offset of the log file for the next test.
+	$offset = $node_subscriber->wait_for_log(
+		qr/LOG:  done skipping logical replication transaction which finished at $lsn/,
+		$offset);
+
+	# Insert non-conflict data
+	$node_publisher->safe_psql('postgres',
+		"INSERT INTO $relname VALUES $nonconflict_data");
+
+	$node_publisher->wait_for_catchup($subname);
+
+	# Check replicated data
+	my $res = $node_subscriber->safe_psql('postgres',
+		"SELECT count(*) FROM $relname");
+	is($res, $expected, $msg);
+}
+
+# Create publisher node. Set a low value to logical_decoding_work_mem
+# so we can test streaming cases easily.
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf(
+	'postgresql.conf',
+	qq[
+logical_decoding_work_mem = 64kB
+max_prepared_transactions = 10
+]);
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf(
+	'postgresql.conf',
+	qq[
+max_prepared_transactions = 10
+]);
+
+$node_subscriber->start;
+
+# Initial table setup on both publisher and subscriber. On the subscriber, we
+# create the same tables but with primary keys. Also, insert some data that
+# will conflict with the data replicated from publisher later.
+$node_publisher->safe_psql(
+	'postgres',
+	qq[
+BEGIN;
+CREATE TABLE $relname (a int, b text);
+COMMIT;
+]);
+$node_subscriber->safe_psql(
+	'postgres',
+	qq[
+BEGIN;
+CREATE TABLE $relname (a int primary key, b text);
+INSERT INTO $relname VALUES (1);
+COMMIT;
+]);
+
+# Setup publications
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql(
+	'postgres',
+	qq[
+CREATE PUBLICATION tap_pub FOR TABLE $relname;
+]);
+
+# Create subscriptions. Both subscription sets disable_on_error to on
+# so that they get disabled when a conflict occurs.
+$node_subscriber->safe_psql(
+	'postgres',
+	qq[
+CREATE SUBSCRIPTION $subname CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = on, two_phase = on, disable_on_error = on);
+]);
+
+$node_publisher->wait_for_catchup($subname);
+$node_subscriber->poll_query_until(
+	'postgres',
+	qq[
+SELECT COUNT(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r')
+]);
+
+# Insert data to test_tab1, raising an error on the subscriber due to violation
+# of the unique constraint on test_tab. Then skip the transaction.
+$node_publisher->safe_psql(
+	'postgres',
+	qq[
+BEGIN;
+INSERT INTO $relname VALUES (1);
+COMMIT;
+]);
+test_skip_xact($node_publisher, $node_subscriber,
+	"(2, NULL)", "2", "test skipping transaction");
+
+# Test for PREPARE and COMMIT PREPARED. Insert the same data to test_tab1 and
+# PREPARE the transaction, raising an error. Then skip the transaction.
+$node_publisher->safe_psql(
+	'postgres',
+	qq[
+BEGIN;
+INSERT INTO $relname VALUES (1);
+PREPARE TRANSACTION 'gtx';
+COMMIT PREPARED 'gtx';
+]);
+test_skip_xact($node_publisher, $node_subscriber,
+	"(3, NULL)", "3", "test skipping prepare and commit prepared ");
+
+# Test for STREAM COMMIT. Insert enough rows to test_tab to exceed the 64kB
+# limit, also raising an error on the subscriber during applying spooled
+# changes for the same reason. Then skip the transaction.
+$node_publisher->safe_psql(
+	'postgres',
+	qq[
+BEGIN;
+INSERT INTO $relname SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i);
+COMMIT;
+]);
+test_skip_xact($node_publisher, $node_subscriber, "(4, md5(4::text))",
+	"4", "test skipping stream-commit");
+
+my $res = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts");
+is($res, "0",
+	"check all prepared transactions are resolved on the subscriber");
+
+$node_subscriber->stop;
+$node_publisher->stop;
+
+done_testing();
-- 
2.24.3 (Apple Git-128)

