From aeb37688ecb78212eec890b761b3d4fa1c56f6df Mon Sep 17 00:00:00 2001
From: Takamichi Osumi <osumi.takamichi@fujitsu.com>
Date: Wed, 25 Jan 2023 05:23:18 +0000
Subject: [PATCH v21] Time-delayed logical replication subscriber

Similar to physical replication, a time-delayed copy of the data for
logical replication is useful for some scenarios (particularly to fix
errors that might cause data loss).

This patch implements a new subscription parameter called 'min_apply_delay'.

If the subscription sets min_apply_delay parameter, the logical
replication worker will delay the transaction apply for min_apply_delay
milliseconds.

The delay is calculated between the WAL time stamp and the current time
on the subscriber.

The delay occurs only on WAL records for transaction begins. The main
reason is to avoid keeping a transaction open for a long time. Regular
and prepared transactions are covered. Streamed transactions are also
covered.

The combination of parallel streaming mode and min_apply_delay is not
allowed. This is because we start applying the transaction stream as
soon as the first change arrives without knowing the transaction's
prepare/commit time. This means we cannot calculate the underlying
network/decoding lag between publisher and subscriber, and so always
waiting for the full 'min_apply_delay' period might include
unnecessary delay.

The other possibility is to apply the delay at the end of the parallel
apply transaction but that would cause issues related to resource
bloat and locks being held for a long time.

Note that this feature doesn't interact with skip transaction feature.
The skip transaction feature applies to one transaction with a specific LSN.
So, even if the skipped transaction and non-skipped transaction come
consecutively in a very short time, regardless of the order of which comes
first, the time-delayed feature gets balanced by delayed application
for other transactions before and after the skipped transaction.

Author: Euler Taveira, Takamichi Osumi, Kuroda Hayato
Reviewed-by: Amit Kapila, Peter Smith, Vignesh C, Shveta Malik,
             Kyotaro Horiguchi, Dilip Kumar, Melih Mutlu
Discussion: https://postgr.es/m/CAB-JLwYOYwL=XTyAXKiH5CtM_Vm8KjKh7aaitCKvmCh4rzr5pQ@mail.gmail.com
---
 doc/src/sgml/catalogs.sgml                    |   9 +
 doc/src/sgml/config.sgml                      |  11 ++
 doc/src/sgml/glossary.sgml                    |  14 ++
 doc/src/sgml/logical-replication.sgml         |   6 +
 doc/src/sgml/ref/alter_subscription.sgml      |   5 +-
 doc/src/sgml/ref/create_subscription.sgml     |  60 +++++-
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/catalog/system_views.sql          |   7 +-
 src/backend/commands/subscriptioncmds.c       | 117 ++++++++++-
 .../replication/logical/applyparallelworker.c |   3 +-
 src/backend/replication/logical/worker.c      | 181 ++++++++++++++---
 src/bin/pg_dump/pg_dump.c                     |  15 +-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/describe.c                       |   9 +-
 src/bin/psql/tab-complete.c                   |   4 +-
 src/include/catalog/pg_subscription.h         |   3 +
 src/include/replication/worker_internal.h     |   2 +-
 src/test/regress/expected/subscription.out    | 181 ++++++++++-------
 src/test/regress/sql/subscription.sql         |  24 +++
 src/test/subscription/meson.build             |   1 +
 src/test/subscription/t/032_apply_delay.pl    | 187 ++++++++++++++++++
 21 files changed, 724 insertions(+), 117 deletions(-)
 create mode 100644 src/test/subscription/t/032_apply_delay.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index c1e4048054..2b62beed59 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7873,6 +7873,15 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subminapplydelay</structfield> <type>int8</type>
+      </para>
+      <para>
+       Total time spent delaying the application of changes, in milliseconds.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subname</structfield> <type>name</type>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index f985afc009..ee91a1fc02 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4787,6 +4787,17 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
        the <filename>postgresql.conf</filename> file or on the server
        command line.
       </para>
+      <para>
+       For time-delayed logical replication, the apply worker sends a feedback
+       message to the publisher every
+       <varname>wal_receiver_status_interval</varname> milliseconds. Make sure
+       to set <varname>wal_receiver_status_interval</varname> less than the
+       <varname>wal_sender_timeout</varname> on the publisher, otherwise, the
+       <literal>walsender</literal> will repeatedly terminate due to timeout
+       error. If <varname>wal_receiver_status_interval</varname> is set to
+       zero, the apply worker doesn't send any feedback messages during the
+       <literal>min_apply_delay</literal> period.
+      </para>
       </listitem>
      </varlistentry>
 
diff --git a/doc/src/sgml/glossary.sgml b/doc/src/sgml/glossary.sgml
index 7c01a541fe..6ed6fa5853 100644
--- a/doc/src/sgml/glossary.sgml
+++ b/doc/src/sgml/glossary.sgml
@@ -1729,6 +1729,20 @@
    </glossdef>
   </glossentry>
 
