From c613be4949c211bac01c9fdc02490c9c8143fa56 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 1 Dec 2025 12:28:29 +0900
Subject: [PATCH v3 4/4] Parallel apply non-streaming transactions

--
Basic design
--

The leader worker assigns each non-streaming transaction to a parallel apply
worker. Before dispatching changes to a parallel worker, the leader verifies if
the current modification affects the same row (identitied by replica identity
key) as another ongoing transaction. If so, the leader sends a list of dependent
transaction IDs to the parallel worker, indicating that the parallel apply
worker must wait for these transactions to commit before proceeding.

Each parallel apply worker records the local end LSN of the transaction it
applies in shared memory. Subsequently, the leader gathers these local end LSNs
and logs them in the local 'lsn_mapping' for verifying whether they have been
flushed to disk (following the logic in get_flush_position()).

If no parallel apply worker is available, the leader will apply the transaction
independently.

For further details, please refer to the following:

--
dedendency tracking
--

The leader maintains a local hash table, using the remote change's replica
identity column values and relid as keys, with remote transaction IDs as values.
Before sending changes to the parallel apply worker, the leader computes a hash
using RI key values and the relid of the current change to search the hash
table. If an existing entry is found, the leader tells the parallel worker
to wait for the remote xid in the hash entry, after which the leader updates the
hash entry with the current xid.

If the remote relation lacks a replica identity (RI), it indicates that only
INSERT can be replicated for this table. In such cases, the leader skips
dependency checks, allowing the parallel apply worker to proceed with applying
changes without delay. This is because the only potential conflict could happen
is related to the local unique key or foreign key, which that is yet to be
implemented (see TODO - dependency on local unique key, foreign key.).

In cases of TRUNCATE or remote schema changes affecting the entire table, the
leader retrieves all remote xids touching the same table (via sequential scans
of the hash table) and tells the parallel worker to wait for those transactions
to commit.

Hash entries are cleaned up once the transaction corresponding to the remote xid
in the entry has been committed. Clean-up typically occurs when collecting the
flush position of each transaction, but is forced if the hash table exceeds a
set threshold.

--
dedendency waiting
--

If a transaction is relied upon by others, the leader adds its xid to a shared
hash table. The shared hash table entry is cleared by the parallel apply worker
upon completing the transaction. Workers needing to wait for a transaction check
the shared hash table entry; if present, they lock the transaction ID (using
pa_lock_transaction). If absent, it indicates the transaction has been
committed, negating the need to wait.

--
commit order
--

There is a case where columns have no foreign or primary keys, and integrity is
maintained at the application layer. In this case, the above RI mechanism cannot
detect any dependencies. For safety reasons, parallel apply workers preserve the
commit ordering done on the publisher side. This is done by the leader worker
caching the lastly dispatched transaction ID and adding a dependency between it
and the currently dispatching one.

--
TODO - dependency on local unique key, foreign key.
--

A transaction could conflict with another if modifying the same unique key.
While current patches don't address conflicts involving unique or foreign keys,
tracking these dependencies might be needed.

--
TODO - user defined trigger and constraints.
--

It would be chanllege to check the dependency if the table has user defined
trigger or constraints. the most viable solution might be to disallow parallel
apply for relations whose triggers and constraints are not marked as
parallel-safe or immutable.
---
 .../replication/logical/applyparallelworker.c | 332 ++++++++++++++++--
 src/backend/replication/logical/proto.c       |  38 ++
 src/backend/replication/logical/relation.c    |  31 ++
 src/backend/replication/logical/worker.c      | 309 ++++++++++++++--
 src/include/replication/logicalproto.h        |   2 +
 src/include/replication/logicalrelation.h     |   2 +
 src/include/replication/worker_internal.h     |  11 +-
 src/test/subscription/t/001_rep_changes.pl    |   2 +
 src/test/subscription/t/010_truncate.pl       |   2 +-
 src/test/subscription/t/015_stream.pl         |   8 +-
 src/test/subscription/t/026_stats.pl          |   1 +
 src/test/subscription/t/027_nosuperuser.pl    |   1 +
 src/tools/pgindent/typedefs.list              |   4 +
 13 files changed, 668 insertions(+), 75 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 40d57daf179..47b5bc3b48a 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -14,6 +14,9 @@
  * ParallelApplyWorkerInfo which is required so the leader worker and parallel
  * apply workers can communicate with each other.
  *
+ * Streaming transactions
+ * ======================
+ *
  * The parallel apply workers are assigned (if available) as soon as xact's
  * first stream is received for subscriptions that have set their 'streaming'
  * option as parallel. The leader apply worker will send changes to this new
@@ -146,6 +149,33 @@
  * which will detect deadlock if any. See pa_send_data() and
  * enum TransApplyAction.
  *
+ * Non-streaming transactions
+ * ======================
+ * The handling is similar to streaming transactions, but including few
+ * differences:
+ *
+ * Transaction dependency
+ * -------------------------------
+ * Before dispatching changes to a parallel worker, the leader verifies if the
+ * current modification affects the same row (identitied by replica identity
+ * key) as another ongoing transaction (see handle_dependency_on_change for
+ * details). If so, the leader sends a list of dependent transaction IDs to the
+ * parallel worker, indicating that the parallel apply worker must wait for
+ * these transactions to commit before proceeding.
+ *
+ * Commit order
+ * ------------
+ * There is a case where columns have no foreign or primary keys, and integrity
+ * is maintained at the application layer. In this case, the above RI mechanism
+ * cannot detect any dependencies. For safety reasons, parallel apply workers
+ * preserve the commit ordering done on the publisher side. This is done by the
+ * leader worker caching the lastly dispatched transaction ID and adding a
+ * dependency between it and the currently dispatching one.
+ * We can extend the parallel apply worker to allow out-of-order commits in the
+ * future: At least we must use a new mechanism to track replication progress
+ * in out-of-order commits. Then we can stop caching the transaction ID and
+ * adding the dependency.
+ *
  * Lock types
  * ----------
  * Both the stream lock and the transaction lock mentioned above are
