From a4cb759a9f053fe66b8d1632a89d7880c2c6dfc2 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignesh21@gmail.com>
Date: Fri, 14 Jul 2023 09:59:53 +0530
Subject: [PATCH 4/4] Support worker pool for table sync.

Support worker pool for table sync.
---
 src/backend/replication/logical/Makefile      |   1 +
 src/backend/replication/logical/launcher.c    |   4 +-
 src/backend/replication/logical/meson.build   |   1 +
 src/backend/replication/logical/tablesync.c   | 185 +++++----
 .../replication/logical/tablesyncpool.c       | 382 ++++++++++++++++++
 src/backend/replication/logical/worker.c      | 154 +++----
 src/include/replication/worker_internal.h     |  71 +++-
 src/include/storage/procsignal.h              |   1 +
 8 files changed, 641 insertions(+), 158 deletions(-)
 create mode 100644 src/backend/replication/logical/tablesyncpool.c

diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 2dc25e37bb..251a28c4bd 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -27,6 +27,7 @@ OBJS = \
 	reorderbuffer.o \
 	snapbuild.o \
 	tablesync.o \
+	tablesyncpool.o \
 	worker.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 72e5ef8a78..6ef38fc68d 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -315,7 +315,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
 	int			nsyncworkers;
 	int			nparallelapplyworkers;
 	TimestampTz now;
-	bool		is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
+	bool		is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID) && !OidIsValid(relid);
 
 	/* Sanity check - tablesync worker cannot be a subworker */
 	Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
@@ -489,7 +489,7 @@ retry:
 	bgw.bgw_notify_pid = MyProcPid;
 	bgw.bgw_main_arg = Int32GetDatum(slot);
 
-	if (is_parallel_apply_worker)
+	if (is_parallel_apply_worker || OidIsValid(relid))
 		memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
 
 	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build
index d48cd4c590..99726fe504 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -13,5 +13,6 @@ backend_sources += files(
   'reorderbuffer.c',
   'snapbuild.c',
   'tablesync.c',
+  'tablesyncpool.c',
   'worker.c',
 )
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f042d9ae00..28113108df 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -152,10 +152,17 @@ clean_sync_worker(void)
  * Exit routine for synchronization worker.
  */
 void
-pg_attribute_noreturn()
-finish_sync_worker(void)
+finish_sync_worker(bool reuse_worker)
 {
-	clean_sync_worker();
+	/*
+	 * Commit any outstanding transaction. This is the usual case, unless there
+	 * was nothing to do for the table.
+	 */
+	if (IsTransactionState())
+	{
+		CommitTransactionCommand();
+		pgstat_report_stat(true);
+	}
 
 	/*
 	 * Disconnect from publisher. Otherwise reused sync workers causes
@@ -167,17 +174,22 @@ finish_sync_worker(void)
 		LogRepWorkerWalRcvConn = NULL;
 	}
 
-	/* And flush all writes. */
-	XLogFlush(GetXLogWriteRecPtr());
+	if (!reuse_worker)
+	{
+		tsp_worker_cleanup();
 
-	StartTransactionCommand();
-	ereport(LOG,
-			(errmsg("logical replication table synchronization worker for subscription \"%s\" has finished",
-					MySubscription->name)));
-	CommitTransactionCommand();
+		/* And flush all writes. */
+		XLogFlush(GetXLogWriteRecPtr());
+
+		StartTransactionCommand();
+		ereport(LOG,
+				(errmsg("logical replication table synchronization worker for subscription \"%s\" has finished",
+						MySubscription->name)));
+		CommitTransactionCommand();
 
-	/* Stop gracefully */
-	proc_exit(0);
+		/* Stop gracefully */
+		proc_exit(0);
+	}
 }
 
 /*
@@ -406,11 +418,59 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 						get_rel_name(MyLogicalRepWorker->relid),
 						MyLogicalRepWorker->relid)));
 		CommitTransactionCommand();
+		tsp_send_signal(tsp_get_apply_worker_pid());
 	}
 	else
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 }
 
+static void
+check_caughtup_update_state(XLogRecPtr current_lsn,
+							SubscriptionRelState *rstate, bool *started_tx)
+{
+	/*
+	 * Apply has caught up to the position where the table sync has
+	 * finished.  Mark the table as ready so that the apply will just
+	 * continue to replicate it normally.
+	 */
+	if (current_lsn >= rstate->lsn)
+	{
+		char		originname[NAMEDATALEN];
+
+		rstate->state = SUBREL_STATE_READY;
+		rstate->lsn = current_lsn;
+		if (!(*started_tx))
+		{
+			StartTransactionCommand();
+			*started_tx = true;
+		}
+
+		/*
+		 * Remove the tablesync origin tracking if exists.
+		 *
+		 * There is a chance that the user is concurrently performing
+		 * refresh for the subscription where we remove the table
+		 * state and its origin or the tablesync worker would have
+		 * already removed this origin. We can't rely on tablesync
+		 * worker to remove the origin tracking as if there is any
+		 * error while dropping we won't restart it to drop the
+		 * origin. So passing missing_ok = true.
+		 */
+		ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
+											rstate->relid,
+											originname,
+											sizeof(originname));
+		replorigin_drop_by_name(originname, true, false);
+
+		/*
+		 * Update the state to READY only after the origin cleanup.
+		 */
+		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+									rstate->relid, rstate->state,
+									rstate->lsn);
+	}
+}
+
 /*
  * Handle table synchronization cooperation from the apply worker.
  *
@@ -469,10 +529,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	 * Clean up the hash table when we're done with all tables (just to
 	 * release the bit of memory).
 	 */
