From e3d9ab01146c78f8c8e2954c11c0f0d72bf204f4 Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Thu, 2 Jun 2022 17:39:37 +0300
Subject: [PATCH] Reuse Logical Replication Background worker

This commit allows tablesync workers to move to another table that needs synchronization,
when they're done with the curren table in tablesync phase of Logical Replication.

It reduces the overhead of launching/killing a new background worker for each table.
By reusing tablesync workers, replication slots created for tablesync can be reused as well.
Removing the burden of creating/dropping replication slot improves tablesync speed.

Discussion: http://postgr.es/m/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com
---
 doc/src/sgml/catalogs.sgml                  |  20 ++
 src/backend/catalog/pg_subscription.c       | 231 ++++++++++++-
 src/backend/commands/subscriptioncmds.c     | 169 ++++++----
 src/backend/replication/logical/launcher.c  |   9 +-
 src/backend/replication/logical/tablesync.c | 217 +++++++++---
 src/backend/replication/logical/worker.c    | 345 ++++++++++++--------
 src/include/catalog/pg_subscription.h       |   5 +
 src/include/catalog/pg_subscription_rel.h   |  10 +-
 src/include/replication/slot.h              |   3 +-
 src/include/replication/worker_internal.h   |  28 +-
 src/test/regress/expected/misc_sanity.out   |   3 +-
 11 files changed, 794 insertions(+), 246 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index cd2cc37aeb..8048710244 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7957,6 +7957,17 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        origin.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>sublastusedid</structfield> <type>int8</type>
+      </para>
+      <para>
+       The last used ID for tablesync workers. This ID is used to 
+       create replication slots. The last used ID needs to be stored 
+       to make logical replication can safely proceed after an interruption.  
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
@@ -8041,6 +8052,15 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        otherwise null
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>srrelslotname</structfield> <type>text</type>
+      </para>
+      <para>
+       Replication slot name that is used for synchronization of relation 
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a506fc3ec8..fc7973859d 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -71,7 +71,6 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
 	sub->disableonerr = subform->subdisableonerr;
-
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
 							tup,
@@ -114,6 +113,14 @@ GetSubscription(Oid subid, bool missing_ok)
 	Assert(!isnull);
 	sub->origin = TextDatumGetCString(datum);
 
+	/* Get last used id */
+	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+							tup,
+							Anum_pg_subscription_sublastusedid,
+							&isnull);
+	Assert(!isnull);
+	sub->lastusedid = DatumGetInt64(datum);
+
 	ReleaseSysCache(tup);
 
 	return sub;
@@ -205,6 +212,44 @@ DisableSubscription(Oid subid)
 	table_close(rel, NoLock);
 }
 
+/*
+ * Update the last used replication slot ID for the given subscription.
+ */
+void
+UpdateSubscriptionLastSlotId(Oid subid, int64 lastusedid)
+{
+	Relation	rel;
+	bool		nulls[Natts_pg_subscription];
+	bool		replaces[Natts_pg_subscription];
+	Datum		values[Natts_pg_subscription];
+	HeapTuple	tup;
+
+	/* Look up the subscription in the catalog */
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "cache lookup failed for subscription %u", subid);
+
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	/* Form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	replaces[Anum_pg_subscription_sublastusedid - 1] = true;
+	values[Anum_pg_subscription_sublastusedid- 1] = Int64GetDatum(lastusedid);
+
+	/* Update the catalog */
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+	heap_freetuple(tup);
+
+	table_close(rel, NoLock);
+}
+
 /*
  * Convert text array to list of strings.
  *
@@ -234,7 +279,7 @@ textarray_to_stringlist(ArrayType *textarray)
  */
 void
 AddSubscriptionRelState(Oid subid, Oid relid, char state,
-						XLogRecPtr sublsn)
+						XLogRecPtr sublsn, char *relslotname)
 {
 	Relation	rel;
 	HeapTuple	tup;
@@ -263,9 +308,17 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
 		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
 	else
 		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	if (relslotname)
+	{
+		values[Anum_pg_subscription_rel_srrelslotname - 1] = CStringGetTextDatum(relslotname);
+	}
+	else
+	{
+		nulls[Anum_pg_subscription_rel_srrelslotname - 1] = true;
+	}
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
-
+ 
 	/* Insert tuple into catalog. */
 	CatalogTupleInsert(rel, tup);
 
@@ -275,6 +328,42 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
 	table_close(rel, NoLock);
 }
 
+/*
+ * Internal function to modify columns for relation state update
+ */
+static void
+UpdateSubscriptionRelState_internal(Datum *values,
+									bool *nulls,
+									bool *replaces,
+									char state,
+						   			XLogRecPtr sublsn)
+{
+	replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+	replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	if (sublsn != InvalidXLogRecPtr)
+		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+	else
+		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+}
+
+/*
+ * Internal function to modify columns for replication slot update
+ */
+static void
+UpdateSubscriptionRelReplicationSlot_internal(Datum *values,
+											bool *nulls,
+											bool *replaces,
+											char *relslotname)
+{
+	replaces[Anum_pg_subscription_rel_srrelslotname - 1] = true;
+	if (relslotname)
+		values[Anum_pg_subscription_rel_srrelslotname - 1] = CStringGetTextDatum(relslotname);
+	else
+		nulls[Anum_pg_subscription_rel_srrelslotname - 1] = true;
+}
+
 /*
  * Update the state of a subscription table.
  */
@@ -305,14 +394,94 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	memset(nulls, false, sizeof(nulls));
 	memset(replaces, false, sizeof(replaces));
 
-	replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
-	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+	UpdateSubscriptionRelState_internal(values, nulls, replaces, state, sublsn);
 
