From 3ebb31e3b5e822b8c2ef3e87827e9269650fb46e Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Thu, 17 Aug 2023 12:05:54 +0530
Subject: [PATCH v12 2/2] Add logical slot sync capability to physical standby

For max number of slot-synchronization workers, new GUC max_slot_sync_workers
has been added, default value and max value is kept at 2 and 50 respectively
for this PoC patch. This is not a run-time modifiable GUC.

For slots to be synchronised, another GUC is added:
synchronize_slot_names: This is a runtime modifiable GUC.

Now replication launcher on physical standby queries primary to get list
of dbids which belong to slots mentioned in GUC 'synchronize_slot_names'.
Once it gets the dbids, if dbids < max_slot_sync_workers, it starts only
that many workers and if dbids > max_slot_sync_workers, it starts
max_slot_sync_workers and divides the work equally among them.
Each worker is then responsible to keep on syncing all the slots belonging
to the DBs assigned to it.

Let us say slots mentioned in 'synchronize_slot_names' on primary belongs to
10 DBs and say the new GUC is set at default value of 2, then each worker
will manage 5 dbs and will keep on synching the slots for them. If a new
DB is found by replication launcher, it will assign this new db to
the worker handling the minimum number of dbs currently (or first
worker in case of equal count)

Each worker slot will have its own dbids list. Since the upper limit
of this dbid-count is not known, it needs to be handled using dsm. We
initially allocate memory to hold 100 dbids for each worker. If this limit
is exhausted, we reallocate this memory with size incremented again by 100
and relaunch the worker.
---
 src/backend/postmaster/bgworker.c             |   3 +
 .../libpqwalreceiver/libpqwalreceiver.c       |  74 ++
 src/backend/replication/logical/Makefile      |   1 +
 src/backend/replication/logical/launcher.c    | 833 ++++++++++++++++--
 src/backend/replication/logical/meson.build   |   1 +
 src/backend/replication/logical/slotsync.c    | 579 ++++++++++++
 src/backend/replication/logical/tablesync.c   |   1 +
 src/backend/replication/repl_gram.y           |  32 +-
 src/backend/replication/repl_scanner.l        |   2 +
 src/backend/replication/walsender.c           |  98 +++
 src/backend/storage/lmgr/lwlocknames.txt      |   1 +
 .../utils/activity/wait_event_names.txt       |   1 +
 src/backend/utils/misc/guc_tables.c           |  18 +-
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/commands/subscriptioncmds.h       |   3 +
 src/include/nodes/replnodes.h                 |   9 +
 src/include/postmaster/bgworker_internals.h   |   1 +
 src/include/replication/logicallauncher.h     |   4 +
 src/include/replication/logicalworker.h       |   1 +
 src/include/replication/slot.h                |   2 -
 src/include/replication/walreceiver.h         |  20 +
 src/include/replication/worker_internal.h     |  41 +-
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/051_slot_sync.pl          | 132 +++
 24 files changed, 1787 insertions(+), 72 deletions(-)
 create mode 100644 src/backend/replication/logical/slotsync.c
 mode change 100644 => 100755 src/backend/replication/walsender.c
 create mode 100644 src/test/recovery/t/051_slot_sync.pl

diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 505e38376c..216287d56a 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -129,6 +129,9 @@ static const struct
 	{
 		"ApplyWorkerMain", ApplyWorkerMain
 	},
+	{
+		"ReplSlotSyncMain", ReplSlotSyncMain
+	},
 	{
 		"ParallelApplyWorkerMain", ParallelApplyWorkerMain
 	},
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 60d5c1fc40..d1e71bac0a 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -34,6 +34,7 @@
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
 #include "utils/tuplestore.h"
+#include "utils/varlena.h"
 
 PG_MODULE_MAGIC;
 
@@ -58,6 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
 									char **sender_host, int *sender_port);
 static char *libpqrcv_identify_system(WalReceiverConn *conn,
 									  TimeLineID *primary_tli);
+static List *libpqrcv_list_db_for_logical_slots(WalReceiverConn *conn, const char *slot_names);
 static int	libpqrcv_server_version(WalReceiverConn *conn);
 static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 											 TimeLineID tli, char **filename,
@@ -96,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	.walrcv_receive = libpqrcv_receive,
 	.walrcv_send = libpqrcv_send,
 	.walrcv_create_slot = libpqrcv_create_slot,
+	.walrcv_list_db_for_logical_slots = libpqrcv_list_db_for_logical_slots,
 	.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
 	.walrcv_exec = libpqrcv_exec,
 	.walrcv_disconnect = libpqrcv_disconnect
@@ -409,6 +412,77 @@ libpqrcv_server_version(WalReceiverConn *conn)
 	return PQserverVersion(conn->streamConn);
 }
 
+/*
+ * Get list of DBs for logical slots from primary.
+ */
+static List *
+libpqrcv_list_db_for_logical_slots(WalReceiverConn *conn, const char *slot_names)
+{
+	PGresult   *res;
+	List	   *slotlist = NIL;
+	int			ntuples;
+	StringInfoData s;
+	WalRecvReplicationSlotDbData *slot_data;
+
+	initStringInfo(&s);
+	appendStringInfoString(&s, "LIST_DBID_FOR_LOGICAL_SLOTS");
+
+	if (strcmp(slot_names, "") != 0 && strcmp(slot_names, "*") != 0)
+	{
+		char	   *rawname;
+		List	   *namelist;
+		ListCell   *lc;
+
+		appendStringInfoChar(&s, ' ');
+		rawname = pstrdup(slot_names);
+		SplitIdentifierString(rawname, ',', &namelist);
+		foreach (lc, namelist)
+		{
+			if (lc != list_head(namelist))
+				appendStringInfoChar(&s, ',');
+			appendStringInfo(&s, "%s",
+							 quote_identifier(lfirst(lc)));
+		}
+	}
+
+	res = libpqrcv_PQexec(conn->streamConn, s.data);
+	pfree(s.data);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		PQclear(res);
+		ereport(ERROR,
+				(errmsg("could not receive list of slots the primary server: %s",
+						pchomp(PQerrorMessage(conn->streamConn)))));
+	}
+	if (PQnfields(res) < 1)
+	{
+		int			nfields = PQnfields(res);
+
+		PQclear(res);
+		ereport(ERROR,
+				(errmsg("invalid response from primary server"),
+				 errdetail("Could not get list of slots: got %d fields, "
+							"expected %d or more fields.",
+						   nfields, 1)));
+	}
+
+	ntuples = PQntuples(res);
+	for (int i = 0; i < ntuples; i++)
+	{
+
+		slot_data = palloc0(sizeof(WalRecvReplicationSlotDbData));
+		if (!PQgetisnull(res, i, 0))
+			slot_data->database = atooid(PQgetvalue(res, i, 0));
+
+		slot_data->last_sync_time = 0;
+		slotlist = lappend(slotlist, slot_data);
+	}
+
+	PQclear(res);
+
+	return slotlist;
+}
+
 /*
  * Start streaming WAL data from given streaming options.
  *
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 2dc25e37bb..ba03eeff1c 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -25,6 +25,7 @@ OBJS = \
 	proto.o \
 	relation.o \
 	reorderbuffer.o \
+	slotsync.o \
 	snapbuild.o \
 	tablesync.o \
 	worker.o
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7cc0a16d3b..231b1dfcc9 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -22,6 +22,7 @@
 #include "access/htup_details.h"
 #include "access/tableam.h"
 #include "access/xact.h"
+#include "catalog/pg_authid.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "funcapi.h"
@@ -57,6 +58,16 @@
 int			max_logical_replication_workers = 4;
 int			max_sync_workers_per_subscription = 2;
 int			max_parallel_apply_workers_per_subscription = 2;
+int			max_slot_sync_workers = 2;
+
+/*
+ * Initial and incremental allocation size for dbids array for each
+ * SlotSyncWorker in dynamic shared memory. Once it is exhausted, dbids will
+ * be reallocted with size incremented by ALLOC_DB_PER_WORKER
+ */
+#define	ALLOC_DB_PER_WORKER 100
+
+SlotSyncWorker *MySlotSyncWorker = NULL;
 
 LogicalRepWorker *MyLogicalRepWorker = NULL;
 
@@ -70,6 +81,7 @@ typedef struct LogicalRepCtxStruct
 	dshash_table_handle last_start_dsh;
 
 	/* Background workers. */
+	SlotSyncWorker   *ss_workers;                      /* slot sync workers */
 	LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
 } LogicalRepCtxStruct;
 
@@ -107,7 +119,6 @@ static void logicalrep_launcher_attach_dshmem(void);
 static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
 static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
 