@@ -283,6 +313,7 @@ static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
 static PartialFileSetState pa_get_fileset_state(void);
 static void pa_attach_parallelized_txn_hash(dsa_handle *pa_dsa_handle,
 											dshash_table_handle *pa_dshash_handle);
+static void write_internal_relation(StringInfo s, LogicalRepRelation *rel);
 
 /*
  * Returns true if it is OK to start a parallel apply worker, false otherwise.
@@ -400,6 +431,7 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
 	shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
 	SpinLockInit(&shared->mutex);
 
+	shared->xid = InvalidTransactionId;
 	shared->xact_state = PARALLEL_TRANS_UNKNOWN;
 	pg_atomic_init_u32(&(shared->pending_stream_count), 0);
 	shared->last_commit_end = InvalidXLogRecPtr;
@@ -443,6 +475,8 @@ pa_launch_parallel_worker(void)
 	MemoryContext oldcontext;
 	bool		launched;
 	ParallelApplyWorkerInfo *winfo;
+	dsa_handle	pa_dsa_handle;
+	dshash_table_handle pa_dshash_handle;
 	ListCell   *lc;
 
 	/* Try to get an available parallel apply worker from the worker pool. */
@@ -450,10 +484,33 @@ pa_launch_parallel_worker(void)
 	{
 		winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
 
-		if (!winfo->in_use)
+		if (!winfo->stream_txn &&
+			pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED)
+		{
+			/*
+			 * Save the local commit LSN of the last transaction applied by
+			 * this worker before reusing it for another transaction. This WAL
+			 * position is crucial for determining the flush position in
+			 * responses to the publisher (see get_flush_position()).
+			 */
+			(void) pa_get_last_commit_end(winfo->shared->xid, false, NULL);
+			return winfo;
+		}
+
+		if (winfo->stream_txn && !winfo->in_use)
 			return winfo;
 	}
 
+	pa_attach_parallelized_txn_hash(&pa_dsa_handle, &pa_dshash_handle);
+
+	/*
+	 * Return if the number of parallel apply workers has reached the maximum
+	 * limit.
+	 */
+	if (list_length(ParallelApplyWorkerPool) ==
+		max_parallel_apply_workers_per_subscription)
+		return NULL;
+
 	/*
 	 * Start a new parallel apply worker.
 	 *
@@ -481,18 +538,32 @@ pa_launch_parallel_worker(void)
 										dsm_segment_handle(winfo->dsm_seg),
 										false);
 
-	if (launched)
-	{
-		ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
-	}
-	else
+	if (!launched)
 	{
+		MemoryContextSwitchTo(oldcontext);
 		pa_free_worker_info(winfo);
-		winfo = NULL;
+		return NULL;
 	}
 
+	ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
+
 	MemoryContextSwitchTo(oldcontext);
 
+	/*
+	 * Send all existing remote relation information to the parallel apply
+	 * worker. This allows the parallel worker to initialize the
+	 * LogicalRepRelMapEntry locally before applying remote changes.
+	 */
+	if (logicalrep_get_num_rels())
+	{
+		StringInfoData out;
+
+		initStringInfo(&out);
+
+		write_internal_relation(&out, NULL);
+		pa_send_data(winfo, out.len, out.data);
+	}
+
 	return winfo;
 }
 
@@ -597,7 +668,8 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo)
 {
 	Assert(!am_parallel_apply_worker());
 	Assert(winfo->in_use);
-	Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
+	Assert(!winfo->stream_txn ||
+		   pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
 
 	if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
 		elog(ERROR, "hash table corrupted");
@@ -613,9 +685,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo)
 	 * been serialized and then letting the parallel apply worker deal with
 	 * the spurious message, we stop the worker.
 	 */
-	if (winfo->serialize_changes ||
-		list_length(ParallelApplyWorkerPool) >
-		(max_parallel_apply_workers_per_subscription / 2))
+	if (winfo->serialize_changes)
 	{
 		logicalrep_pa_worker_stop(winfo);
 		pa_free_worker_info(winfo);
@@ -812,6 +882,38 @@ pa_get_last_commit_end(TransactionId xid, bool delete_entry, bool *skipped_write
 	return entry->local_end;
 }
 
+/*
+ * Wait for the remote transaction associated with the specified remote xid to
+ * complete.
+ */
+static void
+pa_wait_for_transaction(TransactionId wait_for_xid)
+{
+	if (!am_leader_apply_worker())
+		return;
+
+	if (!TransactionIdIsValid(wait_for_xid))
+		return;
+
+	elog(DEBUG1, "plan to wait for remote_xid %u to finish",
+		 wait_for_xid);
+
+	for (;;)
+	{
+		if (pa_transaction_committed(wait_for_xid))
+			break;
+
+		pa_lock_transaction(wait_for_xid, AccessShareLock);
+		pa_unlock_transaction(wait_for_xid, AccessShareLock);
+
+		/* An interrupt may have occurred while we were waiting. */
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	elog(DEBUG1, "finished wait for remote_xid %u to finish",
+		 wait_for_xid);
+}
+
 /*
  * Interrupt handler for main loop of parallel apply worker.
  */
@@ -887,21 +989,34 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
 			 * parallel apply workers can only be PqReplMsg_WALData.
 			 */
 			c = pq_getmsgbyte(&s);
-			if (c != PqReplMsg_WALData)
-				elog(ERROR, "unexpected message \"%c\"", c);
-
-			/*
-			 * Ignore statistics fields that have been updated by the leader
-			 * apply worker.
-			 *
-			 * XXX We can avoid sending the statistics fields from the leader
-			 * apply worker but for that, it needs to rebuild the entire
-			 * message by removing these fields which could be more work than
-			 * simply ignoring these fields in the parallel apply worker.
-			 */
-			s.cursor += SIZE_STATS_MESSAGE;
+			if (c == PqReplMsg_WALData)
+			{
+				/*
+				 * Ignore statistics fields that have been updated by the
+				 * leader apply worker.
+				 *
+				 * XXX We can avoid sending the statistics fields from the
+				 * leader apply worker but for that, it needs to rebuild the
+				 * entire message by removing these fields which could be more
+				 * work than simply ignoring these fields in the parallel
+				 * apply worker.
+				 */
+				s.cursor += SIZE_STATS_MESSAGE;
 
-			apply_dispatch(&s);
+				apply_dispatch(&s);
+			}
+			else if (c == PARALLEL_APPLY_INTERNAL_MESSAGE)
+			{
+				apply_dispatch(&s);
+			}
+			else
+			{
+				/*
+				 * The first byte of messages sent from leader apply worker to
+				 * parallel apply workers can only be 'w' or 'i'.
+				 */
+				elog(ERROR, "unexpected message \"%c\"", c);
+			}
 		}
 		else if (shmq_res == SHM_MQ_WOULD_BLOCK)
 		{
@@ -918,6 +1033,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
 
 				if (rc & WL_LATCH_SET)
 					ResetLatch(MyLatch);
+
+				if (!IsTransactionState())
+					pgstat_report_stat(true);
 			}
 		}
 		else
@@ -955,6 +1073,9 @@ pa_shutdown(int code, Datum arg)
 				   INVALID_PROC_NUMBER);
 
 	dsm_detach((dsm_segment *) DatumGetPointer(arg));
