From d4b9a056139c8fccc7cdc73d26ef6dd804ae5564 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 16 Feb 2023 07:52:23 +0000
Subject: [PATCH v7] Time-delayed logical replication on publisher side

Similar to physical replication, a time-delayed copy of the data for
logical replication is useful for some scenarios (particularly to fix
errors that might cause data loss).

This patch implements a new subscription parameter called 'min_send_delay'.

If the subscription sets min_send_delay parameter, the apply worker
(via walrcv_startstreaming) passes the value to the publisher as an output plugin
option. The walsender will delay the transaction sending for given milliseconds.

The delay does not take into account the overhead of time spent in transferring
the transaction, which means that the arrival time at the subscriber may be
delayed more than the given time.

The delay occurs before we start to send the transaction on the publisher.
Regular and prepared transactions are covered. Streamed transactions are also
covered.

The combination of parallel streaming mode and min_send_delay is not allowed

1. This is because in parallel streaming mode, we start applying the transaction
stream as soon as the first change arrives without knowing the transaction's
prepare/commit time. Always waiting for the full 'min_send_delay' period might
include unnecessary delay.

2. Another reason is, for parallel streaming, the transaction will be opened
immediately by the parallel apply worker. So if walsender delayed to send the
final record of the transaction, the parallel worker must wait receiving with an
opened transaction. This would lead that locks acquired during the transaction
not getting released till min_send_delay elapsed.

Earlier versions were written by Euler Taveira, Takamichi Osumi, and Kuroda Hayato

Author: Kuroda Hayato, Takamichi Osumi
Reviewed-by: Amit Kapila, Peter Smith, Vignesh C, Shveta Malik,
             Kyotaro Horiguchi, Shi Yu, Wang Wei, Dilip Kumar, Melih Mutlu,
             Andres Freund
---
 doc/src/sgml/catalogs.sgml                    |  10 +
 doc/src/sgml/glossary.sgml                    |  15 ++
 doc/src/sgml/logical-replication.sgml         |   6 +
 doc/src/sgml/monitoring.sgml                  |   5 +
 doc/src/sgml/ref/alter_subscription.sgml      |   5 +-
 doc/src/sgml/ref/create_subscription.sgml     |  43 +++-
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/catalog/system_views.sql          |   7 +-
 src/backend/commands/subscriptioncmds.c       | 119 ++++++++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |   5 +
 src/backend/replication/logical/decode.c      |  18 ++
 src/backend/replication/logical/logical.c     |  18 +-
 .../replication/logical/logicalfuncs.c        |   2 +-
 src/backend/replication/logical/worker.c      |  12 +-
 src/backend/replication/pgoutput/pgoutput.c   |  36 ++++
 src/backend/replication/slotfuncs.c           |   4 +-
 src/backend/replication/walsender.c           |  77 +++++++-
 src/backend/utils/activity/wait_event.c       |   3 +
 src/bin/pg_dump/pg_dump.c                     |  15 +-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/describe.c                       |   9 +-
 src/bin/psql/tab-complete.c                   |   4 +-
 src/include/catalog/pg_subscription.h         |   3 +
 src/include/replication/logical.h             |  18 +-
 src/include/replication/logicalproto.h        |   6 +-
 src/include/replication/pgoutput.h            |   1 +
 src/include/replication/walreceiver.h         |   1 +
 src/include/utils/wait_event.h                |   3 +-
 src/test/regress/expected/subscription.out    | 185 +++++++++++-------
 src/test/regress/sql/subscription.sql         |  29 +++
 src/test/subscription/t/001_rep_changes.pl    |  27 +++
 31 files changed, 583 insertions(+), 105 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index c1e4048054..ce6521fe6c 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7892,6 +7892,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subminsenddelay</structfield> <type>int4</type>
+      </para>
+      <para>
+       The minimum delay, in milliseconds, by the publisher before sending all
+       the changes
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subenabled</structfield> <type>bool</type>
diff --git a/doc/src/sgml/glossary.sgml b/doc/src/sgml/glossary.sgml
index 7c01a541fe..9ede9d05f6 100644
--- a/doc/src/sgml/glossary.sgml
+++ b/doc/src/sgml/glossary.sgml
@@ -1729,6 +1729,21 @@
    </glossdef>
   </glossentry>
 
+  <glossentry id="glossary-time-delayed-replication">
+   <glossterm>Time-delayed replication</glossterm>
+   <glossdef>
+    <para>
+     Replication setup that delays the application of changes by a specified
+     minimum time-delay period.
+    </para>
+    <para>
+     For more information, see
+     <xref linkend="guc-recovery-min-apply-delay"/> for physical replication
+     and <xref linkend="sql-createsubscription"/> for logical replication.
+    </para>
+   </glossdef>
+  </glossentry>
+
   <glossentry id="glossary-toast">
    <glossterm>TOAST</glossterm>
    <glossdef>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 1bd5660c87..fe9e7f7b26 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -247,6 +247,12 @@
    target table.
   </para>
 