-
 /*
  * Load the list of subscriptions.
  *
@@ -925,7 +936,7 @@ ApplyLauncherRegister(void)
 	memset(&bgw, 0, sizeof(bgw));
 	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
 		BGWORKER_BACKEND_DATABASE_CONNECTION;
-	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	bgw.bgw_start_time = BgWorkerStart_ConsistentState;
 	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 	snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
 	snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -947,6 +958,7 @@ void
 ApplyLauncherShmemInit(void)
 {
 	bool		found;
+	bool		foundSlotSync;
 
 	LogicalRepCtx = (LogicalRepCtxStruct *)
 		ShmemInitStruct("Logical Replication Launcher Data",
@@ -971,6 +983,24 @@ ApplyLauncherShmemInit(void)
 			SpinLockInit(&worker->relmutex);
 		}
 	}
+
+	/* Allocate shared-memory for slot-sync workers pool now */
+	LogicalRepCtx->ss_workers = (SlotSyncWorker *)
+		ShmemInitStruct("Replication slot synchronization workers",
+				mul_size(max_slot_sync_workers, sizeof(SlotSyncWorker)),
+				&foundSlotSync);
+
+	if (!foundSlotSync)
+	{
+		int	slot;
+
+		for (slot = 0; slot < max_slot_sync_workers; slot++)
+		{
+			SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[slot];
+
+			memset(worker, 0, sizeof(SlotSyncWorker));
+		}
+	}
 }
 
 /*
@@ -1108,6 +1138,734 @@ ApplyLauncherWakeup(void)
 		kill(LogicalRepCtx->launcher_pid, SIGUSR1);
 }
 
+/*
+ * Clean up slot-sync worker info.
+ */
+static void
+sloysync_worker_cleanup(SlotSyncWorker *worker)
+{
+	Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_EXCLUSIVE));
+
+	worker->in_use = false;
+	worker->proc = NULL;
+	worker->dbid = InvalidOid;
+	worker->userid = InvalidOid;
+	worker->slot = -1;
+
+	if (worker->dsm_seg)
+		dsm_detach(worker->dsm_seg);
+
+	worker->dsm_seg = NULL;
+	worker->dbcount = 0;
+	worker->dbids = NULL;
+}
+
+/*
+ * Attach Slot-sync worker to worker-slot assigned by launcher.
+ */
+void
+slotsync_worker_attach(int slot)
+{
+	/* Block concurrent access. */
+	LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE);
+
+	Assert(slot >= 0 && slot < max_slot_sync_workers);
+	MySlotSyncWorker = &LogicalRepCtx->ss_workers[slot];
+	MySlotSyncWorker->slot = slot;
+
+	if (!MySlotSyncWorker->in_use)
+	{
+		LWLockRelease(SlotSyncWorkerLock);
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("logical replication worker slot %d is empty, cannot attach",
+						slot)));
+	}
+
+	if (MySlotSyncWorker->proc)
+	{
+		LWLockRelease(SlotSyncWorkerLock);
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("logical replication worker slot %d is already used by "
+						"another worker, cannot attach", slot)));
+	}
+
+	MySlotSyncWorker->proc = MyProc;
+
+	LWLockRelease(SlotSyncWorkerLock);
+}
+
+/*
+ * Wait for a background worker to start up and attach to the shmem context.
+ *
+ * This is only needed for cleaning up the shared memory in case the worker
+ * fails to attach.
+ *
+ * Returns whether the attach was successful.
+ */
+static bool
+WaitForSlotSyncWorkerAttach(SlotSyncWorker *worker,
+							   uint16 generation,
+							   BackgroundWorkerHandle *handle)
+{
+	BgwHandleStatus status;
+	int			rc;
+
+	for (;;)
+	{
+		pid_t		pid;
+
+		CHECK_FOR_INTERRUPTS();
+
+		LWLockAcquire(SlotSyncWorkerLock, LW_SHARED);
+
+		/* Worker either died or has started. Return false if died. */
+		if (!worker->in_use || worker->proc)
+		{
+			LWLockRelease(SlotSyncWorkerLock);
+			return worker->in_use;
+		}
+
+		LWLockRelease(SlotSyncWorkerLock);
+
+		/* Check if worker has died before attaching, and clean up after it. */
+		status = GetBackgroundWorkerPid(handle, &pid);
+
+		if (status == BGWH_STOPPED)
+		{
+			LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE);
+			/* Ensure that this was indeed the worker we waited for. */
+			if (generation == worker->generation)
+				sloysync_worker_cleanup(worker);
+			LWLockRelease(SlotSyncWorkerLock);
+			return false;
+		}
+
+		/*
+		 * We need timeout because we generally don't get notified via latch
+		 * about the worker attach.  But we don't expect to have to wait long.
+		 */
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					   10L, WAIT_EVENT_BGWORKER_STARTUP);
+
+		if (rc & WL_LATCH_SET)
+		{
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
+	}
+}
+
+/*
+ * Slot Sync worker find.
+ *
+ * Walks the slot-sync workers pool and searches for one that matches given
+ * dbid. Since one worker can manage multiple dbs, so it walks the db array in
+ * each worker to find the match.
+ */
+static SlotSyncWorker *
+slotsync_worker_find(Oid dbid)
+{
+	int			i;
+	SlotSyncWorker *res = NULL;
+
+	Assert(LWLockHeldByMe(SlotSyncWorkerLock));
+
+	/* Search for attached worker for a given dbid */
+	for (i = 0; i < max_slot_sync_workers; i++)
+	{
+		SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i];
+		int             cnt;
+
+		if (!w->in_use)
+			continue;
+
+		SpinLockAcquire(&w->mutex);
+		for (cnt = 0; cnt < w->dbcount; cnt++)
+		{
+			Oid  wdbid = w->dbids[cnt];
+			if (wdbid == dbid)
+			{
+				res = w;
+				break;
+			}
+		}
+		SpinLockRelease(&w->mutex);
+
+		/* if worker is found, break the outer loop */
+		if (res)
+			break;
+	}
+
+	return res;
+}
+
+/*
+ * Setup DSM for slot-sync worker.
+ *
+ * This is needed for dbids array. Since max number of dbs a worker can
+ * manage is not known, so lets start with 'ALLOC_DB_PER_WORKER' size.
+ * If this size if exhausted, we can re-allocate a bigger chunk later
+ * with size incremented by 'ALLOC_DB_PER_WORKER' size.
+ */
+static void slot_sync_dsm_setup(SlotSyncWorker *worker, int alloc_db_count)
+{
+	shm_toc    *toc;
+	shm_toc_estimator e;
+	Size            segsize;
+	dsm_segment *seg;
+	int i;
+	int dbids_size = alloc_db_count * sizeof(Oid);
+
+	shm_toc_initialize_estimator(&e);
+	shm_toc_estimate_chunk(&e, dbids_size);
+	shm_toc_estimate_keys(&e, 1);
+
+	segsize = shm_toc_estimate(&e);
+
+	seg = dsm_create(segsize, 0);
+
+	toc = shm_toc_create(PG_SLOT_SYNC_SHM_MAGIC, dsm_segment_address(seg), segsize);
+
+	worker->dbids = shm_toc_allocate(toc, dbids_size);
+
+	SpinLockInit(&worker->mutex);
+
+	worker->dbcount = 0;
+
+	for (i = 0; i < alloc_db_count; i++)
+		worker->dbids[0] = InvalidOid;
+
+	shm_toc_insert(toc, SLOT_SYNC_DBIDS_KEY_SHARED, worker->dbids);
+
+	worker->dsm_seg = seg;
+
+	ereport(DEBUG1,
+		(errmsg("allocated dsm for slot sync worker for dbcount: %d",
+			alloc_db_count)));
+	/*
+	 * Note: at this point, we have not created any ResourceOwner in this
+	 * process. This will result in our DSM mapping surviving until process
+	 * exit or until explicitly detached and thus we do not need dsm_pin_mapping.
+	 * By default, mappings are owned by the current resource owner, which
+	 * typically means they stick around for the duration of the current query
+	 * only. See comments atop dsm_create and dsm_pin_mapping.
+	 */
+}
+
+/*
+ * Stop the slot-sync worker and wait until it detaches from the
+ * slot.
+ */
+static void
+slot_sync_worker_stop(SlotSyncWorker *worker)
+{
+
+	Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED));
+
+	/* send SIGINT so that it exists cleanly ... */
+	kill(worker->proc->pid, SIGINT);
+
+	/* ... and wait for it to exit. */
+	for (;;)
+	{
+		int			rc;
+
+		/* is it gone? */
+		if (!worker->proc)
+			break;
+
+		LWLockRelease(SlotSyncWorkerLock);
+
+		/* Wait a bit --- we don't expect to have to wait long. */
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					   10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
+
+		if (rc & WL_LATCH_SET)
+		{
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
+
+		LWLockAcquire(SlotSyncWorkerLock, LW_SHARED);
+	}
+
+}
+
+/*
+ * Slot sync worker launch or reuse
+ *
+ * Start new slot-sync background worker, if there is possibility to do so
+ * going by max_slot_sync_workers count. If the worker pool is exhausted,
+ * reuse the existing worker with minimumim number of dbs. The idea is to
+ * always distribute the dbs equally among launched workers.
+ * If initially allocated db-array is exhausted for the selected worker,
+ * reallocate the db array with increased size and re-launch the worker.
+ *
+ * Returns true on success, false on failure.
+ */
+static bool
+slot_sync_worker_launch_or_reuse(Oid dbid, Oid userid)
+{
+	BackgroundWorker bgw;
+	BackgroundWorkerHandle *bgw_handle;
+	uint16		generation;
+	uint			i;
+	SlotSyncWorker *worker = NULL;
+	uint 			mindbcnt = 0;
+	uint			alloc_count = 0;
+	uint                     copied_dbcnt = 0;
+	Oid                     *copied_dbids = NULL;
+	int			worker_slot = -1;
+	dsm_handle handle;
+
+	Assert(OidIsValid(dbid));
+
+	/*
+	 * We need to do the modification of the shared memory under lock so that
+	 * we have consistent view.
+	 */
+	LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE);
+
+	/* Find unused worker slot. */
+	for (i = 0; i < max_slot_sync_workers; i++)
+	{
+		SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i];
+
+		if (!w->in_use)
+		{
+			worker = w;
+			worker_slot = i;
+			break;
+		}
+	}
+
+	/* If all the workers are currently in use. Find the one with
+	 * minimum number of dbs and use that. */
+	if (!worker)
+	{
+		for (i = 0; i < max_slot_sync_workers; i++)
+		{
+			SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i];
+
+			if (i == 0)
+			{
+				mindbcnt = w->dbcount;
+				worker = w;
+				worker_slot = i;
+			}
+			else if (w->dbcount < mindbcnt)
+			{
+				mindbcnt = w->dbcount;
+				worker = w;
+				worker_slot = i;
+			}
+		}
+	}
+
+	/*
+	 * If worker is being reused, and there is vacancy in dbids array,
+	 * just update dbids array and dbcount and we are done.
+	 * But if dbids array is exhausted, stop the worker, reallocate
+	 * dbids in dsm, relaunch the worker with same set of dbs as earlier
+	 * plus the new db.
+	 */
+	if (worker->in_use)
+	{
+		if(worker->dbcount < ALLOC_DB_PER_WORKER)
+		{
+			SpinLockAcquire(&worker->mutex);
+			worker->dbids[worker->dbcount++] = dbid;
+			SpinLockRelease(&worker->mutex);
+
+			LWLockRelease(SlotSyncWorkerLock);
+
+			ereport(LOG,
+					(errmsg("Adding database %d to replication slot"
+					" synchronization worker %d",
+					dbid, worker_slot)));
+			return true;
+		}
+		else
+		{
+			/*
+			 * Release exclusive lock and take shared one. This is
+			 * needed before sending SIGINT, so that worker can update
+			 * the status on receiving SIGINT.
+			 */
+			LWLockRelease(SlotSyncWorkerLock);
+			LWLockAcquire(SlotSyncWorkerLock, LW_SHARED);
+
+			/*
+			 * Remember the old dbids before we stop and cleanup this worker
+			 * as these will be needed in order to relaunch the worker.
+			 */
+			copied_dbcnt = worker->dbcount;
+			copied_dbids = (Oid *)palloc0(worker->dbcount * sizeof(Oid));
+
+			for (i = 0; i < worker->dbcount; i++)
+				copied_dbids[i] = worker->dbids[i];
+
+			/* we are ready to stop the worker now */
+			slot_sync_worker_stop(worker);
+
+			/* Release shared lock and take exclusive one */
+			LWLockRelease(SlotSyncWorkerLock);
+			LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE);
+
+			/*
+			 * Cleanup this worker after it has stopped so that
+			 * we are ready to restart.
+			 */
+			sloysync_worker_cleanup(worker);
+		}
+
+	}
+
+	/* Prepare the new worker. */
+	worker->launch_time = GetCurrentTimestamp();
+	worker->in_use = true;
+
+	/* 'proc' and 'slot' will be assigned in ReplSlotSyncMain when we
+	 * attach this worker to a particular worker-pool slot */
+	worker->proc = NULL;
+	worker->slot = -1;
+
+	/* TODO: do we really need these 2? analyse more here */
+	worker->dbid = dbid;
+	worker->generation++;
+
+	/*
+	 * If it is relaunch of worker after db-array exhaustion, find the new
+	 * alloc-size for dbids else go with ALLOC_DB_PER_WORKER in case of fresh
+	 * launch.
+	 */
+	if (copied_dbcnt)
+		alloc_count = copied_dbcnt + ALLOC_DB_PER_WORKER;
+	else
+		alloc_count = ALLOC_DB_PER_WORKER;
+
+	/* Set up DSM for dbids array to hold 'alloc_count' dbs */
+	slot_sync_dsm_setup(worker, alloc_count);
+
+	/* if it is a reallocation and relaunch, copy the old dbs info back to worker */
+	if (copied_dbcnt)
+	{
+		worker->dbcount = copied_dbcnt;
+		for (i = 0; i < copied_dbcnt; i++)
+			worker->dbids[i] = copied_dbids[i];
+	}
+
+	worker->dbids[worker->dbcount++] = dbid;
+	worker->userid = userid;
+
+	/* Before releasing lock, remember generation for future identification. */
+	generation = worker->generation;
+
+	LWLockRelease(SlotSyncWorkerLock);
+
+	/* Register the new dynamic worker. */
+	memset(&bgw, 0, sizeof(bgw));
+	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
+		BGWORKER_BACKEND_DATABASE_CONNECTION;
+	bgw.bgw_start_time = BgWorkerStart_ConsistentState;
+	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
+
+	snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncMain");
+
+	Assert (worker_slot >= 0);
+	snprintf(bgw.bgw_name, BGW_MAXLEN,
+			 "replication slot synchronization worker %d", worker_slot);
+
+	snprintf(bgw.bgw_type, BGW_MAXLEN, "slot synchronization worker");
+
+	bgw.bgw_restart_time = BGW_NEVER_RESTART;
+	bgw.bgw_notify_pid = MyProcPid;
+	bgw.bgw_main_arg = Int32GetDatum(worker_slot);
+
+	handle = dsm_segment_handle(worker->dsm_seg);
+	memcpy(bgw.bgw_extra, &handle, sizeof(dsm_handle));
+
+	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
+	{
+		/* Failed to start worker, so clean up the worker slot. */
+		LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE);
+		Assert(generation == worker->generation);
+		sloysync_worker_cleanup(worker);
+		LWLockRelease(SlotSyncWorkerLock);
+
+		ereport(WARNING,
+				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+				 errmsg("out of background worker slots"),
+				 errhint("You might need to increase %s.", "max_worker_processes")));
+		return false;
+	}
+
+	/* Now wait until it attaches. */
+	return WaitForSlotSyncWorkerAttach(worker, generation, bgw_handle);
+}
+
+/*
+ * Slot-sync workers remove no longer needed DBs from db-list
+ *
+ * If we find that DBIds fetched from primary this time are subset
+ * of what our workers are managing, then remove extra dbs from
+ * worker's db-list. This may happen if some slots are removed on
+ * primary or 'synchronize_slot_names' have been changed by user.
+ */
+static void
+sync_slot_workers_remove_extra_dbs(List *remote_dbs)
+{
+	int widx;
+	int dbidx;
+	ListCell   *lc;
+
+	LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE);
+
+	/* Traverse slot-sync-workers to validate the DBs */
+	for (widx = 0; widx < max_slot_sync_workers; widx++)
+	{
+		SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx];
+
+		if (!worker->in_use)
+			continue;
+
+		for (dbidx = 0; dbidx < worker->dbcount;)
+		{
+			Oid  wdbid = worker->dbids[dbidx];
+			bool found = false;
+
+			/* Check if current DB is still present in remote-db-list */
+			foreach(lc, remote_dbs)
+			{
+				WalRecvReplicationSlotDbData *slot_db_data = lfirst(lc);
+				if (slot_db_data->database == wdbid)
+				{
+					found = true;
+					break;
+				}
+			}
+
+			/* If not found, then delete this db from worker's db-list */
+			if (!found)
+			{
+				int i;
+				SpinLockAcquire(&worker->mutex);
+
+				for (i = dbidx; i < worker->dbcount; i++)
+				{
+					/* Shift the DBs and get rid of wdbid */
+					if (i < (worker->dbcount - 1))
+						worker->dbids[i] = worker->dbids[i+1];
+				}
+
+				worker->dbcount--;
+				SpinLockRelease(&worker->mutex);
+
+				ereport(LOG,
+					(errmsg("Removed database %d from replication slot"
+					" synchronization worker %d",
+					wdbid, worker->slot)));
+			}
+			/* Else move to next db-position */
+			else
+			{
+				dbidx++;
+			}
+		}
+	}
+
+	LWLockRelease(SlotSyncWorkerLock);
+
+
+	/* If dbcount for any worker has become 0, shut it down */
+	for (widx = 0; widx < max_slot_sync_workers; widx++)
+	{
+		SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx];
+
+		if (worker->in_use && !worker->dbcount)
+		{
+			int slot = worker->slot;
+
+			LWLockAcquire(SlotSyncWorkerLock, LW_SHARED);
+			slot_sync_worker_stop(worker);
+			LWLockRelease(SlotSyncWorkerLock);
+
+			LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE);
+			sloysync_worker_cleanup(worker);
+			LWLockRelease(SlotSyncWorkerLock);
+			ereport(LOG,
+				(errmsg("Stopped replication slot synchronization worker %d", slot)));
+		}
+	}
+
+}
+
+/*
+ * Start slot-sync background workers.
+ *
+ * It connects to primary, get the list of DBIDs for slots configured in
+ * synchronize_slot_names. It then launces the slot-sync workers as per
+ * max_slot_sync_workers and then assign the DBs equally to the workers
+ * launched.
+ */
+static void
+ApplyLauncherStartSlotSync(long *wait_time)
+{
+	WalReceiverConn *wrconn;
+	char	   *err;
+	List	   *slots_dbs;
+	ListCell   *lc;
+	MemoryContext tmpctx;
+	MemoryContext oldctx;
+
+	if (max_slot_sync_workers == 0)
+		return;
+
+	if (strcmp(synchronize_slot_names, "") == 0)
+		return;
+
+	wrconn = walrcv_connect(PrimaryConnInfo, false, false,
+							"Logical Replication Launcher", &err);
+	if (!wrconn)
+		ereport(ERROR,
+				(errmsg("could not connect to the primary server: %s", err)));
+
+	/* Use temporary context for the slot list and worker info. */
+	tmpctx = AllocSetContextCreate(TopMemoryContext,
+								   "Logical Replication Launcher Slot Sync ctx",
+								   ALLOCSET_DEFAULT_SIZES);
+	oldctx = MemoryContextSwitchTo(tmpctx);
+
+	slots_dbs = walrcv_list_db_for_logical_slots(wrconn, synchronize_slot_names);
+
+	sync_slot_workers_remove_extra_dbs(slots_dbs);
+
+	foreach(lc, slots_dbs)
+	{
+		WalRecvReplicationSlotDbData *slot_db_data = lfirst(lc);
+		SlotSyncWorker *w;
+		TimestampTz last_sync;
+		TimestampTz	now;
+		long		elapsed;
+
+		if (!OidIsValid(slot_db_data->database))
+			continue;
+
+		LWLockAcquire(SlotSyncWorkerLock, LW_SHARED);
+		w = slotsync_worker_find(slot_db_data->database);
+		LWLockRelease(SlotSyncWorkerLock);
+
+		if (w != NULL)
+			continue;		/* worker is running already */
+
+		/*
+		 * If the worker is eligible to start now, launch it.  Otherwise,
+		 * adjust wait_time so that we'll wake up as soon as it can be
+		 * started.
+		 *
+		 * Each apply worker can only be restarted once per
+		 * wal_retrieve_retry_interval, so that errors do not cause us to
+		 * repeatedly restart the worker as fast as possible.
+		 */
+		last_sync = slot_db_data->last_sync_time;
+		now = GetCurrentTimestamp();
+		if (last_sync == 0 ||
+			(elapsed = TimestampDifferenceMilliseconds(last_sync, now)) >= wal_retrieve_retry_interval)
+		{
+			slot_db_data->last_sync_time = now;
+			slot_sync_worker_launch_or_reuse(slot_db_data->database,
+									 BOOTSTRAP_SUPERUSERID);
+		}
+		else
+		{
+			*wait_time = Min(*wait_time,
+							wal_retrieve_retry_interval - elapsed);
+		}
+	}
+
+	/* Switch back to original memory context. */
+	MemoryContextSwitchTo(oldctx);
+	/* Clean the temporary memory. */
+	MemoryContextDelete(tmpctx);
+
+	walrcv_disconnect(wrconn);
+}
+
+static void
+ApplyLauncherStartSubs(long *wait_time)
+{
+	List	   *sublist;
+	ListCell   *lc;
+	MemoryContext subctx;
+	MemoryContext oldctx;
+
+	/* Use temporary context to avoid leaking memory across cycles. */
+	subctx = AllocSetContextCreate(TopMemoryContext,
+								   "Logical Replication Launcher sublist",
+								   ALLOCSET_DEFAULT_SIZES);
+	oldctx = MemoryContextSwitchTo(subctx);
+
+	/* Start any missing workers for enabled subscriptions. */
+	sublist = get_subscription_list();
+	foreach(lc, sublist)
+	{
+		Subscription *sub = (Subscription *) lfirst(lc);
+		LogicalRepWorker *w;
+		TimestampTz last_start;
+		TimestampTz now;
+		long		elapsed;
+
+		if (!sub->enabled)
+			continue;
+
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+		LWLockRelease(LogicalRepWorkerLock);
+
+		if (w != NULL)
+			continue;		/* worker is running already */
+
+		/*
+		 * If the worker is eligible to start now, launch it.  Otherwise,
+		 * adjust wait_time so that we'll wake up as soon as it can be
+		 * started.
+		 *
+		 * Each subscription's apply worker can only be restarted once per
+		 * wal_retrieve_retry_interval, so that errors do not cause us to
+		 * repeatedly restart the worker as fast as possible.  In cases
+		 * where a restart is expected (e.g., subscription parameter
+		 * changes), another process should remove the last-start entry
+		 * for the subscription so that the worker can be restarted
+		 * without waiting for wal_retrieve_retry_interval to elapse.
+		 */
+		last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
+		now = GetCurrentTimestamp();
+		if (last_start == 0 ||
+			(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
+		{
+			ApplyLauncherSetWorkerStartTime(sub->oid, now);
+			logicalrep_worker_launch(WORKERTYPE_APPLY,
+									 sub->dbid, sub->oid, sub->name,
+									 sub->owner, InvalidOid,
+									 DSM_HANDLE_INVALID);
+		}
+		else
+		{
+			*wait_time = Min(*wait_time,
+							wal_retrieve_retry_interval - elapsed);
+		}
+	}
+
+	/* Switch back to original memory context. */
+	MemoryContextSwitchTo(oldctx);
+	/* Clean the temporary memory. */
+	MemoryContextDelete(subctx);
+}
+
 /*
  * Main loop for the apply launcher process.
  */
@@ -1133,79 +1891,20 @@ ApplyLauncherMain(Datum main_arg)
 	 */
 	BackgroundWorkerInitializeConnection(NULL, NULL, 0);
 
+	load_file("libpqwalreceiver", false);
+
 	/* Enter main loop */
 	for (;;)
 	{
 		int			rc;
-		List	   *sublist;
-		ListCell   *lc;
-		MemoryContext subctx;
-		MemoryContext oldctx;
 		long		wait_time = DEFAULT_NAPTIME_PER_CYCLE;
 
 		CHECK_FOR_INTERRUPTS();
 
-		/* Use temporary context to avoid leaking memory across cycles. */
-		subctx = AllocSetContextCreate(TopMemoryContext,
-									   "Logical Replication Launcher sublist",
-									   ALLOCSET_DEFAULT_SIZES);
-		oldctx = MemoryContextSwitchTo(subctx);
-
-		/* Start any missing workers for enabled subscriptions. */
-		sublist = get_subscription_list();
-		foreach(lc, sublist)
-		{
-			Subscription *sub = (Subscription *) lfirst(lc);
-			LogicalRepWorker *w;
-			TimestampTz last_start;
-			TimestampTz now;
-			long		elapsed;
-
-			if (!sub->enabled)
-				continue;
-
-			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-			w = logicalrep_worker_find(sub->oid, InvalidOid, false);
-			LWLockRelease(LogicalRepWorkerLock);
-
-			if (w != NULL)
-				continue;		/* worker is running already */
-
-			/*
-			 * If the worker is eligible to start now, launch it.  Otherwise,
-			 * adjust wait_time so that we'll wake up as soon as it can be
-			 * started.
-			 *
-			 * Each subscription's apply worker can only be restarted once per
-			 * wal_retrieve_retry_interval, so that errors do not cause us to
-			 * repeatedly restart the worker as fast as possible.  In cases
-			 * where a restart is expected (e.g., subscription parameter
-			 * changes), another process should remove the last-start entry
-			 * for the subscription so that the worker can be restarted
-			 * without waiting for wal_retrieve_retry_interval to elapse.
-			 */
-			last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
-			now = GetCurrentTimestamp();
-			if (last_start == 0 ||
-				(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
-			{
-				ApplyLauncherSetWorkerStartTime(sub->oid, now);
-				logicalrep_worker_launch(WORKERTYPE_APPLY,
-										 sub->dbid, sub->oid, sub->name,
-										 sub->owner, InvalidOid,
-										 DSM_HANDLE_INVALID);
-			}
-			else
-			{
-				wait_time = Min(wait_time,
-								wal_retrieve_retry_interval - elapsed);
-			}
-		}
-
-		/* Switch back to original memory context. */
-		MemoryContextSwitchTo(oldctx);
-		/* Clean the temporary memory. */
-		MemoryContextDelete(subctx);
+		if (!RecoveryInProgress())
+			ApplyLauncherStartSubs(&wait_time);
+		else
+			ApplyLauncherStartSlotSync(&wait_time);
 
 		/* Wait for more work. */
 		rc = WaitLatch(MyLatch,
diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build
index d48cd4c590..9e52ec421f 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -11,6 +11,7 @@ backend_sources += files(
   'proto.c',
   'relation.c',
   'reorderbuffer.c',
+  'slotsync.c',
   'snapbuild.c',
   'tablesync.c',
   'worker.c',
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
new file mode 100644
index 0000000000..db8f164687
--- /dev/null
+++ b/src/backend/replication/logical/slotsync.c
@@ -0,0 +1,579 @@
+/*-------------------------------------------------------------------------
+ * slotsync.c
+ *	   PostgreSQL worker for synchronizing slots to a standby from primary
+ *
+ * Copyright (c) 2016-2018, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/slotsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "commands/dbcommands.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/interrupt.h"
+#include "replication/logical.h"
+#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
+#include "replication/walreceiver.h"
+#include "replication/worker_internal.h"
+#include "storage/ipc.h"
+#include "storage/procarray.h"
+#include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+#include "utils/guc_hooks.h"
+#include "utils/pg_lsn.h"
+#include "utils/varlena.h"
+
+typedef struct RemoteSlot
+{
+	char *name;
+	char *plugin;
+	char *database;
+	bool two_phase;
+	XLogRecPtr restart_lsn;
+	XLogRecPtr confirmed_lsn;
+	TransactionId catalog_xmin;
+} RemoteSlot;
+
+/*
+ * Wait for remote slot to pass localy reserved position.
+ */
+static void
+wait_for_primary_slot_catchup(WalReceiverConn *wrconn, char *slot_name,
+							  XLogRecPtr min_lsn)
+{
+	WalRcvExecResult *res;
+	TupleTableSlot *slot;
+	Oid			slotRow[1] = {LSNOID};
+	StringInfoData cmd;
+	bool		isnull;
+	XLogRecPtr	restart_lsn;
+
+	for (;;)
+	{
+		int			rc;
+
+		CHECK_FOR_INTERRUPTS();
+
+		initStringInfo(&cmd);
+		appendStringInfo(&cmd,
+						 "SELECT restart_lsn"
+						 "  FROM pg_catalog.pg_replication_slots"
+						 " WHERE slot_name = %s",
+						 quote_literal_cstr(slot_name));
+		res = walrcv_exec(wrconn, cmd.data, 1, slotRow);
+
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					(errmsg("could not fetch slot info for slot \"%s\" from primary: %s",
+							slot_name, res->err)));
+
+		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+			ereport(ERROR,
+					(errmsg("slot \"%s\" disapeared from provider",
+							slot_name)));
+
+		restart_lsn = DatumGetLSN(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		ExecClearTuple(slot);
+		walrcv_clear_result(res);
+
+		if (restart_lsn >= min_lsn)
+			break;
+
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+					   wal_retrieve_retry_interval,
+					   WAIT_EVENT_REPL_SLOT_SYNC_MAIN);
+
+		ResetLatch(MyLatch);
+
+		/* emergency bailout if postmaster has died */
+		if (rc & WL_POSTMASTER_DEATH)
+			proc_exit(1);
+	}
+}
+
+/*
+ * Update local slot metadata as per remote_slot's positions
+ */
+static void
+local_slot_update(RemoteSlot *remote_slot)
+{
+	LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn);
+	LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn,
+							   remote_slot->catalog_xmin);
+	LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn,
+										  remote_slot->restart_lsn);
+	ReplicationSlotMarkDirty();
+}
+
+/*
+ * Get list of local logical slot names belonging to DB ids passed in.
+ */
+static List *
+get_local_logical_slot_names(Oid *dbids)
+{
+	List *slotNames = NIL;
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+	for (int i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+		char *slotName;
+
+		if (s->in_use && SlotIsLogical(s))
+		{
+			SpinLockAcquire(&MySlotSyncWorker->mutex);
+			for (int j = 0; j < MySlotSyncWorker->dbcount; j++)
+			{
+				if (s->data.database == dbids[j])
+				{
+					slotName = pstrdup(NameStr(s->data.name));
+					slotNames = lappend(slotNames, slotName);
+					break;
+				}
+			}
+			SpinLockRelease(&MySlotSyncWorker->mutex);
+		}
+	}
+
+	LWLockRelease(ReplicationSlotControlLock);
+
+	return slotNames;
+}
+
+/*
+ * Helper function to check if a replication slot name exists in the list.
+ */
+static bool
+replication_slot_name_exists(List *list_slot_names, const char *slot_name)
+{
+	ListCell *cell;
+
+	foreach(cell, list_slot_names)
+	{
+		char *name = (char *) lfirst(cell);
+		if (strcmp(name, slot_name) == 0)
+			return true;
+	}
+
+	return false;
+}
+/*
+ * Synchronize single slot to given position.
+ *
+ * This optionally creates new slot if there is no existing one.
+ */
+static void
+synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
+{
+	bool		found = false;
+
+	/* Search for the named slot and mark it active if we find it. */
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (int i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (!s->in_use)
+			continue;
+
+		if (strcmp(NameStr(s->data.name), remote_slot->name) == 0)
+		{
+			found = true;
+			break;
+		}
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	StartTransactionCommand();
+
+	/* Already existing slot, acquire */
+	if (found)
+	{
+		ReplicationSlotAcquire(remote_slot->name, true);
+
+		if (remote_slot->confirmed_lsn < MyReplicationSlot->data.confirmed_flush)
+		{
+			elog(DEBUG1,
+				 "not synchronizing slot %s; synchronization would move it backward",
+				 remote_slot->name);
+
+			ReplicationSlotRelease();
+			CommitTransactionCommand();
+			return;
+		}
+
+		/* update lsns of slot to remote slot's current position */
+		local_slot_update(remote_slot);
+		ReplicationSlotSave();
+	}
+	/* Otherwise create the slot first. */
+	else
+	{
+		TransactionId xmin_horizon = InvalidTransactionId;
+		ReplicationSlot *slot;
+
+		ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL,
+								remote_slot->two_phase);
+		slot = MyReplicationSlot;
+
+		SpinLockAcquire(&slot->mutex);
+		slot->data.database = get_database_oid(remote_slot->database, false);
+		namestrcpy(&slot->data.plugin, remote_slot->plugin);
+		SpinLockRelease(&slot->mutex);
+
+		ReplicationSlotReserveWal();
+
+		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+		xmin_horizon = GetOldestSafeDecodingTransactionId(true);
+		slot->effective_catalog_xmin = xmin_horizon;
+		slot->data.catalog_xmin = xmin_horizon;
+		ReplicationSlotsComputeRequiredXmin(true);
+		LWLockRelease(ProcArrayLock);
+
+		if (remote_slot->confirmed_lsn < MyReplicationSlot->data.restart_lsn)
+		{
+			ereport(LOG,
+					errmsg("waiting for remote slot \"%s\" LSN (%X/%X) to pass local slot LSN (%X/%X)",
+						   remote_slot->name,
+						   LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn)));
+
+			wait_for_primary_slot_catchup(wrconn, remote_slot->name,
+										  MyReplicationSlot->data.restart_lsn);
+		}
+
+
+		/* update lsns of slot to remote slot's current position */
+		local_slot_update(remote_slot);
+		ReplicationSlotPersist();
+	}
+
+	ReplicationSlotRelease();
+	CommitTransactionCommand();
+}
+
+/*
+ * Synchronize slots belonging to all the dbs passed in dbids
+ */
+static void
+synchronize_slots(Oid *dbids)
+{
+	WalRcvExecResult  *res;
+	WalReceiverConn   *wrconn = NULL;
+	TupleTableSlot    *slot;
+	Oid	           slotRow[7] = {TEXTOID, TEXTOID, LSNOID, LSNOID,
+								XIDOID, BOOLOID, TEXTOID};
+	StringInfoData     s;
+	List			  *remote_slot_list = NIL;
+	List			  *local_slot_list = NIL;
+	ListCell		  *lc_slot;
+	char	          *database;
+	char	          *err;
+	int                i;
+	MemoryContext oldctx = CurrentMemoryContext;
+
+	if (!WalRcv)
+		return;
+
+	/* syscache access needs a transaction env. */
+	StartTransactionCommand();
+	/* make dbname live outside TX context */
+	MemoryContextSwitchTo(oldctx);
+
+	database = get_database_name(MyDatabaseId);
+	initStringInfo(&s);
+	appendStringInfo(&s, "%s dbname=%s", PrimaryConnInfo, database);
+	wrconn = walrcv_connect(s.data, true, false, "slot_sync", &err);
+
+	if (wrconn == NULL)
+		ereport(ERROR,
+				(errmsg("could not connect to the primary server: %s", err)));
+
+	resetStringInfo(&s);
+	appendStringInfo(&s,
+					 "SELECT slot_name, plugin, confirmed_flush_lsn,"
+					 " restart_lsn, catalog_xmin, two_phase, database"
+					 "  FROM pg_catalog.pg_replication_slots"
+					 " WHERE database IN ");
+
+
+	LWLockAcquire(SlotSyncWorkerLock, LW_SHARED);
+
+	/*
+	 * If dbcount has become zero, then return. It can happen in a case
+	 * when synchronize_slot_names changes and the dbs assigned to this
+	 * worker are no longer valid. Launcher will make dbcount=0 and will
+	 * send SIGINT to this worker. There is a small window between the
+	 * last time CHECK_FOR_INTERRUPTS was done and this stage, so there
+	 * is scope that SIGINT is sent in-between and dbcount is made zero,
+	 * so check for dbcount before further processing.
+	 */
+	if (!MySlotSyncWorker->dbcount)
+	{
+		/* return and let the worker process interrupts in main loop */
+		pfree(database);
+		pfree(s.data);
+		CommitTransactionCommand();
+		LWLockRelease(SlotSyncWorkerLock);
+		return;
+	}
+
+	SpinLockAcquire(&MySlotSyncWorker->mutex);
+	appendStringInfoChar(&s, '(');
+	for (i = 0; i < MySlotSyncWorker->dbcount; i++)
+	{
+		char	   *dbname;
+		if (i != 0)
+			appendStringInfoChar(&s, ',');
+
+		dbname = get_database_name(dbids[i]);
+		appendStringInfo(&s, "%s",
+						 quote_literal_cstr(dbname));
+		pfree(dbname);
+	}
+	appendStringInfoChar(&s, ')');
+	SpinLockRelease(&MySlotSyncWorker->mutex);
+
+	LWLockRelease(SlotSyncWorkerLock);
+
+
+	if (strcmp(synchronize_slot_names, "") != 0 &&
+		strcmp(synchronize_slot_names, "*") != 0)
+	{
+		char	   *rawname;
+		List	   *namelist;
+		ListCell   *lc;
+
+		rawname = pstrdup(synchronize_slot_names);
+		SplitIdentifierString(rawname, ',', &namelist);
+
+		appendStringInfoString(&s, " AND slot_name IN (");
+		foreach (lc, namelist)
+		{
+			if (lc != list_head(namelist))
+				appendStringInfoChar(&s, ',');
+			appendStringInfo(&s, "%s",
+							 quote_literal_cstr(lfirst(lc)));
+		}
+		appendStringInfoChar(&s, ')');
+	}
+
+	res = walrcv_exec(wrconn, s.data, 7, slotRow);
+	pfree(s.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not fetch slot info from primary: %s",
+						res->err)));
+
+	CommitTransactionCommand();
+	/* CommitTransactionCommand switches to TopMemoryContext */
+	MemoryContextSwitchTo(oldctx);
+
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		bool		isnull;
+		RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot));
+
+		remote_slot->name = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		remote_slot->plugin = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+		Assert(!isnull);
+
+		remote_slot->confirmed_lsn = DatumGetLSN(slot_getattr(slot, 3, &isnull));
+		Assert(!isnull);
+
+		remote_slot->restart_lsn = DatumGetLSN(slot_getattr(slot, 4, &isnull));
+		Assert(!isnull);
+
+		remote_slot->catalog_xmin = DatumGetTransactionId(slot_getattr(slot, 5, &isnull));
+		Assert(!isnull);
+
+		remote_slot->two_phase = DatumGetBool(slot_getattr(slot, 6, &isnull));
+		Assert(!isnull);
+
+		remote_slot->database = TextDatumGetCString(slot_getattr(slot, 7, &isnull));
+		Assert(!isnull);
+
+		/*
+		 * create a list of remote slot names, so that it can be compared with
+		 * local slots in order to drop local slots if they are no longer
+		 * present in remote db
+		 */
+		remote_slot_list = lappend(remote_slot_list, remote_slot->name);
+
+		synchronize_one_slot(wrconn, remote_slot);
+		pfree(remote_slot);
+
+		ExecClearTuple(slot);
+	}
+
+	/*
+	 * Get the list of local slots for dbids managed by this worker, so that
+	 * those not on remote could be dropped.
+	 */
+	local_slot_list = get_local_logical_slot_names(dbids);
+
+	foreach (lc_slot, local_slot_list)
+	{
+		char	*slotname = (char *) lfirst(lc_slot);
+
+		/* Check if the local slot name is not in the remote slot names list */
+		if (!replication_slot_name_exists(remote_slot_list, slotname))
+		{
+			ReplicationSlotDrop(slotname, true);
+
+			elog(LOG,"Dropped replication slot \"%s\" ", slotname);
+		}
+	}
+
+	walrcv_clear_result(res);
+	pfree(database);
+
+	walrcv_disconnect(wrconn);
+}
+
+/*
+ * Interrupt handler for main loop of slot sync worker.
+ */
+static void
+ProcessSlotSyncInterrupts(dsm_segment *seg)
+{
+	CHECK_FOR_INTERRUPTS();
+
+	if (ShutdownRequestPending)
+	{
+		ereport(LOG,
+			(errmsg("slot sync worker %d is shutting down on receiving "
+				"interrupt from logical replication launcher",
+				MySlotSyncWorker->slot)));
+
+		proc_exit(0);
+	}
+
+	if (ConfigReloadPending)
+	{
+		ConfigReloadPending = false;
+		ProcessConfigFile(PGC_SIGHUP);
+	}
+}
+
+/*
+ * Detach the worker from DSM and update 'proc' and 'in_use'.
+ * Logical replication launcher will come to know using these
+ * that the worker has shutdown.
+ * TODO: do we need better status passing here? some new field
+ * 'status' may be with values like RUNNING,SHUTDOWN etc?
+ */
+static void
+slotsync_worker_detach(int code, Datum arg)
+{
+	dsm_detach((dsm_segment *) DatumGetPointer(arg));
+	LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE);
+	MySlotSyncWorker->in_use = false;
+	MySlotSyncWorker->proc = NULL;
+	LWLockRelease(SlotSyncWorkerLock);
+}
+
+/*
+ * The main loop of our worker process.
+ */
+void
+ReplSlotSyncMain(Datum main_arg)
+{
+	int			worker_slot = DatumGetInt32(main_arg);
+	dsm_handle handle;
+	dsm_segment *seg;
+	shm_toc    *toc;
+	Oid		     *dbids;
+
+	/* Setup signal handling */
+	pqsignal(SIGHUP, SignalHandlerForConfigReload);
+	pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+	BackgroundWorkerUnblockSignals();
+
+	/*
+	 * Attach to the dynamic shared memory segment for the slot sync worker
+	 * and find its table of contents.
+	 */
+	memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
+	seg = dsm_attach(handle);
+	if (!seg)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("could not map dynamic shared memory segment for slot sync worker")));
+
+	toc = shm_toc_attach(PG_SLOT_SYNC_SHM_MAGIC, dsm_segment_address(seg));
+	if (!toc)
+		ereport(ERROR,(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("invalid magic number in dynamic shared memory segment for slot sync worker")));
+
+	/* Look up the shared information. */
+	dbids = shm_toc_lookup(toc, SLOT_SYNC_DBIDS_KEY_SHARED, false);
+
+	/* Primary initialization is complete. Now, attach to our slot. */
+	slotsync_worker_attach(worker_slot);
+
+	before_shmem_exit(slotsync_worker_detach, PointerGetDatum(seg));
+
+	/* Load the libpq-specific functions */
+	load_file("libpqwalreceiver", false);
+
+	/* Connect to our database. */
+	BackgroundWorkerInitializeConnectionByOid(MySlotSyncWorker->dbid,
+											  MySlotSyncWorker->userid,
+											  0);
+
+	StartTransactionCommand();
+	ereport(LOG,
+			(errmsg("replication slot synchronization worker %d "
+					"started managing database \"%s\" (dbid: %d) ",
+					worker_slot, get_database_name(MySlotSyncWorker->dbid),
+					MySlotSyncWorker->dbid)));
+	CommitTransactionCommand();
+
+	/* Main wait loop. */
+	for (;;)
+	{
+		int			rc;
+
+		ProcessSlotSyncInterrupts(seg);
+
+		if (!RecoveryInProgress())
+			return;
+
+		if (strcmp(synchronize_slot_names, "") == 0)
+			return;
+
+		synchronize_slots(dbids);
+
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+					   10,
+					   WAIT_EVENT_REPL_SLOT_SYNC_MAIN);
+
+		ResetLatch(MyLatch);
+
+		/* emergency bailout if postmaster has died */
+		if (rc & WL_POSTMASTER_DEATH)
+			proc_exit(1);
+	}
+
+	/*
+	 * The slot-sync worker must not get here because it will only stop when it
+	 * receives a SIGINT from the logical replication launcher, or when there is
+	 * an error. None of these cases will allow the code to reach here.
+	 */
+	Assert(false);
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 67bdd14095..5d77bca7be 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -100,6 +100,7 @@
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
+#include "commands/subscriptioncmds.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "parser/parse_relation.h"
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 0c874e33cf..2b00bf845c 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -76,11 +76,12 @@ Node *replication_parse_result;
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
+%token K_LIST_DBID_FOR_LOGICAL_SLOTS
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
 				create_replication_slot drop_replication_slot identify_system