-	replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
-	if (sublsn != InvalidXLogRecPtr)
-		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-	else
-		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	/* Cleanup. */
+	table_close(rel, NoLock);
+}
+
+/*
+ * Update the replication slot name of a subscription table.
+ */
+void
+UpdateSubscriptionRelReplicationSlot(Oid subid, Oid relid, char *relslotname)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	bool		nulls[Natts_pg_subscription_rel];
+	Datum		values[Natts_pg_subscription_rel];
+	bool		replaces[Natts_pg_subscription_rel];
+
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+	/* Try finding existing mapping. */
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+							  ObjectIdGetDatum(relid),
+							  ObjectIdGetDatum(subid));
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u does not exist",
+			 relid, subid);
+
+	/* Update the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	UpdateSubscriptionRelReplicationSlot_internal(values, nulls, replaces, relslotname);
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	/* Cleanup. */
+	table_close(rel, NoLock);
+}
+
+/*
+ * Update replication slot name and state of a subscription table in one transaction.
+ */
+void
+UpdateSubscriptionRelStateAndSlot(Oid subid, 
+								Oid relid, 
+								char state,
+						   		XLogRecPtr sublsn,
+								char *relslotname)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	bool		nulls[Natts_pg_subscription_rel];
+	Datum		values[Natts_pg_subscription_rel];
+	bool		replaces[Natts_pg_subscription_rel];
+
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+	/* Try finding existing mapping. */
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+							  ObjectIdGetDatum(relid),
+							  ObjectIdGetDatum(subid));
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u does not exist",
+			 relid, subid);
+
+	/* Update the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	UpdateSubscriptionRelState_internal(values, nulls, replaces, state, sublsn);
+	UpdateSubscriptionRelReplicationSlot_internal(values, nulls, replaces, relslotname);
 
 	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
 							replaces);
@@ -324,6 +493,46 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	table_close(rel, NoLock);
 }
 
+/*
+ * Get replication slot name of subscription table.
+ *
+ * Returns null if the subscription table does not have a replication slot.
+ */
+void
+GetSubscriptionRelReplicationSlot(Oid subid, Oid relid, char *slotname)
+{
+	HeapTuple	tup;
+	Relation	rel;
+	Datum 		d;
+	char		*relrepslot;
+	bool		isnull;
+
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+	/* Try finding the mapping. */
+	tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
+						  ObjectIdGetDatum(relid),
+						  ObjectIdGetDatum(subid));
+
+	if (!HeapTupleIsValid(tup))
+	{
+		table_close(rel, AccessShareLock);
+	}
+
+	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
+						Anum_pg_subscription_rel_srrelslotname, &isnull);
+	if (!isnull)
+	{
+		relrepslot = TextDatumGetCString(d);
+		memcpy(slotname, relrepslot, NAMEDATALEN);
+	}
+
+	/* Cleanup */
+	ReleaseSysCache(tup);
+
+	table_close(rel, AccessShareLock);
+}
+
 /*
  * Get state of subscription table.
  *
@@ -542,4 +751,4 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
 	table_close(rel, AccessShareLock);
 
 	return res;
-}
+}
\ No newline at end of file
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f73dfb6067..b893700a84 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -646,6 +646,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		publicationListToArray(publications);
 	values[Anum_pg_subscription_suborigin - 1] =
 		CStringGetTextDatum(opts.origin);
+	values[Anum_pg_subscription_sublastusedid - 1] = Int64GetDatum(1);
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -704,7 +705,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 										 rv->schemaname, rv->relname);
 
 				AddSubscriptionRelState(subid, relid, table_state,
-										InvalidXLogRecPtr);
+										InvalidXLogRecPtr, NULL);
 			}
 
 			/*
@@ -794,6 +795,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 	} SubRemoveRels;
 	SubRemoveRels *sub_remove_rels;
 	WalReceiverConn *wrconn;
+	List	   *sub_remove_slots = NIL;
+	LogicalRepWorker *worker;
 
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
@@ -866,7 +869,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 			{
 				AddSubscriptionRelState(sub->oid, relid,
 										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-										InvalidXLogRecPtr);
+										InvalidXLogRecPtr, NULL);
 				ereport(DEBUG1,
 						(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
 										 rv->schemaname, rv->relname, sub->name)));
@@ -916,7 +919,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 
 				RemoveSubscriptionRel(sub->oid, relid);
 
-				logicalrep_worker_stop(sub->oid, relid);
+				/* 
+				 * Find the logical replication sync worker if exists 
+				 * Store the slot number for dropping associated replication slot later.
+				 */
+				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+				worker = logicalrep_worker_find(sub->oid, relid, false);
+				if (worker)
+				{
+					logicalrep_worker_stop(sub->oid, relid);
+					sub_remove_slots = lappend(sub_remove_slots, &worker->slot_name);
+				}
+				LWLockRelease(LogicalRepWorkerLock);
 
 				/*
 				 * For READY state, we would have already dropped the
@@ -950,31 +964,23 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		}
 
 		/*
-		 * Drop the tablesync slots associated with removed tables. This has
-		 * to be at the end because otherwise if there is an error while doing
+		 * Drop the replication slots associated with tablesync workers for removed tables.
+		 * This has to be at the end because otherwise if there is an error while doing
 		 * the database operations we won't be able to rollback dropped slots.
 		 */
-		for (off = 0; off < remove_rel_len; off++)
+		foreach(lc, sub_remove_slots)
 		{
-			if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
-				sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
-			{
-				char		syncslotname[NAMEDATALEN] = {0};
+			char		syncslotname[NAMEDATALEN] = {0};
+			memcpy(syncslotname, lfirst(lc), sizeof(NAMEDATALEN));
 
-				/*
-				 * For READY/SYNCDONE states we know the tablesync slot has
-				 * already been dropped by the tablesync worker.
-				 *
-				 * For other states, there is no certainty, maybe the slot
-				 * does not exist yet. Also, if we fail after removing some of
-				 * the slots, next time, it will again try to drop already
-				 * dropped slots and fail. For these reasons, we allow
-				 * missing_ok = true for the drop.
-				 */
-				ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
-												syncslotname, sizeof(syncslotname));
-				ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
-			}
+			/*
+			 * There is no certainty, maybe the slot
+			 * does not exist yet. Also, if we fail after removing some of
+			 * the slots, next time, it will again try to drop already
+			 * dropped slots and fail. For these reasons, we allow
+			 * missing_ok = true for the drop.
+			 */
+			ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
 		}
 	}
 	PG_FINALLY();