+  <para>
+   A subscription can delay the receipt of changes by specifying the
+   <literal>min_send_delay</literal> subscription parameter. See
+   <xref linkend="sql-createsubscription"/> for details.
+  </para>
+
   <sect2 id="logical-replication-subscription-slot">
    <title>Replication Slot Management</title>
 
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index b0b997f092..6158587644 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2349,6 +2349,11 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       <entry>Waiting to acquire an exclusive lock to truncate off any
        empty pages at the end of a table vacuumed.</entry>
      </row>
+     <row>
+      <entry><literal>WalSenderSendDelay</literal></entry>
+      <entry>Waiting while sending changes for time-delayed logical replication
+      in the WAL sender process.</entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 964fcbb8ff..3f238b958b 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -213,8 +213,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
       <literal>binary</literal>, <literal>streaming</literal>,
-      <literal>disable_on_error</literal>, and
-      <literal>origin</literal>.
+      <literal>disable_on_error</literal>,
+      <literal>origin</literal>, and
+      <literal>min_send_delay</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 51c45f17c7..9d08740ba2 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -349,7 +349,43 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
-      </variablelist></para>
+
+       <varlistentry>
+        <term><literal>min_send_delay</literal> (<type>integer</type>)</term>
+        <listitem>
+         <para>
+          By default, the publisher sends changes as soon as possible. This
+          parameter allows the user to delay changes by the given time period.
+          If the value is specified without units, it is taken as milliseconds.
+          The default is zero (no delay). See <xref linkend="config-setting-names-values"/>
+          for details on the available valid time units.
+         </para>
+         <para>
+          The delay is effective only when the initial table synchronization
+          has been finished. However, there is a possibility that the table
+          status updated in <link linkend="catalog-pg-subscription-rel"><structname>pg_subscription_rel</structname></link>
+          could be delayed in getting to the "ready" state, and also two-phase
+          (if specified) could be delayed in getting to "enabled".
+         </para>
+         <para>
+          The delay does not take into account the overhead of time spent
+          transferring the transaction. Therefore, the arrival time at the
+          subscriber may be delayed more than the specified
+          <literal>min_send_delay</literal> time.
+         </para>
+         <warning>
+           <para>
+            Delaying the replication means there is a much longer time between
+            making a change on the publisher, and that change being committed
+            on the subscriber. This can impact the performance of synchronous
+            replication. See <xref linkend="guc-synchronous-commit"/>
+            parameter.
+           </para>
+         </warning>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
 
     </listitem>
    </varlistentry>
@@ -420,6 +456,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    published with different column lists are not supported.
   </para>
 
+  <para>
+   A non-zero <literal>min_send_delay</literal> parameter is not allowed when
+   streaming in parallel mode.
+  </para>
+
   <para>
    We allow non-existent publications to be specified so that users can add
    those later. This means
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a56ae311c3..63a10b06d1 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->skiplsn = subform->subskiplsn;
 	sub->name = pstrdup(NameStr(subform->subname));
 	sub->owner = subform->subowner;
+	sub->minsenddelay = subform->subminsenddelay;
 	sub->enabled = subform->subenabled;
 	sub->binary = subform->subbinary;
 	sub->stream = subform->substream;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 34ca0e739f..54a705d71b 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1314,9 +1314,10 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
-              subbinary, substream, subtwophasestate, subdisableonerr,
-              subslotname, subsynccommit, subpublications, suborigin)
+GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subminsenddelay,
+              subenabled, subbinary, substream, subtwophasestate,
+              subdisableonerr, subslotname, subsynccommit, subpublications,
+              suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 464db6d247..4a8cd47171 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -66,6 +66,7 @@
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
 #define SUBOPT_LSN					0x00000800
 #define SUBOPT_ORIGIN				0x00001000
+#define SUBOPT_MIN_SEND_DELAY		0x00002000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -90,6 +91,7 @@ typedef struct SubOpts
 	bool		disableonerr;
 	char	   *origin;
 	XLogRecPtr	lsn;
+	int32		min_send_delay;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -100,7 +102,7 @@ static void check_publications_origin(WalReceiverConn *wrconn,
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
-
+static int32 defGetMinSendDelay(DefElem *def);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -146,6 +148,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->disableonerr = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_MIN_SEND_DELAY))
+		opts->min_send_delay = 0;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -324,6 +328,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_MIN_SEND_DELAY) &&
+				 strcmp(defel->defname, "min_send_delay") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_MIN_SEND_DELAY))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_MIN_SEND_DELAY;
+			opts->min_send_delay = defGetMinSendDelay(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -404,6 +417,29 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 								"slot_name = NONE", "create_slot = false")));
 		}
 	}