+  <glossentry id="glossary-time-delayed-replication">
+   <glossterm>Time-delayed replication</glossterm>
+   <glossdef>
+     <para>
+      Replication setup that applies time-delayed copy of the data.
+    </para>
+    <para>
+     For more information, see
+     <xref linkend="guc-recovery-min-apply-delay"/> for physical replication
+     and <xref linkend="sql-createsubscription"/> for logical replication.
+    </para>
+   </glossdef>
+  </glossentry>
+
   <glossentry id="glossary-toast">
    <glossterm>TOAST</glossterm>
    <glossdef>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 1bd5660c87..d8ae93f88d 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -247,6 +247,12 @@
    target table.
   </para>
 
+  <para>
+   A logical replication subscription can delay the application of changes by
+   specifying the <literal>min_apply_delay</literal> subscription parameter.
+   See <xref linkend="sql-createsubscription"/> for details.
+  </para>
+
   <sect2 id="logical-replication-subscription-slot">
    <title>Replication Slot Management</title>
 
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index ad93553a1d..1c6e9dd2d1 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -213,8 +213,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
       <literal>binary</literal>, <literal>streaming</literal>,
-      <literal>disable_on_error</literal>, and
-      <literal>origin</literal>.
+      <literal>disable_on_error</literal>,
+      <literal>origin</literal>, and
+      <literal>min_apply_delay</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index eba72c6af6..c4a615ee5c 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -349,7 +349,48 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
-      </variablelist></para>
+
+       <varlistentry>
+        <term><literal>min_apply_delay</literal> (<type>integer</type>)</term>
+        <listitem>
+         <para>
+          By default, the subscriber applies changes as soon as possible. This
+          parameter allows the user to delay the application of changes by a
+          given time period. If the value is specified without units, it is
+          taken as milliseconds. The default is zero (no delay). See
+          <xref linkend="config-setting-names-values"/> for details on the
+          available valid time unites.
+         </para>
+         <para>
+          Any delay occurs only on WAL records for transaction begins after all
+          initial table synchronization has finished. The delay is calculated
+          as the difference between the WAL timestamp as written on the
+          publisher and the current time on the subscriber. Any overhead of
+          time spent in logical decoding and in transferring the transaction
+          may reduce the actual wait time. It is also possible that the overhead
+          already exceeds the requested <literal>min_apply_delay</literal> value,
+          in which case no additional wait is necessary. If the system clocks
+          on publisher and subscriber are not synchronized, this may lead to
+          apply changes earlier than expected, but this is not a major issue
+          because this parameter is typically much larger than the time
+          deviations between servers. Note that if this parameter is set to a
+          long delay, the replication will stop if the replication slot falls
+          behind the current LSN by more than
+          <link linkend="guc-max-slot-wal-keep-size"><literal>max_slot_wal_keep_size</literal></link>.
+         </para>
+         <warning>
+           <para>
+            Delaying the replication can mean there is a much longer time
+            between making a change on the publisher, and that change being
+            committed on the subscriber. This can impact the performance of
+            synchronous replication. See <xref linkend="guc-synchronous-commit"/>
+            parameter.
+           </para>
+         </warning>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
 
     </listitem>
    </varlistentry>
@@ -413,6 +454,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    published with different column lists are not supported.
   </para>
 
+  <para>
+   A non-zero <literal>min_apply_delay</literal> parameter is not allowed when
+   streaming in parallel mode.
+  </para>
+
   <para>
    We allow non-existent publications to be specified so that users can add
    those later. This means
@@ -472,6 +518,18 @@ CREATE SUBSCRIPTION mysub
         PUBLICATION insert_only
                WITH (enabled = false);
 </programlisting></para>
+
+  <para>
+   Create a subscription to a remote server that replicates tables in
+   the <literal>mypub</literal> publication and starts replicating immediately
+   on commit. Pre-existing data is not copied. The application of changes is
+   delayed by 4 hours.
+<programlisting>
+CREATE SUBSCRIPTION mysub
+         CONNECTION 'host=192.0.2.4 port=5432 user=foo dbname=foodb'
+        PUBLICATION mypub
+               WITH (copy_data = false, min_apply_delay = '4h');
+</programlisting></para>
  </refsect1>
 
  <refsect1>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a56ae311c3..c767cc1c3a 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -64,6 +64,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->oid = subid;
 	sub->dbid = subform->subdbid;
 	sub->skiplsn = subform->subskiplsn;
+	sub->minapplydelay = subform->subminapplydelay;
 	sub->name = pstrdup(NameStr(subform->subname));
 	sub->owner = subform->subowner;
 	sub->enabled = subform->subenabled;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 8608e3fa5b..317c2010cb 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1299,9 +1299,10 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