@@ -1567,39 +1573,16 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 
 	PG_TRY();
 	{
-		foreach(lc, rstates)
-		{
-			SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
-			Oid			relid = rstate->relid;
-
-			/* Only cleanup resources of tablesync workers */
-			if (!OidIsValid(relid))
-				continue;
-
-			/*
-			 * Drop the tablesync slots associated with removed tables.
-			 *
-			 * For SYNCDONE/READY states, the tablesync slot is known to have
-			 * already been dropped by the tablesync worker.
-			 *
-			 * For other states, there is no certainty, maybe the slot does
-			 * not exist yet. Also, if we fail after removing some of the
-			 * slots, next time, it will again try to drop already dropped
-			 * slots and fail. For these reasons, we allow missing_ok = true
-			 * for the drop.
-			 */
-			if (rstate->state != SUBREL_STATE_SYNCDONE)
-			{
-				char		syncslotname[NAMEDATALEN] = {0};
+		List *slots = NULL;
 
-				ReplicationSlotNameForTablesync(subid, relid, syncslotname,
-												sizeof(syncslotname));
-				ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
-			}
+		
+		slots = GetReplicationSlotNamesBySubId(wrconn, subid, true);
+		foreach(lc, slots)
+		{
+			char *syncslotname = (char *) lfirst(lc);
+			ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
 		}
 
-		list_free(rstates);
-
 		/*
 		 * If there is a slot associated with the subscription, then drop the
 		 * replication slot at the publisher.
@@ -1622,6 +1605,69 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	table_close(rel, NoLock);
 }
 
+/*
+ * GetReplicationSlotNamesBySubId
+ *
+ * WRITE COMMENT HERE
+ */
+List *
+GetReplicationSlotNamesBySubId(WalReceiverConn *wrconn, Oid subid, bool missing_ok){
+	StringInfoData cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[1] = {NAMEOID};
+	List	   *tablelist = NIL;
+
+	Assert(wrconn);
+
+	load_file("libpqwalreceiver", false);
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd, "SELECT slot_name"
+						" FROM pg_replication_slots"
+						" WHERE slot_name LIKE 'pg_%i_sync_%%';",
+						 subid);
+	PG_TRY();
+	{
+		WalRcvExecResult *res;
+
+		res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
+
+		if (res->status != WALRCV_OK_TUPLES)
+		{
+			ereport(ERROR,
+					 errmsg("not tuple returned."));
+		}
+
+		/* Process tables. */
+		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+		{
+			char	   *repslotname;
+			char	   *slotattr;
+			bool		isnull;
+
+			slotattr = NameStr(*DatumGetName(slot_getattr(slot, 1, &isnull)));
+			Assert(!isnull);
+
+			repslotname = palloc(sizeof(char) * strlen(slotattr) + 1);
+			memcpy(repslotname, slotattr, sizeof(char) * strlen(slotattr));
+			repslotname[strlen(slotattr)] = '\0';
+			tablelist = lappend(tablelist, repslotname);
+
+			ExecClearTuple(slot);
+		}
+		ExecDropSingleTupleTableSlot(slot);
+
+		walrcv_clear_result(res);
+	}
+	PG_FINALLY();
+	{
+		pfree(cmd.data);
+	}
+	PG_END_TRY();\
+	return tablelist;
+}
+
 /*
  * Drop the replication slot at the publisher node using the replication
  * connection.
@@ -1863,6 +1909,7 @@ static void
 ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
 {
 	ListCell   *lc;
+	LogicalRepWorker *worker;
 
 	foreach(lc, rstates)
 	{
@@ -1873,18 +1920,20 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
 		if (!OidIsValid(relid))
 			continue;
 
+		/* Check if there is a sync worker for the relation */
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		worker = logicalrep_worker_find(subid, relid, false);
+		LWLockRelease(LogicalRepWorkerLock);
+		
 		/*
 		 * Caller needs to ensure that relstate doesn't change underneath us.
 		 * See DropSubscription where we get the relstates.
 		 */
-		if (rstate->state != SUBREL_STATE_SYNCDONE)
+		if (worker &&
+			rstate->state != SUBREL_STATE_SYNCDONE)
 		{
-			char		syncslotname[NAMEDATALEN] = {0};
-
-			ReplicationSlotNameForTablesync(subid, relid, syncslotname,
-											sizeof(syncslotname));
 			elog(WARNING, "could not drop tablesync replication slot \"%s\"",
-				 syncslotname);
+				 worker->slot_name);
 		}
 	}
 
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3bbd522724..96cc38eeb5 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -263,7 +263,7 @@ logicalrep_workers_find(Oid subid, bool only_running)
  */
 void
 logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
-						 Oid relid)
+						 Oid relid, int64 slotid)
 {
 	BackgroundWorker bgw;
 	BackgroundWorkerHandle *bgw_handle;
@@ -370,7 +370,11 @@ retry:
 	/* Prepare the worker slot. */
 	worker->launch_time = now;
 	worker->in_use = true;
+	worker->is_first_run = true;
 	worker->generation++;
+	worker->created_slot = false;
+	worker->rep_slot_id = slotid;
+	worker->slot_name = (char *) palloc(NAMEDATALEN);
 	worker->proc = NULL;
 	worker->dbid = dbid;
 	worker->userid = userid;
@@ -378,6 +382,7 @@ retry:
 	worker->relid = relid;
 	worker->relstate = SUBREL_STATE_UNKNOWN;
 	worker->relstate_lsn = InvalidXLogRecPtr;
+	worker->move_to_next_rel = false;
 	worker->stream_fileset = NULL;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
@@ -868,7 +873,7 @@ ApplyLauncherMain(Datum main_arg)
 					wait_time = wal_retrieve_retry_interval;
 
 					logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
-											 sub->owner, InvalidOid);
+											 sub->owner, InvalidOid, 0);
 				}
 			}
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6a01ffd273..a561e6f29d 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -126,12 +126,8 @@ static bool FetchTableStates(bool *started_tx);
 
 static StringInfo copybuf = NULL;
 