+
+	if (parallel_apply_dsa_area)
+		dsa_detach(parallel_apply_dsa_area);
 }
 
 /*
@@ -1267,7 +1388,6 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 	shm_mq_result result;
 	TimestampTz startTime = 0;
 
-	Assert(!IsTransactionState());
 	Assert(!winfo->serialize_changes);
 
 	/*
@@ -1319,6 +1439,67 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 	}
 }
 
+/*
+ * Distribute remote relation information to all active parallel apply workers
+ * that require it.
+ */
+void
+pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel)
+{
+	List	   *workers_stopped = NIL;
+	StringInfoData out;
+
+	if (!ParallelApplyWorkerPool)
+		return;
+
+	initStringInfo(&out);
+
+	write_internal_relation(&out, rel);
+
+	foreach_ptr(ParallelApplyWorkerInfo, winfo, ParallelApplyWorkerPool)
+	{
+		/*
+		 * Skip the worker responsible for the current transaction, as the
+		 * relation information has already been sent to it.
+		 */
+		if (winfo == stream_apply_worker)
+			continue;
+
+		/*
+		 * Skip the worker that is in serialize mode, as they will soon stop
+		 * once they finish applying the transaction.
+		 */
+		if (winfo->serialize_changes)
+			continue;
+
+		elog(DEBUG1, "distributing schema changes to pa workers");
+
+		if (pa_send_data(winfo, out.len, out.data))
+			continue;
+
+		elog(DEBUG1, "failed to distribute, will stop that worker instead");
+
+		/*
+		 * Distribution to this worker failed due to a sending timeout. Wait
+		 * for the worker to complete its transaction and then stop it. This
+		 * is consistent with the handling of workers in serialize mode (see
+		 * pa_free_worker() for details).
+		 */
+		pa_wait_for_transaction(winfo->shared->xid);
+
+		pa_get_last_commit_end(winfo->shared->xid, false, NULL);
+
+		logicalrep_pa_worker_stop(winfo);
+
+		workers_stopped = lappend(workers_stopped, winfo);
+	}
+
+	pfree(out.data);
+
+	foreach_ptr(ParallelApplyWorkerInfo, winfo, workers_stopped)
+		pa_free_worker_info(winfo);
+}
+
 /*
  * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
  * that the current data and any subsequent data for this transaction will be
@@ -1401,8 +1582,8 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
 
 	/*
 	 * Wait for the transaction lock to be released. This is required to
-	 * detect deadlock among leader and parallel apply workers. Refer to the
-	 * comments atop this file.
+	 * detect detect deadlock among leader and parallel apply workers. Refer
+	 * to the comments atop this file.
 	 */
 	pa_lock_transaction(winfo->shared->xid, AccessShareLock);
 	pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
@@ -1479,6 +1660,9 @@ pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
 void
 pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
 {
+	if (!TransactionIdIsValid(top_xid))
+		return;
+
 	if (current_xid != top_xid &&
 		!list_member_xid(subxactlist, current_xid))
 	{
@@ -1735,25 +1919,41 @@ pa_decr_and_wait_stream_block(void)
 void
 pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
 {
+	XLogRecPtr	local_lsn = InvalidXLogRecPtr;
+	TransactionId pa_remote_xid = winfo->shared->xid;
+
 	Assert(am_leader_apply_worker());
 
 	/*
-	 * Unlock the shared object lock so that parallel apply worker can
-	 * continue to receive and apply changes.
+	 * Unlock the shared object lock taken for streaming transactions so that
+	 * parallel apply worker can continue to receive and apply changes.
 	 */
-	pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
+	if (winfo->stream_txn)
+		pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
 
 	/*
-	 * Wait for that worker to finish. This is necessary to maintain commit
-	 * order which avoids failures due to transaction dependencies and
-	 * deadlocks.
+	 * Wait for that worker for streaming transaction to finish. This is
+	 * necessary to maintain commit order which avoids failures due to
+	 * transaction dependencies and deadlocks.
+	 *
+	 * For non-streaming transaction but in partial seralize mode, wait for
+	 * stop as well as the worker is anyway cannot be reused anymore (see
+	 * pa_free_worker() for details).
 	 */
-	pa_wait_for_xact_finish(winfo);
+	if (winfo->serialize_changes || winfo->stream_txn)
+	{
+		pa_wait_for_xact_finish(winfo);
+
+		local_lsn = winfo->shared->last_commit_end;
+		pa_remote_xid = InvalidTransactionId;
+
+		pa_free_worker(winfo);
+	}
 
 	if (XLogRecPtrIsValid(remote_lsn))
-		store_flush_position(remote_lsn, winfo->shared->last_commit_end);
+		store_flush_position(remote_lsn, local_lsn, pa_remote_xid);
 
-	pa_free_worker(winfo);
+	pa_set_stream_apply_worker(NULL);
 }
 
 bool
@@ -1852,6 +2052,22 @@ pa_record_dependency_on_transactions(List *depends_on_xids)
 	}
 }
 