-              subbinary, substream, subtwophasestate, subdisableonerr,
-              subslotname, subsynccommit, subpublications, suborigin)
+GRANT SELECT (oid, subdbid, subskiplsn, subminapplydelay, subname, subowner,
+              subenabled, subbinary, substream, subtwophasestate,
+              subdisableonerr, subslotname, subsynccommit, subpublications,
+              suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 464db6d247..b4d075c931 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -66,6 +66,7 @@
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
 #define SUBOPT_LSN					0x00000800
 #define SUBOPT_ORIGIN				0x00001000
+#define SUBOPT_MIN_APPLY_DELAY		0x00002000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -90,6 +91,7 @@ typedef struct SubOpts
 	bool		disableonerr;
 	char	   *origin;
 	XLogRecPtr	lsn;
+	int			min_apply_delay;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -100,7 +102,7 @@ static void check_publications_origin(WalReceiverConn *wrconn,
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
-
+static int	defGetMinApplyDelay(DefElem *def);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -146,6 +148,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->disableonerr = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY))
+		opts->min_apply_delay = 0;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -324,6 +328,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) &&
+				 strcmp(defel->defname, "min_apply_delay") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_MIN_APPLY_DELAY))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_MIN_APPLY_DELAY;
+			opts->min_apply_delay = defGetMinApplyDelay(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -404,6 +417,31 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 								"slot_name = NONE", "create_slot = false")));
 		}
 	}
+
+	/*
+	 * The combination of parallel streaming mode and min_apply_delay is not
+	 * allowed. This is because we start applying the transaction stream as
+	 * soon as the first change arrives without knowing the transaction's
+	 * prepare/commit time. This means we cannot calculate the underlying
+	 * network/decoding lag between publisher and subscriber, and so always
+	 * waiting for the full 'min_apply_delay' period might include unnecessary
+	 * delay.
+	 *
+	 * The other possibility is to apply the delay at the end of the parallel
+	 * apply transaction but that would cause issues related to resource bloat
+	 * and locks being held for a long time.
+	 */
+	if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) &&
+		opts->min_apply_delay > 0 && opts->streaming == LOGICALREP_STREAM_PARALLEL)
+		ereport(ERROR,
+				errcode(ERRCODE_SYNTAX_ERROR),
+
+		/*
+		 * - translator: the first %s is a string of the form "option > value"
+		 * and the second one is "option = value".
+		 */
+				errmsg("%s and %s are mutually exclusive options",
+					   "min_apply_delay > 0", "streaming = parallel"));
 }
 
 /*
@@ -560,7 +598,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
+					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN |
+					  SUBOPT_MIN_APPLY_DELAY);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -625,6 +664,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
 	values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
 	values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
+	values[Anum_pg_subscription_subminapplydelay - 1] = Int64GetDatum(opts.min_apply_delay);
 	values[Anum_pg_subscription_subname - 1] =
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
@@ -1054,7 +1094,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_ORIGIN | SUBOPT_MIN_APPLY_DELAY);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1098,6 +1138,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
 				{
+					/*
+					 * The combination of parallel streaming mode and
+					 * min_apply_delay is not allowed. See
+					 * parse_subscription_options for details of the reason.
+					 */
+					if (opts.streaming == LOGICALREP_STREAM_PARALLEL)
+						if ((IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) && opts.min_apply_delay > 0) ||
+							(!IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) && sub->minapplydelay > 0))
+							ereport(ERROR,
+									errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+									errmsg("cannot enable parallel streaming mode for subscription with %s",
+										   "min_apply_delay"));
+
 					values[Anum_pg_subscription_substream - 1] =
 						CharGetDatum(opts.streaming);
 					replaces[Anum_pg_subscription_substream - 1] = true;
@@ -1111,6 +1164,25 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 						= true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY))
+				{
+					/*
+					 * The combination of parallel streaming mode and
+					 * min_apply_delay is not allowed.
+					 */
+					if (opts.min_apply_delay > 0)
+						if ((IsSet(opts.specified_opts, SUBOPT_STREAMING) && opts.streaming == LOGICALREP_STREAM_PARALLEL) ||
+							(!IsSet(opts.specified_opts, SUBOPT_STREAMING) && sub->stream == LOGICALREP_STREAM_PARALLEL))
+							ereport(ERROR,
+									errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+									errmsg("cannot enable %s for subscription in %s mode",
+										   "min_apply_delay", "streaming = parallel"));
+
+					values[Anum_pg_subscription_subminapplydelay - 1] =
+						Int64GetDatum(opts.min_apply_delay);
+					replaces[Anum_pg_subscription_subminapplydelay - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
 				{
 					values[Anum_pg_subscription_suborigin - 1] =
@@ -2195,3 +2267,42 @@ defGetStreamingMode(DefElem *def)
 					def->defname)));
 	return LOGICALREP_STREAM_OFF;	/* keep compiler quiet */
 }