-				read_replication_slot timeline_history show
+				read_replication_slot timeline_history show list_dbid_for_logical_slots
 %type <list>	generic_option_list
 %type <defelt>	generic_option
 %type <uintval>	opt_timeline
@@ -91,6 +92,7 @@ Node *replication_parse_result;
 %type <boolval>	opt_temporary
 %type <list>	create_slot_options create_slot_legacy_opt_list
 %type <defelt>	create_slot_legacy_opt
+%type <list>	slot_name_list slot_name_list_opt
 
 %%
 
@@ -114,6 +116,7 @@ command:
 			| read_replication_slot
 			| timeline_history
 			| show
+			| list_dbid_for_logical_slots
 			;
 
 /*
@@ -126,6 +129,33 @@ identify_system:
 				}
 			;
 
+slot_name_list:
+			IDENT
+				{
+					$$ = list_make1($1);
+				}
+			| slot_name_list ',' IDENT
+				{
+					$$ = lappend($1, $3);
+				}
+
+slot_name_list_opt:
+			slot_name_list			{ $$ = $1; }
+			| /* EMPTY */			{ $$ = NIL; }
+		;
+
+/*
+ * LIST_DBID_FOR_LOGICAL_SLOTS
+ */
+list_dbid_for_logical_slots:
+			K_LIST_DBID_FOR_LOGICAL_SLOTS slot_name_list_opt
+				{
+					ListDBForLogicalSlotsCmd *cmd = makeNode(ListDBForLogicalSlotsCmd);
+					cmd->slot_names = $2;
+					$$ = (Node *) cmd;
+				}
+			;
+
 /*
  * READ_REPLICATION_SLOT %s
  */
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 1cc7fb858c..d4ecce6a47 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -128,6 +128,7 @@ DROP_REPLICATION_SLOT		{ return K_DROP_REPLICATION_SLOT; }
 TIMELINE_HISTORY	{ return K_TIMELINE_HISTORY; }
 PHYSICAL			{ return K_PHYSICAL; }
 RESERVE_WAL			{ return K_RESERVE_WAL; }