-/*
- * Exit routine for synchronization worker.
- */
 static void
-pg_attribute_noreturn()
-finish_sync_worker(void)
+clean_sync_worker(void)
 {
 	/*
 	 * Commit any outstanding transaction. This is the usual case, unless
@@ -143,18 +139,27 @@ finish_sync_worker(void)
 		pgstat_report_stat(true);
 	}
 
-	/* And flush all writes. */
-	XLogFlush(GetXLogWriteRecPtr());
-
-	StartTransactionCommand();
-	ereport(LOG,
-			(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
-					MySubscription->name,
-					get_rel_name(MyLogicalRepWorker->relid))));
-	CommitTransactionCommand();
+	/* Disconnect from publisher.
+	 * Otherwise reused sync workers causes exceeding max_wal_senders 
+	 */
+	walrcv_disconnect(LogRepWorkerWalRcvConn);
+	LogRepWorkerWalRcvConn = NULL;
 
 	/* Find the main apply worker and signal it. */
 	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+}
+
+/*
+ * Exit routine for synchronization worker.
+ */
+static void
+pg_attribute_noreturn()
+finish_sync_worker(void)
+{
+	clean_sync_worker();
+	
+	/* And flush all writes. */
+	XLogFlush(GetXLogWriteRecPtr());
 
 	/* Stop gracefully */
 	proc_exit(0);
@@ -180,7 +185,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
 		LogicalRepWorker *worker;
 		XLogRecPtr	statelsn;
 
-		CHECK_FOR_INTERRUPTS();
+		CHECK_FOR_INTERRUPTS();		
 
 		InvalidateCatalogSnapshot();
 		state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
@@ -284,6 +289,10 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 static void
 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 {
+	List	   *rstates;
+	SubscriptionRelState *rstate;
+	ListCell   *lc;
+
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 
 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
@@ -299,7 +308,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
 		/*
 		 * UpdateSubscriptionRelState must be called within a transaction.
-		 * That transaction will be ended within the finish_sync_worker().
 		 */
 		if (!IsTransactionState())
 			StartTransactionCommand();
@@ -308,6 +316,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 								   MyLogicalRepWorker->relid,
 								   MyLogicalRepWorker->relstate,
 								   MyLogicalRepWorker->relstate_lsn);
+		CommitTransactionCommand();
 
 		/*
 		 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
@@ -317,24 +326,88 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
 		/*
 		 * Cleanup the tablesync slot.
+		 * If the slot name used by this worker is different from the default 
+		 * slot name for the worker, this means the current table had started to being 
+		 * synchronized by another worker and replication slot. And this worker
+		 * is reusing a replication slot from a previous attempt.
+		 * We do not need that replication slot anymore.
 		 *
 		 * This has to be done after updating the state because otherwise if
 		 * there is an error while doing the database operations we won't be
 		 * able to rollback dropped slot.
 		 */
 		ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
-										MyLogicalRepWorker->relid,
+										MyLogicalRepWorker->rep_slot_id,
 										syncslotname,
 										sizeof(syncslotname));
 
+
+		/* This transaction will be ended within the clean_sync_worker(). */
+		StartTransactionCommand();
+		if (MyLogicalRepWorker->slot_name && strcmp(syncslotname, MyLogicalRepWorker->slot_name) != 0)
+		{
+			ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, MyLogicalRepWorker->slot_name, false);
+			UpdateSubscriptionRelReplicationSlot(MyLogicalRepWorker->subid,
+												 MyLogicalRepWorker->relid,
+												 NULL);
+		}
+
+		ereport(LOG,
+				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
+						MySubscription->name,
+						get_rel_name(MyLogicalRepWorker->relid))));
+
 		/*
-		 * It is important to give an error if we are unable to drop the slot,
-		 * otherwise, it won't be dropped till the corresponding subscription
-		 * is dropped. So passing missing_ok = false.
+		 * Check if any table whose relation state is still INIT. 
+		 * If a table in INIT state is found, the worker will not be finished,
+		 * it will be reused instead.
 		 */
-		ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
+		rstates = GetSubscriptionRelations(MySubscription->oid, true);
+		
+		foreach (lc, rstates)
+		{
+			rstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
+			memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
+
+			/* 
+			 * Pick the table for the next run
+			 * if there is not another worker already picked that table.
+			 */
+			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+			if (rstate->state != SUBREL_STATE_SYNCDONE && 
+				!logicalrep_worker_find(MySubscription->oid, rstate->relid, false))
+			{
+				/* Update worker state for the next table */
+				MyLogicalRepWorker->is_first_run = false;
+				MyLogicalRepWorker->relid = rstate->relid;
+				MyLogicalRepWorker->relstate = rstate->state;
+				MyLogicalRepWorker->relstate_lsn = rstate->lsn;
+				MyLogicalRepWorker->move_to_next_rel = true;
+				LWLockRelease(LogicalRepWorkerLock);
+				break;
+			}
+			LWLockRelease(LogicalRepWorkerLock);
+		}
+
+		/* Cleanup before next run or ending the worker. */
+		if(!MyLogicalRepWorker->move_to_next_rel)
+		{
+		   /*
+			* It is important to give an error if we are unable to drop the slot,
+			* otherwise, it won't be dropped till the corresponding subscription
+			* is dropped. So passing missing_ok = false.
+			*/
+			if (MyLogicalRepWorker->created_slot)
+			{
+				ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
+			}
 
-		finish_sync_worker();
+			finish_sync_worker();
+		}
+		else
+		{
+			clean_sync_worker();
+		}
 	}
 	else
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
@@ -473,9 +546,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				/*
 				 * Update the state to READY only after the origin cleanup.
 				 */