+
+/*
+ * Extract the min_apply_delay value from a DefElem. This is very similar to
+ * parse_and_validate_value() for integer values, because min_apply_delay
+ * accepts the same parameter format as recovery_min_apply_delay.
+ */
+static int
+defGetMinApplyDelay(DefElem *def)
+{
+	char	   *input_string;
+	int			result;
+	const char *hintmsg;
+
+	input_string = defGetString(def);
+
+	/*
+	 * Parse given string as parameter which has millisecond unit
+	 */
+	if (!parse_int(input_string, &result, GUC_UNIT_MS, &hintmsg))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("invalid value for parameter \"%s\": \"%s\"",
+						"min_apply_delay", input_string),
+				 hintmsg ? errhint("%s", _(hintmsg)) : 0));
+
+	/*
+	 * Check lower bound. parse_int() has already been confirmed that result
+	 * is less than or equal to INT_MAX.
+	 */
+	if (result < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("%d ms is outside the valid range for parameter \"%s\" (%d .. %d)",
+						result,
+						"min_apply_delay",
+						0, INT_MAX)));
+
+	return result;
+}
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 3579e704fe..7302bce7a0 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -704,7 +704,8 @@ pa_process_spooled_messages_if_required(void)
 	{
 		apply_spooled_messages(&MyParallelShared->fileset,
 							   MyParallelShared->xid,
-							   InvalidXLogRecPtr);
+							   InvalidXLogRecPtr,
+							   0);
 		pa_set_fileset_state(MyParallelShared, FS_EMPTY);
 	}
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cfb2ab6248..d040efbaaf 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -319,6 +319,17 @@ static List *on_commit_wakeup_workers_subids = NIL;
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
+/*
+ * In order to avoid walsender timeout for time-delayed logical replication the
+ * apply worker keeps sending feedback messages during the delay period.
+ * Meanwhile, the feature delays the apply before the start of the
+ * transaction and thus we don't write WAL records for the suspended changes
+ * during the wait. When the apply worker sends a feedback message during the
+ * delay, we should not make positions of the flushed and apply LSN overwritten
+ * by the last received latest LSN. See send_feedback() for details.
+ */
+static XLogRecPtr last_received = InvalidXLogRecPtr;
+
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
 
@@ -389,7 +400,8 @@ static void stream_write_change(char action, StringInfo s);
 static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
 static void stream_close_file(void);
 
-static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
+static void send_feedback(bool force, bool requestReply,
+						  bool has_unprocessed_change);
 
 static void DisableSubscriptionAndExit(void);
 
@@ -999,6 +1011,108 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
 	ExecStoreVirtualTuple(slot);
 }
 
+/*
+ * When min_apply_delay parameter is set on the subscriber, we wait long enough
+ * to make sure a transaction is applied at least that period behind the
+ * publisher.
+ *
+ * While the physical replication applies the delay at commit time, this
+ * feature applies the delay for the next transaction but before starting the
+ * transaction. This is mainly because keeping a transaction that conducted
+ * write operations open for a long time results in some issues such as bloat
+ * and locks.
+ *
+ * The min_apply_delay parameter will take effect only after all tables are in
+ * READY state.
+ *
+ * xid is the transaction id where we apply the delay.
+ *
+ * finish_ts is the commit/prepare time of both regular (non-streamed) and
+ * streamed transactions. Unlike the regular (non-streamed) cases, the delay
+ * is applied in a STREAM COMMIT/STREAM PREPARE message for streamed
+ * transactions. The STREAM START message does not contain a commit/prepare
+ * time (it will be available when the in-progress transaction finishes).
+ * Hence, it's not appropriate to apply a delay at the STREAM START time.
+ */
+static void
+maybe_apply_delay(TransactionId xid, TimestampTz finish_ts)
+{
+	Assert(finish_ts > 0);
+
+	/* Nothing to do if no delay set */
+	if (!MySubscription->minapplydelay)
+		return;
+
+	/*
+	 * The min_apply_delay parameter is ignored until all tablesync workers
+	 * have reached READY state. This is because if we allowed the delay
+	 * during the catchup phase, then once we reached the limit of tablesync
+	 * workers it would impose a delay for each subsequent worker. That would
+	 * cause initial table synchronization completion to take a long time.
+	 */
+	if (!AllTablesyncsReady())
+		return;
+
+	/* Apply the delay by the latch mechanism */
+	while (true)
+	{
+		long		diffms;
+
+		ResetLatch(MyLatch);
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* This might change wal_receiver_status_interval */
+		if (ConfigReloadPending)
+		{
+			ConfigReloadPending = false;
+			ProcessConfigFile(PGC_SIGHUP);
+		}
+
+		/*
+		 * Before calculating the time duration, reload the catalog if needed.
+		 */
+		if (!in_remote_transaction && !in_streamed_transaction)
+		{
+			AcceptInvalidationMessages();
+			maybe_reread_subscription();
+		}
+
+		diffms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(),
+												 TimestampTzPlusMilliseconds(finish_ts, MySubscription->minapplydelay));
+
+		/*
+		 * Exit without arming the latch if it's already past time to apply
+		 * this transaction.
+		 */
+		if (diffms <= 0)
+			break;
+
+		elog(DEBUG2, "time-delayed replication for txid %u, min_apply_delay = " INT64_FORMAT " ms, remaining wait time: %ld ms",
+			 xid, MySubscription->minapplydelay, diffms);
+
+		/*
+		 * Call send_feedback() to prevent the publisher from exiting by
+		 * timeout during the delay, when wal_receiver_status_interval is
+		 * available.
+		 */
+		if (wal_receiver_status_interval > 0 &&
+			diffms > wal_receiver_status_interval * 1000L)
+		{
+			WaitLatch(MyLatch,
+					  WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					  wal_receiver_status_interval * 1000L,
+					  WAIT_EVENT_RECOVERY_APPLY_DELAY);
+			send_feedback(true, false, true);
+		}
+		else
+			WaitLatch(MyLatch,
+					  WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					  diffms,
+					  WAIT_EVENT_RECOVERY_APPLY_DELAY);
+	}
+}
+
 /*
  * Handle BEGIN message.
  */