+
+	/*
+	 * The combination of parallel streaming mode and min_send_delay is not
+	 * allowed. This is because in parallel streaming mode, the walsender
+	 * starts sending the transaction stream without knowing the prepare/commit
+	 * time of the transaction. Always waiting for the full 'min_send_delay'
+	 * time to send may introduce unnecessary delay.
+	 *
+	 * The other possibility was to wait sending COMMIT record of the parallel
+	 * apply transaction but that would cause issues related to resource bloat
+	 * and locks being held for a long time.
+	 */
+	if (IsSet(supported_opts, SUBOPT_MIN_SEND_DELAY) &&
+		opts->min_send_delay > 0 &&
+		opts->streaming == LOGICALREP_STREAM_PARALLEL)
+		ereport(ERROR,
+				errcode(ERRCODE_SYNTAX_ERROR),
+		/*
+		 * translator: the first %s is a string of the form "parameter > 0"
+		 * and the second one is "option = value".
+		 */
+				errmsg("%s and %s are mutually exclusive options",
+					   "min_send_delay > 0", "streaming = parallel"));
 }
 
 /*
@@ -560,7 +596,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
+					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN |
+					  SUBOPT_MIN_SEND_DELAY);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -628,6 +665,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subname - 1] =
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
+	values[Anum_pg_subscription_subminsenddelay - 1] = Int32GetDatum(opts.min_send_delay);
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
 	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
 	values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
@@ -1054,7 +1092,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_ORIGIN | SUBOPT_MIN_SEND_DELAY);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1098,6 +1136,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
 				{
+					/*
+					 * The combination of parallel streaming mode and
+					 * min_send_delay is not allowed. See
+					 * parse_subscription_options.
+					 */
+					if (opts.streaming == LOGICALREP_STREAM_PARALLEL &&
+						!IsSet(opts.specified_opts, SUBOPT_MIN_SEND_DELAY) &&
+						sub->minsenddelay > 0)
+						ereport(ERROR,
+								errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								errmsg("cannot set parallel streaming mode for subscription with %s",
+									   "min_send_delay"));
+
 					values[Anum_pg_subscription_substream - 1] =
 						CharGetDatum(opts.streaming);
 					replaces[Anum_pg_subscription_substream - 1] = true;
@@ -1111,6 +1162,26 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 						= true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_MIN_SEND_DELAY))
+				{
+					/*
+					 * The combination of parallel streaming mode and
+					 * min_send_delay is not allowed. See
+					 * parse_subscription_options.
+					 */
+					if (opts.min_send_delay > 0 &&
+						!IsSet(opts.specified_opts, SUBOPT_STREAMING) &&
+						sub->stream == LOGICALREP_STREAM_PARALLEL)
+						ereport(ERROR,
+								errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								errmsg("cannot set %s for subscription in parallel streaming mode",
+									   "min_send_delay"));
+
+					values[Anum_pg_subscription_subminsenddelay - 1] =
+						Int32GetDatum(opts.min_send_delay);
+					replaces[Anum_pg_subscription_subminsenddelay - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
 				{
 					values[Anum_pg_subscription_suborigin - 1] =
@@ -2195,3 +2266,45 @@ defGetStreamingMode(DefElem *def)
 					def->defname)));
 	return LOGICALREP_STREAM_OFF;	/* keep compiler quiet */
 }
+
+/*
+ * Extract the min_send_delay value from a DefElem. This is very similar to
+ * parse_and_validate_value() for integer values, because min_send_delay
+ * accepts the same parameter format as recovery_min_apply_delay.
+ */
+static int32
+defGetMinSendDelay(DefElem *def)
+{
+	char	   *input_string;
+	int			result;
+	const char *hintmsg;
+
+	input_string = defGetString(def);
+
+	/*
+	 * Parse given string as parameter which has millisecond unit
+	 */
+	if (!parse_int(input_string, &result, GUC_UNIT_MS, &hintmsg))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("invalid value for parameter \"%s\": \"%s\"",
+						"min_send_delay", input_string),
+				 hintmsg ? errhint("%s", _(hintmsg)) : 0));
+
+	/*
+	 * Check both the lower boundary for the valid min_send_delay range and
+	 * the upper boundary as the safeguard for some platforms where INT_MAX is
+	 * wider than int32 respectively. Although parse_int() has confirmed that
+	 * the result is less than or equal to INT_MAX, the value will be stored
+	 * in a catalog column of int32.
+	 */
+	if (result < 0 || result > PG_INT32_MAX)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("%d ms is outside the valid range for parameter \"%s\" (%d .. %d)",
+						result,
+						"min_send_delay",
+						0, PG_INT32_MAX)));
+
+	return result;
+}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 560ec974fa..89a72c1abe 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -443,6 +443,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			PQserverVersion(conn->streamConn) >= 140000)
 			appendStringInfoString(&cmd, ", binary 'true'");
 