+LIST_DBID_FOR_LOGICAL_SLOTS	{ return K_LIST_DBID_FOR_LOGICAL_SLOTS; }
 LOGICAL				{ return K_LOGICAL; }
 SLOT				{ return K_SLOT; }
 TEMPORARY			{ return K_TEMPORARY; }
@@ -304,6 +305,7 @@ replication_scanner_is_replication_command(void)
 		case K_READ_REPLICATION_SLOT:
 		case K_TIMELINE_HISTORY:
 		case K_SHOW:
+		case K_LIST_DBID_FOR_LOGICAL_SLOTS:
 			/* Yes; push back the first token so we can parse later. */
 			repl_pushed_back_token = first_token;
 			return true;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
old mode 100644
new mode 100755
index d27ef2985d..3d1940d185
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -473,6 +473,97 @@ IdentifySystem(void)
 	end_tup_output(tstate);
 }
 
+static int
+pg_qsort_namecmp(const void *a, const void *b)
+{
+	return strncmp(NameStr(*(Name) a), NameStr(*(Name) b), NAMEDATALEN);
+}
+
+/*
+ * Handle the LIST_SLOT_DATABASE_OIDS command.
+ */
+static void
+ListSlotDatabaseOIDs(ListDBForLogicalSlotsCmd *cmd)
+{
+	DestReceiver *dest;
+	TupOutputState *tstate;
+	TupleDesc tupdesc;
+	NameData *slot_names = NULL;
+	int numslot_names;
+	List *database_oids_list = NIL;
+
+	numslot_names = list_length(cmd->slot_names);
+	if (numslot_names)
+	{
+		ListCell *lc;
+		int i = 0;
+
+		slot_names = palloc(numslot_names * sizeof(NameData));
+		foreach (lc, cmd->slot_names)
+		{
+			char *slot_name = lfirst(lc);
+
+			ReplicationSlotValidateName(slot_name, ERROR);
+			namestrcpy(&slot_names[i++], slot_name);
+		}
+
+		qsort(slot_names, numslot_names, sizeof(NameData), pg_qsort_namecmp);
+	}
+
+	dest = CreateDestReceiver(DestRemoteSimple);
+
+	/* need a tuple descriptor representing a single column */
+	tupdesc = CreateTemplateTupleDesc(1);
+	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber)1, "database_oid",
+							  INT8OID, -1, 0);
+
+	/* prepare for projection of tuples */
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (int slotno = 0; slotno < max_replication_slots; slotno++)
+	{
+		ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
+		Oid datoid; /* Variable to store the database OID for each slot */
+		Datum values[1];
+		bool nulls[1];
+
+		if (!slot->in_use)
+			continue;
+
+		SpinLockAcquire(&slot->mutex);
+
+		datoid = slot->data.database;
+
+		SpinLockRelease(&slot->mutex);
+
+		/* If slot names were provided and the current slot name is not in the list, skip it. */
+		if (numslot_names &&
+			!bsearch((void *)&slot->data.name, (void *)slot_names,
+				numslot_names, sizeof(NameData), pg_qsort_namecmp))
+			continue;
+
+		/* Check if the database OID is already in the list, and if so, skip this slot. */
+		if ((OidIsValid(datoid) && list_member_oid(database_oids_list, datoid)))
+			continue;
+
+		/* Add the database OID to the list */
+		database_oids_list = lappend_oid(database_oids_list, datoid);
+
+		values[0] = Int64GetDatum(datoid);
+		nulls[0] = (datoid == InvalidOid);
+
+		/* send it to dest */
+		do_tup_output(tstate, values, nulls);
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	/* Clean up the list */
+	list_free(database_oids_list);
+
+	end_tup_output(tstate);
+}
+
 /* Handle READ_REPLICATION_SLOT command */
 static void
 ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