@@ -1013,6 +1127,9 @@ apply_handle_begin(StringInfo s)
 	logicalrep_read_begin(s, &begin_data);
 	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
+	/* Should we delay the current transaction? */
+	maybe_apply_delay(begin_data.xid, begin_data.committime);
+
 	remote_final_lsn = begin_data.final_lsn;
 
 	maybe_start_skipping_changes(begin_data.final_lsn);
@@ -1070,6 +1187,9 @@ apply_handle_begin_prepare(StringInfo s)
 	logicalrep_read_begin_prepare(s, &begin_data);
 	set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
 
+	/* Should we delay the current prepared transaction? */
+	maybe_apply_delay(begin_data.xid, begin_data.prepare_time);
+
 	remote_final_lsn = begin_data.prepare_lsn;
 
 	maybe_start_skipping_changes(begin_data.prepare_lsn);
@@ -1317,7 +1437,8 @@ apply_handle_stream_prepare(StringInfo s)
 			 * spooled operations.
 			 */
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
-								   prepare_data.xid, prepare_data.prepare_lsn);
+								   prepare_data.xid, prepare_data.prepare_lsn,
+								   prepare_data.prepare_time);
 
 			/* Mark the transaction as prepared. */
 			apply_handle_prepare_internal(&prepare_data);
@@ -2011,10 +2132,13 @@ ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
 
 /*
  * Common spoolfile processing.
+ *
+ * The commit/prepare time (finish_ts) for streamed transactions is required
+ * for time-delayed logical replication.
  */
 void
 apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
-					   XLogRecPtr lsn)
+					   XLogRecPtr lsn, TimestampTz finish_ts)
 {
 	StringInfoData s2;
 	int			nchanges;
@@ -2025,6 +2149,10 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
 	int			fileno;
 	off_t		offset;
 
+	/* Should we delay the current transaction? */
+	if (finish_ts)
+		maybe_apply_delay(xid, finish_ts);
+
 	if (!am_parallel_apply_worker())
 		maybe_start_skipping_changes(lsn);
 
@@ -2174,7 +2302,7 @@ apply_handle_stream_commit(StringInfo s)
 			 * spooled operations.
 			 */
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
-								   commit_data.commit_lsn);
+								   commit_data.commit_lsn, commit_data.committime);
 
 			apply_handle_commit_internal(&commit_data);
 
@@ -3447,7 +3575,7 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
  * Apply main loop.
  */
 static void
-LogicalRepApplyLoop(XLogRecPtr last_received)
+LogicalRepApplyLoop(void)
 {
 	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
 	bool		ping_sent = false;
@@ -3568,7 +3696,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 						if (last_received < end_lsn)
 							last_received = end_lsn;
 
-						send_feedback(last_received, reply_requested, false);
+						send_feedback(reply_requested, false, false);
 						UpdateWorkerStats(last_received, timestamp, true);
 					}
 					/* other message types are purposefully ignored */
@@ -3581,7 +3709,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 		}
 
 		/* confirm all writes so far */
-		send_feedback(last_received, false, false);
+		send_feedback(false, false, false);
 
 		if (!in_remote_transaction && !in_streamed_transaction)
 		{
@@ -3678,7 +3806,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 				}
 			}
 
-			send_feedback(last_received, requestReply, requestReply);
+			send_feedback(requestReply, requestReply, false);
 
 			/*
 			 * Force reporting to ensure long idle periods don't lead to
@@ -3703,17 +3831,13 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 /*
  * Send a Standby Status Update message to server.
- *
- * 'recvpos' is the latest LSN we've received data to, force is set if we need
- * to send a response to avoid timeouts.
  */
 static void