-				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+				UpdateSubscriptionRelStateAndSlot(MyLogicalRepWorker->subid,
 										   rstate->relid, rstate->state,
-										   rstate->lsn);
+										   rstate->lsn, NULL);
 			}
 		}
 		else
@@ -564,11 +637,21 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						TimestampDifferenceExceeds(hentry->last_start_time, now,
 												   wal_retrieve_retry_interval))
 					{
+						if (IsTransactionState())
+							CommitTransactionCommand();
+						StartTransactionCommand();
+						started_tx = true;
+
+						MySubscription->lastusedid++;
+						UpdateSubscriptionLastSlotId(MyLogicalRepWorker->subid,
+													MySubscription->lastusedid);
+
 						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
 												 MySubscription->oid,
 												 MySubscription->name,
 												 MyLogicalRepWorker->userid,
-												 rstate->relid);
+												 rstate->relid,
+												 MySubscription->lastusedid);
 						hentry->last_start_time = now;
 					}
 				}
@@ -1140,8 +1223,8 @@ copy_table(Relation rel)
  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
  * on slot name length. We append system_identifier to avoid slot_name
  * collision with subscriptions in other clusters. With the current scheme
- * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
- * length of slot_name will be 50.
+ * pg_%u_sync_%lu_UINT64_FORMAT (3 + 10 + 6 + 20 + 20 + '\0'), the maximum
+ * length of slot_name will be 45.
  *
  * The returned slot name is stored in the supplied buffer (syncslotname) with
  * the given size.
@@ -1152,11 +1235,10 @@ copy_table(Relation rel)
  * had changed.
  */
 void
-ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
-								char *syncslotname, int szslot)
+ReplicationSlotNameForTablesync(Oid suboid, int64 slotid, char *syncslotname, int szslot)
 {
-	snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
-			 relid, GetSystemIdentifier());
+	snprintf(syncslotname, szslot, "pg_%u_sync_%lu_" UINT64_FORMAT, suboid,
+			 slotid, GetSystemIdentifier());
 }
 
 /*
@@ -1191,6 +1273,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	WalRcvExecResult *res;
 	char		originname[NAMEDATALEN];
 	RepOriginId originid;
+	char  		*prev_slotname;
 
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
@@ -1219,7 +1302,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	/* Calculate the name of the tablesync slot. */
 	slotname = (char *) palloc(NAMEDATALEN);
 	ReplicationSlotNameForTablesync(MySubscription->oid,
-									MyLogicalRepWorker->relid,
+									MyLogicalRepWorker->rep_slot_id,
 									slotname,
 									NAMEDATALEN);
 
@@ -1245,6 +1328,20 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 									  originname,
 									  sizeof(originname));
 
+
+	/*
+	 * See if tablesync of the current relation has been 
+	 * started with another replication slot. 
+	 * 
+	 * Read previous slot name from the catalog, if exists.
+	 */
+	prev_slotname = (char *) palloc0(NAMEDATALEN);
+	StartTransactionCommand();
+	GetSubscriptionRelReplicationSlot(MyLogicalRepWorker->subid,
+									  MyLogicalRepWorker->relid,
+									  prev_slotname);
+	CommitTransactionCommand();
+
 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
 	{
 		/*
@@ -1258,10 +1355,34 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 * breakdown then it wouldn't have succeeded so trying it next time
 		 * seems like a better bet.
 		 */
-		ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
+		if (strcmp(prev_slotname, ""))
+		{
+			ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, prev_slotname, true);
+		
+			StartTransactionCommand();
+			UpdateSubscriptionRelReplicationSlot(MyLogicalRepWorker->subid,
+												  MyLogicalRepWorker->relid,
+												  NULL);
+			CommitTransactionCommand();
+		}
 	}
 	else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
 	{
+		/*
+		 * At this point, the table that is currently being synchronized should have
+		 * its replication slot name filled in the catalog. The tablesync process was started 
+		 * with another sync worker and replication slot.
+		 * We need to continue using the same replication slot in this worker too.
+		 */
+		if (!strcmp(prev_slotname, ""))
+		{
+			elog(ERROR, "Replication slot could not be found for relation %u",
+				 MyLogicalRepWorker->relid);
+		}
+
+		/* Proceed with the correct replication slot. */
+		slotname = prev_slotname;
+
 		/*
 		 * The COPY phase was previously done, but tablesync then crashed
 		 * before it was able to finish normally.
@@ -1289,10 +1410,11 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 	/* Update the state and make it visible to others. */
 	StartTransactionCommand();
-	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+	UpdateSubscriptionRelStateAndSlot(MyLogicalRepWorker->subid,
 							   MyLogicalRepWorker->relid,
 							   MyLogicalRepWorker->relstate,
-							   MyLogicalRepWorker->relstate_lsn);
+							   MyLogicalRepWorker->relstate_lsn,
+							   slotname);
 	CommitTransactionCommand();
 	pgstat_report_stat(true);
 
@@ -1351,16 +1473,31 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 * for the catchup phase after COPY is done, so tell it to use the
 	 * snapshot to make the final data consistent.
 	 *
+	 * Replication slot will only be created if either this is the first run 
+	 * of the worker or we're not using a previous replication slot. 
+	 * 
 	 * Prevent cancel/die interrupts while creating slot here because it is
 	 * possible that before the server finishes this command, a concurrent
 	 * drop subscription happens which would complete without removing this
 	 * slot leading to a dangling slot on the server.
 	 */