-	else if (table_states_not_ready == NIL && last_start_times)
+	else if (table_states_not_ready == NIL)
 	{
-		hash_destroy(last_start_times);
-		last_start_times = NULL;
+		if (last_start_times)
+		{
+			hash_destroy(last_start_times);
+			last_start_times = NULL;
+		}
+
+		tsp_free_all_workers();
 	}
 
 	/*
@@ -483,49 +548,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
 
 		if (rstate->state == SUBREL_STATE_SYNCDONE)
-		{
-			/*
-			 * Apply has caught up to the position where the table sync has
-			 * finished.  Mark the table as ready so that the apply will just
-			 * continue to replicate it normally.
-			 */
-			if (current_lsn >= rstate->lsn)
-			{
-				char		originname[NAMEDATALEN];
-
-				rstate->state = SUBREL_STATE_READY;
-				rstate->lsn = current_lsn;
-				if (!started_tx)
-				{
-					StartTransactionCommand();
-					started_tx = true;
-				}
-
-				/*
-				 * Remove the tablesync origin tracking if exists.
-				 *
-				 * There is a chance that the user is concurrently performing
-				 * refresh for the subscription where we remove the table
-				 * state and its origin or the tablesync worker would have
-				 * already removed this origin. We can't rely on tablesync
-				 * worker to remove the origin tracking as if there is any
-				 * error while dropping we won't restart it to drop the
-				 * origin. So passing missing_ok = true.
-				 */
-				ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
-												   rstate->relid,
-												   originname,
-												   sizeof(originname));
-				replorigin_drop_by_name(originname, true, false);
-
-				/*
-				 * Update the state to READY only after the origin cleanup.
-				 */
-				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-										   rstate->relid, rstate->state,
-										   rstate->lsn);
-			}
-		}
+			check_caughtup_update_state(current_lsn, rstate, &started_tx);
 		else
 		{
 			LogicalRepWorker *syncworker;
@@ -578,6 +601,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 					wait_for_relation_state_change(rstate->relid,
 												   SUBREL_STATE_SYNCDONE);
+					check_caughtup_update_state(current_lsn, rstate,
+												&started_tx);
 				}
 				else
 					LWLockRelease(LogicalRepWorkerLock);
@@ -589,38 +614,36 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * running sync workers for this subscription, while we have
 				 * the lock.
 				 */
