Hi,

On 4/14/23 3:22 PM, Drouvot, Bertrand wrote:
Now that the "Minimal logical decoding on standby" patch series (mentioned 
up-thread) has been
committed, I think we can resume working on this one ("Synchronizing slots from 
primary to standby").

I'll work on a rebase and share it once done (unless someone already started 
working on a rebase).


Please find attached V5 (a rebase of V4 posted up-thread).

In addition to the "rebasing" work, the TAP test adds a test about conflict 
handling (logical slot invalidation)
relying on the work done in the "Minimal logical decoding on standby" patch 
series.

I did not look more at the patch (than what's was needed for the rebase) but 
plan to do so.

Regards,

--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 655359eedf37d8f2e522aeb1ec8c48adfc1759b1 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot...@gmail.com>
Date: Thu, 13 Apr 2023 11:32:28 +0000
Subject: [PATCH v5] Synchronize logical replication slots from primary to
 standby

---
 doc/src/sgml/config.sgml                      |  34 ++
 src/backend/commands/subscriptioncmds.c       |   4 +-
 src/backend/postmaster/bgworker.c             |   3 +
 .../libpqwalreceiver/libpqwalreceiver.c       |  95 ++++
 src/backend/replication/logical/Makefile      |   1 +
 src/backend/replication/logical/launcher.c    | 263 +++++++----
 src/backend/replication/logical/meson.build   |   1 +
 .../replication/logical/reorderbuffer.c       |  86 ++++
 src/backend/replication/logical/slotsync.c    | 413 ++++++++++++++++++
 src/backend/replication/logical/tablesync.c   |  13 +-
 src/backend/replication/logical/worker.c      |   3 +-
 src/backend/replication/repl_gram.y           |  32 +-
 src/backend/replication/repl_scanner.l        |   2 +
 src/backend/replication/slotfuncs.c           |   2 +-
 src/backend/replication/walsender.c           | 195 +++++++++
 src/backend/utils/activity/wait_event.c       |   3 +
 src/backend/utils/misc/guc_tables.c           |  26 ++
 src/backend/utils/misc/postgresql.conf.sample |   2 +
 src/include/commands/subscriptioncmds.h       |   3 +
 src/include/nodes/replnodes.h                 |   9 +
 src/include/replication/logicallauncher.h     |   2 +
 src/include/replication/logicalworker.h       |   9 +
 src/include/replication/slot.h                |   5 +-
 src/include/replication/walreceiver.h         |  20 +
 src/include/replication/worker_internal.h     |   8 +-
 src/include/utils/wait_event.h                |   1 +
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/037_slot_sync.pl          | 130 ++++++
 28 files changed, 1272 insertions(+), 94 deletions(-)
   3.8% doc/src/sgml/
   7.1% src/backend/replication/libpqwalreceiver/
  54.7% src/backend/replication/logical/
  14.9% src/backend/replication/
   3.3% src/backend/
   4.0% src/include/replication/
  10.9% src/test/recovery/t/

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 091a79d4f3..1360885208 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4466,6 +4466,23 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" 
"%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-standby-slot-names" xreflabel="standby_slot_names">
+      <term><varname>standby_slot_names</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>standby_slot_names</varname> configuration 
parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        List of physical replication slots that logical replication waits for.
+        If a logical replication connection is meant to switch to a physical
+        standby after the standby is promoted, the physical replication slot
+        for the standby should be listed here.  This ensures that logical
+        replication is not ahead of the physical standby.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
@@ -4649,6 +4666,23 @@ ANY <replaceable 
class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-synchronize_slot_names" 
xreflabel="synchronize_slot_names">
+      <term><varname>synchronize_slot_names</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>synchronize_slot_names</varname> configuration 
parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies a list of logical replication slots that a physical standby
+        should synchronize from the primary server.  This is necessary to be
+        able to retarget those logical replication connections to this standby
+        if it gets promoted.  Specify <literal>*</literal> to synchronize all
+        logical replication slots.  The default is empty.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
diff --git a/src/backend/commands/subscriptioncmds.c 
b/src/backend/commands/subscriptioncmds.c
index 3251d89ba8..8721706b79 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -991,7 +991,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 
                                RemoveSubscriptionRel(sub->oid, relid);
 
-                               logicalrep_worker_stop(sub->oid, relid);
+                               logicalrep_worker_stop(MyDatabaseId, sub->oid, 
relid);
 
                                /*
                                 * For READY state, we would have already 
dropped the
@@ -1589,7 +1589,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool 
isTopLevel)
        {
                LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-               logicalrep_worker_stop(w->subid, w->relid);
+               logicalrep_worker_stop(w->dbid, w->subid, w->relid);
        }
        list_free(subworkers);
 
diff --git a/src/backend/postmaster/bgworker.c 
b/src/backend/postmaster/bgworker.c
index 0dd22b2351..a89d1f10a1 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -129,6 +129,9 @@ static const struct
        {
                "ApplyWorkerMain", ApplyWorkerMain
        },
+       {
+               "ReplSlotSyncMain", ReplSlotSyncMain
+       },
        {
                "ParallelApplyWorkerMain", ParallelApplyWorkerMain
        }
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c 
b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 052505e46f..4f7417c49a 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -34,6 +34,7 @@
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
 #include "utils/tuplestore.h"
+#include "utils/varlena.h"
 
 PG_MODULE_MAGIC;
 
@@ -58,6 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
                                                                        char 
**sender_host, int *sender_port);
 static char *libpqrcv_identify_system(WalReceiverConn *conn,
                                                                          
TimeLineID *primary_tli);
+static List *libpqrcv_list_slots(WalReceiverConn *conn, const char 
*slot_names);
 static int     libpqrcv_server_version(WalReceiverConn *conn);
 static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
                                                                                
         TimeLineID tli, char **filename,
@@ -96,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
        .walrcv_receive = libpqrcv_receive,
        .walrcv_send = libpqrcv_send,
        .walrcv_create_slot = libpqrcv_create_slot,
+       .walrcv_list_slots = libpqrcv_list_slots,
        .walrcv_get_backend_pid = libpqrcv_get_backend_pid,
        .walrcv_exec = libpqrcv_exec,
        .walrcv_disconnect = libpqrcv_disconnect
@@ -409,6 +412,98 @@ libpqrcv_server_version(WalReceiverConn *conn)
        return PQserverVersion(conn->streamConn);
 }
 
+/*
+ * Get list of slots from primary.
+ */
+static List *
+libpqrcv_list_slots(WalReceiverConn *conn, const char *slot_names)
+{
+       PGresult   *res;
+       List       *slotlist = NIL;
+       int                     ntuples;
+       StringInfoData s;
+       WalRecvReplicationSlotData *slot_data;
+
+       initStringInfo(&s);
+       appendStringInfoString(&s, "LIST_SLOTS");
+
+       if (strcmp(slot_names, "") != 0 && strcmp(slot_names, "*") != 0)
+       {
+               char       *rawname;
+               List       *namelist;
+               ListCell   *lc;
+
+               appendStringInfoChar(&s, ' ');
+               rawname = pstrdup(slot_names);
+               SplitIdentifierString(rawname, ',', &namelist);
+               foreach (lc, namelist)
+               {
+                       if (lc != list_head(namelist))
+                               appendStringInfoChar(&s, ',');
+                       appendStringInfo(&s, "%s",
+                                                        
quote_identifier(lfirst(lc)));
+               }
+       }
+
+       res = libpqrcv_PQexec(conn->streamConn, s.data);
+       pfree(s.data);
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       {
+               PQclear(res);
+               ereport(ERROR,
+                               (errmsg("could not receive list of slots the 
primary server: %s",
+                                               
pchomp(PQerrorMessage(conn->streamConn)))));
+       }
+       if (PQnfields(res) < 10)
+       {
+               int                     nfields = PQnfields(res);
+
+               PQclear(res);
+               ereport(ERROR,
+                               (errmsg("invalid response from primary server"),
+                                errdetail("Could not get list of slots: got %d 
fields, expected %d or more fields.",
+                                                  nfields, 10)));
+       }
+
+       ntuples = PQntuples(res);
+       for (int i = 0; i < ntuples; i++)
+       {
+               char       *slot_type;
+
+               slot_data = palloc0(sizeof(WalRecvReplicationSlotData));
+               namestrcpy(&slot_data->persistent_data.name, PQgetvalue(res, i, 
0));
+               if (!PQgetisnull(res, i, 1))
+                       namestrcpy(&slot_data->persistent_data.plugin, 
PQgetvalue(res, i, 1));
+               slot_type = PQgetvalue(res, i, 2);
+               if (!PQgetisnull(res, i, 3))
+                       slot_data->persistent_data.database = 
atooid(PQgetvalue(res, i, 3));
+               if (strcmp(slot_type, "physical") == 0)
+               {
+                       if (OidIsValid(slot_data->persistent_data.database))
+                               elog(ERROR, "unexpected physical replication 
slot with database set");
+               }
+               if (pg_strtoint32(PQgetvalue(res, i, 5)) == 1)
+                       slot_data->persistent_data.persistency = RS_TEMPORARY;
+               else
+                       slot_data->persistent_data.persistency = RS_PERSISTENT;
+               if (!PQgetisnull(res, i, 6))
+                       slot_data->persistent_data.xmin = 
atooid(PQgetvalue(res, i, 6));
+               if (!PQgetisnull(res, i, 7))
+                       slot_data->persistent_data.catalog_xmin = 
atooid(PQgetvalue(res, i, 7));
+               if (!PQgetisnull(res, i, 8))
+                       slot_data->persistent_data.restart_lsn = 
strtou64(PQgetvalue(res, i, 8), NULL, 10);
+               if (!PQgetisnull(res, i, 9))
+                       slot_data->persistent_data.confirmed_flush = 
strtou64(PQgetvalue(res, i, 9), NULL, 10);
+
+               slot_data->last_sync_time = 0;
+               slotlist = lappend(slotlist, slot_data);
+       }
+
+       PQclear(res);
+
+       return slotlist;
+}
+
 /*
  * Start streaming WAL data from given streaming options.
  *
diff --git a/src/backend/replication/logical/Makefile 
b/src/backend/replication/logical/Makefile
index 2dc25e37bb..ba03eeff1c 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -25,6 +25,7 @@ OBJS = \
        proto.o \
        relation.o \
        reorderbuffer.o \
+       slotsync.o \
        snapbuild.o \
        tablesync.o \
        worker.o
diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 970d170e73..14af724639 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -22,6 +22,7 @@
 #include "access/htup_details.h"
 #include "access/tableam.h"
 #include "access/xact.h"
+#include "catalog/pg_authid.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "funcapi.h"
@@ -246,7 +247,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
  * We are only interested in the leader apply worker or table sync worker.
  */
 LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(Oid dbid, Oid subid, Oid relid, bool only_running)
 {
        int                     i;
        LogicalRepWorker *res = NULL;
@@ -262,8 +263,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool 
only_running)
                if (isParallelApplyWorker(w))
                        continue;
 
-               if (w->in_use && w->subid == subid && w->relid == relid &&
-                       (!only_running || w->proc))
+               if (w->in_use && w->dbid == dbid && w->subid == subid &&
+                       w->relid == relid && (!only_running || w->proc))
                {
                        res = w;
                        break;
@@ -320,9 +321,13 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char 
*subname, Oid userid,
        /* Sanity check - tablesync worker cannot be a subworker */
        Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
 
-       ereport(DEBUG1,
-                       (errmsg_internal("starting logical replication worker 
for subscription \"%s\"",
-                                                        subname)));
+       if (OidIsValid(subid))
+               ereport(DEBUG1,
+                               (errmsg_internal("starting logical replication 
worker for subscription \"%s\"",
+                                                                subname)));
+       else
+               ereport(DEBUG1,
+                               (errmsg_internal("starting replication slot 
synchronization worker")));
 
        /* Report this after the initial starting message for consistency. */
        if (max_replication_slots == 0)
@@ -359,7 +364,9 @@ retry:
         * reason we do this is because if some worker failed to start up and 
its
         * parent has crashed while waiting, the in_use state was never cleared.
         */
-       if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+       if (worker == NULL ||
+               (OidIsValid(relid) &&
+                nsyncworkers >= max_sync_workers_per_subscription))
        {
                bool            did_cleanup = false;
 
@@ -455,15 +462,20 @@ retry:
        memset(&bgw, 0, sizeof(bgw));
        bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
                BGWORKER_BACKEND_DATABASE_CONNECTION;
-       bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+       bgw.bgw_start_time = BgWorkerStart_ConsistentState;
        snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
 
-       if (is_parallel_apply_worker)
+       if (!OidIsValid(subid))
+               snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncMain");
+       else if (is_parallel_apply_worker)
                snprintf(bgw.bgw_function_name, BGW_MAXLEN, 
"ParallelApplyWorkerMain");
        else
                snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
 
-       if (OidIsValid(relid))
+       if (!OidIsValid(subid))
+               snprintf(bgw.bgw_name, BGW_MAXLEN,
+                                "replication slot synchronization worker");
+       else if (OidIsValid(relid))
                snprintf(bgw.bgw_name, BGW_MAXLEN,
                                 "logical replication worker for subscription 
%u sync %u", subid, relid);
        else if (is_parallel_apply_worker)
@@ -591,13 +603,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, 
int signo)
  * Stop the logical replication worker for subid/relid, if any.
  */
 void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid)
 {
        LogicalRepWorker *worker;
 
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-       worker = logicalrep_worker_find(subid, relid, false);
+       worker = logicalrep_worker_find(dbid, subid, relid, false);
 
        if (worker)
        {
@@ -640,13 +652,13 @@ logicalrep_pa_worker_stop(int slot_no, uint16 generation)
  * Wake up (using latch) any logical replication worker for specified sub/rel.
  */
 void
-logicalrep_worker_wakeup(Oid subid, Oid relid)
+logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid)
 {
        LogicalRepWorker *worker;
 
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-       worker = logicalrep_worker_find(subid, relid, true);
+       worker = logicalrep_worker_find(dbid, subid, relid, true);
 
        if (worker)
                logicalrep_worker_wakeup_ptr(worker);
@@ -888,7 +900,7 @@ ApplyLauncherRegister(void)
        memset(&bgw, 0, sizeof(bgw));
        bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
                BGWORKER_BACKEND_DATABASE_CONNECTION;
-       bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+       bgw.bgw_start_time = BgWorkerStart_ConsistentState;
        snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
        snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
        snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -1071,6 +1083,157 @@ ApplyLauncherWakeup(void)
                kill(LogicalRepCtx->launcher_pid, SIGUSR1);
 }
 
+static void
+ApplyLauncherStartSlotSync(long *wait_time)
+{
+       WalReceiverConn *wrconn;
+       char       *err;
+       List       *slots;
+       ListCell   *lc;
+       MemoryContext tmpctx;
+       MemoryContext oldctx;
+
+       if (strcmp(synchronize_slot_names, "") == 0)
+               return;
+
+       wrconn = walrcv_connect(PrimaryConnInfo, false, false,
+                                                       "Logical Replication 
Launcher", &err);
+       if (!wrconn)
+               ereport(ERROR,
+                               (errmsg("could not connect to the primary 
server: %s", err)));
+
+       /* Use temporary context for the slot list and worker info. */
+       tmpctx = AllocSetContextCreate(TopMemoryContext,
+                                                                  "Logical 
Replication Launcher slot sync ctx",
+                                                                  
ALLOCSET_DEFAULT_SIZES);
+       oldctx = MemoryContextSwitchTo(tmpctx);
+
+       slots = walrcv_list_slots(wrconn, synchronize_slot_names);
+
+       foreach(lc, slots)
+       {
+               WalRecvReplicationSlotData *slot_data = lfirst(lc);
+               LogicalRepWorker *w;
+               TimestampTz last_sync;
+               TimestampTz     now;
+               long            elapsed;
+
+               if (!OidIsValid(slot_data->persistent_data.database))
+                       continue;
+
+               LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+               w = logicalrep_worker_find(slot_data->persistent_data.database, 
InvalidOid,
+                                                                  InvalidOid, 
false);
+               LWLockRelease(LogicalRepWorkerLock);
+
+               if (w != NULL)
+                       continue;               /* worker is running already */
+
+               /*
+                * If the worker is eligible to start now, launch it.  
Otherwise,
+                * adjust wait_time so that we'll wake up as soon as it can be
+                * started.
+                *
+                * Each apply worker can only be restarted once per
+                * wal_retrieve_retry_interval, so that errors do not cause us 
to
+                * repeatedly restart the worker as fast as possible.
+                */
+               last_sync = slot_data->last_sync_time;
+               now = GetCurrentTimestamp();
+               if (last_sync == 0 ||
+                       (elapsed = TimestampDifferenceMilliseconds(last_sync, 
now)) >= wal_retrieve_retry_interval)
+               {
+                       slot_data->last_sync_time = now;
+                       
logicalrep_worker_launch(slot_data->persistent_data.database,
+                                                                        
InvalidOid, NULL,
+                                                                        
BOOTSTRAP_SUPERUSERID, InvalidOid,
+                                                                        
DSM_HANDLE_INVALID);
+               }
+               else
+               {
+                       *wait_time = Min(*wait_time,
+                                                       
wal_retrieve_retry_interval - elapsed);
+               }
+       }
+
+       /* Switch back to original memory context. */
+       MemoryContextSwitchTo(oldctx);
+       /* Clean the temporary memory. */
+       MemoryContextDelete(tmpctx);
+
+       walrcv_disconnect(wrconn);
+}
+
+static void
+ApplyLauncherStartSubs(long *wait_time)
+{
+       List       *sublist;
+       ListCell   *lc;
+       MemoryContext subctx;
+       MemoryContext oldctx;
+
+       /* Use temporary context to avoid leaking memory across cycles. */
+       subctx = AllocSetContextCreate(TopMemoryContext,
+                                                                  "Logical 
Replication Launcher sublist",
+                                                                  
ALLOCSET_DEFAULT_SIZES);
+       oldctx = MemoryContextSwitchTo(subctx);
+
+       /* Start the missing workers for enabled subscriptions. */
+       sublist = get_subscription_list();
+       foreach(lc, sublist)
+       {
+               Subscription *sub = (Subscription *) lfirst(lc);
+               LogicalRepWorker *w;
+               TimestampTz last_start;
+               TimestampTz now;
+               long            elapsed;
+
+               if (!sub->enabled)
+                       continue;
+
+               LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+               w = logicalrep_worker_find(sub->dbid, sub->oid, InvalidOid, 
false);
+               LWLockRelease(LogicalRepWorkerLock);
+
+               if (w != NULL)
+                       continue;               /* worker is running already */
+
+               /*
+                * If the worker is eligible to start now, launch it.  
Otherwise,
+                * adjust wait_time so that we'll wake up as soon as it can be
+                * started.
+                *
+                * Each subscription's apply worker can only be restarted once 
per
+                * wal_retrieve_retry_interval, so that errors do not cause us 
to
+                * repeatedly restart the worker as fast as possible.  In cases
+                * where a restart is expected (e.g., subscription parameter
+                * changes), another process should remove the last-start entry
+                * for the subscription so that the worker can be restarted
+                * without waiting for wal_retrieve_retry_interval to elapse.
+                */
+               last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
+               now = GetCurrentTimestamp();
+               if (last_start == 0 ||
+                       (elapsed = TimestampDifferenceMilliseconds(last_start, 
now)) >= wal_retrieve_retry_interval)
+               {
+                       ApplyLauncherSetWorkerStartTime(sub->oid, now);
+                       logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+                                                                        
sub->owner, InvalidOid,
+                                                                        
DSM_HANDLE_INVALID);
+               }
+               else
+               {
+                       *wait_time = Min(*wait_time,
+                                                       
wal_retrieve_retry_interval - elapsed);
+               }
+       }
+
+       /* Switch back to original memory context. */
+       MemoryContextSwitchTo(oldctx);
+       /* Clean the temporary memory. */
+       MemoryContextDelete(subctx);
+}
+
 /*
  * Main loop for the apply launcher process.
  */
@@ -1096,78 +1259,20 @@ ApplyLauncherMain(Datum main_arg)
         */
        BackgroundWorkerInitializeConnection(NULL, NULL, 0);
 
+       load_file("libpqwalreceiver", false);
+
        /* Enter main loop */
        for (;;)
        {
                int                     rc;
-               List       *sublist;
-               ListCell   *lc;
-               MemoryContext subctx;
-               MemoryContext oldctx;
                long            wait_time = DEFAULT_NAPTIME_PER_CYCLE;
 
                CHECK_FOR_INTERRUPTS();
 
-               /* Use temporary context to avoid leaking memory across cycles. 
*/
-               subctx = AllocSetContextCreate(TopMemoryContext,
-                                                                          
"Logical Replication Launcher sublist",
-                                                                          
ALLOCSET_DEFAULT_SIZES);
-               oldctx = MemoryContextSwitchTo(subctx);
-
-               /* Start any missing workers for enabled subscriptions. */
-               sublist = get_subscription_list();
-               foreach(lc, sublist)
-               {
-                       Subscription *sub = (Subscription *) lfirst(lc);
-                       LogicalRepWorker *w;
-                       TimestampTz last_start;
-                       TimestampTz now;
-                       long            elapsed;
-
-                       if (!sub->enabled)
-                               continue;
-
-                       LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-                       w = logicalrep_worker_find(sub->oid, InvalidOid, false);
-                       LWLockRelease(LogicalRepWorkerLock);
-
-                       if (w != NULL)
-                               continue;               /* worker is running 
already */
-
-                       /*
-                        * If the worker is eligible to start now, launch it.  
Otherwise,
-                        * adjust wait_time so that we'll wake up as soon as it 
can be
-                        * started.
-                        *
-                        * Each subscription's apply worker can only be 
restarted once per
-                        * wal_retrieve_retry_interval, so that errors do not 
cause us to
-                        * repeatedly restart the worker as fast as possible.  
In cases
-                        * where a restart is expected (e.g., subscription 
parameter
-                        * changes), another process should remove the 
last-start entry
-                        * for the subscription so that the worker can be 
restarted
-                        * without waiting for wal_retrieve_retry_interval to 
elapse.
-                        */
-                       last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
-                       now = GetCurrentTimestamp();
-                       if (last_start == 0 ||
-                               (elapsed = 
TimestampDifferenceMilliseconds(last_start, now)) >= 
wal_retrieve_retry_interval)
-                       {
-                               ApplyLauncherSetWorkerStartTime(sub->oid, now);
-                               logicalrep_worker_launch(sub->dbid, sub->oid, 
sub->name,
-                                                                               
 sub->owner, InvalidOid,
-                                                                               
 DSM_HANDLE_INVALID);
-                       }
-                       else
-                       {
-                               wait_time = Min(wait_time,
-                                                               
wal_retrieve_retry_interval - elapsed);
-                       }
-               }
-
-               /* Switch back to original memory context. */
-               MemoryContextSwitchTo(oldctx);
-               /* Clean the temporary memory. */
-               MemoryContextDelete(subctx);
+               if (!RecoveryInProgress())
+                       ApplyLauncherStartSubs(&wait_time);
+               else
+                       ApplyLauncherStartSlotSync(&wait_time);
 
                /* Wait for more work. */
                rc = WaitLatch(MyLatch,
diff --git a/src/backend/replication/logical/meson.build 
b/src/backend/replication/logical/meson.build
index d48cd4c590..9e52ec421f 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -11,6 +11,7 @@ backend_sources += files(
   'proto.c',
   'relation.c',
   'reorderbuffer.c',
+  'slotsync.c',
   'snapbuild.c',
   'tablesync.c',
   'worker.c',
diff --git a/src/backend/replication/logical/reorderbuffer.c 
b/src/backend/replication/logical/reorderbuffer.c
index 9f44974473..1519b0ec64 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -95,11 +95,14 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "replication/logical.h"
+#include "replication/logicalworker.h"
 #include "replication/reorderbuffer.h"
 #include "replication/slot.h"
 #include "replication/snapbuild.h"     /* just for SnapBuildSnapDecRefcount */
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
 #include "storage/sinval.h"
 #include "utils/builtins.h"
 #include "utils/combocid.h"
@@ -107,6 +110,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/relfilenumbermap.h"
+#include "utils/varlena.h"
 
 
 /* entry for a hash table we use to map from xid to our transaction state */
@@ -2053,6 +2057,85 @@ ReorderBufferResetTXN(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
        }
 }
 
+static void
+wait_for_standby_confirmation(XLogRecPtr commit_lsn)
+{
+       char       *rawname;
+       List       *namelist;
+       ListCell   *lc;
+       XLogRecPtr      flush_pos = InvalidXLogRecPtr;
+
+       if (strcmp(standby_slot_names, "") == 0)
+               return;
+
+       rawname = pstrdup(standby_slot_names);
+       SplitIdentifierString(rawname, ',', &namelist);
+
+       while (true)
+       {
+               int                     wait_slots_remaining;
+               XLogRecPtr      oldest_flush_pos = InvalidXLogRecPtr;
+               int                     rc;
+
+               wait_slots_remaining = list_length(namelist);
+
+               LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+               for (int i = 0; i < max_replication_slots; i++)
+               {
+                       ReplicationSlot *s = 
&ReplicationSlotCtl->replication_slots[i];
+                       bool            inlist;
+
+                       if (!s->in_use)
+                               continue;
+
+                       inlist = false;
+                       foreach (lc, namelist)
+                       {
+                               char *name = lfirst(lc);
+                               if (strcmp(name, NameStr(s->data.name)) == 0)
+                               {
+                                       inlist = true;
+                                       break;
+                               }
+                       }
+                       if (!inlist)
+                               continue;
+
+                       SpinLockAcquire(&s->mutex);
+
+                       if (s->data.database == InvalidOid)
+                               /* Physical slots advance restart_lsn on flush 
and ignore confirmed_flush_lsn */
+                               flush_pos = s->data.restart_lsn;
+                       else
+                               /* For logical slots we must wait for commit 
and flush */
+                               flush_pos = s->data.confirmed_flush;
+
+                       SpinLockRelease(&s->mutex);
+
+                       /* We want to find out the min(flush pos) over all 
named slots */
+                       if (oldest_flush_pos == InvalidXLogRecPtr
+                               || oldest_flush_pos > flush_pos)
+                               oldest_flush_pos = flush_pos;
+
+                       if (flush_pos >= commit_lsn && wait_slots_remaining > 0)
+                               wait_slots_remaining --;
+               }
+               LWLockRelease(ReplicationSlotControlLock);
+
+               if (wait_slots_remaining == 0)
+                       return;
+
+               rc = WaitLatch(MyLatch,
+                                          WL_LATCH_SET | WL_TIMEOUT | 
WL_POSTMASTER_DEATH,
+                                          1000L, PG_WAIT_EXTENSION);
+
+               if (rc & WL_POSTMASTER_DEATH)
+                       proc_exit(1);
+
+               CHECK_FOR_INTERRUPTS();
+       }
+}
+
 /*
  * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
  *
@@ -2502,6 +2585,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
                         * Call either PREPARE (for two-phase transactions) or 
COMMIT (for
                         * regular ones).
                         */
+
+                       wait_for_standby_confirmation(commit_lsn);
+
                        if (rbtxn_prepared(txn))
                                rb->prepare(rb, txn, commit_lsn);
                        else
diff --git a/src/backend/replication/logical/slotsync.c 
b/src/backend/replication/logical/slotsync.c
new file mode 100644
index 0000000000..529ddb21ae
--- /dev/null
+++ b/src/backend/replication/logical/slotsync.c
@@ -0,0 +1,413 @@
+/*-------------------------------------------------------------------------
+ * slotsync.c
+ *        PostgreSQL worker for synchronizing slots to a standby from primary
+ *
+ * Copyright (c) 2016-2018, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       src/backend/replication/logical/slotsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "commands/dbcommands.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
+#include "replication/walreceiver.h"
+#include "replication/worker_internal.h"
+#include "storage/ipc.h"
+#include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/varlena.h"
+
+char      *synchronize_slot_names;
+char      *standby_slot_names;
+
+/*
+ * Wait for remote slot to pass localy reserved position.
+ */
+static void
+wait_for_primary_slot_catchup(WalReceiverConn *wrconn, char *slot_name,
+                                                         XLogRecPtr min_lsn)
+{
+       WalRcvExecResult *res;
+       TupleTableSlot *slot;
+       Oid                     slotRow[1] = {LSNOID};
+       StringInfoData cmd;
+       bool            isnull;
+       XLogRecPtr      restart_lsn;
+
+       for (;;)
+       {
+               int                     rc;
+
+               CHECK_FOR_INTERRUPTS();
+
+               initStringInfo(&cmd);
+               appendStringInfo(&cmd,
+                                                "SELECT restart_lsn"
+                                                "  FROM 
pg_catalog.pg_replication_slots"
+                                                " WHERE slot_name = %s",
+                                                quote_literal_cstr(slot_name));
+               res = walrcv_exec(wrconn, cmd.data, 1, slotRow);
+
+               if (res->status != WALRCV_OK_TUPLES)
+                       ereport(ERROR,
+                                       (errmsg("could not fetch slot info for 
slot \"%s\" from primary: %s",
+                                                       slot_name, res->err)));
+
+               slot = MakeSingleTupleTableSlot(res->tupledesc, 
&TTSOpsMinimalTuple);
+               if (!tuplestore_gettupleslot(res->tuplestore, true, false, 
slot))
+                       ereport(ERROR,
+                                       (errmsg("slot \"%s\" disapeared from 
provider",
+                                                       slot_name)));
+
+               restart_lsn = DatumGetLSN(slot_getattr(slot, 1, &isnull));
+               Assert(!isnull);
+
+               ExecClearTuple(slot);
+               walrcv_clear_result(res);
+
+               if (restart_lsn >= min_lsn)
+                       break;
+
+               rc = WaitLatch(MyLatch,
+                                          WL_LATCH_SET | WL_TIMEOUT | 
WL_POSTMASTER_DEATH,
+                                          wal_retrieve_retry_interval,
+                                          WAIT_EVENT_REPL_SLOT_SYNC_MAIN);
+
+               ResetLatch(MyLatch);
+
+               /* emergency bailout if postmaster has died */
+               if (rc & WL_POSTMASTER_DEATH)
+                       proc_exit(1);
+       }
+}
+
+/*
+ * Synchronize single slot to given position.
+ *
+ * This optionally creates new slot if there is no existing one.
+ */
+static void
+synchronize_one_slot(WalReceiverConn *wrconn, char *slot_name, char *database,
+                                        char *plugin_name, XLogRecPtr 
target_lsn)
+{
+       bool            found = false;
+       XLogRecPtr      endlsn;
+
+       /* Search for the named slot and mark it active if we find it. */
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       for (int i = 0; i < max_replication_slots; i++)
+       {
+               ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+               if (!s->in_use)
+                       continue;
+
+               if (strcmp(NameStr(s->data.name), slot_name) == 0)
+               {
+                       found = true;
+                       break;
+               }
+       }
+       LWLockRelease(ReplicationSlotControlLock);
+
+       StartTransactionCommand();
+
+       /* Already existing slot, acquire */
+       if (found)
+       {
+               ReplicationSlotAcquire(slot_name, true);
+
+               if (target_lsn < MyReplicationSlot->data.confirmed_flush)
+               {
+                       elog(DEBUG1,
+                                "not synchronizing slot %s; synchronization 
would move it backward",
+                                slot_name);
+
+                       ReplicationSlotRelease();
+                       CommitTransactionCommand();
+                       return;
+               }
+       }
+       /* Otherwise create the slot first. */
+       else
+       {
+               TransactionId xmin_horizon = InvalidTransactionId;
+               ReplicationSlot *slot;
+
+               ReplicationSlotCreate(slot_name, true, RS_EPHEMERAL, false);
+               slot = MyReplicationSlot;
+
+               SpinLockAcquire(&slot->mutex);
+               slot->data.database = get_database_oid(database, false);
+               namestrcpy(&slot->data.plugin, plugin_name);
+               SpinLockRelease(&slot->mutex);
+
+               ReplicationSlotReserveWal();
+
+               LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+               xmin_horizon = GetOldestSafeDecodingTransactionId(true);
+               slot->effective_catalog_xmin = xmin_horizon;
+               slot->data.catalog_xmin = xmin_horizon;
+               ReplicationSlotsComputeRequiredXmin(true);
+               LWLockRelease(ProcArrayLock);
+
+               if (target_lsn < MyReplicationSlot->data.restart_lsn)
+               {
+                       ereport(LOG,
+                                       errmsg("waiting for remote slot \"%s\" 
LSN (%X/%X) to pass local slot LSN (%X/%X)",
+                                                  slot_name,
+                                                  LSN_FORMAT_ARGS(target_lsn), 
LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn)));
+
+                       wait_for_primary_slot_catchup(wrconn, slot_name,
+                                                                               
  MyReplicationSlot->data.restart_lsn);
+               }
+
+               ReplicationSlotPersist();
+       }
+
+       endlsn = pg_logical_replication_slot_advance(target_lsn);
+
+       elog(DEBUG3, "synchronized slot %s to lsn (%X/%X)",
+                slot_name, LSN_FORMAT_ARGS(endlsn));
+
+       ReplicationSlotRelease();
+       CommitTransactionCommand();
+}
+
+static void
+synchronize_slots(void)
+{
+       WalRcvExecResult *res;
+       WalReceiverConn *wrconn = NULL;
+       TupleTableSlot *slot;
+       Oid                     slotRow[3] = {TEXTOID, TEXTOID, LSNOID};
+       StringInfoData s;
+       char       *database;
+       char       *err;
+       MemoryContext oldctx = CurrentMemoryContext;
+
+       if (!WalRcv)
+               return;
+
+       /* syscache access needs a transaction env. */
+       StartTransactionCommand();
+       /* make dbname live outside TX context */
+       MemoryContextSwitchTo(oldctx);
+
+       database = get_database_name(MyDatabaseId);
+       initStringInfo(&s);
+       appendStringInfo(&s, "%s dbname=%s", PrimaryConnInfo, database);
+       wrconn = walrcv_connect(s.data, true, false, "slot_sync", &err);
+
+       if (wrconn == NULL)
+               ereport(ERROR,
+                               (errmsg("could not connect to the primary 
server: %s", err)));
+
+       resetStringInfo(&s);
+       appendStringInfo(&s,
+                                        "SELECT slot_name, plugin, 
confirmed_flush_lsn"
+                                        "  FROM 
pg_catalog.pg_replication_slots"
+                                        " WHERE database = %s",
+                                        quote_literal_cstr(database));
+       if (strcmp(synchronize_slot_names, "") != 0 && 
strcmp(synchronize_slot_names, "*") != 0)
+       {
+               char       *rawname;
+               List       *namelist;
+               ListCell   *lc;
+
+               rawname = pstrdup(synchronize_slot_names);
+               SplitIdentifierString(rawname, ',', &namelist);
+
+               appendStringInfoString(&s, " AND slot_name IN (");
+               foreach (lc, namelist)
+               {
+                       if (lc != list_head(namelist))
+                               appendStringInfoChar(&s, ',');
+                       appendStringInfo(&s, "%s",
+                                                        
quote_literal_cstr(lfirst(lc)));
+               }
+               appendStringInfoChar(&s, ')');
+       }
+
+       res = walrcv_exec(wrconn, s.data, 3, slotRow);
+       pfree(s.data);
+
+       if (res->status != WALRCV_OK_TUPLES)
+               ereport(ERROR,
+                               (errmsg("could not fetch slot info from 
primary: %s",
+                                               res->err)));
+
+       CommitTransactionCommand();
+       /* CommitTransactionCommand switches to TopMemoryContext */
+       MemoryContextSwitchTo(oldctx);
+
+       slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+       while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+       {
+               char       *slot_name;
+               char       *plugin_name;
+               XLogRecPtr      confirmed_flush_lsn;
+               bool            isnull;
+
+               slot_name = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+               Assert(!isnull);
+
+               plugin_name = TextDatumGetCString(slot_getattr(slot, 2, 
&isnull));
+               Assert(!isnull);
+
+               confirmed_flush_lsn = DatumGetLSN(slot_getattr(slot, 3, 
&isnull));
+               Assert(!isnull);
+
+               synchronize_one_slot(wrconn, slot_name, database, plugin_name,
+                                                        confirmed_flush_lsn);
+
+               ExecClearTuple(slot);
+       }
+
+       walrcv_clear_result(res);
+       pfree(database);
+
+       walrcv_disconnect(wrconn);
+}
+
+/*
+ * The main loop of our worker process.
+ */
+void
+ReplSlotSyncMain(Datum main_arg)
+{
+       int                     worker_slot = DatumGetInt32(main_arg);
+
+       /* Attach to slot */
+       logicalrep_worker_attach(worker_slot);
+
+       /* Establish signal handlers. */
+       BackgroundWorkerUnblockSignals();
+
+       /* Load the libpq-specific functions */
+       load_file("libpqwalreceiver", false);
+
+       /* Connect to our database. */
+       BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
+                                                                               
          MyLogicalRepWorker->userid,
+                                                                               
          0);
+
+       StartTransactionCommand();
+       ereport(LOG,
+                       (errmsg("replication slot synchronization worker for 
database \"%s\" has started",
+                                       
get_database_name(MyLogicalRepWorker->dbid))));
+       CommitTransactionCommand();
+
+       /* Main wait loop. */
+       for (;;)
+       {
+               int                     rc;
+
+               CHECK_FOR_INTERRUPTS();
+
+               if (!RecoveryInProgress())
+                       return;
+
+               if (strcmp(synchronize_slot_names, "") == 0)
+                       return;
+
+               synchronize_slots();
+
+               rc = WaitLatch(MyLatch,
+                                          WL_LATCH_SET | WL_TIMEOUT | 
WL_POSTMASTER_DEATH,
+                                          wal_retrieve_retry_interval,
+                                          WAIT_EVENT_REPL_SLOT_SYNC_MAIN);
+
+               ResetLatch(MyLatch);
+
+               /* emergency bailout if postmaster has died */
+               if (rc & WL_POSTMASTER_DEATH)
+                       proc_exit(1);
+       }
+}
+
+/*
+ * Routines for handling the GUC variable(s)
+ */
+
+bool
+check_synchronize_slot_names(char **newval, void **extra, GucSource source)
+{
+       /* Special handling for "*" which means all. */
+       if (strcmp(*newval, "*") == 0)
+       {
+               return true;
+       }
+       else
+       {
+               char       *rawname;
+               List       *namelist;
+               ListCell   *lc;
+
+               /* Need a modifiable copy of string */
+               rawname = pstrdup(*newval);
+
+               /* Parse string into list of identifiers */
+               if (!SplitIdentifierString(rawname, ',', &namelist))
+               {
+                       /* syntax error in name list */
+                       GUC_check_errdetail("List syntax is invalid.");
+                       pfree(rawname);
+                       list_free(namelist);
+                       return false;
+               }
+
+               foreach(lc, namelist)
+               {
+                       char       *curname = (char *) lfirst(lc);
+
+                       ReplicationSlotValidateName(curname, ERROR);
+               }
+
+               pfree(rawname);
+               list_free(namelist);
+       }
+
+       return true;
+}
+
+
+bool
+check_standby_slot_names(char **newval, void **extra, GucSource source)
+{
+       char       *rawname;
+       List       *namelist;
+       ListCell   *lc;
+
+       /* Need a modifiable copy of string */
+       rawname = pstrdup(*newval);
+
+       /* Parse string into list of identifiers */
+       if (!SplitIdentifierString(rawname, ',', &namelist))
+       {
+               /* syntax error in name list */
+               GUC_check_errdetail("List syntax is invalid.");
+               pfree(rawname);
+               list_free(namelist);
+               return false;
+       }
+
+       foreach(lc, namelist)
+       {
+               char       *curname = (char *) lfirst(lc);
+
+               ReplicationSlotValidateName(curname, ERROR);
+       }
+
+       pfree(rawname);
+       list_free(namelist);
+
+       return true;
+}
diff --git a/src/backend/replication/logical/tablesync.c 
b/src/backend/replication/logical/tablesync.c
index 6dce355633..2307d187e4 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -100,6 +100,7 @@
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
+#include "commands/subscriptioncmds.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "parser/parse_relation.h"
@@ -155,7 +156,8 @@ finish_sync_worker(void)
        CommitTransactionCommand();
 
        /* Find the leader apply worker and signal it. */
-       logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+       logicalrep_worker_wakeup(MyLogicalRepWorker->dbid,
+                                                        
MyLogicalRepWorker->subid, InvalidOid);
 
        /* Stop gracefully */
        proc_exit(0);
@@ -195,7 +197,8 @@ wait_for_relation_state_change(Oid relid, char 
expected_state)
 
                /* Check if the sync worker is still running and bail if not. */
                LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-               worker = logicalrep_worker_find(MyLogicalRepWorker->subid, 
relid,
+               worker = logicalrep_worker_find(MyLogicalRepWorker->dbid,
+                                                                               
MyLogicalRepWorker->subid, relid,
                                                                                
false);
                LWLockRelease(LogicalRepWorkerLock);
                if (!worker)
@@ -242,7 +245,8 @@ wait_for_worker_state_change(char expected_state)
                 * waiting.
                 */
                LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-               worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+               worker = logicalrep_worker_find(MyLogicalRepWorker->dbid,
+                                                                               
MyLogicalRepWorker->subid,
                                                                                
InvalidOid, false);
                if (worker && worker->proc)
                        logicalrep_worker_wakeup_ptr(worker);
@@ -508,7 +512,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                         */
                        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-                       syncworker = 
logicalrep_worker_find(MyLogicalRepWorker->subid,
+                       syncworker = 
logicalrep_worker_find(MyLogicalRepWorker->dbid,
+                                                                               
                MyLogicalRepWorker->subid,
                                                                                
                rstate->relid, false);
 
                        if (syncworker)
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 3d58910c14..b9354bd023 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1600,7 +1600,8 @@ apply_handle_stream_start(StringInfo s)
                                 * Signal the leader apply worker, as it may be 
waiting for
                                 * us.
                                 */
-                               
logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+                               
logicalrep_worker_wakeup(MyLogicalRepWorker->dbid,
+                                                                               
 MyLogicalRepWorker->subid, InvalidOid);
                        }
 
                        parallel_stream_nchanges = 0;
diff --git a/src/backend/replication/repl_gram.y 
b/src/backend/replication/repl_gram.y
index 0c874e33cf..12a4b74368 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -76,11 +76,12 @@ Node *replication_parse_result;
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
+%token K_LIST_SLOTS
 
 %type <node>   command
 %type <node>   base_backup start_replication start_logical_replication
                                create_replication_slot drop_replication_slot 
identify_system
-                               read_replication_slot timeline_history show
+                               read_replication_slot timeline_history show 
list_slots
 %type <list>   generic_option_list
 %type <defelt> generic_option
 %type <uintval>        opt_timeline
@@ -91,6 +92,7 @@ Node *replication_parse_result;
 %type <boolval>        opt_temporary
 %type <list>   create_slot_options create_slot_legacy_opt_list
 %type <defelt> create_slot_legacy_opt
+%type <list>   slot_name_list slot_name_list_opt
 
 %%
 
@@ -114,6 +116,7 @@ command:
                        | read_replication_slot
                        | timeline_history
                        | show
+                       | list_slots
                        ;
 
 /*
@@ -126,6 +129,33 @@ identify_system:
                                }
                        ;
 
+slot_name_list:
+                       IDENT
+                               {
+                                       $$ = list_make1($1);
+                               }
+                       | slot_name_list ',' IDENT
+                               {
+                                       $$ = lappend($1, $3);
+                               }
+
+slot_name_list_opt:
+                       slot_name_list                  { $$ = $1; }
+                       | /* EMPTY */                   { $$ = NIL; }
+               ;
+
+/*
+ * LIST_SLOTS
+ */
+list_slots:
+                       K_LIST_SLOTS slot_name_list_opt
+                               {
+                                       ListSlotsCmd *cmd = 
makeNode(ListSlotsCmd);
+                                       cmd->slot_names = $2;
+                                       $$ = (Node *) cmd;
+                               }
+                       ;
+
 /*
  * READ_REPLICATION_SLOT %s
  */
diff --git a/src/backend/replication/repl_scanner.l 
b/src/backend/replication/repl_scanner.l
index cb467ca46f..9501df38eb 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -128,6 +128,7 @@ DROP_REPLICATION_SLOT               { return 
K_DROP_REPLICATION_SLOT; }
 TIMELINE_HISTORY       { return K_TIMELINE_HISTORY; }
 PHYSICAL                       { return K_PHYSICAL; }
 RESERVE_WAL                    { return K_RESERVE_WAL; }
+LIST_SLOTS                     { return K_LIST_SLOTS; }
 LOGICAL                                { return K_LOGICAL; }
 SLOT                           { return K_SLOT; }
 TEMPORARY                      { return K_TEMPORARY; }
@@ -304,6 +305,7 @@ replication_scanner_is_replication_command(void)
                case K_READ_REPLICATION_SLOT:
                case K_TIMELINE_HISTORY:
                case K_SHOW:
+               case K_LIST_SLOTS:
                        /* Yes; push back the first token so we can parse 
later. */
                        repl_pushed_back_token = first_token;
                        return true;
diff --git a/src/backend/replication/slotfuncs.c 
b/src/backend/replication/slotfuncs.c
index 6035cf4816..83ada6db6a 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -467,7 +467,7 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
  * mode, no changes are generated anyway.
  */
-static XLogRecPtr
+XLogRecPtr
 pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
        LogicalDecodingContext *ctx;
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 45b8b3684f..0d01b8967a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -473,6 +473,194 @@ IdentifySystem(void)
        end_tup_output(tstate);
 }
 
+static int
+pg_qsort_namecmp(const void *a, const void *b)
+{
+       return strncmp(NameStr(*(Name) a), NameStr(*(Name) b), NAMEDATALEN);
+}
+
+/*
+ * Handle the LIST_SLOTS command.
+ */
+static void
+ListSlots(ListSlotsCmd *cmd)
+{
+       DestReceiver *dest;
+       TupOutputState *tstate;
+       TupleDesc       tupdesc;
+       NameData   *slot_names;
+       int                     numslot_names;
+
+       numslot_names = list_length(cmd->slot_names);
+       if (numslot_names)
+       {
+               ListCell   *lc;
+               int                     i = 0;
+
+               slot_names = palloc(numslot_names * sizeof(NameData));
+               foreach(lc, cmd->slot_names)
+               {
+                       char       *slot_name = lfirst(lc);
+
+                       ReplicationSlotValidateName(slot_name, ERROR);
+                       namestrcpy(&slot_names[i++], slot_name);
+               }
+
+               qsort(slot_names, numslot_names, sizeof(NameData), 
pg_qsort_namecmp);
+       }
+
+       dest = CreateDestReceiver(DestRemoteSimple);
+
+       /* need a tuple descriptor representing four columns */
+       tupdesc = CreateTemplateTupleDesc(10);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
+                                                         TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "plugin",
+                                                         TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "slot_type",
+                                                         TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "datoid",
+                                                         INT8OID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "database",
+                                                         TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 6, "temporary",
+                                                         INT4OID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 7, "xmin",
+                                                         INT8OID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 8, "catalog_xmin",
+                                                         INT8OID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 9, "restart_lsn",
+                                                         TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 10, "confirmed_flush",
+                                                         TEXTOID, -1, 0);
+
+       /* prepare for projection of tuples */
+       tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       for (int slotno = 0; slotno < max_replication_slots; slotno++)
+       {
+               ReplicationSlot *slot = 
&ReplicationSlotCtl->replication_slots[slotno];
+               char            restart_lsn_str[MAXFNAMELEN];
+               char            confirmed_flush_lsn_str[MAXFNAMELEN];
+               Datum           values[10];
+               bool            nulls[10];
+
+               ReplicationSlotPersistency persistency;
+               TransactionId xmin;
+               TransactionId catalog_xmin;
+               XLogRecPtr      restart_lsn;
+               XLogRecPtr      confirmed_flush_lsn;
+               Oid                     datoid;
+               NameData        slot_name;
+               NameData        plugin;
+               int                     i;
+               int64           tmpbigint;
+
+               if (!slot->in_use)
+                       continue;
+
+               SpinLockAcquire(&slot->mutex);
+
+               xmin = slot->data.xmin;
+               catalog_xmin = slot->data.catalog_xmin;
+               datoid = slot->data.database;
+               restart_lsn = slot->data.restart_lsn;
+               confirmed_flush_lsn = slot->data.confirmed_flush;
+               namestrcpy(&slot_name, NameStr(slot->data.name));
+               namestrcpy(&plugin, NameStr(slot->data.plugin));
+               persistency = slot->data.persistency;
+
+               SpinLockRelease(&slot->mutex);
+
+               if (numslot_names &&
+                       !bsearch((void *) &slot_name, (void *) slot_names,
+                                        numslot_names, sizeof(NameData), 
pg_qsort_namecmp))
+                       continue;
+
+               memset(nulls, 0, sizeof(nulls));
+
+               i = 0;
+               values[i++] = CStringGetTextDatum(NameStr(slot_name));
+
+               if (datoid == InvalidOid)
+                       nulls[i++] = true;
+               else
+                       values[i++] = CStringGetTextDatum(NameStr(plugin));
+
+               if (datoid == InvalidOid)
+                       values[i++] = CStringGetTextDatum("physical");
+               else
+                       values[i++] = CStringGetTextDatum("logical");
+
+               if (datoid == InvalidOid)
+                       nulls[i++] = true;
+               else
+               {
+                       tmpbigint = datoid;
+                       values[i++] = Int64GetDatum(tmpbigint);
+               }
+
+               if (datoid == InvalidOid)
+                       nulls[i++] = true;
+               else
+               {
+                       MemoryContext cur = CurrentMemoryContext;
+
+                       /* syscache access needs a transaction env. */
+                       StartTransactionCommand();
+                       /* make dbname live outside TX context */
+                       MemoryContextSwitchTo(cur);
+                       values[i++] = 
CStringGetTextDatum(get_database_name(datoid));
+                       CommitTransactionCommand();
+                       /* CommitTransactionCommand switches to 
TopMemoryContext */
+                       MemoryContextSwitchTo(cur);
+               }
+
+               values[i++] = Int32GetDatum(persistency == RS_TEMPORARY ? 1 : 
0);
+
+               if (xmin != InvalidTransactionId)
+               {
+                       tmpbigint = xmin;
+                       values[i++] = Int64GetDatum(tmpbigint);
+               }
+               else
+                       nulls[i++] = true;
+
+               if (catalog_xmin != InvalidTransactionId)
+               {
+                       tmpbigint = catalog_xmin;
+                       values[i++] = Int64GetDatum(tmpbigint);
+               }
+               else
+                       nulls[i++] = true;
+
+               if (restart_lsn != InvalidXLogRecPtr)
+               {
+                       snprintf(restart_lsn_str, sizeof(restart_lsn_str), 
"%X/%X",
+                                        LSN_FORMAT_ARGS(restart_lsn));
+                       values[i++] = CStringGetTextDatum(restart_lsn_str);
+               }
+               else
+                       nulls[i++] = true;
+
+               if (confirmed_flush_lsn != InvalidXLogRecPtr)
+               {
+                       snprintf(confirmed_flush_lsn_str, 
sizeof(confirmed_flush_lsn_str),
+                                        "%X/%X", 
LSN_FORMAT_ARGS(confirmed_flush_lsn));
+                       values[i++] = 
CStringGetTextDatum(confirmed_flush_lsn_str);
+               }
+               else
+                       nulls[i++] = true;
+
+               /* send it to dest */
+               do_tup_output(tstate, values, nulls);
+       }
+       LWLockRelease(ReplicationSlotControlLock);
+
+       end_tup_output(tstate);
+}
+
 /* Handle READ_REPLICATION_SLOT command */
 static void
 ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
@@ -1820,6 +2008,13 @@ exec_replication_command(const char *cmd_string)
                        EndReplicationCommand(cmdtag);
                        break;
 
+               case T_ListSlotsCmd:
+                       cmdtag = "LIST_SLOTS";
+                       set_ps_display(cmdtag);
+                       ListSlots((ListSlotsCmd *) cmd_node);
+                       EndReplicationCommand(cmdtag);
+                       break;
+
                case T_StartReplicationCmd:
                        {
                                StartReplicationCmd *cmd = (StartReplicationCmd 
*) cmd_node;
diff --git a/src/backend/utils/activity/wait_event.c 
b/src/backend/utils/activity/wait_event.c
index 7940d64639..f2a9517091 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -230,6 +230,9 @@ pgstat_get_wait_activity(WaitEventActivity w)
                case WAIT_EVENT_LOGICAL_LAUNCHER_MAIN:
                        event_name = "LogicalLauncherMain";
                        break;
+               case WAIT_EVENT_REPL_SLOT_SYNC_MAIN:
+                       event_name = "ReplSlotSyncMain";
+                       break;
                case WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN:
                        event_name = "LogicalParallelApplyMain";
                        break;
diff --git a/src/backend/utils/misc/guc_tables.c 
b/src/backend/utils/misc/guc_tables.c
index cab3ddbe11..0ee7ad1348 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -63,8 +63,12 @@
 #include "postmaster/syslogger.h"
 #include "postmaster/walwriter.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
+#include "replication/reorderbuffer.h"
 #include "replication/slot.h"
 #include "replication/syncrep.h"
+#include "replication/walreceiver.h"
+#include "replication/walsender.h"
 #include "storage/bufmgr.h"
 #include "storage/large_object.h"
 #include "storage/pg_shmem.h"
@@ -4587,6 +4591,28 @@ struct config_string ConfigureNamesString[] =
                check_io_direct, assign_io_direct, NULL
        },
 
+       {
+               {"synchronize_slot_names", PGC_SIGHUP, REPLICATION_STANDBY,
+                       gettext_noop("Sets the names of replication slots which 
to synchronize from primary to standby."),
+                       gettext_noop("Value of \"*\" means all."),
+                       GUC_LIST_INPUT | GUC_LIST_QUOTE
+               },
+               &synchronize_slot_names,
+               "",
+               check_synchronize_slot_names, NULL, NULL
+       },
+
+       {
+               {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY,
+                       gettext_noop("List of physical slots that must confirm 
changes before changes are sent to logical replication consumers."),
+                       NULL,
+                       GUC_LIST_INPUT | GUC_LIST_QUOTE
+               },
+               &standby_slot_names,
+               "",
+               check_standby_slot_names, NULL, NULL
+       },
+
        /* End-of-list marker */
        {
                {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample 
b/src/backend/utils/misc/postgresql.conf.sample
index dce5049bc2..2ff2188c02 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -330,6 +330,7 @@
                                # and comma-separated list of application_name
                                # from standby(s); '*' = all
 #vacuum_defer_cleanup_age = 0  # number of xacts by which cleanup is delayed
+#standby_slot_names = ''       # physical standby slot names that logical 
replication waits for
 
 # - Standby Servers -
 
@@ -357,6 +358,7 @@
 #wal_retrieve_retry_interval = 5s      # time to wait before retrying to
                                        # retrieve WAL after a failed attempt
 #recovery_min_apply_delay = 0          # minimum delay for applying changes 
during recovery
+#synchronize_slot_names = ''           # logical replication slots to sync to 
standby
 
 # - Subscribers -
 
diff --git a/src/include/commands/subscriptioncmds.h 
b/src/include/commands/subscriptioncmds.h
index 214dc6c29e..0e77f9ee5c 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -17,6 +17,7 @@
 
 #include "catalog/objectaddress.h"
 #include "parser/parse_node.h"
+#include "replication/walreceiver.h"
 
 extern ObjectAddress CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
                                                                                
bool isTopLevel);
@@ -28,4 +29,6 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid 
newOwnerId);
 
 extern char defGetStreamingMode(DefElem *def);
 
+extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char 
*slotname, bool missing_ok);
+
 #endif                                                 /* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 4321ba8f86..980e0b2ee2 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,15 @@ typedef struct IdentifySystemCmd
        NodeTag         type;
 } IdentifySystemCmd;
 
+/* ----------------------
+ *             LIST_SLOTS command
+ * ----------------------
+ */
+typedef struct ListSlotsCmd
+{
+       NodeTag         type;
+       List       *slot_names;
+} ListSlotsCmd;
 
 /* ----------------------
  *             BASE_BACKUP command
diff --git a/src/include/replication/logicallauncher.h 
b/src/include/replication/logicallauncher.h
index a07c9cb311..80fdbf9657 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -31,4 +31,6 @@ extern bool IsLogicalLauncher(void);
 
 extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
 
+extern PGDLLIMPORT char *PrimaryConnInfo;
+
 #endif                                                 /* LOGICALLAUNCHER_H */
diff --git a/src/include/replication/logicalworker.h 
b/src/include/replication/logicalworker.h
index 39588da79f..6408753557 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,10 +14,16 @@
 
 #include <signal.h>
 
+#include "utils/guc.h"
+
+extern char *synchronize_slot_names;
+extern char *standby_slot_names;
+
 extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
 
 extern void ApplyWorkerMain(Datum main_arg);
 extern void ParallelApplyWorkerMain(Datum main_arg);
+extern void ReplSlotSyncMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 extern bool IsLogicalParallelApplyWorker(void);
@@ -29,4 +35,7 @@ extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
 
 extern void AtEOXact_LogicalRepWorkers(bool isCommit);
 
+extern bool check_synchronize_slot_names(char **newval, void **extra, 
GucSource source);
+extern bool check_standby_slot_names(char **newval, void **extra, GucSource 
source);
+
 #endif                                                 /* LOGICALWORKER_H */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a8a89dc784..5dc2e0d30d 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,7 +15,6 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
-#include "replication/walreceiver.h"
 
 /*
  * Behaviour of replication slots, upon release or crash.
@@ -238,7 +237,6 @@ extern ReplicationSlot *SearchNamedReplicationSlot(const 
char *name, bool need_l
 extern int     ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
 extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char 
*syncslotname, Size szslot);
-extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char 
*slotname, bool missing_ok);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
@@ -246,4 +244,7 @@ extern void CheckPointReplicationSlots(void);
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
 
+extern XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto);
+
+
 #endif                                                 /* SLOT_H */
diff --git a/src/include/replication/walreceiver.h 
b/src/include/replication/walreceiver.h
index 281626fa6f..9e9d64faf2 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -20,6 +20,7 @@
 #include "pgtime.h"
 #include "port/atomics.h"
 #include "replication/logicalproto.h"
+#include "replication/slot.h"
 #include "replication/walsender.h"
 #include "storage/condition_variable.h"
 #include "storage/latch.h"
@@ -191,6 +192,17 @@ typedef struct
        }                       proto;
 } WalRcvStreamOptions;
 
+/*
+ * Slot information receiver from remote.
+ *
+ * Currently same as ReplicationSlotPersistentData except last_sync_time
+ */
+typedef struct WalRecvReplicationSlotData
+{
+       ReplicationSlotPersistentData persistent_data;
+       TimestampTz last_sync_time;
+} WalRecvReplicationSlotData;
+
 struct WalReceiverConn;
 typedef struct WalReceiverConn WalReceiverConn;
 
@@ -280,6 +292,11 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn 
*conn,
 typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
                                                                                
        TimeLineID *primary_tli);
 
+/*
+ * TODO
+ */
+typedef List *(*walrcv_list_slots_fn) (WalReceiverConn *conn, const char 
*slots);
+
 /*
  * walrcv_server_version_fn
  *
@@ -393,6 +410,7 @@ typedef struct WalReceiverFunctionsType
        walrcv_get_conninfo_fn walrcv_get_conninfo;
        walrcv_get_senderinfo_fn walrcv_get_senderinfo;
        walrcv_identify_system_fn walrcv_identify_system;
+       walrcv_list_slots_fn walrcv_list_slots;
        walrcv_server_version_fn walrcv_server_version;
        walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
        walrcv_startstreaming_fn walrcv_startstreaming;
@@ -417,6 +435,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType 
*WalReceiverFunctions;
        WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, 
sender_port)
 #define walrcv_identify_system(conn, primary_tli) \
        WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
+#define walrcv_list_slots(conn, slots) \
+       WalReceiverFunctions->walrcv_list_slots(conn, slots)
 #define walrcv_server_version(conn) \
        WalReceiverFunctions->walrcv_server_version(conn)
 #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
diff --git a/src/include/replication/worker_internal.h 
b/src/include/replication/worker_internal.h
index dce71d2c50..5b4fda2fd9 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -65,7 +65,7 @@ typedef struct LogicalRepWorker
         * would be created for each transaction which will be deleted after the
         * transaction is finished.
         */
-       FileSet    *stream_fileset;
+       struct FileSet    *stream_fileset;
 
        /*
         * PID of leader apply worker if this slot is used for a parallel apply
@@ -226,15 +226,15 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
 extern PGDLLIMPORT bool in_remote_transaction;
 
 extern void logicalrep_worker_attach(int slot);
-extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
+extern LogicalRepWorker *logicalrep_worker_find(Oid dbid, Oid subid, Oid relid,
                                                                                
                bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
                                                                         Oid 
userid, Oid relid,
                                                                         
dsm_handle subworker_dsm);
-extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid);
 extern void logicalrep_pa_worker_stop(int slot_no, uint16 generation);
-extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
 extern int     logicalrep_sync_worker_count(Oid subid);
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index 518d3b0a1f..cccd4d9d32 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -43,6 +43,7 @@ typedef enum
        WAIT_EVENT_LOGICAL_APPLY_MAIN,
        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN,
        WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN,
+       WAIT_EVENT_REPL_SLOT_SYNC_MAIN,
        WAIT_EVENT_RECOVERY_WAL_STREAM,
        WAIT_EVENT_SYSLOGGER_MAIN,
        WAIT_EVENT_WAL_RECEIVER_MAIN,
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 2008958010..b6fcc8704e 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -42,6 +42,7 @@ tests += {
       't/034_create_database.pl',
       't/035_standby_logical_decoding.pl',
       't/036_truncated_dropped.pl',
+      't/037_slot_sync.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/037_slot_sync.pl 
b/src/test/recovery/t/037_slot_sync.pl
new file mode 100644
index 0000000000..0520042d96
--- /dev/null
+++ b/src/test/recovery/t/037_slot_sync.pl
@@ -0,0 +1,130 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_phys_standby = PostgreSQL::Test::Cluster->new('phys_standby');
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+       my ($node, $pat, $off) = @_;
+
+       $off = 0 unless defined $off;
+       my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+       return 0 if (length($log) <= $off);
+
+       $log = substr($log, $off);
+
+       return $log =~ m/$pat/;
+}
+
+# Check invalidation in the logfile
+sub check_for_invalidation
+{
+       my ($log_start, $test_name) = @_;
+
+       # message should be issued
+       ok( find_in_log(
+               $node_phys_standby,
+        "invalidating obsolete replication slot \"sub1\"", $log_start),
+        "sub1 slot invalidation is logged $test_name");
+}
+
+# Check conflicting status in pg_replication_slots.
+sub check_slots_conflicting_status
+{
+       my $res = $node_phys_standby->safe_psql(
+                               'postgres', qq(
+                               select bool_and(conflicting) from 
pg_replication_slots;));
+
+       is($res, 't',
+               "Logical slot is reported as conflicting");
+}
+
+$node_primary->init(allows_streaming => 'logical');
+$node_primary->append_conf('postgresql.conf', "standby_slot_names = 'pslot1'");
+$node_primary->start;
+$node_primary->psql('postgres', q{SELECT 
pg_create_physical_replication_slot('pslot1');});
+
+$node_primary->backup('backup');
+
+$node_phys_standby->init_from_backup($node_primary, 'backup', has_streaming => 
1);
+$node_phys_standby->append_conf('postgresql.conf', q{
+synchronize_slot_names = '*'
+primary_slot_name = 'pslot1'
+hot_standby_feedback = off
+});
+
+$node_phys_standby->start;
+
+$node_primary->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)");
+$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (1), (2), (3)");
+
+# Some tests need to wait for VACUUM to be replayed. But vacuum does not flush
+# WAL. An insert into flush_wal outside transaction does guarantee a flush.
+$node_primary->psql('postgres', q[CREATE TABLE flush_wal();]);
+
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+$node_subscriber->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)");
+
+$node_primary->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR TABLE t1");
+$node_subscriber->safe_psql('postgres',
+       "CREATE SUBSCRIPTION sub1 CONNECTION '" . ($node_primary->connstr . ' 
dbname=postgres') . "' PUBLICATION pub1");
+
+# Wait for initial sync of all subscriptions
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 
's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result = $node_primary->safe_psql('postgres',
+       "SELECT slot_name, plugin, database FROM pg_replication_slots WHERE 
slot_type = 'logical'");
+
+is($result, qq(sub1|pgoutput|postgres), 'logical slot on primary');
+
+# FIXME: standby needs restart to pick up new slots
+$node_phys_standby->restart;
+sleep 3;
+
+$result = $node_phys_standby->safe_psql('postgres',
+       "SELECT slot_name, plugin, database FROM pg_replication_slots");
+
+is($result, qq(sub1|pgoutput|postgres), 'logical slot on standby');
+
+$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (4), (5), (6)");
+$node_primary->wait_for_catchup('sub1');
+
+$node_primary->wait_for_catchup($node_phys_standby->name);
+
+# Logical subscriber and physical replica are caught up at this point.
+
+# Drop the subscription so that catalog_xmin is unknown on the primary
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
+
+# This should trigger a conflict as hot_standby_feedback is off on the standby
+$node_primary->safe_psql('postgres', qq[
+  CREATE TABLE conflict_test(x integer, y text);
+  DROP TABLE conflict_test;
+  VACUUM full pg_class;
+  INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
+]);
+
+# Ensure physical replay catches up
+$node_primary->wait_for_catchup($node_phys_standby);
+
+# Check invalidation in the logfile
+check_for_invalidation(1, 'with vacuum FULL on pg_class');
+
+# Check conflicting status in pg_replication_slots.
+check_slots_conflicting_status();
+
+done_testing();
-- 
2.34.1

Reply via email to