-	HOLD_INTERRUPTS();
-	walrcv_create_slot(LogRepWorkerWalRcvConn,
-					   slotname, false /* permanent */ , false /* two_phase */ ,
-					   CRS_USE_SNAPSHOT, origin_startpos);
-	RESUME_INTERRUPTS();
+	if (MyLogicalRepWorker->is_first_run ||
+		(!strcmp(prev_slotname, "") && !MyLogicalRepWorker->created_slot))
+	{
+		HOLD_INTERRUPTS();
+		walrcv_create_slot(LogRepWorkerWalRcvConn,
+						slotname, false /* permanent */ , false /* two_phase */ ,
+						CRS_USE_SNAPSHOT, origin_startpos);
+		RESUME_INTERRUPTS();
+
+		/*
+		 * Remember that we created the slot so that
+		 * we will not try to create it again.
+		 */
+		SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+		MyLogicalRepWorker->created_slot = true;
+		SpinLockRelease(&MyLogicalRepWorker->relmutex);
+	}
 
 	/*
 	 * Setup replication origin tracking. The purpose of doing this before the
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5f8c541763..163c0ea921 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -315,6 +315,7 @@ static void stream_cleanup_files(Oid subid, TransactionId xid);
 static void stream_open_file(Oid subid, TransactionId xid, bool first);
 static void stream_write_change(char action, StringInfo s);
 static void stream_close_file(void);
+static void stream_build_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos);
 
 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
 
@@ -2814,6 +2815,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 			/* Process any table synchronization changes. */
 			process_syncing_tables(last_received);
+			if (MyLogicalRepWorker->move_to_next_rel)
+			{
+				endofstream = true;
+			}
 		}
 
 		/* Cleanup the memory. */
@@ -2915,8 +2920,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
 
-	/* All done */
-	walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
+	/* 
+	 * If it's moving to next relation, this is a sync worker.
+	 * Sync workers end the streaming during process_syncing_tables_for_sync.
+	 * Calling endstreaming twice causes "no COPY in progress" errors.
+	 */
+	if (!MyLogicalRepWorker->move_to_next_rel)
+	{
+		/* All done */
+		walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
+	}
 }
 
 /*
@@ -3458,6 +3471,35 @@ stream_write_change(char action, StringInfo s)
 	BufFileWrite(stream_fd, &s->data[s->cursor], len);
 }
 
+/*
+ * stream_build_options_replication
+ * 		Build logical replication streaming options.
+ *
+ * This function sets streaming options including replication slot name
+ * and origin start position. Workers need these options for logical replication.
+ */
+static void
+stream_build_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
+{
+	int server_version;
+
+	options->logical = true;
+	options->startpoint = *origin_startpos;
+	options->slotname = slotname;
+
+	server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
+	options->proto.logical.proto_version =
+		server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
+		server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
+		LOGICALREP_PROTO_VERSION_NUM;
+
+	options->proto.logical.publication_names = MySubscription->publications;
+	options->proto.logical.binary = MySubscription->binary;
+	options->proto.logical.streaming = MySubscription->stream;
+	options->proto.logical.twophase = false;
+	options->proto.logical.origin = pstrdup(MySubscription->origin);
+}
+
 /*
  * Cleanup the memory for subxacts and reset the related variables.
  */
@@ -3532,6 +3574,9 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
 
 	/* allocate slot name in long-lived context */
 	*myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
+
+	/* Keep the replication slot name used for this sync. */
+	MyLogicalRepWorker->slot_name = *myslotname;
 	pfree(syncslotname);
 }
 
@@ -3569,6 +3614,136 @@ start_apply(XLogRecPtr origin_startpos)
 	PG_END_TRY();
 }
 
+/*
+ * Runs the tablesync worker.
+ * It starts table sync. After successful sync, 
+ * builds streaming options and starts streaming. 
+ */
+static void
+run_tablesync_worker(WalRcvStreamOptions *options, 
+					 char *slotname,
+					 char *originname,
+					 int originame_size,
+					 XLogRecPtr *origin_startpos)
+{
+	/* Set this to false for safety, in case we're already reusing the worker */
+    MyLogicalRepWorker->move_to_next_rel = false;
+
+    start_table_sync(origin_startpos, &slotname);
+
+    /*
+        * Allocate the origin name in long-lived context for error context
+        * message.
+        */
+    ReplicationOriginNameForTablesync(MySubscription->oid,
+                                        MyLogicalRepWorker->relid,
+                                        originname,
+                                        originame_size);
+    apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+                                                                originname);
+    
+    stream_build_options(options, slotname, origin_startpos);
+
+    /* Start normal logical streaming replication. */
+	walrcv_startstreaming(LogRepWorkerWalRcvConn, options);
+}
+
+/*
+ * Runs the apply worker.
+ * It sets up replication origin, the streaming options 
+ * and then starts streaming. 
+ */
+static void
+run_apply_worker(WalRcvStreamOptions *options,
+				 char *slotname,
+				 char *originname,
+				 int originname_size,
+				 XLogRecPtr *origin_startpos)
+{
+    RepOriginId originid;
+    TimeLineID	startpointTLI;
+    char	   *err;
+
+    slotname = MySubscription->slotname;
+
+    /*
+	 * This shouldn't happen if the subscription is enabled, but guard
+	 * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
+	 * crash if slot is NULL.)
+	 */
+    if (!slotname)
+        ereport(ERROR,
+                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                    errmsg("subscription has no replication slot set")));
+
+    /* Setup replication origin tracking. */
+    StartTransactionCommand();
+    snprintf(originname, originname_size, "pg_%u", MySubscription->oid);
+    originid = replorigin_by_name(originname, true);
+    if (!OidIsValid(originid))
+        originid = replorigin_create(originname);
+    replorigin_session_setup(originid);
+    replorigin_session_origin = originid;
+    *origin_startpos = replorigin_session_get_progress(false);
+    CommitTransactionCommand();
+
+    LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+                                            MySubscription->name, &err);
+    if (LogRepWorkerWalRcvConn == NULL)
+        ereport(ERROR,
+                (errcode(ERRCODE_CONNECTION_FAILURE),
+                    errmsg("could not connect to the publisher: %s", err)));
+
+    /*
+	 * We don't really use the output identify_system for anything but it
+	 * does some initializations on the upstream so let's still call it.
+	 */
+    (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+    /*
+	 * Allocate the origin name in long-lived context for error context
+	 * message.
+	 */
+    apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+                                                                originname);
+
+    stream_build_options(options, slotname, origin_startpos);
+
+    /*
+     * Even when the two_phase mode is requested by the user, it remains
+     * as the tri-state PENDING until all tablesyncs have reached READY
+     * state. Only then, can it become ENABLED.
+     *
+     * Note: If the subscription has no tables then leave the state as
+     * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+     * work.
+     */
+    if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+        AllTablesyncsReady())
+    {
+        /* Start streaming with two_phase enabled */
+        options->proto.logical.twophase = true;
+        walrcv_startstreaming(LogRepWorkerWalRcvConn, options);
+
+        StartTransactionCommand();
+        UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
+        MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+        CommitTransactionCommand();
+    }
+    else
+    {
+        walrcv_startstreaming(LogRepWorkerWalRcvConn, options);
+    }
+
+    ereport(DEBUG1,
+            (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s",
+                    MySubscription->name,
+                    MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
+                    MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
+                    MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
+                    "?")));
+}
+
 /* Logical Replication Apply worker entry point */
 void
 ApplyWorkerMain(Datum main_arg)