-				int			nsyncworkers =
-					logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+				int nsyncworkers;
+				TimestampTz now = GetCurrentTimestamp();
+				struct tablesync_start_time_mapping *hentry;
+				bool		found;
+
+				nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
 
 				/* Now safe to release the LWLock */
 				LWLockRelease(LogicalRepWorkerLock);
 
+				hentry = hash_search(last_start_times, &rstate->relid,
+										HASH_ENTER, &found);
+
 				/*
-				 * If there are free sync worker slot(s), start a new sync
-				 * worker for the table.
+				 * If free any workers are available don't launch it,
+				 * reuse the worker.
 				 */
-				if (nsyncworkers < max_sync_workers_per_subscription)
+				if (!tsp_allocate_free_worker(rstate->relid, rstate->state,
+											  rstate->lsn))
 				{
-					TimestampTz now = GetCurrentTimestamp();
-					struct tablesync_start_time_mapping *hentry;
-					bool		found;
-
-					hentry = hash_search(last_start_times, &rstate->relid,
-										 HASH_ENTER, &found);
-
-					if (!found ||
-						TimestampDifferenceExceeds(hentry->last_start_time, now,
-												   wal_retrieve_retry_interval))
+					if (nsyncworkers < max_sync_workers_per_subscription)
 					{
-						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
-												 MySubscription->oid,
-												 MySubscription->name,
-												 MyLogicalRepWorker->userid,
-												 rstate->relid,
-												 DSM_HANDLE_INVALID);
-						hentry->last_start_time = now;
+						if (!found ||
+							TimestampDifferenceExceeds(hentry->last_start_time, now,
+													wal_retrieve_retry_interval))
+							tsp_launch_worker(rstate->relid);
 					}
 				}
+
+				hentry->last_start_time = now;
 			}
 		}
 	}
@@ -1310,7 +1333,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos, int worker_slot)
 		case SUBREL_STATE_SYNCDONE:
 		case SUBREL_STATE_READY:
 		case SUBREL_STATE_UNKNOWN:
-			finish_sync_worker();	/* doesn't return */
+			finish_sync_worker(false);	/* doesn't return */
 	}
 
 	/* Calculate the name of the tablesync slot. */