-send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
+send_feedback(bool force, bool requestReply, bool has_unprocessed_change)
 {
 	static StringInfo reply_message = NULL;
 	static TimestampTz send_time = 0;
 
-	static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
 	static XLogRecPtr last_writepos = InvalidXLogRecPtr;
 	static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
 
@@ -3729,18 +3853,21 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 	if (!force && wal_receiver_status_interval <= 0)
 		return;
 
-	/* It's legal to not pass a recvpos */
-	if (recvpos < last_recvpos)
-		recvpos = last_recvpos;
-
 	get_flush_position(&writepos, &flushpos, &have_pending_txes);
 
 	/*
 	 * No outstanding transactions to flush, we can report the latest received
 	 * position. This is important for synchronous replication.
+	 *
+	 * If the logical replication subscription has unprocessed changes then do
+	 * not inform the publisher that the received latest LSN is already
+	 * applied and flushed, otherwise, the publisher will make a wrong
+	 * assumption about the logical replication progress. Instead, it just
+	 * sends a feedback message to avoid a replication timeout during the
+	 * delay.
 	 */
-	if (!have_pending_txes)
-		flushpos = writepos = recvpos;
+	if (!have_pending_txes && !has_unprocessed_change)
+		flushpos = writepos = last_received;
 
 	if (writepos < last_writepos)
 		writepos = last_writepos;
@@ -3770,23 +3897,22 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
 		resetStringInfo(reply_message);
 
 	pq_sendbyte(reply_message, 'r');
-	pq_sendint64(reply_message, recvpos);	/* write */
+	pq_sendint64(reply_message, last_received); /* write */
 	pq_sendint64(reply_message, flushpos);	/* flush */
 	pq_sendint64(reply_message, writepos);	/* apply */
 	pq_sendint64(reply_message, now);	/* sendTime */
 	pq_sendbyte(reply_message, requestReply);	/* replyRequested */
 
-	elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
+	elog(DEBUG2, "sending feedback (force %d, has_unprocessed_change %s) to recv %X/%X, write %X/%X, flush %X/%X",
 		 force,
-		 LSN_FORMAT_ARGS(recvpos),
+		 has_unprocessed_change ? "yes" : "no",
+		 LSN_FORMAT_ARGS(last_received),
 		 LSN_FORMAT_ARGS(writepos),
 		 LSN_FORMAT_ARGS(flushpos));
 
 	walrcv_send(LogRepWorkerWalRcvConn,
 				reply_message->data, reply_message->len);
 
-	if (recvpos > last_recvpos)
-		last_recvpos = recvpos;
 	if (writepos > last_writepos)
 		last_writepos = writepos;
 	if (flushpos > last_flushpos)
@@ -4367,11 +4493,11 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
  * of system resource error and are not repeatable.
  */
 static void
-start_apply(XLogRecPtr origin_startpos)
+start_apply(void)
 {
 	PG_TRY();
 	{
-		LogicalRepApplyLoop(origin_startpos);
+		LogicalRepApplyLoop();
 	}
 	PG_CATCH();
 	{
@@ -4661,7 +4787,8 @@ ApplyWorkerMain(Datum main_arg)
 	}
 
 	/* Run the main loop. */
-	start_apply(origin_startpos);
+	last_received = origin_startpos;
+	start_apply();
 
 	proc_exit(0);
 }
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 527c7651ab..c0f69cb43b 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4494,6 +4494,7 @@ getSubscriptions(Archive *fout)
 	int			i_subsynccommit;
 	int			i_subpublications;
 	int			i_subbinary;
+	int			i_subminapplydelay;
 	int			i,
 				ntups;
 
@@ -4546,9 +4547,13 @@ getSubscriptions(Archive *fout)
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
 	if (fout->remoteVersion >= 160000)
-		appendPQExpBufferStr(query, " s.suborigin\n");
+		appendPQExpBufferStr(query,
+							 " s.suborigin,\n"
+							 " s.subminapplydelay\n");
 	else
-		appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
+		appendPQExpBuffer(query, " '%s' AS suborigin,\n"
+						  " 0 AS subminapplydelay\n",
+						  LOGICALREP_ORIGIN_ANY);
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
@@ -4576,6 +4581,7 @@ getSubscriptions(Archive *fout)
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_suborigin = PQfnumber(res, "suborigin");
+	i_subminapplydelay = PQfnumber(res, "subminapplydelay");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4606,6 +4612,8 @@ getSubscriptions(Archive *fout)
 		subinfo[i].subdisableonerr =
 			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
 		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
+		subinfo[i].subminapplydelay =
+			strtoi64(PQgetvalue(res, i, i_subminapplydelay), NULL, 10);
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4687,6 +4695,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
+	if (subinfo->subminapplydelay > 0)
+		appendPQExpBuffer(query, ", min_apply_delay = '" INT64_FORMAT " ms'", subinfo->subminapplydelay);
+
 	appendPQExpBufferStr(query, ");\n");
 
 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index e7cbd8d7ed..e2525f70ab 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -661,6 +661,7 @@ typedef struct _SubscriptionInfo
 	char	   *subdisableonerr;
 	char	   *suborigin;
 	char	   *subsynccommit;
+	int64		subminapplydelay;
 	char	   *subpublications;
 } SubscriptionInfo;
 
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index c8a0bb7b3a..81d4607a1c 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6472,7 +6472,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false, false, false, false};
+	false, false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6527,10 +6527,13 @@ describeSubscriptions(const char *pattern, bool verbose)
 							  gettext_noop("Two-phase commit"),
 							  gettext_noop("Disable on error"));
 