+		if (options->proto.logical.min_send_delay > 0 &&
+			PQserverVersion(conn->streamConn) >= 160000)
+			appendStringInfo(&cmd, ", min_send_delay '%d'",
+							 options->proto.logical.min_send_delay);
+
 		appendStringInfoChar(&cmd, ')');
 	}
 	else
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8fe7bb65f1..31bf43bd63 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -689,6 +689,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	}
 	else
 	{
+		/*
+		 * Delay sending the changes if required. For streaming transactions,
+		 * this means a delay in sending the last stream but that is OK
+		 * because on the downstream the changes will be applied only after
+		 * receiving the last stream.
+		 */
+		if (ctx->min_send_delay > 0 && ctx->delay_send)
+			ctx->delay_send(ctx, xid, commit_time);
+
 		ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
 							commit_time, origin_id, origin_lsn);
 	}
@@ -773,6 +782,15 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 								 buf->origptr, buf->endptr);
 	}
 
+	/*
+	 * Delay sending the changes if required. For streaming transactions, this
+	 * means a delay in sending the last stream but that is OK because on the
+	 * downstream the changes will be applied only after receiving the last
+	 * stream.
+	 */
+	if (ctx->min_send_delay > 0 && ctx->delay_send)
+		ctx->delay_send(ctx, xid, prepare_time);
+
 	/* replay actions of all transaction + subtransactions in order */
 	ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index c3ec97a0a6..e4dd822cdc 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -156,7 +156,8 @@ StartupDecodingContext(List *output_plugin_options,
 					   XLogReaderRoutine *xl_routine,
 					   LogicalOutputPluginWriterPrepareWrite prepare_write,
 					   LogicalOutputPluginWriterWrite do_write,
-					   LogicalOutputPluginWriterUpdateProgress update_progress)
+					   LogicalOutputPluginWriterUpdateProgress update_progress,
+					   LogicalOutputPluginWriterDelay delay_send)
 {
 	ReplicationSlot *slot;
 	MemoryContext context,
@@ -293,6 +294,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
 	ctx->update_progress = update_progress;
+	ctx->delay_send = delay_send;
 
 	ctx->output_plugin_options = output_plugin_options;
 
@@ -316,7 +318,7 @@ StartupDecodingContext(List *output_plugin_options,
  *		marking WAL reserved beforehand.  In that scenario, it's up to the
  *		caller to guarantee that WAL remains available.
  * xl_routine -- XLogReaderRoutine for underlying XLogReader
- * prepare_write, do_write, update_progress --
+ * prepare_write, do_write, update_progress, delay_send --
  *		callbacks that perform the use-case dependent, actual, work.
  *
  * Needs to be called while in a memory context that's at least as long lived
@@ -334,7 +336,8 @@ CreateInitDecodingContext(const char *plugin,
 						  XLogReaderRoutine *xl_routine,
 						  LogicalOutputPluginWriterPrepareWrite prepare_write,
 						  LogicalOutputPluginWriterWrite do_write,
-						  LogicalOutputPluginWriterUpdateProgress update_progress)
+						  LogicalOutputPluginWriterUpdateProgress update_progress,
+						  LogicalOutputPluginWriterDelay delay_send)
 {
 	TransactionId xmin_horizon = InvalidTransactionId;
 	ReplicationSlot *slot;
@@ -435,7 +438,7 @@ CreateInitDecodingContext(const char *plugin,
 	ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
 								 need_full_snapshot, false,
 								 xl_routine, prepare_write, do_write,
-								 update_progress);
+								 update_progress, delay_send);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
@@ -475,7 +478,7 @@ CreateInitDecodingContext(const char *plugin,
  * xl_routine
  *		XLogReaderRoutine used by underlying xlogreader
  *
- * prepare_write, do_write, update_progress
+ * prepare_write, do_write, update_progress, delay_send
  *		callbacks that have to be filled to perform the use-case dependent,
  *		actual work.
  *
@@ -493,7 +496,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 					  XLogReaderRoutine *xl_routine,
 					  LogicalOutputPluginWriterPrepareWrite prepare_write,
 					  LogicalOutputPluginWriterWrite do_write,
-					  LogicalOutputPluginWriterUpdateProgress update_progress)
+					  LogicalOutputPluginWriterUpdateProgress update_progress,
+					  LogicalOutputPluginWriterDelay delay_send)
 {
 	LogicalDecodingContext *ctx;
 	ReplicationSlot *slot;
@@ -547,7 +551,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 	ctx = StartupDecodingContext(output_plugin_options,
 								 start_lsn, InvalidTransactionId, false,
 								 fast_forward, xl_routine, prepare_write,
-								 do_write, update_progress);
+								 do_write, update_progress, delay_send);
 
 	/* call output plugin initialization callback */
 	old_context = MemoryContextSwitchTo(ctx->context);
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index fa1b641a2b..960025197f 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -212,7 +212,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 											   .segment_open = wal_segment_open,
 											   .segment_close = wal_segment_close),
 									LogicalOutputPrepareWrite,