+/*
+ * Mark the transaction state as finished and remove the shared hash entry.
+ */
+void
+pa_commit_transaction(void)
+{
+	TransactionId xid = MyParallelShared->xid;
+
+	SpinLockAcquire(&MyParallelShared->mutex);
+	MyParallelShared->xact_state = PARALLEL_TRANS_FINISHED;
+	SpinLockRelease(&MyParallelShared->mutex);
+
+	dshash_delete_key(parallelized_txns, &xid);
+	elog(DEBUG1, "depended xid %u committed", xid);
+}
+
 /*
  * Wait for the given transaction to finish.
  */
@@ -1880,3 +2096,45 @@ pa_wait_for_depended_transaction(TransactionId xid)
 
 	elog(DEBUG1, "finish waiting for depended xid %u", xid);
 }
+
+/*
+ * Write internal relation description to the output stream.
+ */
+static void
+write_internal_relation(StringInfo s, LogicalRepRelation *rel)
+{
+	pq_sendbyte(s, PARALLEL_APPLY_INTERNAL_MESSAGE);
+	pq_sendbyte(s, LOGICAL_REP_MSG_INTERNAL_RELATION);
+
+	if (rel)
+	{
+		pq_sendint(s, 1, 4);
+		logicalrep_write_internal_rel(s, rel);
+	}
+	else
+	{
+		pq_sendint(s, logicalrep_get_num_rels(), 4);
+		logicalrep_write_all_rels(s);
+	}
+}
+
+/*
+ * Register a transaction to the shared hash table.
+ *
+ * This function is intended to be called during the commit phase of
+ * non-streamed transactions. Other parallel workers would wait,
+ * removing the added entry.
+ */
+void
+pa_add_parallelized_transaction(TransactionId xid)
+{
+	bool		found;
+	ParallelizedTxnEntry *txn_entry;
+
+	Assert(parallelized_txns);
+	Assert(TransactionIdIsValid(xid));
+
+	txn_entry = dshash_find_or_insert(parallelized_txns, &xid, &found);
+
+	dshash_release_lock(parallelized_txns, txn_entry);
+}
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 72dedee3a43..73a1bd36963 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -691,6 +691,44 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
 	logicalrep_write_attrs(out, rel, columns, include_gencols_type);
 }
 
+/*
+ * Write internal relation description to the output stream.
+ */
+void
+logicalrep_write_internal_rel(StringInfo out, LogicalRepRelation *rel)
+{
+	pq_sendint32(out, rel->remoteid);
+
+	/* Write relation name */
+	pq_sendstring(out, rel->nspname);
+	pq_sendstring(out, rel->relname);
+
+	/* Write the replica identity. */
+	pq_sendbyte(out, rel->replident);
+
+	/* Write attribute description */
+	pq_sendint16(out, rel->natts);
+
+	for (int i = 0; i < rel->natts; i++)
+	{
+		uint8		flags = 0;
+
+		if (bms_is_member(i, rel->attkeys))
+			flags |= LOGICALREP_IS_REPLICA_IDENTITY;
+
+		pq_sendbyte(out, flags);
+
+		/* attribute name */
+		pq_sendstring(out, rel->attnames[i]);
+
+		/* attribute type id */
+		pq_sendint32(out, rel->atttyps[i]);
+
+		/* ignore attribute mode for now */
+		pq_sendint32(out, 0);
+	}
+}
+
 /*
  * Read the relation info from stream and return as LogicalRepRelation.
  */
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 66c73ce34a1..001cf6a143f 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -960,6 +960,37 @@ FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
 	return InvalidOid;
 }
 
+/*
+ * Get the number of entries in the LogicalRepRelMap.
+ */
+int
+logicalrep_get_num_rels(void)
+{
+	if (LogicalRepRelMap == NULL)
+		return 0;
+
+	return hash_get_num_entries(LogicalRepRelMap);
+}
+
+/*
+ * Write all the remote relation information from the LogicalRepRelMapEntry to
+ * the output stream.
+ */
+void
+logicalrep_write_all_rels(StringInfo out)
+{
+	LogicalRepRelMapEntry *entry;
+	HASH_SEQ_STATUS status;
+
+	if (LogicalRepRelMap == NULL)
+		return;
+
+	hash_seq_init(&status, LogicalRepRelMap);
+
+	while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+		logicalrep_write_internal_rel(out, &entry->remoterel);
+}
+
 /*
  * Get the LogicalRepRelMapEntry corresponding to the given relid without
  * opening the local relation.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 269a3ac5804..8c871b205fc 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -484,6 +484,8 @@ static List *on_commit_wakeup_workers_subids = NIL;
 
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+static TransactionId remote_xid = InvalidTransactionId;
+static TransactionId last_remote_xid = InvalidTransactionId;
 
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
@@ -602,11 +604,7 @@ static inline void cleanup_subxact_info(void);
 /*
  * Serialize and deserialize changes for a toplevel transaction.
  */