diff --git a/src/backend/replication/logical/tablesyncpool.c b/src/backend/replication/logical/tablesyncpool.c
new file mode 100644
index 0000000000..24acfc9a99
--- /dev/null
+++ b/src/backend/replication/logical/tablesyncpool.c
@@ -0,0 +1,382 @@
+/*-------------------------------------------------------------------------
+ * tablesyncpool.c
+ *	   Support routines for syncronizing table using worker pool
+ *
+ * Copyright (c) 2023, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/tablesyncpool.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "pgstat.h"
+#include "postmaster/interrupt.h"
+#include "replication/logicalworker.h"
+#include "replication/worker_internal.h"
+#include "tcop/tcopprot.h"
+#include "utils/inval.h"
+#include "utils/resowner.h"
+#include "utils/syscache.h"
+
+/*
+ * A list (pool) of table sync workers. The information for
+ * the new worker is added to the list after successfully launching it.
+ */
+static List *TableSyncPool = NIL;
+TablesyncPoolShared *MyTablesyncPoolShared = NULL;
+
+static void tsp_free_worker_info(TablesyncPoolInfo *winfo);
+
+/*
+ * Set up a dynamic shared memory segment.
+ *
+ * We set up a control region that contains a fixed-size worker info of
+ * TablesyncPoolShared.
+ *
+ * Returns true on success, false on failure.
+ */
+static bool
+tsp_setup_dsm(TablesyncPoolInfo *winfo)
+{
+	shm_toc_estimator e;
+	Size		segsize;
+	dsm_segment *seg;
+	shm_toc    *toc;
+	TablesyncPoolShared *shared;
+
+	/*
+	 * Estimate how much shared memory we need.
+	 *
+	 * Because the TOC machinery may choose to insert padding of oddly-sized
+	 * requests, we must estimate each chunk separately.
+	 *
+	 * We need one key to register the location of the header.
+	 */
+	shm_toc_initialize_estimator(&e);
+	shm_toc_estimate_chunk(&e, sizeof(TablesyncPoolShared));
+
+	shm_toc_estimate_keys(&e, 1);
+	segsize = shm_toc_estimate(&e);
+
+	/* Create the shared memory segment and establish a table of contents. */
+	seg = dsm_create(shm_toc_estimate(&e), 0);
+	if (!seg)
+		return false;
+
+	toc = shm_toc_create(PG_LOGICAL_TABLESYNC_POOL_SHM_MAGIC, dsm_segment_address(seg),
+						 segsize);
+
+	/* Set up the header region. */
+	shared = shm_toc_allocate(toc, sizeof(TablesyncPoolShared));
+	SpinLockInit(&shared->mutex);
+
+	shared->exec_state = TS_FREE;
+	shared->parent_pid = MyProcPid;
+	shared->relstate = 'i';
+	shared->relstate_lsn = 0;
+
+	shm_toc_insert(toc, TABLESYNC_POOL_KEY_SHARED, shared);
+
+	/* Return results to caller. */
+	winfo->dsm_seg = seg;
+	winfo->shared = shared;
+
+	return true;
+}
+
+/*
+ * Start a new tablesync worker and add it to the tablesync pool.
+ */
+TablesyncPoolInfo *
+tsp_launch_worker(Oid relid)
+{
+	MemoryContext oldcontext;
+	bool launched;
+	TablesyncPoolInfo *winfo;
+	ResourceOwner saveResourceOwner;
+
+	/*
+	 * The worker info can be used for the lifetime of the worker process, so
+	 * create it in a permanent context.
+	 */
+	oldcontext = MemoryContextSwitchTo(ApplyContext);
+
+	winfo = (TablesyncPoolInfo *)palloc0(sizeof(TablesyncPoolInfo));
+
+	saveResourceOwner = CurrentResourceOwner;
+	CurrentResourceOwner = NULL;
+
+	/* Setup shared memory. */
+	if (!tsp_setup_dsm(winfo))
+	{
+		CurrentResourceOwner = saveResourceOwner;
+		MemoryContextSwitchTo(oldcontext);
+		pfree(winfo);
+		return NULL;
+	}
+
+	CurrentResourceOwner = saveResourceOwner;
+	winfo->shared->relid = relid;
+
+	/* Start a new table sync worker. */
+	launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+										MySubscription->oid,
+										MySubscription->name,
+										MyLogicalRepWorker->userid,
+										relid,
+										dsm_segment_handle(winfo->dsm_seg));
+
+	if (launched)
+		TableSyncPool = lappend(TableSyncPool, winfo);
+	else
+	{
+		tsp_free_worker_info(winfo);
+		winfo = NULL;
+	}
+
+	MemoryContextSwitchTo(oldcontext);
+
+	return winfo;
+}
+
+/*
+ * Wait until the tablesync worker/apply worker changes the state to the
+ * expected one.
+ *
+ * Returns false if the tablesync worker/apply worker has disappeared.
+ */
+bool
+wait_for_tsp_worker_state_change(TablesyncPoolState expected_state, int pid,
+								 Oid relid, int wait_time,
+								 TablesyncPoolShared *shared)
+{
+	int			rc;
+	bool	proc_exited = false;
+
+	for (;;)
+	{
+		LogicalRepWorker *worker;
+		TablesyncPoolState state = tsp_get_exec_state(shared);
+
+		CHECK_FOR_INTERRUPTS();
+
+		if (shared->exited)
+			break;
+
+		/* It is applicable only for apply worker. */
+		if (!OidIsValid(relid) && state == TS_INIT_STOP)
+			break;
+
+		/* Done if already in correct state. */
+		if (state == expected_state)
+			return true;
+
+		if (tsp_send_signal(pid))
+		{
+			/* Bail out if the tablesync worker/apply worker has died. */
+			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+			worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+											relid, false);
+			if (!worker || !worker->proc || (worker->in_use && pid != worker->proc->pid))
+				proc_exited = true;
+
+			LWLockRelease(LogicalRepWorkerLock);
+
+			if (proc_exited)
+				break;
+		}
+
+		/*
+		 * Wait. We expect to get a latch signal back from the tablesync
+		 * worker/apply worker, but use a timeout in case it dies without
+		 * sending one.
+		 */
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					   wait_time, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+	}
+
+	return false;
+}
+
+/*
+ * Stop the given logical replication parallel tablesync worker.
+ */
+static void
+logicalrep_tsp_worker_stop(TablesyncPoolInfo *winfo)
+
+{
+	TablesyncPoolShared *shared = winfo->shared;
+
+	tsp_set_exec_state(shared, TS_INIT_STOP);
+	wait_for_tsp_worker_state_change(TS_STOP, shared->child_pid,
+									 shared->relid, 1000L, shared);
+}
+
+/*
+ * Allocate a free tablesync worker which is available.
+ */
+bool
+tsp_allocate_free_worker(Oid relid, char state, XLogRecPtr lsn)
+{
+	TablesyncPoolInfo *winfo;
+	ListCell *lc;
+	MemoryContext oldcontext;
+
+	if (!list_length(TableSyncPool))
+		return false;
+
+	oldcontext = MemoryContextSwitchTo(ApplyContext);
+
+retry:
+
+	/* Try to get an available table sync worker from the worker pool. */
+	foreach (lc, TableSyncPool)
+	{
+		TablesyncPoolState exec_state;
+
+		winfo = (TablesyncPoolInfo *)lfirst(lc);
+
+		/* worker might have exited due to an error */
+		if (winfo->shared->exited)
+		{
+			tsp_free_worker_info(winfo);
+
+			/* TableSyncPool has changed, repeate the loop again */
+			goto retry;
+		}
+
+		exec_state = tsp_get_exec_state(winfo->shared);
+
+		if (exec_state != TS_DONE)
+			continue;
+
+		winfo->shared->relid = relid;
+		winfo->shared->relstate = state;
+		winfo->shared->relstate_lsn = lsn;
+
+		tsp_set_exec_state(winfo->shared, TS_INIT);
+		tsp_send_signal(winfo->shared->child_pid);
+
+		MemoryContextSwitchTo(oldcontext);
+		return true;
+	}
+
+	MemoryContextSwitchTo(oldcontext);
+
+	return false;
+}
+
+/*
+ * Set the execution state for a given table sync worker.
+ */
+void
+tsp_set_exec_state(TablesyncPoolShared *wshared,
+				   TablesyncPoolState exec_state)
+{
+	SpinLockAcquire(&wshared->mutex);
+	wshared->exec_state = exec_state;
+	SpinLockRelease(&wshared->mutex);
+}
+
+/*
+ * Get the execution state of a given table sync worker.
+ */
+TablesyncPoolState
+tsp_get_exec_state(TablesyncPoolShared *wshared)
+{
+	TablesyncPoolState exec_state;
+
+	SpinLockAcquire(&wshared->mutex);
+	exec_state = wshared->exec_state;
+	SpinLockRelease(&wshared->mutex);
+
+	return exec_state;
+}
+
+/*
+ * Free the table sync worker.
+ */
+static void
+tsp_free_worker(TablesyncPoolInfo *winfo)
+{
+	logicalrep_tsp_worker_stop(winfo);
+	tsp_free_worker_info(winfo);
+}
+
+/*
+ * Free the table sync worker information.
+ */
+static void
+tsp_free_worker_info(TablesyncPoolInfo *winfo)
+{
+	Assert(winfo);
+
+	if (winfo->dsm_seg)
+		dsm_detach(winfo->dsm_seg);
+
+	/* Remove from the worker pool. */
+	TableSyncPool = list_delete_ptr(TableSyncPool, winfo);
+
+	pfree(winfo);
+}
+
+/*
+ * Free all the table sync workers as there is no more work to allocate.
+ */
+void
+tsp_free_all_workers()
+{
+	TablesyncPoolInfo *winfo = NULL;
+
+	while (list_length(TableSyncPool) > 0)
+	{
+		winfo = (TablesyncPoolInfo *)lfirst(list_head(TableSyncPool));
+		tsp_free_worker(winfo);
+	}
+
+	TableSyncPool = NIL;
+}
+
+/*
+ * Stop the tablesync worker.
+ */
+void
+tsp_worker_cleanup(void)
+{
+	tsp_set_exec_state(MyTablesyncPoolShared, TS_STOP);
+	tsp_send_signal(MyTablesyncPoolShared->parent_pid);
+}
+
+/*
+ * Set the tablesync shared information global variable.
+ */
+void
+tsp_set_shared(TablesyncPoolShared *shared)
+{
+	MyTablesyncPoolShared = shared;
+}
+
+/*
+ * Get the apply worker pid.
+ */
+int
+tsp_get_apply_worker_pid(void)
+{
+	return MyTablesyncPoolShared->parent_pid;
+}
+
+/*
+ * Send signal to tsp worker or apply worker.
+ */
+int
+tsp_send_signal(int pid)
+{
+	return SendProcSignal(pid, PROCSIG_TABLESYNC_WORKER_POOL, InvalidBackendId);
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f77bc55e34..6c2bcfe584 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -438,6 +438,20 @@ static inline void reset_apply_error_context_info(void);
 static TransApplyAction get_transaction_apply_action(TransactionId xid,
 													 ParallelApplyWorkerInfo **winfo);
 
+/*
+ * Return the name of the logical replication worker.
+ */
+const char *
+get_worker_name(void)
+{
+	if (am_tablesync_worker())
+		return _("logical replication table synchronization worker");
+	else if (am_parallel_apply_worker())
+		return _("logical replication parallel apply worker");
+	else
+		return _("logical replication apply worker");
+}
+
 /*
  * Form the origin name for the subscription.
  *
@@ -4530,7 +4544,7 @@ start_apply(XLogRecPtr origin_startpos)
  * It starts syncing tables. After a successful sync, sets streaming options
  * and starts streaming to catchup.
  */
-static void
+void
 run_tablesync_worker(WalRcvStreamOptions *options,
 					 char *slotname,
 					 char *originname,
@@ -4788,17 +4802,47 @@ ApplyWorkerMain(Datum main_arg)
 void
 TablesyncWorkerMain(Datum main_arg)
 {
-	int			worker_slot = DatumGetInt32(main_arg);
-	char		originname[NAMEDATALEN];
-	XLogRecPtr	origin_startpos = InvalidXLogRecPtr;
-	char	   *myslotname = NULL;
+	int worker_slot = DatumGetInt32(main_arg);
+	char originname[NAMEDATALEN];
+	dsm_handle	handle;
+	dsm_segment *seg;
+	shm_toc    *toc;
+	TablesyncPoolShared *shared;
+
+	XLogRecPtr origin_startpos = InvalidXLogRecPtr;
+	char *myslotname = NULL;
 	WalRcvStreamOptions options;
 
+	/*
+	 * Attach to the dynamic shared memory segment for the table sync pool, and
+	 * find its table of contents.
+	 */
+	memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
+	seg = dsm_attach(handle);
+	if (!seg)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("could not map dynamic shared memory segment")));
+
+	toc = shm_toc_attach(PG_LOGICAL_TABLESYNC_POOL_SHM_MAGIC, dsm_segment_address(seg));
+	if (!toc)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("invalid magic number in dynamic shared memory segment")));
+
+	/* Look up the shared information. */
+	shared = shm_toc_lookup(toc, TABLESYNC_POOL_KEY_SHARED, false);
+	tsp_set_shared(shared);
+
+	shared->child_pid = MyProcPid;
+	tsp_set_exec_state(shared, TS_INIT);
+
 	/* Attach to slot */
 	logicalrep_worker_attach(worker_slot);
 
 	/* Setup signal handling */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