-									LogicalOutputWrite, NULL);
+									LogicalOutputWrite, NULL, NULL);
 
 		/*
 		 * After the sanity checks in CreateDecodingContext, make sure the
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cfb2ab6248..e68902ae34 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3898,7 +3898,8 @@ maybe_reread_subscription(void)
 		newsub->stream != MySubscription->stream ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
-		!equal(newsub->publications, MySubscription->publications))
+		!equal(newsub->publications, MySubscription->publications) ||
+		newsub->minsenddelay != MySubscription->minsenddelay)
 	{
 		if (am_parallel_apply_worker())
 			ereport(LOG,
@@ -4617,9 +4618,18 @@ ApplyWorkerMain(Datum main_arg)
 
 	options.proto.logical.twophase = false;
 	options.proto.logical.origin = pstrdup(MySubscription->origin);
+	options.proto.logical.min_send_delay = 0;
 
 	if (!am_tablesync_worker())
 	{
+		/*
+		 * Time-delayed logical replication does not support tablesync
+		 * workers, so only the leader apply worker can request walsenders to
+		 * delay on the publisher side.
+		 */
+		if (server_version >= 160000 && MySubscription->minsenddelay > 0)
+			options.proto.logical.min_send_delay = MySubscription->minsenddelay;
+
 		/*
 		 * Even when the two_phase mode is requested by the user, it remains
 		 * as the tri-state PENDING until all tablesyncs have reached READY
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 98377c094b..df6a87b0ba 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -285,6 +285,7 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
 	bool		origin_option_given = false;
+	bool		min_send_delay_option_given = false;
 
 	data->binary = false;
 	data->streaming = LOGICALREP_STREAM_OFF;
@@ -396,6 +397,32 @@ parse_output_parameters(List *options, PGOutputData *data)
 						errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 						errmsg("unrecognized origin value: \"%s\"", data->origin));
 		}
+		else if (strcmp(defel->defname, "min_send_delay") == 0)
+		{
+			unsigned long delay_val;
+			char	   *endptr;
+
+			if (min_send_delay_option_given)
+				ereport(ERROR,
+						errcode(ERRCODE_SYNTAX_ERROR),
+						errmsg("conflicting or redundant options"));
+			min_send_delay_option_given = true;
+
+			errno = 0;
+			delay_val = strtoul(strVal(defel->arg), &endptr, 10);
+			if (errno != 0 || *endptr != '\0')
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid min_send_delay")));
+
+			if (delay_val > PG_INT32_MAX)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("min_send_delay \"%s\" out of range",
+								strVal(defel->arg))));
+
+			data->min_send_delay = (int32) delay_val;
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -501,6 +528,15 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		else
 			ctx->twophase_opt_given = true;
 
+		if (data->min_send_delay &&
+			data->protocol_version < LOGICALREP_PROTO_MIN_SEND_DELAY_VERSION_NUM)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("requested proto_version=%d does not support delay sending data, need %d or higher",
+							data->protocol_version, LOGICALREP_PROTO_MIN_SEND_DELAY_VERSION_NUM)));
+		else
+			ctx->min_send_delay = data->min_send_delay;
+
 		/* Init publication state. */
 		data->publications = NIL;
 		publications_valid = false;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2f3c964824..522f7600a1 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -148,7 +148,7 @@ create_logical_replication_slot(char *name, char *plugin,
 									XL_ROUTINE(.page_read = read_local_xlog_page,
 											   .segment_open = wal_segment_open,
 											   .segment_close = wal_segment_close),
-									NULL, NULL, NULL);
+									NULL, NULL, NULL, NULL);
 
 	/*
 	 * If caller needs us to determine the decoding start point, do so now.
@@ -481,7 +481,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 									XL_ROUTINE(.page_read = read_local_xlog_page,
 											   .segment_open = wal_segment_open,
 											   .segment_close = wal_segment_close),
-									NULL, NULL, NULL);
+									NULL, NULL, NULL, NULL);
 
 		/*
 		 * Start reading at the slot's restart_lsn, which we know to point to
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 75e8363e24..8cefd8cd0a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -252,6 +252,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 								 bool skipped_xact);
+static void WalSndDelay(LogicalDecodingContext *ctx, TransactionId xid, TimestampTz delay_start);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1126,7 +1127,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 												   .segment_open = WalSndSegmentOpen,
 												   .segment_close = wal_segment_close),
 										WalSndPrepareWrite, WalSndWriteData,
-										WalSndUpdateProgress);
+										WalSndUpdateProgress, WalSndDelay);
 
 		/*
 		 * Signal that we don't need the timeout mechanism. We're just
@@ -1285,7 +1286,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 										 .segment_open = WalSndSegmentOpen,
 										 .segment_close = wal_segment_close),
 							  WalSndPrepareWrite, WalSndWriteData,
-							  WalSndUpdateProgress);
+							  WalSndUpdateProgress, WalSndDelay);
 	xlogreader = logical_decoding_ctx->reader;
 
 	WalSndSetState(WALSNDSTATE_CATCHUP);
@@ -3849,3 +3850,75 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
 	Assert(time != 0);
 	return now - time;
 }
+
+/*
+ * LogicalDecodingContext 'delay' callback.
+ *
+ * Wait long enough to make sure a transaction is applied at least that
+ * period behind the publisher.
+ */
+static void
+WalSndDelay(LogicalDecodingContext *ctx, TransactionId xid, TimestampTz delay_start)
+{
+	/* Wait till delayUntil by the latch mechanism */
+	while (true)
+	{
+		TimestampTz delayUntil;
+		long		remaining_wait_time_ms;
+		long		timeout_sleeptime_ms;
+
+		ResetLatch(MyLatch);
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* This might change wal_sender_timeout */
+		if (ConfigReloadPending)
+		{
+			ConfigReloadPending = false;
+			ProcessConfigFile(PGC_SIGHUP);
+		}
+
+		/* Check for input from the client */
+		ProcessRepliesIfAny();
+
+		/* die if timeout was reached */
+		WalSndCheckTimeOut();
+
+		/* Send keepalive if the time has come */
+		WalSndKeepaliveIfNecessary();
+
+		/* Try to flush pending output to the client */
+		if (pq_flush_if_writable() != 0)
+			WalSndShutdown();
+
+		/*
+		 * If we've requested to shut down, exit the process.
+		 *
+		 * Note that WalSndDone() cannot be used here because the delaying
+		 * changes will be sent in the function.
+		 */
+		if (got_STOPPING)
+			WalSndShutdown();
+
+		delayUntil = TimestampTzPlusMilliseconds(delay_start, ctx->min_send_delay);
+		remaining_wait_time_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), delayUntil);
+
+		/*
+		 * Exit without arming the latch if it's already past time to send
+		 * this transaction.
+		 */
+		if (remaining_wait_time_ms <= 0)
+			break;
+
+		/* Sleep until appropriate time. */
+		timeout_sleeptime_ms = WalSndComputeSleeptime(GetCurrentTimestamp());
+
+		elog(DEBUG2, "time-delayed replication for txid %u, delay_time = %d ms, remaining wait time: %ld ms",
+			 xid, (int) ctx->min_send_delay, remaining_wait_time_ms);
+
+		/* Sleep until we get reply from worker or we time out */
+		WalSndWait(WL_SOCKET_READABLE,
+				   Min(timeout_sleeptime_ms, remaining_wait_time_ms),
+				   WAIT_EVENT_WALSENDER_SEND_DELAY);
+	}
+}
diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c
index cb99cc6339..76c19fe11d 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -515,6 +515,9 @@ pgstat_get_wait_timeout(WaitEventTimeout w)
 		case WAIT_EVENT_VACUUM_TRUNCATE:
 			event_name = "VacuumTruncate";
 			break;