-static void stream_open_file(Oid subid, TransactionId xid,
-							 bool first_segment);
 static void stream_write_change(char action, StringInfo s);
-static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
-static void stream_close_file(void);
 
 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
 
@@ -676,6 +674,8 @@ static void replorigin_reset(int code, Datum arg);
 static bool send_internal_dependencies(ParallelApplyWorkerInfo *winfo,
 									   StringInfo s);
 
+static bool build_dependency_with_last_committed_txn(ParallelApplyWorkerInfo *winfo);
+
 /*
  * Compute the hash value for entries in the replica_identity_table.
  */
@@ -1406,7 +1406,11 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
 	TransApplyAction apply_action;
 	StringInfoData original_msg;
 
-	apply_action = get_transaction_apply_action(stream_xid, &winfo);
+	Assert(!in_streamed_transaction || TransactionIdIsValid(stream_xid));
+
+	apply_action = get_transaction_apply_action(in_streamed_transaction
+												? stream_xid : remote_xid,
+												&winfo);
 
 	/* not in streaming mode */
 	if (apply_action == TRANS_LEADER_APPLY)
@@ -1415,8 +1419,6 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
 		return false;
 	}
 
-	Assert(TransactionIdIsValid(stream_xid));
-
 	/*
 	 * The parallel apply worker needs the xid in this message to decide
 	 * whether to define a savepoint, so save the original message that has
@@ -1427,15 +1429,28 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
 
 	/*
 	 * We should have received XID of the subxact as the first part of the
-	 * message, so extract it.
+	 * message in streaming transactions, so extract it.
 	 */
-	current_xid = pq_getmsgint(s, 4);
+	if (in_streamed_transaction)
+		current_xid = pq_getmsgint(s, 4);
+	else
+		current_xid = remote_xid;
 
 	if (!TransactionIdIsValid(current_xid))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
 				 errmsg_internal("invalid transaction ID in streamed replication transaction")));
 
+	handle_dependency_on_change(action, s, current_xid, winfo);
+
+	/*
+	 * Re-fetch the latest apply action as it might have been changed during
+	 * dependency check.
+	 */
+	apply_action = get_transaction_apply_action(in_streamed_transaction
+												? stream_xid : remote_xid,
+												&winfo);
+
 	switch (apply_action)
 	{
 		case TRANS_LEADER_SERIALIZE:
@@ -1839,17 +1854,71 @@ static void
 apply_handle_begin(StringInfo s)
 {
 	LogicalRepBeginData begin_data;
+	ParallelApplyWorkerInfo *winfo;
+	TransApplyAction apply_action;
+
+	/* Save the message before it is consumed. */
+	StringInfoData original_msg = *s;
 
 	/* There must not be an active streaming transaction. */
 	Assert(!TransactionIdIsValid(stream_xid));
 
 	logicalrep_read_begin(s, &begin_data);
-	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
+
+	remote_xid = begin_data.xid;
+
+	set_apply_error_context_xact(remote_xid, begin_data.final_lsn);
 
 	remote_final_lsn = begin_data.final_lsn;
 
 	maybe_start_skipping_changes(begin_data.final_lsn);
 
+	pa_allocate_worker(remote_xid, false);
+
+	apply_action = get_transaction_apply_action(remote_xid, &winfo);
+
+	elog(DEBUG1, "new remote_xid %u", remote_xid);
+	switch (apply_action)
+	{
+		case TRANS_LEADER_APPLY:
+			break;
+
+		case TRANS_LEADER_SEND_TO_PARALLEL:
+			Assert(winfo);
+
+			if (pa_send_data(winfo, s->len, s->data))
+			{
+				pa_set_stream_apply_worker(winfo);
+				break;
+			}
+
+			/*
+			 * Switch to serialize mode when we are not able to send the
+			 * change to parallel apply worker.
+			 */
+			pa_switch_to_partial_serialize(winfo, true);
+
+/* fall through */
+		case TRANS_LEADER_PARTIAL_SERIALIZE:
+			Assert(winfo);
+
+			stream_write_change(LOGICAL_REP_MSG_BEGIN, &original_msg);
+
+			/* Cache the parallel apply worker for this transaction. */
+			pa_set_stream_apply_worker(winfo);
+			break;
+
+		case TRANS_PARALLEL_APPLY:
+			/* Hold the lock until the end of the transaction. */
+			pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
+			pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
+			break;
+
+		default:
+			elog(ERROR, "unexpected apply action: %d", (int) apply_action);
+			break;
+	}
+
 	in_remote_transaction = true;
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
@@ -1882,6 +1951,37 @@ send_internal_dependencies(ParallelApplyWorkerInfo *winfo, StringInfo s)
 	return false;
 }
 
+/*
+ * Make a dependency between this and the lastly committed transaction.
+ *
+ * This function ensures that the commit ordering handled by parallel apply
+ * workers is preserved. Returns false if we switched to the serialize mode to
+ * send the massage, true otherwise.
+ */
+static bool
+build_dependency_with_last_committed_txn(ParallelApplyWorkerInfo *winfo)
+{
+	StringInfoData dependency_msg;
+	bool		ret;
+
+	/* Skip if transactions have not been applied yet */
+	if (!TransactionIdIsValid(last_remote_xid))
+		return true;
+
+	/* Build the dependency message used to send to parallel apply worker */
+	initStringInfo(&dependency_msg);
+
+	pq_sendbyte(&dependency_msg, PARALLEL_APPLY_INTERNAL_MESSAGE);
+	pq_sendbyte(&dependency_msg, LOGICAL_REP_MSG_INTERNAL_DEPENDENCY);
+	pq_sendint32(&dependency_msg, 1);
+	pq_sendint32(&dependency_msg, last_remote_xid);
+
+	ret = send_internal_dependencies(winfo, &dependency_msg);
+
+	pfree(dependency_msg.data);
+	return ret;
+}
+
 /*
  * Handle COMMIT message.
  *
@@ -1891,6 +1991,11 @@ static void
 apply_handle_commit(StringInfo s)
 {
 	LogicalRepCommitData commit_data;
+	ParallelApplyWorkerInfo *winfo;
+	TransApplyAction apply_action;
+
+	/* Save the message before it is consumed. */
+	StringInfoData original_msg = *s;
 
 	logicalrep_read_commit(s, &commit_data);
 
@@ -1901,7 +2006,84 @@ apply_handle_commit(StringInfo s)
 								 LSN_FORMAT_ARGS(commit_data.commit_lsn),
 								 LSN_FORMAT_ARGS(remote_final_lsn))));
 