+	pqsignal(SIGINT, SignalHandlerForShutdownRequest);
 	pqsignal(SIGTERM, die);
 	BackgroundWorkerUnblockSignals();
 
@@ -4816,17 +4860,13 @@ TablesyncWorkerMain(Datum main_arg)
 
 	InitializeLogRepWorker();
 
-	/* Connect to the origin and start the replication. */
-	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
-		 MySubscription->conninfo);
-
 	/*
 	 * Setup callback for syscache so that we know when something changes in
 	 * the subscription relation state.
 	 */
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
 								  invalidate_syncing_table_states,
-								  (Datum) 0);
+								  (Datum)0);
 
 	/*
 	 * The loop where worker does its job. It loops until there is no relation
@@ -4834,81 +4874,47 @@ TablesyncWorkerMain(Datum main_arg)
 	 */
 	for (;;)
 	{
-		List	   *rstates;
-		ListCell   *lc;
-		bool 	is_table_found = false;
+		/* Update worker state for the next table */
+		MyLogicalRepWorker->relid = shared->relid;
+		MyLogicalRepWorker->relstate = shared->relstate;
+		MyLogicalRepWorker->relstate_lsn = shared->relstate_lsn;
+
+		tsp_set_exec_state(shared, TS_IN_PROGRESS);
 
-		run_tablesync_worker(&options,
-							 myslotname,
-							 originname,
-							 sizeof(originname),
-							 &origin_startpos,
-							 worker_slot);
+		PG_TRY();
+		{
+			run_tablesync_worker(&options,
+								myslotname,
+								originname,
+								sizeof(originname),
+								&origin_startpos,
+								worker_slot);
+		}
+		PG_CATCH();
+		{
+			shared->exited = true;
+			PG_RE_THROW();
+		}
+
+		PG_END_TRY();
 
 		if (IsTransactionState())
+		{
 			CommitTransactionCommand();
+			pgstat_report_stat(true);
+		}
 
 		if (MyLogicalRepWorker->is_sync_completed)
 		{
-			/* This transaction will be committed by clean_sync_worker. */
-			StartTransactionCommand();
-
-			/*
-			 * Check if there is any table whose relation state is still INIT.
-			 * If a table in INIT state is found, the worker will not be
-			 * finished, it will be reused instead.
-			 */
-			rstates = GetSubscriptionRelations(MySubscription->oid, true);
-
-			foreach(lc, rstates)
-			{
-				SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
-
-				if (rstate->state == SUBREL_STATE_SYNCDONE)
-					continue;
-
-				/*
-				 * Take exclusive lock to prevent any other sync worker from
-				 * picking the same table.
-				 */
-				LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
-
-				/*
-				 * Pick the table for the next run if it is not already picked
-				 * up by another worker.
-				 */
-				if (!logicalrep_worker_find(MySubscription->oid, rstate->relid, false))
-				{
-					/* Update worker state for the next table */
-					MyLogicalRepWorker->relid = rstate->relid;
-					MyLogicalRepWorker->relstate = rstate->state;
-					MyLogicalRepWorker->relstate_lsn = rstate->lsn;
-					LWLockRelease(LogicalRepWorkerLock);
-
-					/* Found a table for next iteration */
-					is_table_found = true;
-					clean_sync_worker();
-
-					StartTransactionCommand();
-					ereport(LOG,
-							(errmsg("%s for subscription \"%s\" has moved to sync table \"%s\" with relid %u.",
-									get_worker_name(),
-									MySubscription->name,
-									get_rel_name(MyLogicalRepWorker->relid),
-									MyLogicalRepWorker->relid)));
-					CommitTransactionCommand();
-
-					break;
-				}
-				LWLockRelease(LogicalRepWorkerLock);
-			}
-
-			if (!is_table_found)
+			tsp_set_exec_state(shared, TS_DONE);
+			/* wait for apply worker to assign a new table with INIT state. */
+			if (!wait_for_tsp_worker_state_change(TS_INIT, shared->parent_pid,
+												  InvalidOid, 1L, shared))
 				break;
 		}
 	}
 