+		case WAIT_EVENT_WALSENDER_SEND_DELAY:
+			event_name = "WalSenderSendDelay";
+			break;
 			/* no default case, so that compiler will warn */
 	}
 
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 1a06eeaf6a..9754487921 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4494,6 +4494,7 @@ getSubscriptions(Archive *fout)
 	int			i_subsynccommit;
 	int			i_subpublications;
 	int			i_subbinary;
+	int			i_subminsenddelay;
 	int			i,
 				ntups;
 
@@ -4546,9 +4547,13 @@ getSubscriptions(Archive *fout)
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
 	if (fout->remoteVersion >= 160000)
-		appendPQExpBufferStr(query, " s.suborigin\n");
+		appendPQExpBufferStr(query,
+							 " s.suborigin,\n"
+							 " s.subminsenddelay\n");
 	else
-		appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
+		appendPQExpBuffer(query, " '%s' AS suborigin,\n"
+						  " 0 AS subminsenddelay\n",
+						  LOGICALREP_ORIGIN_ANY);
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
@@ -4576,6 +4581,7 @@ getSubscriptions(Archive *fout)
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_suborigin = PQfnumber(res, "suborigin");
+	i_subminsenddelay = PQfnumber(res, "subminsenddelay");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4606,6 +4612,8 @@ getSubscriptions(Archive *fout)
 		subinfo[i].subdisableonerr =
 			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
 		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
+		subinfo[i].subminsenddelay =
+			atoi(PQgetvalue(res, i, i_subminsenddelay));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4687,6 +4695,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
+	if (subinfo->subminsenddelay > 0)
+		appendPQExpBuffer(query, ", min_send_delay = '%d ms'", subinfo->subminsenddelay);
+
 	appendPQExpBufferStr(query, ");\n");
 
 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index cdca0b993d..4c55f8efc4 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -660,6 +660,7 @@ typedef struct _SubscriptionInfo
 	char	   *subdisableonerr;
 	char	   *suborigin;
 	char	   *subsynccommit;
+	int			subminsenddelay;
 	char	   *subpublications;
 } SubscriptionInfo;
 
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index c8a0bb7b3a..c7d303a168 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6472,7 +6472,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false, false, false, false};
+	false, false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6527,10 +6527,13 @@ describeSubscriptions(const char *pattern, bool verbose)
 							  gettext_noop("Two-phase commit"),
 							  gettext_noop("Disable on error"));
 