@@ -1819,6 +1910,13 @@ exec_replication_command(const char *cmd_string)
 			EndReplicationCommand(cmdtag);
 			break;
 
+		case T_ListDBForLogicalSlotsCmd:
+			cmdtag = "LIST_DBID_FOR_LOGICAL_SLOTS";
+			set_ps_display(cmdtag);
+			ListSlotDatabaseOIDs((ListDBForLogicalSlotsCmd *) cmd_node);
+			EndReplicationCommand(cmdtag);
+			break;
+
 		case T_StartReplicationCmd:
 			{
 				StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index 811ad94742..2cb8fd9ed5 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -54,3 +54,4 @@ XactTruncationLock					44
 WrapLimitsVacuumLock				46
 NotifyQueueTailLock					47
 WaitEventExtensionLock				48
+SlotSyncWorkerLock					49
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index f9e01e33b1..8d136d98a4 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -53,6 +53,7 @@ WAIT_EVENT_LOGICAL_APPLY_MAIN	LogicalApplyMain	"Waiting in main loop of logical
 WAIT_EVENT_LOGICAL_LAUNCHER_MAIN	LogicalLauncherMain	"Waiting in main loop of logical replication launcher process."
 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN	LogicalParallelApplyMain	"Waiting in main loop of logical replication parallel apply process."
 WAIT_EVENT_RECOVERY_WAL_STREAM	RecoveryWalStream	"Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
+WAIT_EVENT_REPL_SLOT_SYNC_MAIN	ReplSlotSyncMain	"Waiting in main loop of worker for synchronizing slots to a standby from primary."
 WAIT_EVENT_SYSLOGGER_MAIN	SysLoggerMain	"Waiting in main loop of syslogger process."
 WAIT_EVENT_WAL_RECEIVER_MAIN	WalReceiverMain	"Waiting in main loop of WAL receiver process."
 WAIT_EVENT_WAL_SENDER_MAIN	WalSenderMain	"Waiting in main loop of WAL sender process."
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index d72b6b95b6..0703673889 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -63,8 +63,11 @@
 #include "postmaster/syslogger.h"
 #include "postmaster/walwriter.h"
 #include "replication/logicallauncher.h"
+#include "replication/reorderbuffer.h"
 #include "replication/slot.h"
 #include "replication/syncrep.h"
+#include "replication/walreceiver.h"
+#include "replication/walsender.h"
 #include "storage/bufmgr.h"
 #include "storage/large_object.h"
 #include "storage/pg_shmem.h"
@@ -3507,6 +3510,19 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_slot_sync_workers",
+			PGC_SIGHUP,
+			REPLICATION_STANDBY,
+			gettext_noop("Maximum number of slots synchronization workers "
+						 "on a standby."),
+			NULL,
+		},
+		&max_slot_sync_workers,
+		2, 0, MAX_SLOT_SYNC_WORKER_LIMIT,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
@@ -4556,7 +4572,7 @@ struct config_string ConfigureNamesString[] =
 	 * standby, therefore, we might need a new group REPLICATION.
 	 */
 	{
-		{"synchronize_slot_names", PGC_SIGHUP, REPLICATION_STANDBY,
+		{"synchronize_slot_names", PGC_USERSET, REPLICATION_STANDBY,
 			gettext_noop("List of replication slot names to synchronize from "
 						 "primary to streaming replication standby server."),
 			gettext_noop("Value of \"*\" means all."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 63daf586f3..4e0ae87b54 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -359,6 +359,7 @@
 #recovery_min_apply_delay = 0		# minimum delay for applying changes during recovery
 #synchronize_slot_names = ''	# replication slot names to synchronize from
 					# primary to streaming replication standby server
+#max_slot_sync_workers = 2		# max number of slot synchronization workers
 
 # - Subscribers -
 
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index 214dc6c29e..0e77f9ee5c 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -17,6 +17,7 @@
 
 #include "catalog/objectaddress.h"
 #include "parser/parse_node.h"
+#include "replication/walreceiver.h"
 
 extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 										bool isTopLevel);
@@ -28,4 +29,6 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
 
 extern char defGetStreamingMode(DefElem *def);
 
+extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
+
 #endif							/* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 4321ba8f86..bc9c1baea1 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,15 @@ typedef struct IdentifySystemCmd
 	NodeTag		type;
 } IdentifySystemCmd;
 
+/* -------------------------------
+ *		LIST_DBID_FOR_LOGICAL_SLOTS command
+ * -------------------------------
+ */
+typedef struct ListDBForLogicalSlotsCmd
+{
+	NodeTag		type;
+	List	   *slot_names;
+} ListDBForLogicalSlotsCmd;
 
 /* ----------------------
  *		BASE_BACKUP command
diff --git a/src/include/postmaster/bgworker_internals.h b/src/include/postmaster/bgworker_internals.h
index 4ad63fd9bd..19c5421a55 100644
--- a/src/include/postmaster/bgworker_internals.h
+++ b/src/include/postmaster/bgworker_internals.h
@@ -22,6 +22,7 @@
  * Maximum possible value of parallel workers.
  */
 #define MAX_PARALLEL_WORKER_LIMIT 1024
+#define MAX_SLOT_SYNC_WORKER_LIMIT 50
 
 /*
  * List of background workers, private to postmaster.
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index a07c9cb311..690f3deebd 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -15,6 +15,8 @@
 extern PGDLLIMPORT int max_logical_replication_workers;
 extern PGDLLIMPORT int max_sync_workers_per_subscription;
 extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_slot_sync_workers;
+
 
 extern void ApplyLauncherRegister(void);
 extern void ApplyLauncherMain(Datum main_arg);
@@ -31,4 +33,6 @@ extern bool IsLogicalLauncher(void);
 
 extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
 
+extern PGDLLIMPORT char *PrimaryConnInfo;
+
 #endif							/* LOGICALLAUNCHER_H */
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index bbd71d0b42..e1af29af4a 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -19,6 +19,7 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
 extern void ApplyWorkerMain(Datum main_arg);
 extern void ParallelApplyWorkerMain(Datum main_arg);
 extern void TablesyncWorkerMain(Datum main_arg);
+extern void ReplSlotSyncMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 extern bool IsLogicalParallelApplyWorker(void);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 2765f99ccf..0c494596cc 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,7 +15,6 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
-#include "replication/walreceiver.h"
 
 /*
  * Behaviour of replication slots, upon release or crash.
@@ -240,7 +239,6 @@ extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_l
 extern int	ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
 extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot);
-extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 281626fa6f..3774cbc450 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -20,6 +20,7 @@
 #include "pgtime.h"
 #include "port/atomics.h"
 #include "replication/logicalproto.h"
+#include "replication/slot.h"
 #include "replication/walsender.h"
 #include "storage/condition_variable.h"
 #include "storage/latch.h"
@@ -191,6 +192,17 @@ typedef struct
 	}			proto;
 } WalRcvStreamOptions;
 
+/*
+ * Slot information receiver from remote.
+ *
+ * Currently same as ReplicationSlotPersistentData except last_sync_time
+ */
+typedef struct WalRecvReplicationSlotDbData
+{
+	Oid database;
+	TimestampTz last_sync_time;
+} WalRecvReplicationSlotDbData;
+
 struct WalReceiverConn;
 typedef struct WalReceiverConn WalReceiverConn;
 
@@ -280,6 +292,11 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
 typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
 											TimeLineID *primary_tli);
 
+/*
+ * TODO
+ */
+typedef List *(*walrcv_list_db_for_logical_slots_fn) (WalReceiverConn *conn, const char *slots);
+
 /*
  * walrcv_server_version_fn
  *
@@ -393,6 +410,7 @@ typedef struct WalReceiverFunctionsType
 	walrcv_get_conninfo_fn walrcv_get_conninfo;
 	walrcv_get_senderinfo_fn walrcv_get_senderinfo;
 	walrcv_identify_system_fn walrcv_identify_system;
+	walrcv_list_db_for_logical_slots_fn walrcv_list_db_for_logical_slots;
 	walrcv_server_version_fn walrcv_server_version;
 	walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
 	walrcv_startstreaming_fn walrcv_startstreaming;
@@ -417,6 +435,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
 #define walrcv_identify_system(conn, primary_tli) \
 	WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
+#define walrcv_list_db_for_logical_slots(conn, slots) \
+	WalReceiverFunctions->walrcv_list_db_for_logical_slots(conn, slots)
 #define walrcv_server_version(conn) \
 	WalReceiverFunctions->walrcv_server_version(conn)
 #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index a428663859..3fdee5b45a 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -77,7 +77,7 @@ typedef struct LogicalRepWorker
 	 * would be created for each transaction which will be deleted after the
 	 * transaction is finished.
 	 */
-	FileSet    *stream_fileset;
+	struct FileSet    *stream_fileset;
 
 	/*
 	 * PID of leader apply worker if this slot is used for a parallel apply
@@ -96,6 +96,43 @@ typedef struct LogicalRepWorker
 	TimestampTz reply_time;
 } LogicalRepWorker;
 
+#define PG_SLOT_SYNC_SHM_MAGIC 			0x797ca067
+#define SLOT_SYNC_DBIDS_KEY_SHARED               1
+
+typedef struct SlotSyncWorker
+{
+	/* Time at which this worker was launched. */
+	TimestampTz launch_time;
+
+	/* Indicates if this slot is used or free. */
+	bool		in_use;
+
+	/* slot in worker pool to which it is attached */
+	int 		slot;
+
+	/* Increased every time the slot is taken by new worker. */
+	uint16		generation;
+
+	/* Pointer to proc array. NULL if not running. */
+	PGPROC	   *proc;
+
+	/* User to use for connection (will be same as owner of subscription). */
+	Oid			userid;
+
+	/* Database id to connect to. */
+	Oid			dbid;
+
+	dsm_segment *dsm_seg;
+
+	slock_t         mutex;
+
+	/* Count of Database ids it manages */
+	uint32			dbcount;
+
+	/* Database ids it manages */
+	Oid		*dbids;
+} SlotSyncWorker;
+
 /*
  * State of the transaction in parallel apply worker.
  *
@@ -234,12 +271,14 @@ extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
 /* Worker and subscription objects. */
 extern PGDLLIMPORT Subscription *MySubscription;
 extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