-	finish_sync_worker();
+	finish_sync_worker(false);
 }
 
 /*
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index af6fd339f7..1484b7b0b9 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -19,6 +19,7 @@
 #include "datatype/timestamp.h"
 #include "miscadmin.h"
 #include "replication/logicalrelation.h"
+#include "replication/walreceiver.h"
 #include "storage/buffile.h"
 #include "storage/fileset.h"
 #include "storage/lock.h"
@@ -213,6 +214,51 @@ typedef struct ParallelApplyWorkerInfo
 	ParallelApplyWorkerShared *shared;
 } ParallelApplyWorkerInfo;
 
+typedef enum TablesyncPoolState
+{
+	TS_FREE,
+	TS_INIT,
+	TS_IN_PROGRESS,
+	TS_DONE,
+	TS_INIT_STOP,
+	TS_STOP
+} TablesyncPoolState;
+
+/*
+ * Struct for sharing information between leader apply worker and parallel
+ * apply workers.
+ */
+typedef struct TablesyncPoolShared
+{
+	slock_t		mutex;
+
+	Oid 		relid;
+	char		relstate;
+	XLogRecPtr	relstate_lsn;
+	bool		exited;
+	pid_t			parent_pid;
+	pid_t			child_pid;
+
+	TablesyncPoolState exec_state;
+} TablesyncPoolShared;
+
+/*
+ * Information which is used to manage the parallel apply worker.
+ */
+typedef struct TablesyncPoolInfo
+{
+	dsm_segment *dsm_seg;
+	TablesyncPoolShared *shared;
+} TablesyncPoolInfo;
+
+
+#define PG_LOGICAL_TABLESYNC_POOL_SHM_MAGIC 0x5450d157
+
+/*
+ * DSM keys for table sync pool.
+ */
+#define TABLESYNC_POOL_KEY_SHARED	1
+
 /* Main memory context for apply worker. Permanent during worker lifetime. */
 extern PGDLLIMPORT MemoryContext ApplyContext;
 