@@ -3579,7 +3754,6 @@ ApplyWorkerMain(Datum main_arg)
 	XLogRecPtr	origin_startpos = InvalidXLogRecPtr;
 	char	   *myslotname = NULL;
 	WalRcvStreamOptions options;
-	int			server_version;
 
 	/* Attach to slot */
 	logicalrep_worker_attach(worker_slot);
@@ -3670,142 +3844,55 @@ ApplyWorkerMain(Datum main_arg)
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
 
-	if (am_tablesync_worker())
-	{
-		start_table_sync(&origin_startpos, &myslotname);
-
-		/*
-		 * Allocate the origin name in long-lived context for error context
-		 * message.
-		 */
-		ReplicationOriginNameForTablesync(MySubscription->oid,
-										  MyLogicalRepWorker->relid,
-										  originname,
-										  sizeof(originname));
-		apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
-																   originname);
-	}
-	else
-	{
-		/* This is main apply worker */
-		RepOriginId originid;
-		TimeLineID	startpointTLI;
-		char	   *err;
-
-		myslotname = MySubscription->slotname;
-
-		/*
-		 * This shouldn't happen if the subscription is enabled, but guard
-		 * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
-		 * crash if slot is NULL.)
-		 */
-		if (!myslotname)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("subscription has no replication slot set")));
-
-		/* Setup replication origin tracking. */
-		StartTransactionCommand();
-		snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
-		originid = replorigin_by_name(originname, true);
-		if (!OidIsValid(originid))
-			originid = replorigin_create(originname);
-		replorigin_session_setup(originid);
-		replorigin_session_origin = originid;
-		origin_startpos = replorigin_session_get_progress(false);
-		CommitTransactionCommand();
-
-		LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
-												MySubscription->name, &err);
-		if (LogRepWorkerWalRcvConn == NULL)
-			ereport(ERROR,
-					(errcode(ERRCODE_CONNECTION_FAILURE),
-					 errmsg("could not connect to the publisher: %s", err)));
-
-		/*
-		 * We don't really use the output identify_system for anything but it
-		 * does some initializations on the upstream so let's still call it.
-		 */
-		(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
-
-		/*
-		 * Allocate the origin name in long-lived context for error context
-		 * message.
-		 */
-		apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
-																   originname);
-	}
-
 	/*
-	 * Setup callback for syscache so that we know when something changes in
-	 * the subscription relation state.
-	 */
+	* Setup callback for syscache so that we know when something changes in
+	* the subscription relation state.
+	* Do this outside the loop to avoid exceeding MAX_SYSCACHE_CALLBACKS
+	*/
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
-								  invalidate_syncing_table_states,
-								  (Datum) 0);
-
-	/* Build logical replication streaming options. */
-	options.logical = true;
-	options.startpoint = origin_startpos;
-	options.slotname = myslotname;
-
-	server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
-	options.proto.logical.proto_version =
-		server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
-		server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
-		LOGICALREP_PROTO_VERSION_NUM;
-
-	options.proto.logical.publication_names = MySubscription->publications;
-	options.proto.logical.binary = MySubscription->binary;
-	options.proto.logical.streaming = MySubscription->stream;
-	options.proto.logical.twophase = false;
-	options.proto.logical.origin = pstrdup(MySubscription->origin);
+								invalidate_syncing_table_states,
+								(Datum) 0);
 