+		/* Origin and min_send_delay are only supported in v16 and higher */
 		if (pset.sversion >= 160000)
 			appendPQExpBuffer(&buf,
-							  ", suborigin AS \"%s\"\n",
-							  gettext_noop("Origin"));
+							  ", suborigin AS \"%s\"\n"
+							  ", subminsenddelay AS \"%s\"\n",
+							  gettext_noop("Origin"),
+							  gettext_noop("Min send delay"));
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 5e1882eaea..6643db6f55 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1925,7 +1925,7 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-		COMPLETE_WITH("binary", "disable_on_error", "origin", "slot_name",
+		COMPLETE_WITH("binary", "disable_on_error", "min_send_delay", "origin", "slot_name",
 					  "streaming", "synchronous_commit");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
@@ -3268,7 +3268,7 @@ psql_completion(const char *text, int start, int end)
 	/* Complete "CREATE SUBSCRIPTION <name> ...  WITH ( <opt>" */
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
-					  "disable_on_error", "enabled", "origin", "slot_name",
+					  "disable_on_error", "enabled", "min_send_delay", "origin", "slot_name",
 					  "streaming", "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b0f2a1705d..69ae4314b4 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -74,6 +74,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	Oid			subowner BKI_LOOKUP(pg_authid); /* Owner of the subscription */
 
+	int32		subminsenddelay;	/* Replication send delay (ms) */
+
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
 
@@ -122,6 +124,7 @@ typedef struct Subscription
 								 * skipped */
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
+	int32		minsenddelay;	/* Replication send delay (ms) */
 	bool		enabled;		/* Indicates if the subscription is enabled */
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 5f49554ea0..c389dc17e5 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -30,6 +30,11 @@ typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingC
 														 bool skipped_xact
 );
 
+typedef void (*LogicalOutputPluginWriterDelay) (struct LogicalDecodingContext *lr,
+												TransactionId xid,
+												TimestampTz start_time
+);
+
 typedef struct LogicalDecodingContext
 {
 	/* memory context this is all allocated in */
@@ -64,6 +69,7 @@ typedef struct LogicalDecodingContext
 	LogicalOutputPluginWriterPrepareWrite prepare_write;
 	LogicalOutputPluginWriterWrite write;
 	LogicalOutputPluginWriterUpdateProgress update_progress;
+	LogicalOutputPluginWriterDelay delay_send;
 
 	/*
 	 * Output buffer.
@@ -100,6 +106,12 @@ typedef struct LogicalDecodingContext
 	 */
 	bool		twophase_opt_given;
 
+	/*
+	 * The minimum delay, in milliseconds, by the publisher before sending all
+	 * the changes.
+	 */
+	int32		min_send_delay;
+
 	/*
 	 * State for writing output.
 	 */
@@ -121,14 +133,16 @@ extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
 														 XLogReaderRoutine *xl_routine,
 														 LogicalOutputPluginWriterPrepareWrite prepare_write,
 														 LogicalOutputPluginWriterWrite do_write,
-														 LogicalOutputPluginWriterUpdateProgress update_progress);
+														 LogicalOutputPluginWriterUpdateProgress update_progress,
+														 LogicalOutputPluginWriterDelay delay_send);
 extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
 													 List *output_plugin_options,
 													 bool fast_forward,
 													 XLogReaderRoutine *xl_routine,
 													 LogicalOutputPluginWriterPrepareWrite prepare_write,
 													 LogicalOutputPluginWriterWrite do_write,
-													 LogicalOutputPluginWriterUpdateProgress update_progress);
+													 LogicalOutputPluginWriterUpdateProgress update_progress,
+													 LogicalOutputPluginWriterDelay delay_send);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 0ea2df5088..46faadbd7a 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -36,13 +36,17 @@
  * LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol version
  * where we support applying large streaming transactions in parallel.
  * Introduced in PG16.
+ *
+ * LOGICALREP_PROTO_MIN_SEND_DELAY_VERSION_NUM is the minimum protocol version
+ * with support for delaying to send transactions. Introduced in PG16.
  */
 #define LOGICALREP_PROTO_MIN_VERSION_NUM 1
 #define LOGICALREP_PROTO_VERSION_NUM 1
 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
 #define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
 #define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4
-#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
+#define LOGICALREP_PROTO_MIN_SEND_DELAY_VERSION_NUM 4
+#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_MIN_SEND_DELAY_VERSION_NUM
 
 /*
  * Logical message types
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index b4a8015403..d2fde09e00 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -30,6 +30,7 @@ typedef struct PGOutputData
 	bool		messages;
 	bool		two_phase;
 	char	   *origin;
+	int32		min_send_delay;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index decffe352d..c20969aed7 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -187,6 +187,7 @@ typedef struct
 									 * prepare time */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