@@ -312,9 +358,16 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 						   XLogRecPtr remote_lsn);
 
+extern void run_tablesync_worker(WalRcvStreamOptions *options,
+								 char *slotname,
+								 char *originname,
+								 int originname_size,
+								 XLogRecPtr *origin_startpos,
+								 int worker_slot);
+
 #define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
 
-extern void finish_sync_worker(void);
+extern void finish_sync_worker(bool reuse);
 extern void clean_sync_worker(void);
 
 static inline bool
@@ -336,4 +389,20 @@ am_parallel_apply_worker(void)
 	return isParallelApplyWorker(MyLogicalRepWorker);
 }
 
+extern bool tsp_allocate_free_worker(Oid relid, char state, XLogRecPtr lsn);
+extern TablesyncPoolInfo *tsp_launch_worker(Oid relid);
+extern void tsp_free_all_workers(void);
+extern TablesyncPoolState tsp_get_exec_state(TablesyncPoolShared *wshared);
+extern void tsp_set_exec_state(TablesyncPoolShared *wshared,
+							   TablesyncPoolState exec_state);
+extern void tsp_worker_cleanup(void);
+extern int tsp_get_apply_worker_pid(void);
+extern int tsp_send_signal(int pid);
+extern const char *get_worker_name(void);
+extern void tsp_set_shared(TablesyncPoolShared *shared);
+extern bool wait_for_tsp_worker_state_change(TablesyncPoolState expected_state,
+											 int pid,
+											 Oid relid, int wait_time,
+											 TablesyncPoolShared *shared);
+
 #endif							/* WORKER_INTERNAL_H */
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 2f52100b00..55967c3d02 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -36,6 +36,7 @@ typedef enum
 	PROCSIG_BARRIER,			/* global barrier interrupt  */
 	PROCSIG_LOG_MEMORY_CONTEXT, /* ask backend to log the memory contexts */
 	PROCSIG_PARALLEL_APPLY_MESSAGE, /* Message from parallel apply workers */
+	PROCSIG_TABLESYNC_WORKER_POOL, /* parallel tablesync worker pool interrupt */
 
 	/* Recovery conflict reasons */
 	PROCSIG_RECOVERY_CONFLICT_DATABASE,
-- 
2.34.1