-	if (!am_tablesync_worker())
+	/*
+	 * The loop where worker does its job.
+	 * It loops until the worker is not reused. 
+	 */
+	while (MyLogicalRepWorker->is_first_run || 
+			MyLogicalRepWorker->move_to_next_rel)
 	{
-		/*
-		 * Even when the two_phase mode is requested by the user, it remains
-		 * as the tri-state PENDING until all tablesyncs have reached READY
-		 * state. Only then, can it become ENABLED.
-		 *
-		 * Note: If the subscription has no tables then leave the state as
-		 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
-		 * work.
-		 */
-		if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
-			AllTablesyncsReady())
+		if (am_tablesync_worker())
+			{
+				/* 
+				* This is a tablesync worker. 
+				* Start syncing tables before starting the apply loop.  
+				*/
+				run_tablesync_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos);
+			}
+			else
+			{
+				/* This is main apply worker */
+				run_apply_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos);
+			}
+		
+		/* Run the main loop. */
+		start_apply(origin_startpos);
+
+		if (MyLogicalRepWorker->move_to_next_rel)
 		{
-			/* Start streaming with two_phase enabled */
-			options.proto.logical.twophase = true;
-			walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+			/* Reset the currenct replication origin session.
+			* Since we'll use the same process for another relation, it needs to be reset 
+			* and will be created again later while syncing the new relation.
+			*/
+			replorigin_session_origin = InvalidRepOriginId;
+			replorigin_session_reset();
 
 			StartTransactionCommand();
-			UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
-			MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+			ereport(LOG,
+					(errmsg("logical replication table synchronization worker for subscription \"%s\" has moved to sync table \"%s\".",
+							MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
 			CommitTransactionCommand();
 		}
-		else
-		{
-			walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
-		}
-
-		ereport(DEBUG1,
-				(errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s",
-						MySubscription->name,
-						MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
-						MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
-						MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
-						"?")));
 	}
-	else
-	{
-		/* Start normal logical streaming replication. */
-		walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
-	}
-
-	/* Run the main loop. */
-	start_apply(origin_startpos);
-
 	proc_exit(0);
 }
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 7b98714f30..2d391615e8 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -102,6 +102,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	/* Only publish data originating from the specified origin */
 	text		suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY);
+
+	/* The last used ID to create a replication slot for tablesync */
+	int64		sublastusedid BKI_DEFAULT(0);	
 #endif
 } FormData_pg_subscription;
 
@@ -135,11 +138,13 @@ typedef struct Subscription
 	List	   *publications;	/* List of publication names to subscribe to */
 	char	   *origin;			/* Only publish data originating from the
 								 * specified origin */
+	int64		lastusedid;		/* Last used unique ID to create replication slots in tablesync */
 } Subscription;
 
 extern Subscription *GetSubscription(Oid subid, bool missing_ok);
 extern void FreeSubscription(Subscription *sub);
 extern void DisableSubscription(Oid subid);
+extern void UpdateSubscriptionLastSlotId(Oid subid, int64 lastusedid);
 
 extern int	CountDBSubscriptions(Oid dbid);
 
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 8e88de7b2b..7373df40ab 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -44,6 +44,8 @@ CATALOG(pg_subscription_rel,6102,SubscriptionRelRelationId)
 											 * used for synchronization
 											 * coordination, or NULL if not
 											 * valid */
+	text		srrelslotname BKI_FORCE_NULL;	/* name of the replication slot for relatio in subscription */
+
 #endif
 } FormData_pg_subscription_rel;
 
@@ -81,12 +83,18 @@ typedef struct SubscriptionRelState
 } SubscriptionRelState;
 
 extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
-									XLogRecPtr sublsn);
+									XLogRecPtr sublsn, char *relslotname);
 extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 									   XLogRecPtr sublsn);
 extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
+extern void UpdateSubscriptionRelReplicationSlot(Oid subid, Oid relid, char *relslotname);
+extern void GetSubscriptionRelReplicationSlot(Oid subid, Oid relid, char *slotname);
+
+extern void UpdateSubscriptionRelStateAndSlot(Oid subid, Oid relid, char state,
+									XLogRecPtr sublsn, char *relslotname);
+
 extern bool HasSubscriptionRelations(Oid subid);
 extern List *GetSubscriptionRelations(Oid subid, bool not_ready);
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8c9f3321d5..4b3e2993e3 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -217,8 +217,9 @@ extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern int	ReplicationSlotIndex(ReplicationSlot *slot);
-extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);
+extern void ReplicationSlotNameForTablesync(Oid suboid, int64 slotid, char *syncslotname, int szslot);
 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
+extern List* GetReplicationSlotNamesBySubId(WalReceiverConn *wrconn, Oid subid, bool missing_ok);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 901845abc2..5fabd1a829 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -30,6 +30,27 @@ typedef struct LogicalRepWorker
 	/* Indicates if this slot is used or free. */
 	bool		in_use;
 
+	/* 
+	 * Indicates if worker is running for the first time
+	 * or in reuse
+	 */
+	bool		is_first_run;
+
+	/* 
+	 * Indicates if the sync worker created a replication slot
+	 * or it reuses an existing one created by another worker.
+	 */
+	bool		created_slot;
+
+	/* 
+	 * Unique identifier for replication slot to be 
+	 * created by tablesnync workers, if needed.
+	 */
+	int64 	 	rep_slot_id;
+
+	/* Replication slot name used by the worker. */
+	char		*slot_name;
+
 	/* Increased every time the slot is taken by new worker. */
 	uint16		generation;
 
@@ -51,6 +72,11 @@ typedef struct LogicalRepWorker
 	XLogRecPtr	relstate_lsn;
 	slock_t		relmutex;
 
+	/* 
+	 * Used to indicate whether sync worker will be reused for another relation
+	 */
+	bool		move_to_next_rel;
+	
 	/*
 	 * Used to create the changes and subxact files for the streaming
 	 * transactions.  Upon the arrival of the first streaming transaction, the
@@ -85,7 +111,7 @@ extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
-									 Oid userid, Oid relid);
+									 Oid userid, Oid relid, int64 slotid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
diff --git a/src/test/regress/expected/misc_sanity.out b/src/test/regress/expected/misc_sanity.out
index a57fd142a9..a2a6f14944 100644
--- a/src/test/regress/expected/misc_sanity.out
+++ b/src/test/regress/expected/misc_sanity.out
@@ -60,7 +60,8 @@ ORDER BY 1, 2;
  pg_index                | indpred       | pg_node_tree
  pg_largeobject          | data          | bytea
  pg_largeobject_metadata | lomacl        | aclitem[]
-(11 rows)
+ pg_subscription_rel     | srrelslotname | text
+(12 rows)
 
 -- system catalogs without primary keys
 --
-- 
2.25.1