+			int32		min_send_delay; /* The minimum send delay */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index 9ab23e1c4a..cc3a234eba 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -150,7 +150,8 @@ typedef enum
 	WAIT_EVENT_REGISTER_SYNC_REQUEST,
 	WAIT_EVENT_SPIN_DELAY,
 	WAIT_EVENT_VACUUM_DELAY,
-	WAIT_EVENT_VACUUM_TRUNCATE
+	WAIT_EVENT_VACUUM_TRUNCATE,
+	WAIT_EVENT_WALSENDER_SEND_DELAY
 } WaitEventTimeout;
 
 /* ----------
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 3f99b14394..2027316233 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -114,18 +114,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                  List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -143,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -163,10 +163,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -175,10 +175,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -210,10 +210,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                               List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                                        List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    |              0 | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -247,19 +247,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -271,27 +271,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -306,10 +306,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more than once
@@ -324,10 +324,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -363,10 +363,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -375,10 +375,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -388,10 +388,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -404,20 +404,61 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    |              0 | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail -- min_send_delay must be a non-negative integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = foo);
+ERROR:  invalid value for parameter "min_send_delay": "foo"
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = -1);
+ERROR:  -1 ms is outside the valid range for parameter "min_send_delay" (0 .. 2147483647)
+-- fail - specifying streaming = parallel with time-delayed replication is not
+-- supported
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel, min_send_delay = 123);
+ERROR:  min_send_delay > 0 and streaming = parallel are mutually exclusive options
+-- success -- min_send_delay value without units is taken as milliseconds
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = 123);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+\dRs+
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo          | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |            123 | off                | dbname=regress_doesnotexit | 0/0
+(1 row)
+
+-- success -- min_send_delay value with units other than ms is converted to ms
+-- and stored as an integer
+ALTER SUBSCRIPTION regress_testsub SET (min_send_delay = '1 d');
+\dRs+
+                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit |          Conninfo          | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    |       86400000 | off                | dbname=regress_doesnotexit | 0/0
 (1 row)
 
+-- fail - alter subscription with streaming = parallel should fail when
+-- time-delayed replication is set
+ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
+ERROR:  cannot set parallel streaming mode for subscription with min_send_delay
+-- fail - alter subscription with min_send_delay should fail when
+-- streaming = parallel is set
+ALTER SUBSCRIPTION regress_testsub SET (min_send_delay = 0, streaming = parallel);
+ALTER SUBSCRIPTION regress_testsub SET (min_send_delay = 123);
+ERROR:  cannot set min_send_delay for subscription in parallel streaming mode
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 RESET SESSION AUTHORIZATION;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 7281f5fee2..46bf4a27d9 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -286,6 +286,35 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail -- min_send_delay must be a non-negative integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = foo);
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = -1);
+
+-- fail - specifying streaming = parallel with time-delayed replication is not
+-- supported
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel, min_send_delay = 123);
+
+-- success -- min_send_delay value without units is taken as milliseconds
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = 123);
+\dRs+
+
+-- success -- min_send_delay value with units other than ms is converted to ms
+-- and stored as an integer
+ALTER SUBSCRIPTION regress_testsub SET (min_send_delay = '1 d');
+\dRs+
+
+-- fail - alter subscription with streaming = parallel should fail when
+-- time-delayed replication is set
+ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
+
+-- fail - alter subscription with min_send_delay should fail when
+-- streaming = parallel is set
+ALTER SUBSCRIPTION regress_testsub SET (min_send_delay = 0, streaming = parallel);
+ALTER SUBSCRIPTION regress_testsub SET (min_send_delay = 123);
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 91aa068c95..063a98fde9 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -515,6 +515,33 @@ $node_publisher->poll_query_until('postgres',
   or die
   "Timed out while waiting for apply to restart after renaming SUBSCRIPTION";
 
+# Test time-delayed logical replication
+#
+# If the subscription sets min_send_delay parameter, the walsender will delay
+# the transaction send for min_send_delay milliseconds. We verify this by
+# looking at the time difference between a) when tuples are inserted on the
+# publisher, and b) when those changes are replicated on the subscriber. Even
+# on slow machines, this strategy will give predictable behavior.
+
+# Set min_send_delay parameter to 3 seconds
+my $delay = 3;
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_renamed SET (min_send_delay = '${delay}s')");
+
+# Before doing the insertion, get the current timestamp that will be
+# used as a comparison base.
+my $publisher_insert_time = time();
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_ins VALUES (generate_series(1101, 1120))");
+
+# The publisher waits for the replication to complete
+$node_publisher->wait_for_catchup('tap_sub_renamed');
+
+# This test is successful only if at least the configured delay has elapsed.
+ok( time() - $publisher_insert_time >= $delay,
+	"subscriber applies WAL only after replication delay for non-streaming transaction"
+);
+
 # check all the cleanup
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed");
 
-- 
2.27.0