-	apply_handle_commit_internal(&commit_data);
+	apply_action = get_transaction_apply_action(remote_xid, &winfo);
+
+	switch (apply_action)
+	{
+		case TRANS_LEADER_APPLY:
+			apply_handle_commit_internal(&commit_data);
+			break;
+
+		case TRANS_LEADER_SEND_TO_PARALLEL:
+			Assert(winfo);
+
+			/*
+			 * Mark this transaction as parallelized. This ensures that
+			 * upcoming transactions wait until this transaction is committed.
+			 */
+			pa_add_parallelized_transaction(remote_xid);
+
+			/*
+			 * Build a dependency between this transaction and the lastly
+			 * committed transaction to preserve the commit order. Then try to
+			 * send a COMMIT message if succeeded.
+			 */
+			if (build_dependency_with_last_committed_txn(winfo) &&
+				pa_send_data(winfo, s->len, s->data))
+			{
+				/* Finish processing the transaction. */
+				pa_xact_finish(winfo, commit_data.end_lsn);
+				break;
+			}
+
+			/*
+			 * Switch to serialize mode when we are not able to send the
+			 * change to parallel apply worker.
+			 */
+			pa_switch_to_partial_serialize(winfo, true);
+/* fall through */
+		case TRANS_LEADER_PARTIAL_SERIALIZE:
+			Assert(winfo);
+
+			stream_open_and_write_change(remote_xid, LOGICAL_REP_MSG_COMMIT,
+										 &original_msg);
+
+			pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
+
+			/* Finish processing the transaction. */
+			pa_xact_finish(winfo, commit_data.end_lsn);
+			break;
+
+		case TRANS_PARALLEL_APPLY:
+
+			/*
+			 * If the parallel apply worker is applying spooled messages then
+			 * close the file before committing.
+			 */
+			if (stream_fd)
+				stream_close_file();
+
+			apply_handle_commit_internal(&commit_data);
+
+			MyParallelShared->last_commit_end = XactLastCommitEnd;
+
+			pa_commit_transaction();
+
+			pa_unlock_transaction(remote_xid, AccessExclusiveLock);
+			break;
+
+		default:
+			elog(ERROR, "unexpected apply action: %d", (int) apply_action);
+			break;
+	}
+
+	/* Cache the remote_xid */
+	last_remote_xid = remote_xid;
+
+	remote_xid = InvalidTransactionId;
+	in_remote_transaction = false;
+
+	elog(DEBUG1, "reset remote_xid %u", remote_xid);
 
 	/*
 	 * Process any tables that are being synchronized in parallel, as well as
@@ -2024,7 +2206,8 @@ apply_handle_prepare(StringInfo s)
 	 * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
 	 * it.
 	 */
-	store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
+	store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr,
+						 InvalidTransactionId);
 
 	in_remote_transaction = false;
 
@@ -2072,6 +2255,8 @@ apply_handle_commit_prepared(StringInfo s)
 	/* There is no transaction when COMMIT PREPARED is called */
 	begin_replication_step();
 
+	/* TODO wait for xid to finish */
+
 	/*
 	 * Update origin state so we can restart streaming from correct position
 	 * in case of crash.
@@ -2084,7 +2269,8 @@ apply_handle_commit_prepared(StringInfo s)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
+	store_flush_position(prepare_data.end_lsn, XactLastCommitEnd,
+						 InvalidTransactionId);
 	in_remote_transaction = false;
 
 	/*
@@ -2153,7 +2339,8 @@ apply_handle_rollback_prepared(StringInfo s)
 	 * transaction because we always flush the WAL record for it. See
 	 * apply_handle_prepare.
 	 */
-	store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
+	store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr,
+						 InvalidTransactionId);
 	in_remote_transaction = false;
 
 	/*
@@ -2215,7 +2402,8 @@ apply_handle_stream_prepare(StringInfo s)
 			 * It is okay not to set the local_end LSN for the prepare because
 			 * we always flush the prepare record. See apply_handle_prepare.
 			 */
-			store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
+			store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr,
+								 InvalidTransactionId);
 
 			in_remote_transaction = false;
 
@@ -2467,6 +2655,11 @@ apply_handle_stream_start(StringInfo s)
 		case TRANS_LEADER_PARTIAL_SERIALIZE:
 			Assert(winfo);
 