+		/* Origin and min_apply_delay are only supported in v16 and higher */
 		if (pset.sversion >= 160000)
 			appendPQExpBuffer(&buf,
-							  ", suborigin AS \"%s\"\n",
-							  gettext_noop("Origin"));
+							  ", suborigin AS \"%s\"\n"
+							  ", subminapplydelay AS \"%s\"\n",
+							  gettext_noop("Origin"),
+							  gettext_noop("Min apply delay"));
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 5e1882eaea..e8b9a43a47 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1925,7 +1925,7 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-		COMPLETE_WITH("binary", "disable_on_error", "origin", "slot_name",
+		COMPLETE_WITH("binary", "disable_on_error", "min_apply_delay", "origin", "slot_name",
 					  "streaming", "synchronous_commit");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
@@ -3268,7 +3268,7 @@ psql_completion(const char *text, int start, int end)
 	/* Complete "CREATE SUBSCRIPTION <name> ...  WITH ( <opt>" */
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
-					  "disable_on_error", "enabled", "origin", "slot_name",
+					  "disable_on_error", "enabled", "min_apply_delay", "origin", "slot_name",
 					  "streaming", "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b0f2a1705d..e06f35c037 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -70,6 +70,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	XLogRecPtr	subskiplsn;		/* All changes finished at this LSN are
 								 * skipped */
 
+	int64		subminapplydelay;	/* Replication apply delay (ms) */
+
 	NameData	subname;		/* Name of the subscription */
 
 	Oid			subowner BKI_LOOKUP(pg_authid); /* Owner of the subscription */
@@ -120,6 +122,7 @@ typedef struct Subscription
 								 * in */
 	XLogRecPtr	skiplsn;		/* All changes finished at this LSN are
 								 * skipped */
+	int64		minapplydelay;	/* Replication apply delay (ms) */
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
 	bool		enabled;		/* Indicates if the subscription is enabled */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index dc87a4edd1..3dc09d1a4c 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -255,7 +255,7 @@ extern void stream_stop_internal(TransactionId xid);
 
 /* Common streaming function to apply all the spooled messages */
 extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
-								   XLogRecPtr lsn);
+								   XLogRecPtr lsn, TimestampTz finish_ts);
 
 extern void apply_dispatch(StringInfo s);
 
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 3f99b14394..5ccce39986 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -114,18 +114,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -143,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -163,10 +163,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -175,10 +175,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -210,10 +210,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                               List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                                        List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    |               0 | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -247,19 +247,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -271,27 +271,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -306,10 +306,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                          List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more than once
@@ -324,10 +324,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -363,10 +363,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -375,10 +375,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -388,10 +388,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -404,20 +404,57 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    |               0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail -- min_apply_delay must be a non-negative integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo);
+ERROR:  invalid value for parameter "min_apply_delay": "foo"
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1);
+ERROR:  -1 ms is outside the valid range for parameter "min_apply_delay" (0 .. 2147483647)
+-- fail - utilizing streaming = parallel with time-delayed replication is not supported
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel, min_apply_delay = 123);
+ERROR:  min_apply_delay > 0 and streaming = parallel are mutually exclusive options
+-- success -- min_apply_delay value without unit is taken as milliseconds
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+\dRs+
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |             123 | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+-- success -- min_apply_delay value with unit is converted into ms and stored as an integer
+ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = '1 d');
+\dRs+
+                                                                                                  List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min apply delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |        86400000 | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+-- fail - alter subscription with streaming = parallel should fail when time-delayed replication is set
+ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
+ERROR:  cannot enable parallel streaming mode for subscription with min_apply_delay
+-- fail - alter subscription with min_apply_delay should fail when streaming = parallel is set
+ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = 0, streaming = parallel);
+ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = 123);
+ERROR:  cannot enable min_apply_delay for subscription in streaming = parallel mode
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 RESET SESSION AUTHORIZATION;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 7281f5fee2..7317b140f5 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -286,6 +286,30 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail -- min_apply_delay must be a non-negative integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo);
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1);
+
+-- fail - utilizing streaming = parallel with time-delayed replication is not supported
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel, min_apply_delay = 123);
+
+-- success -- min_apply_delay value without unit is taken as milliseconds
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123);
+\dRs+
+
+-- success -- min_apply_delay value with unit is converted into ms and stored as an integer
+ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = '1 d');
+\dRs+
+
+-- fail - alter subscription with streaming = parallel should fail when time-delayed replication is set
+ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
+
+-- fail - alter subscription with min_apply_delay should fail when streaming = parallel is set
+ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = 0, streaming = parallel);
+ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = 123);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index 3db0fdfd96..a186876eb4 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -38,6 +38,7 @@ tests += {
       't/029_on_error.pl',
       't/030_origin.pl',
       't/031_column_list.pl',
+      't/032_apply_delay.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/032_apply_delay.pl b/src/test/subscription/t/032_apply_delay.pl
new file mode 100644
index 0000000000..7d499ab664
--- /dev/null
+++ b/src/test/subscription/t/032_apply_delay.pl
@@ -0,0 +1,187 @@
+
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Test replication apply delay
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Confirm the time-delayed replication has been effective from the server log
+# message where the apply worker emits for applying delay. Moreover, verifies
+# that the current worker's remaining wait time is sufficiently bigger than the
+# expected value, in order to check any update of the min_apply_delay.
+sub check_apply_delay_log
+{
+	my ($node_subscriber, $offset, $expected) = @_;
+
+	my $log_location = $node_subscriber->wait_for_log(
+		qr/time-delayed replication for txid (\d+), min_apply_delay = (\d+) ms, remaining wait time: (\d+) ms/,
+		$offset);
+
+	cmp_ok($log_location, '>', $offset,
+		"logfile contains triggered logical replication apply delay");
+
+	# Get the remaining wait time from the server log
+	my $contents = slurp_file($node_subscriber->logfile, $offset);
+	$contents =~
+	  qr/time-delayed replication for txid (\d+), min_apply_delay = (\d+) ms, remaining wait time: (\d+) ms/,
+	  or die "could not get the apply worker wait time";
+	my $logged_delay = $3;
+
+	# Is it larger than expected?
+	cmp_ok($logged_delay, '>', $expected,
+		"The apply worker wait time has expected duration");
+}
+
+# Compare inserted time on the publisher with applied time on the subscriber to
+# confirm the latter is applied after expected time. The time is automatically
+# generated and stored in the table column 'c'.
+sub check_apply_delay_time
+{
+	my ($node_publisher, $node_subscriber, $primary_key, $expected_diffs) =
+	  @_;
+
+	my $inserted_time_on_pub = $node_publisher->safe_psql(
+		'postgres', qq[
+		SELECT extract(epoch from c) FROM test_tab WHERE a = $primary_key;
+	]);
+
+	my $inserted_time_on_sub = $node_subscriber->safe_psql(
+		'postgres', qq[
+		SELECT extract(epoch from c) FROM test_tab WHERE a = $primary_key;
+	]);
+
+	cmp_ok(
+		$inserted_time_on_sub - $inserted_time_on_pub,
+		'>',
+		$expected_diffs,
+		"The tuple on the subscriber was modified later than the publisher");
+}
+
+# Create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug2");
+$node_subscriber->start;
+
+# Setup structure on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b varchar, c timestamptz (6) DEFAULT now())"
+);
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b text, c timestamptz (6) DEFAULT now())"
+);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+# The column 'c' must not be published because we want to compare the time
+# difference.
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE test_tab (a, b)");
+
+my $appname = 'tap_sub';
+
+# Create a subscription that applies the transaction after 1 second delay
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (copy_data = off, min_apply_delay = '1s', streaming = 'on')"
+);
+
+# Check log starting now for logical replication apply delay
+my $offset = -s $node_subscriber->logfile;
+
+# New row to trigger apply delay
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (1, 'foo')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (2, 'bar')");
+
+$node_publisher->wait_for_catchup($appname);
+
+my $result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM test_tab");
+is($result, qq(2|1|2), 'check if the new rows were applied to subscriber');
+
+# Make sure the apply worker knows to wait for more than 500ms
+check_apply_delay_log($node_subscriber, $offset, "0.5");
+
+# Verify that the subscriber lags the publisher by at least 1 second
+check_apply_delay_time($node_publisher, $node_subscriber, '2', '1');
+
+# Setup for streaming case
+$node_publisher->append_conf('postgresql.conf',
+	'logical_decoding_mode = immediate');
+$node_publisher->reload;
+
+# Run a query to make sure that the reload has taken effect.
+$node_publisher->safe_psql('postgres', q{SELECT 1});
+
+# Check log starting now for logical replication apply delay
+$offset = -s $node_subscriber->logfile;
+
+# Test streamed transaction by insert
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM test_tab");
+is($result, qq(5|1|5), 'check if the new rows were applied to subscriber');
+
+# Make sure the apply worker knows to wait for more than 500ms
+check_apply_delay_log($node_subscriber, $offset, "0.5");
+
+# Verify that the subscriber lags the publisher by at least 1 second
+check_apply_delay_time($node_publisher, $node_subscriber, '5', '1');
+
+# Test whether ALTER SUBSCRIPTION changes the delayed time of the apply worker
+# (1 day 5 minutes). Note that the extra 5 minute is to account for any
+# decoding/network overhead.
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub SET (min_apply_delay = 86700000)");
+
+# Check log starting now for logical replication apply delay
+$offset = -s $node_subscriber->logfile;
+
+# New row to trigger apply delay
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (0, 'foobar')");
+
+# Make sure the apply worker knows to wait for more than 1 day
+check_apply_delay_log($node_subscriber, $offset, "86400000");
+
+# Disable subscription and the worker should die immediately
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub DISABLE;");
+
+# Wait until worker dies
+my $sub_query =
+  "SELECT count(1) = 0 FROM pg_stat_subscription WHERE subname = 'tap_sub' AND pid IS NOT NULL;";
+$node_subscriber->poll_query_until('postgres', $sub_query)
+  or die "Timed out while waiting for subscriber to die";
+
+# Confirm disabling the subscription by ALTER SUBSCRIPTION DISABLE did not cause
+# the delayed transaction to be applied.
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(a) FROM test_tab WHERE a = 0;");
+is($result, qq(0), "check the delayed transaction was not applied");
+
+$node_subscriber->stop;
+$node_publisher->stop;
+
+done_testing();
-- 
2.30.0