+extern PGDLLIMPORT SlotSyncWorker *MySlotSyncWorker;
 
 extern PGDLLIMPORT bool in_remote_transaction;
 
 extern PGDLLIMPORT bool InitializingApplyWorker;
 
 extern void logicalrep_worker_attach(int slot);
+extern void slotsync_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index ee590eeac7..ca043d2009 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -44,6 +44,7 @@ tests += {
       't/036_truncated_dropped.pl',
       't/037_invalid_database.pl',
       't/050_verify_slot_order.pl',
+      't/051_slot_sync.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/051_slot_sync.pl b/src/test/recovery/t/051_slot_sync.pl
new file mode 100644
index 0000000000..febe4e3db8
--- /dev/null
+++ b/src/test/recovery/t/051_slot_sync.pl
@@ -0,0 +1,132 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_phys_standby = PostgreSQL::Test::Cluster->new('phys_standby');
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+	my ($node, $pat, $off) = @_;
+
+	$off = 0 unless defined $off;
+	my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+	return 0 if (length($log) <= $off);
+
+	$log = substr($log, $off);
+
+	return $log =~ m/$pat/;
+}
+
+# Check invalidation in the logfile
+sub check_for_invalidation
+{
+	my ($log_start, $test_name) = @_;
+
+	# message should be issued
+	ok( find_in_log(
+		$node_phys_standby,
+        "invalidating obsolete replication slot \"sub1\"", $log_start),
+        "sub1 slot invalidation is logged $test_name");
+}
+
+# Check conflicting status in pg_replication_slots.
+sub check_slots_conflicting_status
+{
+	my $res = $node_phys_standby->safe_psql(
+				'postgres', qq(
+				select bool_and(conflicting) from pg_replication_slots;));
+
+	is($res, 't',
+		"Logical slot is reported as conflicting");
+}
+
+$node_primary->init(allows_streaming => 'logical');
+$node_primary->append_conf('postgresql.conf', q{
+synchronize_slot_names = '*'
+standby_slot_names = 'pslot1'
+});
+$node_primary->start;
+$node_primary->psql('postgres', q{SELECT pg_create_physical_replication_slot('pslot1');});
+
+$node_primary->backup('backup');
+
+$node_phys_standby->init_from_backup($node_primary, 'backup', has_streaming => 1);
+$node_phys_standby->append_conf('postgresql.conf', q{
+synchronize_slot_names = '*'
+primary_slot_name = 'pslot1'
+hot_standby_feedback = off
+});
+$node_phys_standby->start;
+
+$node_primary->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)");
+$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (1), (2), (3)");
+
+# Some tests need to wait for VACUUM to be replayed. But vacuum does not flush
+# WAL. An insert into flush_wal outside transaction does guarantee a flush.
+$node_primary->psql('postgres', q[CREATE TABLE flush_wal();]);
+
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+$node_subscriber->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)");
+
+$node_primary->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR TABLE t1");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub1 CONNECTION '" . ($node_primary->connstr . ' dbname=postgres') . "' PUBLICATION pub1");
+
+# Wait for initial sync of all subscriptions
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result = $node_primary->safe_psql('postgres',
+	"SELECT slot_name, plugin, database FROM pg_replication_slots WHERE slot_type = 'logical'");
+
+is($result, qq(sub1|pgoutput|postgres), 'logical slot on primary');
+
+# FIXME: standby needs restart to pick up new slots
+$node_phys_standby->restart;
+sleep 3;
+
+$result = $node_phys_standby->safe_psql('postgres',
+	"SELECT slot_name, plugin, database FROM pg_replication_slots");
+
+is($result, qq(sub1|pgoutput|postgres), 'logical slot on standby');
+
+$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (4), (5), (6)");
+$node_primary->wait_for_catchup('sub1');
+
+$node_primary->wait_for_catchup($node_phys_standby->name);
+
+# Logical subscriber and physical replica are caught up at this point.
+
+# Drop the subscription so that catalog_xmin is unknown on the primary
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
+
+# This should trigger a conflict as hot_standby_feedback is off on the standby
+$node_primary->safe_psql('postgres', qq[
+  CREATE TABLE conflict_test(x integer, y text);
+  DROP TABLE conflict_test;
+  VACUUM full pg_class;
+  INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
+]);
+
+# Ensure physical replay catches up
+$node_primary->wait_for_catchup($node_phys_standby);
+
+# Check invalidation in the logfile
+check_for_invalidation(1, 'with vacuum FULL on pg_class');
+
+# Check conflicting status in pg_replication_slots.
+check_slots_conflicting_status();
+
+done_testing();
-- 
2.34.1