+			/*
+			 * TODO, the pa worker could start to wait too soon when
+			 * processing some old stream start
+			 */
+
 			/*
 			 * Open the spool file unless it was already opened when switching
 			 * to serialize mode. The transaction started in
@@ -3084,7 +3277,20 @@ apply_handle_stream_commit(StringInfo s)
 		case TRANS_LEADER_SEND_TO_PARALLEL:
 			Assert(winfo);
 
-			if (pa_send_data(winfo, s->len, s->data))
+			/*
+			 * Apart from non-streaming case, no need to mark this transaction
+			 * as parallelized. Because the leader waits until the streamed
+			 * transaction is committed thus commit ordering is always
+			 * preserved.
+			 */
+
+			/*
+			 * Build a dependency between this transaction and the lastly
+			 * committed transaction to preserve the commit order. Then try to
+			 * send a COMMIT message if succeeded.
+			 */
+			if (build_dependency_with_last_committed_txn(winfo) &&
+				pa_send_data(winfo, s->len, s->data))
 			{
 				/* Finish processing the streaming transaction. */
 				pa_xact_finish(winfo, commit_data.end_lsn);
@@ -3140,6 +3346,9 @@ apply_handle_stream_commit(StringInfo s)
 			break;
 	}
 
+	/* Cache the remote xid */
+	last_remote_xid = xid;
+
 	/*
 	 * Process any tables that are being synchronized in parallel, as well as
 	 * any newly added tables or sequences.
@@ -3194,7 +3403,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 
 		pgstat_report_stat(false);
 
-		store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
+		store_flush_position(commit_data->end_lsn, XactLastCommitEnd,
+							 InvalidTransactionId);
 	}
 	else
 	{
@@ -3227,6 +3437,9 @@ apply_handle_relation(StringInfo s)
 
 	/* Also reset all entries in the partition map that refer to remoterel. */
 	logicalrep_partmap_reset_relmap(rel);
+
+	if (am_leader_apply_worker())
+		pa_distribute_schema_changes_to_workers(rel);
 }
 
 /*
@@ -4001,6 +4214,8 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
 
 /*
  * This handles insert, update, delete on a partitioned table.
+ *
+ * TODO, support parallel apply.
  */
 static void
 apply_handle_tuple_routing(ApplyExecutionData *edata,
@@ -4551,6 +4766,10 @@ apply_dispatch(StringInfo s)
  * check which entries on it are already locally flushed. Those we can report
  * as having been flushed.
  *
+ * For non-streaming transactions managed by a parallel apply worker, we will
+ * get the local commit end from the shared parallel apply worker info once the
+ * transaction has been committed by the worker.
+ *
  * The have_pending_txes is true if there are outstanding transactions that
  * need to be flushed.
  */
@@ -4560,6 +4779,7 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
 {
 	dlist_mutable_iter iter;
 	XLogRecPtr	local_flush = GetFlushRecPtr(NULL);
+	List	   *committed_pa_xid = NIL;
 
 	*write = InvalidXLogRecPtr;
 	*flush = InvalidXLogRecPtr;
@@ -4569,6 +4789,36 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
 		FlushPosition *pos =
 			dlist_container(FlushPosition, node, iter.cur);
 
+		if (TransactionIdIsValid(pos->pa_remote_xid) &&
+			XLogRecPtrIsInvalid(pos->local_end))
+		{
+			bool		skipped_write;
+
+			pos->local_end = pa_get_last_commit_end(pos->pa_remote_xid, true,
+													&skipped_write);
+
+			elog(DEBUG1,
+				 "got commit end from parallel apply worker, "
+				 "txn: %u, remote_end %X/%X, local_end %X/%X",
+				 pos->pa_remote_xid, LSN_FORMAT_ARGS(pos->remote_end),
+				 LSN_FORMAT_ARGS(pos->local_end));
+
+			/*
+			 * Break the loop if the worker has not finished applying the
+			 * transaction. There's no need to check subsequent transactions,
+			 * as they must commit after the current transaction being
+			 * examined and thus won't have their commit end available yet.
+			 */
+			if (!skipped_write && XLogRecPtrIsInvalid(pos->local_end))
+				break;
+
+			committed_pa_xid = lappend_xid(committed_pa_xid, pos->pa_remote_xid);
+		}
+
+		/*
+		 * Worker has finished applying or the transaction was applied in the
+		 * leader apply worker
+		 */
 		*write = pos->remote_end;
 
 		if (pos->local_end <= local_flush)
@@ -4577,29 +4827,19 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
 			dlist_delete(iter.cur);
 			pfree(pos);
 		}
-		else
-		{
-			/*
-			 * Don't want to uselessly iterate over the rest of the list which
-			 * could potentially be long. Instead get the last element and
-			 * grab the write position from there.
-			 */
-			pos = dlist_tail_element(FlushPosition, node,
-									 &lsn_mapping);
-			*write = pos->remote_end;
-			*have_pending_txes = true;
-			return;
-		}
 	}
 
 	*have_pending_txes = !dlist_is_empty(&lsn_mapping);
+
+	cleanup_replica_identity_table(committed_pa_xid);
 }
 
 /*
  * Store current remote/local lsn pair in the tracking list.
  */
 void
-store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
+store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn,
+					 TransactionId remote_xid)
 {
 	FlushPosition *flushpos;
 
@@ -4617,6 +4857,7 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
 	flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
 	flushpos->local_end = local_lsn;
 	flushpos->remote_end = remote_lsn;
+	flushpos->pa_remote_xid = remote_xid;
 
 	dlist_push_tail(&lsn_mapping, &flushpos->node);
 	MemoryContextSwitchTo(ApplyMessageContext);
@@ -6064,7 +6305,7 @@ stream_cleanup_files(Oid subid, TransactionId xid)
  * changes for this transaction, create the buffile, otherwise open the
  * previously created file.
  */
-static void
+void
 stream_open_file(Oid subid, TransactionId xid, bool first_segment)
 {
 	char		path[MAXPGPATH];
@@ -6109,7 +6350,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
  * stream_close_file
  *	  Close the currently open file with streamed changes.
  */
-static void
+void
 stream_close_file(void)
 {
 	Assert(stream_fd != NULL);
@@ -6157,7 +6398,7 @@ stream_write_change(char action, StringInfo s)
  * target file if not already before writing the message and close the file at
  * the end.
  */
-static void
+void
 stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
 {
 	Assert(!in_streamed_transaction);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 5d91e2a4287..7d2aaf2d389 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -253,6 +253,8 @@ extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecP
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel, Bitmapset *columns,
 								 PublishGencolsType include_gencols_type);
+extern void logicalrep_write_internal_rel(StringInfo out,
+										  LogicalRepRelation *rel);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
 								 Oid typoid);
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 4b321bd2ad2..34a7069e9e5 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -52,6 +52,8 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 								 LOCKMODE lockmode);
 extern bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap);
 extern Oid	GetRelationIdentityOrPK(Relation rel);
+extern int	logicalrep_get_num_rels(void);
+extern void logicalrep_write_all_rels(StringInfo out);
 extern LogicalRepRelMapEntry *logicalrep_get_relentry(LogicalRepRelId remoteid);
 
 #endif							/* LOGICALRELATION_H */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 78b5667cebe..5371ee767f1 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -314,6 +314,10 @@ extern void apply_dispatch(StringInfo s);
 extern void maybe_reread_subscription(void);
 
 extern void stream_cleanup_files(Oid subid, TransactionId xid);
+extern void stream_open_file(Oid subid, TransactionId xid, bool first_segment);
+extern void stream_close_file(void);
+extern void stream_open_and_write_change(TransactionId xid, char action,
+										 StringInfo s);
 
 extern void set_stream_options(WalRcvStreamOptions *options,
 							   char *slotname,
@@ -327,7 +331,8 @@ extern void SetupApplyOrSyncWorker(int worker_slot);
 
 extern void DisableSubscriptionAndExit(void);
 
-extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
+extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn,
+								 TransactionId remote_xid);
 
 /* Function for apply error callback */
 extern void apply_error_callback(void *arg);
@@ -342,6 +347,7 @@ extern void pa_detach_all_error_mq(void);
 
 extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
 						 const void *data);
+extern void pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel);
 extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
 										   bool stream_locked);
 
@@ -368,8 +374,9 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 extern bool pa_transaction_committed(TransactionId xid);
 extern void pa_record_dependency_on_transactions(List *depends_on_xids);
-
+extern void pa_commit_transaction(void);
 extern void pa_wait_for_depended_transaction(TransactionId xid);
+extern void pa_add_parallelized_transaction(TransactionId xid);
 
 #define isParallelApplyWorker(worker) ((worker)->in_use && \
 									   (worker)->type == WORKERTYPE_PARALLEL_APPLY)
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 430c1246d14..2caf798ee0a 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -16,6 +16,8 @@ $node_publisher->start;
 # Create subscriber node
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	"max_logical_replication_workers = 10");
 $node_subscriber->start;
 
 # Create some preexisting content on publisher
diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl
index 3d16c2a800d..c2fba0b9a9c 100644
--- a/src/test/subscription/t/010_truncate.pl
+++ b/src/test/subscription/t/010_truncate.pl
@@ -17,7 +17,7 @@ $node_publisher->start;
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
 $node_subscriber->append_conf('postgresql.conf',
-	qq(max_logical_replication_workers = 6));
+	qq(max_logical_replication_workers = 7));
 $node_subscriber->start;
 
 my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl
index 03135b1cd6e..e79ddd9a41c 100644
--- a/src/test/subscription/t/015_stream.pl
+++ b/src/test/subscription/t/015_stream.pl
@@ -232,6 +232,12 @@ $node_subscriber->wait_for_log(
 
 $node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)");
 
+# FIXME: Currently, non-streaming transactions are applied in parallel by
+# default. So, the first transaction is handled by a parallel apply worker. To
+# trigger the deadlock, initiate an more transaction to be applied by the
+# leader.
+$node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)");
+
 $h->query_safe('COMMIT');
 $h->quit;
 
@@ -247,7 +253,7 @@ $node_publisher->wait_for_catchup($appname);
 
 $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
-is($result, qq(5001), 'data replicated to subscriber after dropping index');
+is($result, qq(5002), 'data replicated to subscriber after dropping index');
 
 # Clean up test data from the environment.
 $node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2");
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index a430ab4feec..58e34839ab4 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -16,6 +16,7 @@ $node_publisher->start;
 # Create subscriber node.
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10");
 $node_subscriber->start;
 
 
diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl
index 691731743df..e0c1d213800 100644
--- a/src/test/subscription/t/027_nosuperuser.pl
+++ b/src/test/subscription/t/027_nosuperuser.pl
@@ -86,6 +86,7 @@ $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_publisher->init(allows_streaming => 'logical');
 $node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10");
 $node_publisher->start;
 $node_subscriber->start;
 $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index cf3f6a7dafd..c1bdd918df5 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2088,6 +2088,7 @@ ParallelTransState
 ParallelVacuumState
 ParallelWorkerContext
 ParallelWorkerInfo
+ParallelizedTxnEntry
 Param
 ParamCompileHook
 ParamExecData
@@ -2558,6 +2559,8 @@ ReparameterizeForeignPathByChild_function
 ReplaceVarsFromTargetList_context
 ReplaceVarsNoMatchOption
 ReplaceWrapOption
+ReplicaIdentityEntry
+ReplicaIdentityKey
 ReplicaIdentityStmt
 ReplicationKind
 ReplicationSlot
@@ -4054,6 +4057,7 @@ remoteDep
 remove_nulling_relids_context
 rendezvousHashEntry
 rep
+replica_identity_hash
 replace_rte_variables_callback
 replace_rte_variables_context
 report_error_fn
-- 
2.47.3

