Hi,
On 4/14/23 3:22 PM, Drouvot, Bertrand wrote:
Now that the "Minimal logical decoding on standby" patch series (mentioned
up-thread) has been
committed, I think we can resume working on this one ("Synchronizing slots from
primary to standby").
I'll work on a rebase and share it once done (unless someone already started
working on a rebase).
Please find attached V5 (a rebase of V4 posted up-thread).
In addition to the "rebasing" work, the TAP test adds a test about conflict
handling (logical slot invalidation)
relying on the work done in the "Minimal logical decoding on standby" patch
series.
I did not look more at the patch (than what's was needed for the rebase) but
plan to do so.
Regards,
--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 655359eedf37d8f2e522aeb1ec8c48adfc1759b1 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <[email protected]>
Date: Thu, 13 Apr 2023 11:32:28 +0000
Subject: [PATCH v5] Synchronize logical replication slots from primary to
standby
---
doc/src/sgml/config.sgml | 34 ++
src/backend/commands/subscriptioncmds.c | 4 +-
src/backend/postmaster/bgworker.c | 3 +
.../libpqwalreceiver/libpqwalreceiver.c | 95 ++++
src/backend/replication/logical/Makefile | 1 +
src/backend/replication/logical/launcher.c | 263 +++++++----
src/backend/replication/logical/meson.build | 1 +
.../replication/logical/reorderbuffer.c | 86 ++++
src/backend/replication/logical/slotsync.c | 413 ++++++++++++++++++
src/backend/replication/logical/tablesync.c | 13 +-
src/backend/replication/logical/worker.c | 3 +-
src/backend/replication/repl_gram.y | 32 +-
src/backend/replication/repl_scanner.l | 2 +
src/backend/replication/slotfuncs.c | 2 +-
src/backend/replication/walsender.c | 195 +++++++++
src/backend/utils/activity/wait_event.c | 3 +
src/backend/utils/misc/guc_tables.c | 26 ++
src/backend/utils/misc/postgresql.conf.sample | 2 +
src/include/commands/subscriptioncmds.h | 3 +
src/include/nodes/replnodes.h | 9 +
src/include/replication/logicallauncher.h | 2 +
src/include/replication/logicalworker.h | 9 +
src/include/replication/slot.h | 5 +-
src/include/replication/walreceiver.h | 20 +
src/include/replication/worker_internal.h | 8 +-
src/include/utils/wait_event.h | 1 +
src/test/recovery/meson.build | 1 +
src/test/recovery/t/037_slot_sync.pl | 130 ++++++
28 files changed, 1272 insertions(+), 94 deletions(-)
3.8% doc/src/sgml/
7.1% src/backend/replication/libpqwalreceiver/
54.7% src/backend/replication/logical/
14.9% src/backend/replication/
3.3% src/backend/
4.0% src/include/replication/
10.9% src/test/recovery/t/
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 091a79d4f3..1360885208 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4466,6 +4466,23 @@ restore_command = 'copy "C:\\server\\archivedir\\%f"
"%p"' # Windows
</listitem>
</varlistentry>
+ <varlistentry id="guc-standby-slot-names" xreflabel="standby_slot_names">
+ <term><varname>standby_slot_names</varname> (<type>string</type>)
+ <indexterm>
+ <primary><varname>standby_slot_names</varname> configuration
parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ List of physical replication slots that logical replication waits for.
+ If a logical replication connection is meant to switch to a physical
+ standby after the standby is promoted, the physical replication slot
+ for the standby should be listed here. This ensures that logical
+ replication is not ahead of the physical standby.
+ </para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</sect2>
@@ -4649,6 +4666,23 @@ ANY <replaceable
class="parameter">num_sync</replaceable> ( <replaceable class="
</listitem>
</varlistentry>
+ <varlistentry id="guc-synchronize_slot_names"
xreflabel="synchronize_slot_names">
+ <term><varname>synchronize_slot_names</varname> (<type>string</type>)
+ <indexterm>
+ <primary><varname>synchronize_slot_names</varname> configuration
parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Specifies a list of logical replication slots that a physical standby
+ should synchronize from the primary server. This is necessary to be
+ able to retarget those logical replication connections to this standby
+ if it gets promoted. Specify <literal>*</literal> to synchronize all
+ logical replication slots. The default is empty.
+ </para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</sect2>
diff --git a/src/backend/commands/subscriptioncmds.c
b/src/backend/commands/subscriptioncmds.c
index 3251d89ba8..8721706b79 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -991,7 +991,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
RemoveSubscriptionRel(sub->oid, relid);
- logicalrep_worker_stop(sub->oid, relid);
+ logicalrep_worker_stop(MyDatabaseId, sub->oid,
relid);
/*
* For READY state, we would have already
dropped the
@@ -1589,7 +1589,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool
isTopLevel)
{
LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
- logicalrep_worker_stop(w->subid, w->relid);
+ logicalrep_worker_stop(w->dbid, w->subid, w->relid);
}
list_free(subworkers);
diff --git a/src/backend/postmaster/bgworker.c
b/src/backend/postmaster/bgworker.c
index 0dd22b2351..a89d1f10a1 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -129,6 +129,9 @@ static const struct
{
"ApplyWorkerMain", ApplyWorkerMain
},
+ {
+ "ReplSlotSyncMain", ReplSlotSyncMain
+ },
{
"ParallelApplyWorkerMain", ParallelApplyWorkerMain
}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 052505e46f..4f7417c49a 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -34,6 +34,7 @@
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/tuplestore.h"
+#include "utils/varlena.h"
PG_MODULE_MAGIC;
@@ -58,6 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
char
**sender_host, int *sender_port);
static char *libpqrcv_identify_system(WalReceiverConn *conn,
TimeLineID *primary_tli);
+static List *libpqrcv_list_slots(WalReceiverConn *conn, const char
*slot_names);
static int libpqrcv_server_version(WalReceiverConn *conn);
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
TimeLineID tli, char **filename,
@@ -96,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
.walrcv_receive = libpqrcv_receive,
.walrcv_send = libpqrcv_send,
.walrcv_create_slot = libpqrcv_create_slot,
+ .walrcv_list_slots = libpqrcv_list_slots,
.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
.walrcv_exec = libpqrcv_exec,
.walrcv_disconnect = libpqrcv_disconnect
@@ -409,6 +412,98 @@ libpqrcv_server_version(WalReceiverConn *conn)
return PQserverVersion(conn->streamConn);
}
+/*
+ * Get list of slots from primary.
+ */
+static List *
+libpqrcv_list_slots(WalReceiverConn *conn, const char *slot_names)
+{
+ PGresult *res;
+ List *slotlist = NIL;
+ int ntuples;
+ StringInfoData s;
+ WalRecvReplicationSlotData *slot_data;
+
+ initStringInfo(&s);
+ appendStringInfoString(&s, "LIST_SLOTS");
+
+ if (strcmp(slot_names, "") != 0 && strcmp(slot_names, "*") != 0)
+ {
+ char *rawname;
+ List *namelist;
+ ListCell *lc;
+
+ appendStringInfoChar(&s, ' ');
+ rawname = pstrdup(slot_names);
+ SplitIdentifierString(rawname, ',', &namelist);
+ foreach (lc, namelist)
+ {
+ if (lc != list_head(namelist))
+ appendStringInfoChar(&s, ',');
+ appendStringInfo(&s, "%s",
+
quote_identifier(lfirst(lc)));
+ }
+ }
+
+ res = libpqrcv_PQexec(conn->streamConn, s.data);
+ pfree(s.data);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("could not receive list of slots the
primary server: %s",
+
pchomp(PQerrorMessage(conn->streamConn)))));
+ }
+ if (PQnfields(res) < 10)
+ {
+ int nfields = PQnfields(res);
+
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("invalid response from primary server"),
+ errdetail("Could not get list of slots: got %d
fields, expected %d or more fields.",
+ nfields, 10)));
+ }
+
+ ntuples = PQntuples(res);
+ for (int i = 0; i < ntuples; i++)
+ {
+ char *slot_type;
+
+ slot_data = palloc0(sizeof(WalRecvReplicationSlotData));
+ namestrcpy(&slot_data->persistent_data.name, PQgetvalue(res, i,
0));
+ if (!PQgetisnull(res, i, 1))
+ namestrcpy(&slot_data->persistent_data.plugin,
PQgetvalue(res, i, 1));
+ slot_type = PQgetvalue(res, i, 2);
+ if (!PQgetisnull(res, i, 3))
+ slot_data->persistent_data.database =
atooid(PQgetvalue(res, i, 3));
+ if (strcmp(slot_type, "physical") == 0)
+ {
+ if (OidIsValid(slot_data->persistent_data.database))
+ elog(ERROR, "unexpected physical replication
slot with database set");
+ }
+ if (pg_strtoint32(PQgetvalue(res, i, 5)) == 1)
+ slot_data->persistent_data.persistency = RS_TEMPORARY;
+ else
+ slot_data->persistent_data.persistency = RS_PERSISTENT;
+ if (!PQgetisnull(res, i, 6))
+ slot_data->persistent_data.xmin =
atooid(PQgetvalue(res, i, 6));
+ if (!PQgetisnull(res, i, 7))
+ slot_data->persistent_data.catalog_xmin =
atooid(PQgetvalue(res, i, 7));
+ if (!PQgetisnull(res, i, 8))
+ slot_data->persistent_data.restart_lsn =
strtou64(PQgetvalue(res, i, 8), NULL, 10);
+ if (!PQgetisnull(res, i, 9))
+ slot_data->persistent_data.confirmed_flush =
strtou64(PQgetvalue(res, i, 9), NULL, 10);
+
+ slot_data->last_sync_time = 0;
+ slotlist = lappend(slotlist, slot_data);
+ }
+
+ PQclear(res);
+
+ return slotlist;
+}
+
/*
* Start streaming WAL data from given streaming options.
*
diff --git a/src/backend/replication/logical/Makefile
b/src/backend/replication/logical/Makefile
index 2dc25e37bb..ba03eeff1c 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -25,6 +25,7 @@ OBJS = \
proto.o \
relation.o \
reorderbuffer.o \
+ slotsync.o \
snapbuild.o \
tablesync.o \
worker.o
diff --git a/src/backend/replication/logical/launcher.c
b/src/backend/replication/logical/launcher.c
index 970d170e73..14af724639 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -22,6 +22,7 @@
#include "access/htup_details.h"
#include "access/tableam.h"
#include "access/xact.h"
+#include "catalog/pg_authid.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "funcapi.h"
@@ -246,7 +247,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
* We are only interested in the leader apply worker or table sync worker.
*/
LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(Oid dbid, Oid subid, Oid relid, bool only_running)
{
int i;
LogicalRepWorker *res = NULL;
@@ -262,8 +263,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool
only_running)
if (isParallelApplyWorker(w))
continue;
- if (w->in_use && w->subid == subid && w->relid == relid &&
- (!only_running || w->proc))
+ if (w->in_use && w->dbid == dbid && w->subid == subid &&
+ w->relid == relid && (!only_running || w->proc))
{
res = w;
break;
@@ -320,9 +321,13 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char
*subname, Oid userid,
/* Sanity check - tablesync worker cannot be a subworker */
Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
- ereport(DEBUG1,
- (errmsg_internal("starting logical replication worker
for subscription \"%s\"",
- subname)));
+ if (OidIsValid(subid))
+ ereport(DEBUG1,
+ (errmsg_internal("starting logical replication
worker for subscription \"%s\"",
+ subname)));
+ else
+ ereport(DEBUG1,
+ (errmsg_internal("starting replication slot
synchronization worker")));
/* Report this after the initial starting message for consistency. */
if (max_replication_slots == 0)
@@ -359,7 +364,9 @@ retry:
* reason we do this is because if some worker failed to start up and
its
* parent has crashed while waiting, the in_use state was never cleared.
*/
- if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+ if (worker == NULL ||
+ (OidIsValid(relid) &&
+ nsyncworkers >= max_sync_workers_per_subscription))
{
bool did_cleanup = false;
@@ -455,15 +462,20 @@ retry:
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
- bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+ bgw.bgw_start_time = BgWorkerStart_ConsistentState;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
- if (is_parallel_apply_worker)
+ if (!OidIsValid(subid))
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncMain");
+ else if (is_parallel_apply_worker)
snprintf(bgw.bgw_function_name, BGW_MAXLEN,
"ParallelApplyWorkerMain");
else
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
- if (OidIsValid(relid))
+ if (!OidIsValid(subid))
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "replication slot synchronization worker");
+ else if (OidIsValid(relid))
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication worker for subscription
%u sync %u", subid, relid);
else if (is_parallel_apply_worker)
@@ -591,13 +603,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker,
int signo)
* Stop the logical replication worker for subid/relid, if any.
*/
void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid)
{
LogicalRepWorker *worker;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- worker = logicalrep_worker_find(subid, relid, false);
+ worker = logicalrep_worker_find(dbid, subid, relid, false);
if (worker)
{
@@ -640,13 +652,13 @@ logicalrep_pa_worker_stop(int slot_no, uint16 generation)
* Wake up (using latch) any logical replication worker for specified sub/rel.
*/
void
-logicalrep_worker_wakeup(Oid subid, Oid relid)
+logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid)
{
LogicalRepWorker *worker;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- worker = logicalrep_worker_find(subid, relid, true);
+ worker = logicalrep_worker_find(dbid, subid, relid, true);
if (worker)
logicalrep_worker_wakeup_ptr(worker);
@@ -888,7 +900,7 @@ ApplyLauncherRegister(void)
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
- bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+ bgw.bgw_start_time = BgWorkerStart_ConsistentState;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -1071,6 +1083,157 @@ ApplyLauncherWakeup(void)
kill(LogicalRepCtx->launcher_pid, SIGUSR1);
}
+static void
+ApplyLauncherStartSlotSync(long *wait_time)
+{
+ WalReceiverConn *wrconn;
+ char *err;
+ List *slots;
+ ListCell *lc;
+ MemoryContext tmpctx;
+ MemoryContext oldctx;
+
+ if (strcmp(synchronize_slot_names, "") == 0)
+ return;
+
+ wrconn = walrcv_connect(PrimaryConnInfo, false, false,
+ "Logical Replication
Launcher", &err);
+ if (!wrconn)
+ ereport(ERROR,
+ (errmsg("could not connect to the primary
server: %s", err)));
+
+ /* Use temporary context for the slot list and worker info. */
+ tmpctx = AllocSetContextCreate(TopMemoryContext,
+ "Logical
Replication Launcher slot sync ctx",
+
ALLOCSET_DEFAULT_SIZES);
+ oldctx = MemoryContextSwitchTo(tmpctx);
+
+ slots = walrcv_list_slots(wrconn, synchronize_slot_names);
+
+ foreach(lc, slots)
+ {
+ WalRecvReplicationSlotData *slot_data = lfirst(lc);
+ LogicalRepWorker *w;
+ TimestampTz last_sync;
+ TimestampTz now;
+ long elapsed;
+
+ if (!OidIsValid(slot_data->persistent_data.database))
+ continue;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ w = logicalrep_worker_find(slot_data->persistent_data.database,
InvalidOid,
+ InvalidOid,
false);
+ LWLockRelease(LogicalRepWorkerLock);
+
+ if (w != NULL)
+ continue; /* worker is running already */
+
+ /*
+ * If the worker is eligible to start now, launch it.
Otherwise,
+ * adjust wait_time so that we'll wake up as soon as it can be
+ * started.
+ *
+ * Each apply worker can only be restarted once per
+ * wal_retrieve_retry_interval, so that errors do not cause us
to
+ * repeatedly restart the worker as fast as possible.
+ */
+ last_sync = slot_data->last_sync_time;
+ now = GetCurrentTimestamp();
+ if (last_sync == 0 ||
+ (elapsed = TimestampDifferenceMilliseconds(last_sync,
now)) >= wal_retrieve_retry_interval)
+ {
+ slot_data->last_sync_time = now;
+
logicalrep_worker_launch(slot_data->persistent_data.database,
+
InvalidOid, NULL,
+
BOOTSTRAP_SUPERUSERID, InvalidOid,
+
DSM_HANDLE_INVALID);
+ }
+ else
+ {
+ *wait_time = Min(*wait_time,
+
wal_retrieve_retry_interval - elapsed);
+ }
+ }
+
+ /* Switch back to original memory context. */
+ MemoryContextSwitchTo(oldctx);
+ /* Clean the temporary memory. */
+ MemoryContextDelete(tmpctx);
+
+ walrcv_disconnect(wrconn);
+}
+
+static void
+ApplyLauncherStartSubs(long *wait_time)
+{
+ List *sublist;
+ ListCell *lc;
+ MemoryContext subctx;
+ MemoryContext oldctx;
+
+ /* Use temporary context to avoid leaking memory across cycles. */
+ subctx = AllocSetContextCreate(TopMemoryContext,
+ "Logical
Replication Launcher sublist",
+
ALLOCSET_DEFAULT_SIZES);
+ oldctx = MemoryContextSwitchTo(subctx);
+
+ /* Start the missing workers for enabled subscriptions. */
+ sublist = get_subscription_list();
+ foreach(lc, sublist)
+ {
+ Subscription *sub = (Subscription *) lfirst(lc);
+ LogicalRepWorker *w;
+ TimestampTz last_start;
+ TimestampTz now;
+ long elapsed;
+
+ if (!sub->enabled)
+ continue;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ w = logicalrep_worker_find(sub->dbid, sub->oid, InvalidOid,
false);
+ LWLockRelease(LogicalRepWorkerLock);
+
+ if (w != NULL)
+ continue; /* worker is running already */
+
+ /*
+ * If the worker is eligible to start now, launch it.
Otherwise,
+ * adjust wait_time so that we'll wake up as soon as it can be
+ * started.
+ *
+ * Each subscription's apply worker can only be restarted once
per
+ * wal_retrieve_retry_interval, so that errors do not cause us
to
+ * repeatedly restart the worker as fast as possible. In cases
+ * where a restart is expected (e.g., subscription parameter
+ * changes), another process should remove the last-start entry
+ * for the subscription so that the worker can be restarted
+ * without waiting for wal_retrieve_retry_interval to elapse.
+ */
+ last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
+ now = GetCurrentTimestamp();
+ if (last_start == 0 ||
+ (elapsed = TimestampDifferenceMilliseconds(last_start,
now)) >= wal_retrieve_retry_interval)
+ {
+ ApplyLauncherSetWorkerStartTime(sub->oid, now);
+ logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+
sub->owner, InvalidOid,
+
DSM_HANDLE_INVALID);
+ }
+ else
+ {
+ *wait_time = Min(*wait_time,
+
wal_retrieve_retry_interval - elapsed);
+ }
+ }
+
+ /* Switch back to original memory context. */
+ MemoryContextSwitchTo(oldctx);
+ /* Clean the temporary memory. */
+ MemoryContextDelete(subctx);
+}
+
/*
* Main loop for the apply launcher process.
*/
@@ -1096,78 +1259,20 @@ ApplyLauncherMain(Datum main_arg)
*/
BackgroundWorkerInitializeConnection(NULL, NULL, 0);
+ load_file("libpqwalreceiver", false);
+
/* Enter main loop */
for (;;)
{
int rc;
- List *sublist;
- ListCell *lc;
- MemoryContext subctx;
- MemoryContext oldctx;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
CHECK_FOR_INTERRUPTS();
- /* Use temporary context to avoid leaking memory across cycles.
*/
- subctx = AllocSetContextCreate(TopMemoryContext,
-
"Logical Replication Launcher sublist",
-
ALLOCSET_DEFAULT_SIZES);
- oldctx = MemoryContextSwitchTo(subctx);
-
- /* Start any missing workers for enabled subscriptions. */
- sublist = get_subscription_list();
- foreach(lc, sublist)
- {
- Subscription *sub = (Subscription *) lfirst(lc);
- LogicalRepWorker *w;
- TimestampTz last_start;
- TimestampTz now;
- long elapsed;
-
- if (!sub->enabled)
- continue;
-
- LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- w = logicalrep_worker_find(sub->oid, InvalidOid, false);
- LWLockRelease(LogicalRepWorkerLock);
-
- if (w != NULL)
- continue; /* worker is running
already */
-
- /*
- * If the worker is eligible to start now, launch it.
Otherwise,
- * adjust wait_time so that we'll wake up as soon as it
can be
- * started.
- *
- * Each subscription's apply worker can only be
restarted once per
- * wal_retrieve_retry_interval, so that errors do not
cause us to
- * repeatedly restart the worker as fast as possible.
In cases
- * where a restart is expected (e.g., subscription
parameter
- * changes), another process should remove the
last-start entry
- * for the subscription so that the worker can be
restarted
- * without waiting for wal_retrieve_retry_interval to
elapse.
- */
- last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
- now = GetCurrentTimestamp();
- if (last_start == 0 ||
- (elapsed =
TimestampDifferenceMilliseconds(last_start, now)) >=
wal_retrieve_retry_interval)
- {
- ApplyLauncherSetWorkerStartTime(sub->oid, now);
- logicalrep_worker_launch(sub->dbid, sub->oid,
sub->name,
-
sub->owner, InvalidOid,
-
DSM_HANDLE_INVALID);
- }
- else
- {
- wait_time = Min(wait_time,
-
wal_retrieve_retry_interval - elapsed);
- }
- }
-
- /* Switch back to original memory context. */
- MemoryContextSwitchTo(oldctx);
- /* Clean the temporary memory. */
- MemoryContextDelete(subctx);
+ if (!RecoveryInProgress())
+ ApplyLauncherStartSubs(&wait_time);
+ else
+ ApplyLauncherStartSlotSync(&wait_time);
/* Wait for more work. */
rc = WaitLatch(MyLatch,
diff --git a/src/backend/replication/logical/meson.build
b/src/backend/replication/logical/meson.build
index d48cd4c590..9e52ec421f 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -11,6 +11,7 @@ backend_sources += files(
'proto.c',
'relation.c',
'reorderbuffer.c',
+ 'slotsync.c',
'snapbuild.c',
'tablesync.c',
'worker.c',
diff --git a/src/backend/replication/logical/reorderbuffer.c
b/src/backend/replication/logical/reorderbuffer.c
index 9f44974473..1519b0ec64 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -95,11 +95,14 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
+#include "replication/logicalworker.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
#include "storage/bufmgr.h"
#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/combocid.h"
@@ -107,6 +110,7 @@
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
+#include "utils/varlena.h"
/* entry for a hash table we use to map from xid to our transaction state */
@@ -2053,6 +2057,85 @@ ReorderBufferResetTXN(ReorderBuffer *rb,
ReorderBufferTXN *txn,
}
}
+static void
+wait_for_standby_confirmation(XLogRecPtr commit_lsn)
+{
+ char *rawname;
+ List *namelist;
+ ListCell *lc;
+ XLogRecPtr flush_pos = InvalidXLogRecPtr;
+
+ if (strcmp(standby_slot_names, "") == 0)
+ return;
+
+ rawname = pstrdup(standby_slot_names);
+ SplitIdentifierString(rawname, ',', &namelist);
+
+ while (true)
+ {
+ int wait_slots_remaining;
+ XLogRecPtr oldest_flush_pos = InvalidXLogRecPtr;
+ int rc;
+
+ wait_slots_remaining = list_length(namelist);
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (int i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s =
&ReplicationSlotCtl->replication_slots[i];
+ bool inlist;
+
+ if (!s->in_use)
+ continue;
+
+ inlist = false;
+ foreach (lc, namelist)
+ {
+ char *name = lfirst(lc);
+ if (strcmp(name, NameStr(s->data.name)) == 0)
+ {
+ inlist = true;
+ break;
+ }
+ }
+ if (!inlist)
+ continue;
+
+ SpinLockAcquire(&s->mutex);
+
+ if (s->data.database == InvalidOid)
+ /* Physical slots advance restart_lsn on flush
and ignore confirmed_flush_lsn */
+ flush_pos = s->data.restart_lsn;
+ else
+ /* For logical slots we must wait for commit
and flush */
+ flush_pos = s->data.confirmed_flush;
+
+ SpinLockRelease(&s->mutex);
+
+ /* We want to find out the min(flush pos) over all
named slots */
+ if (oldest_flush_pos == InvalidXLogRecPtr
+ || oldest_flush_pos > flush_pos)
+ oldest_flush_pos = flush_pos;
+
+ if (flush_pos >= commit_lsn && wait_slots_remaining > 0)
+ wait_slots_remaining --;
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+
+ if (wait_slots_remaining == 0)
+ return;
+
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT |
WL_POSTMASTER_DEATH,
+ 1000L, PG_WAIT_EXTENSION);
+
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+
+ CHECK_FOR_INTERRUPTS();
+ }
+}
+
/*
* Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
*
@@ -2502,6 +2585,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
ReorderBufferTXN *txn,
* Call either PREPARE (for two-phase transactions) or
COMMIT (for
* regular ones).
*/
+
+ wait_for_standby_confirmation(commit_lsn);
+
if (rbtxn_prepared(txn))
rb->prepare(rb, txn, commit_lsn);
else
diff --git a/src/backend/replication/logical/slotsync.c
b/src/backend/replication/logical/slotsync.c
new file mode 100644
index 0000000000..529ddb21ae
--- /dev/null
+++ b/src/backend/replication/logical/slotsync.c
@@ -0,0 +1,413 @@
+/*-------------------------------------------------------------------------
+ * slotsync.c
+ * PostgreSQL worker for synchronizing slots to a standby from primary
+ *
+ * Copyright (c) 2016-2018, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/slotsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "commands/dbcommands.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
+#include "replication/walreceiver.h"
+#include "replication/worker_internal.h"
+#include "storage/ipc.h"
+#include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/varlena.h"
+
+char *synchronize_slot_names;
+char *standby_slot_names;
+
+/*
+ * Wait for remote slot to pass localy reserved position.
+ */
+static void
+wait_for_primary_slot_catchup(WalReceiverConn *wrconn, char *slot_name,
+ XLogRecPtr min_lsn)
+{
+ WalRcvExecResult *res;
+ TupleTableSlot *slot;
+ Oid slotRow[1] = {LSNOID};
+ StringInfoData cmd;
+ bool isnull;
+ XLogRecPtr restart_lsn;
+
+ for (;;)
+ {
+ int rc;
+
+ CHECK_FOR_INTERRUPTS();
+
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd,
+ "SELECT restart_lsn"
+ " FROM
pg_catalog.pg_replication_slots"
+ " WHERE slot_name = %s",
+ quote_literal_cstr(slot_name));
+ res = walrcv_exec(wrconn, cmd.data, 1, slotRow);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not fetch slot info for
slot \"%s\" from primary: %s",
+ slot_name, res->err)));
+
+ slot = MakeSingleTupleTableSlot(res->tupledesc,
&TTSOpsMinimalTuple);
+ if (!tuplestore_gettupleslot(res->tuplestore, true, false,
slot))
+ ereport(ERROR,
+ (errmsg("slot \"%s\" disapeared from
provider",
+ slot_name)));
+
+ restart_lsn = DatumGetLSN(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+
+ ExecClearTuple(slot);
+ walrcv_clear_result(res);
+
+ if (restart_lsn >= min_lsn)
+ break;
+
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT |
WL_POSTMASTER_DEATH,
+ wal_retrieve_retry_interval,
+ WAIT_EVENT_REPL_SLOT_SYNC_MAIN);
+
+ ResetLatch(MyLatch);
+
+ /* emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+ }
+}
+
+/*
+ * Synchronize single slot to given position.
+ *
+ * This optionally creates new slot if there is no existing one.
+ */
+static void
+synchronize_one_slot(WalReceiverConn *wrconn, char *slot_name, char *database,
+ char *plugin_name, XLogRecPtr
target_lsn)
+{
+ bool found = false;
+ XLogRecPtr endlsn;
+
+ /* Search for the named slot and mark it active if we find it. */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (int i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (!s->in_use)
+ continue;
+
+ if (strcmp(NameStr(s->data.name), slot_name) == 0)
+ {
+ found = true;
+ break;
+ }
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+
+ StartTransactionCommand();
+
+ /* Already existing slot, acquire */
+ if (found)
+ {
+ ReplicationSlotAcquire(slot_name, true);
+
+ if (target_lsn < MyReplicationSlot->data.confirmed_flush)
+ {
+ elog(DEBUG1,
+ "not synchronizing slot %s; synchronization
would move it backward",
+ slot_name);
+
+ ReplicationSlotRelease();
+ CommitTransactionCommand();
+ return;
+ }
+ }
+ /* Otherwise create the slot first. */
+ else
+ {
+ TransactionId xmin_horizon = InvalidTransactionId;
+ ReplicationSlot *slot;
+
+ ReplicationSlotCreate(slot_name, true, RS_EPHEMERAL, false);
+ slot = MyReplicationSlot;
+
+ SpinLockAcquire(&slot->mutex);
+ slot->data.database = get_database_oid(database, false);
+ namestrcpy(&slot->data.plugin, plugin_name);
+ SpinLockRelease(&slot->mutex);
+
+ ReplicationSlotReserveWal();
+
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ xmin_horizon = GetOldestSafeDecodingTransactionId(true);
+ slot->effective_catalog_xmin = xmin_horizon;
+ slot->data.catalog_xmin = xmin_horizon;
+ ReplicationSlotsComputeRequiredXmin(true);
+ LWLockRelease(ProcArrayLock);
+
+ if (target_lsn < MyReplicationSlot->data.restart_lsn)
+ {
+ ereport(LOG,
+ errmsg("waiting for remote slot \"%s\"
LSN (%X/%X) to pass local slot LSN (%X/%X)",
+ slot_name,
+ LSN_FORMAT_ARGS(target_lsn),
LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn)));
+
+ wait_for_primary_slot_catchup(wrconn, slot_name,
+
MyReplicationSlot->data.restart_lsn);
+ }
+
+ ReplicationSlotPersist();
+ }
+
+ endlsn = pg_logical_replication_slot_advance(target_lsn);
+
+ elog(DEBUG3, "synchronized slot %s to lsn (%X/%X)",
+ slot_name, LSN_FORMAT_ARGS(endlsn));
+
+ ReplicationSlotRelease();
+ CommitTransactionCommand();
+}
+
+static void
+synchronize_slots(void)
+{
+ WalRcvExecResult *res;
+ WalReceiverConn *wrconn = NULL;
+ TupleTableSlot *slot;
+ Oid slotRow[3] = {TEXTOID, TEXTOID, LSNOID};
+ StringInfoData s;
+ char *database;
+ char *err;
+ MemoryContext oldctx = CurrentMemoryContext;
+
+ if (!WalRcv)
+ return;
+
+ /* syscache access needs a transaction env. */
+ StartTransactionCommand();
+ /* make dbname live outside TX context */
+ MemoryContextSwitchTo(oldctx);
+
+ database = get_database_name(MyDatabaseId);
+ initStringInfo(&s);
+ appendStringInfo(&s, "%s dbname=%s", PrimaryConnInfo, database);
+ wrconn = walrcv_connect(s.data, true, false, "slot_sync", &err);
+
+ if (wrconn == NULL)
+ ereport(ERROR,
+ (errmsg("could not connect to the primary
server: %s", err)));
+
+ resetStringInfo(&s);
+ appendStringInfo(&s,
+ "SELECT slot_name, plugin,
confirmed_flush_lsn"
+ " FROM
pg_catalog.pg_replication_slots"
+ " WHERE database = %s",
+ quote_literal_cstr(database));
+ if (strcmp(synchronize_slot_names, "") != 0 &&
strcmp(synchronize_slot_names, "*") != 0)
+ {
+ char *rawname;
+ List *namelist;
+ ListCell *lc;
+
+ rawname = pstrdup(synchronize_slot_names);
+ SplitIdentifierString(rawname, ',', &namelist);
+
+ appendStringInfoString(&s, " AND slot_name IN (");
+ foreach (lc, namelist)
+ {
+ if (lc != list_head(namelist))
+ appendStringInfoChar(&s, ',');
+ appendStringInfo(&s, "%s",
+
quote_literal_cstr(lfirst(lc)));
+ }
+ appendStringInfoChar(&s, ')');
+ }
+
+ res = walrcv_exec(wrconn, s.data, 3, slotRow);
+ pfree(s.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not fetch slot info from
primary: %s",
+ res->err)));
+
+ CommitTransactionCommand();
+ /* CommitTransactionCommand switches to TopMemoryContext */
+ MemoryContextSwitchTo(oldctx);
+
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ char *slot_name;
+ char *plugin_name;
+ XLogRecPtr confirmed_flush_lsn;
+ bool isnull;
+
+ slot_name = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+
+ plugin_name = TextDatumGetCString(slot_getattr(slot, 2,
&isnull));
+ Assert(!isnull);
+
+ confirmed_flush_lsn = DatumGetLSN(slot_getattr(slot, 3,
&isnull));
+ Assert(!isnull);
+
+ synchronize_one_slot(wrconn, slot_name, database, plugin_name,
+ confirmed_flush_lsn);
+
+ ExecClearTuple(slot);
+ }
+
+ walrcv_clear_result(res);
+ pfree(database);
+
+ walrcv_disconnect(wrconn);
+}
+
+/*
+ * The main loop of our worker process.
+ */
+void
+ReplSlotSyncMain(Datum main_arg)
+{
+ int worker_slot = DatumGetInt32(main_arg);
+
+ /* Attach to slot */
+ logicalrep_worker_attach(worker_slot);
+
+ /* Establish signal handlers. */
+ BackgroundWorkerUnblockSignals();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
+ /* Connect to our database. */
+ BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
+
MyLogicalRepWorker->userid,
+
0);
+
+ StartTransactionCommand();
+ ereport(LOG,
+ (errmsg("replication slot synchronization worker for
database \"%s\" has started",
+
get_database_name(MyLogicalRepWorker->dbid))));
+ CommitTransactionCommand();
+
+ /* Main wait loop. */
+ for (;;)
+ {
+ int rc;
+
+ CHECK_FOR_INTERRUPTS();
+
+ if (!RecoveryInProgress())
+ return;
+
+ if (strcmp(synchronize_slot_names, "") == 0)
+ return;
+
+ synchronize_slots();
+
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT |
WL_POSTMASTER_DEATH,
+ wal_retrieve_retry_interval,
+ WAIT_EVENT_REPL_SLOT_SYNC_MAIN);
+
+ ResetLatch(MyLatch);
+
+ /* emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+ }
+}
+
+/*
+ * Routines for handling the GUC variable(s)
+ */
+
+bool
+check_synchronize_slot_names(char **newval, void **extra, GucSource source)
+{
+ /* Special handling for "*" which means all. */
+ if (strcmp(*newval, "*") == 0)
+ {
+ return true;
+ }
+ else
+ {
+ char *rawname;
+ List *namelist;
+ ListCell *lc;
+
+ /* Need a modifiable copy of string */
+ rawname = pstrdup(*newval);
+
+ /* Parse string into list of identifiers */
+ if (!SplitIdentifierString(rawname, ',', &namelist))
+ {
+ /* syntax error in name list */
+ GUC_check_errdetail("List syntax is invalid.");
+ pfree(rawname);
+ list_free(namelist);
+ return false;
+ }
+
+ foreach(lc, namelist)
+ {
+ char *curname = (char *) lfirst(lc);
+
+ ReplicationSlotValidateName(curname, ERROR);
+ }
+
+ pfree(rawname);
+ list_free(namelist);
+ }
+
+ return true;
+}
+
+
+bool
+check_standby_slot_names(char **newval, void **extra, GucSource source)
+{
+ char *rawname;
+ List *namelist;
+ ListCell *lc;
+
+ /* Need a modifiable copy of string */
+ rawname = pstrdup(*newval);
+
+ /* Parse string into list of identifiers */
+ if (!SplitIdentifierString(rawname, ',', &namelist))
+ {
+ /* syntax error in name list */
+ GUC_check_errdetail("List syntax is invalid.");
+ pfree(rawname);
+ list_free(namelist);
+ return false;
+ }
+
+ foreach(lc, namelist)
+ {
+ char *curname = (char *) lfirst(lc);
+
+ ReplicationSlotValidateName(curname, ERROR);
+ }
+
+ pfree(rawname);
+ list_free(namelist);
+
+ return true;
+}
diff --git a/src/backend/replication/logical/tablesync.c
b/src/backend/replication/logical/tablesync.c
index 6dce355633..2307d187e4 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -100,6 +100,7 @@
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
+#include "commands/subscriptioncmds.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "parser/parse_relation.h"
@@ -155,7 +156,8 @@ finish_sync_worker(void)
CommitTransactionCommand();
/* Find the leader apply worker and signal it. */
- logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+ logicalrep_worker_wakeup(MyLogicalRepWorker->dbid,
+
MyLogicalRepWorker->subid, InvalidOid);
/* Stop gracefully */
proc_exit(0);
@@ -195,7 +197,8 @@ wait_for_relation_state_change(Oid relid, char
expected_state)
/* Check if the sync worker is still running and bail if not. */
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
relid,
+ worker = logicalrep_worker_find(MyLogicalRepWorker->dbid,
+
MyLogicalRepWorker->subid, relid,
false);
LWLockRelease(LogicalRepWorkerLock);
if (!worker)
@@ -242,7 +245,8 @@ wait_for_worker_state_change(char expected_state)
* waiting.
*/
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+ worker = logicalrep_worker_find(MyLogicalRepWorker->dbid,
+
MyLogicalRepWorker->subid,
InvalidOid, false);
if (worker && worker->proc)
logicalrep_worker_wakeup_ptr(worker);
@@ -508,7 +512,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
*/
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- syncworker =
logicalrep_worker_find(MyLogicalRepWorker->subid,
+ syncworker =
logicalrep_worker_find(MyLogicalRepWorker->dbid,
+
MyLogicalRepWorker->subid,
rstate->relid, false);
if (syncworker)
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index 3d58910c14..b9354bd023 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1600,7 +1600,8 @@ apply_handle_stream_start(StringInfo s)
* Signal the leader apply worker, as it may be
waiting for
* us.
*/
-
logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+
logicalrep_worker_wakeup(MyLogicalRepWorker->dbid,
+
MyLogicalRepWorker->subid, InvalidOid);
}
parallel_stream_nchanges = 0;
diff --git a/src/backend/replication/repl_gram.y
b/src/backend/replication/repl_gram.y
index 0c874e33cf..12a4b74368 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -76,11 +76,12 @@ Node *replication_parse_result;
%token K_EXPORT_SNAPSHOT
%token K_NOEXPORT_SNAPSHOT
%token K_USE_SNAPSHOT
+%token K_LIST_SLOTS
%type <node> command
%type <node> base_backup start_replication start_logical_replication
create_replication_slot drop_replication_slot
identify_system
- read_replication_slot timeline_history show
+ read_replication_slot timeline_history show
list_slots
%type <list> generic_option_list
%type <defelt> generic_option
%type <uintval> opt_timeline
@@ -91,6 +92,7 @@ Node *replication_parse_result;
%type <boolval> opt_temporary
%type <list> create_slot_options create_slot_legacy_opt_list
%type <defelt> create_slot_legacy_opt
+%type <list> slot_name_list slot_name_list_opt
%%
@@ -114,6 +116,7 @@ command:
| read_replication_slot
| timeline_history
| show
+ | list_slots
;
/*
@@ -126,6 +129,33 @@ identify_system:
}
;
+slot_name_list:
+ IDENT
+ {
+ $$ = list_make1($1);
+ }
+ | slot_name_list ',' IDENT
+ {
+ $$ = lappend($1, $3);
+ }
+
+slot_name_list_opt:
+ slot_name_list { $$ = $1; }
+ | /* EMPTY */ { $$ = NIL; }
+ ;
+
+/*
+ * LIST_SLOTS
+ */
+list_slots:
+ K_LIST_SLOTS slot_name_list_opt
+ {
+ ListSlotsCmd *cmd =
makeNode(ListSlotsCmd);
+ cmd->slot_names = $2;
+ $$ = (Node *) cmd;
+ }
+ ;
+
/*
* READ_REPLICATION_SLOT %s
*/
diff --git a/src/backend/replication/repl_scanner.l
b/src/backend/replication/repl_scanner.l
index cb467ca46f..9501df38eb 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -128,6 +128,7 @@ DROP_REPLICATION_SLOT { return
K_DROP_REPLICATION_SLOT; }
TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
PHYSICAL { return K_PHYSICAL; }
RESERVE_WAL { return K_RESERVE_WAL; }
+LIST_SLOTS { return K_LIST_SLOTS; }
LOGICAL { return K_LOGICAL; }
SLOT { return K_SLOT; }
TEMPORARY { return K_TEMPORARY; }
@@ -304,6 +305,7 @@ replication_scanner_is_replication_command(void)
case K_READ_REPLICATION_SLOT:
case K_TIMELINE_HISTORY:
case K_SHOW:
+ case K_LIST_SLOTS:
/* Yes; push back the first token so we can parse
later. */
repl_pushed_back_token = first_token;
return true;
diff --git a/src/backend/replication/slotfuncs.c
b/src/backend/replication/slotfuncs.c
index 6035cf4816..83ada6db6a 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -467,7 +467,7 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
* WAL and removal of old catalog tuples. As decoding is done in fast_forward
* mode, no changes are generated anyway.
*/
-static XLogRecPtr
+XLogRecPtr
pg_logical_replication_slot_advance(XLogRecPtr moveto)
{
LogicalDecodingContext *ctx;
diff --git a/src/backend/replication/walsender.c
b/src/backend/replication/walsender.c
index 45b8b3684f..0d01b8967a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -473,6 +473,194 @@ IdentifySystem(void)
end_tup_output(tstate);
}
+static int
+pg_qsort_namecmp(const void *a, const void *b)
+{
+ return strncmp(NameStr(*(Name) a), NameStr(*(Name) b), NAMEDATALEN);
+}
+
+/*
+ * Handle the LIST_SLOTS command.
+ */
+static void
+ListSlots(ListSlotsCmd *cmd)
+{
+ DestReceiver *dest;
+ TupOutputState *tstate;
+ TupleDesc tupdesc;
+ NameData *slot_names;
+ int numslot_names;
+
+ numslot_names = list_length(cmd->slot_names);
+ if (numslot_names)
+ {
+ ListCell *lc;
+ int i = 0;
+
+ slot_names = palloc(numslot_names * sizeof(NameData));
+ foreach(lc, cmd->slot_names)
+ {
+ char *slot_name = lfirst(lc);
+
+ ReplicationSlotValidateName(slot_name, ERROR);
+ namestrcpy(&slot_names[i++], slot_name);
+ }
+
+ qsort(slot_names, numslot_names, sizeof(NameData),
pg_qsort_namecmp);
+ }
+
+ dest = CreateDestReceiver(DestRemoteSimple);
+
+ /* need a tuple descriptor representing four columns */
+ tupdesc = CreateTemplateTupleDesc(10);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "plugin",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "slot_type",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "datoid",
+ INT8OID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "database",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 6, "temporary",
+ INT4OID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 7, "xmin",
+ INT8OID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 8, "catalog_xmin",
+ INT8OID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 9, "restart_lsn",
+ TEXTOID, -1, 0);
+ TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 10, "confirmed_flush",
+ TEXTOID, -1, 0);
+
+ /* prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (int slotno = 0; slotno < max_replication_slots; slotno++)
+ {
+ ReplicationSlot *slot =
&ReplicationSlotCtl->replication_slots[slotno];
+ char restart_lsn_str[MAXFNAMELEN];
+ char confirmed_flush_lsn_str[MAXFNAMELEN];
+ Datum values[10];
+ bool nulls[10];
+
+ ReplicationSlotPersistency persistency;
+ TransactionId xmin;
+ TransactionId catalog_xmin;
+ XLogRecPtr restart_lsn;
+ XLogRecPtr confirmed_flush_lsn;
+ Oid datoid;
+ NameData slot_name;
+ NameData plugin;
+ int i;
+ int64 tmpbigint;
+
+ if (!slot->in_use)
+ continue;
+
+ SpinLockAcquire(&slot->mutex);
+
+ xmin = slot->data.xmin;
+ catalog_xmin = slot->data.catalog_xmin;
+ datoid = slot->data.database;
+ restart_lsn = slot->data.restart_lsn;
+ confirmed_flush_lsn = slot->data.confirmed_flush;
+ namestrcpy(&slot_name, NameStr(slot->data.name));
+ namestrcpy(&plugin, NameStr(slot->data.plugin));
+ persistency = slot->data.persistency;
+
+ SpinLockRelease(&slot->mutex);
+
+ if (numslot_names &&
+ !bsearch((void *) &slot_name, (void *) slot_names,
+ numslot_names, sizeof(NameData),
pg_qsort_namecmp))
+ continue;
+
+ memset(nulls, 0, sizeof(nulls));
+
+ i = 0;
+ values[i++] = CStringGetTextDatum(NameStr(slot_name));
+
+ if (datoid == InvalidOid)
+ nulls[i++] = true;
+ else
+ values[i++] = CStringGetTextDatum(NameStr(plugin));
+
+ if (datoid == InvalidOid)
+ values[i++] = CStringGetTextDatum("physical");
+ else
+ values[i++] = CStringGetTextDatum("logical");
+
+ if (datoid == InvalidOid)
+ nulls[i++] = true;
+ else
+ {
+ tmpbigint = datoid;
+ values[i++] = Int64GetDatum(tmpbigint);
+ }
+
+ if (datoid == InvalidOid)
+ nulls[i++] = true;
+ else
+ {
+ MemoryContext cur = CurrentMemoryContext;
+
+ /* syscache access needs a transaction env. */
+ StartTransactionCommand();
+ /* make dbname live outside TX context */
+ MemoryContextSwitchTo(cur);
+ values[i++] =
CStringGetTextDatum(get_database_name(datoid));
+ CommitTransactionCommand();
+ /* CommitTransactionCommand switches to
TopMemoryContext */
+ MemoryContextSwitchTo(cur);
+ }
+
+ values[i++] = Int32GetDatum(persistency == RS_TEMPORARY ? 1 :
0);
+
+ if (xmin != InvalidTransactionId)
+ {
+ tmpbigint = xmin;
+ values[i++] = Int64GetDatum(tmpbigint);
+ }
+ else
+ nulls[i++] = true;
+
+ if (catalog_xmin != InvalidTransactionId)
+ {
+ tmpbigint = catalog_xmin;
+ values[i++] = Int64GetDatum(tmpbigint);
+ }
+ else
+ nulls[i++] = true;
+
+ if (restart_lsn != InvalidXLogRecPtr)
+ {
+ snprintf(restart_lsn_str, sizeof(restart_lsn_str),
"%X/%X",
+ LSN_FORMAT_ARGS(restart_lsn));
+ values[i++] = CStringGetTextDatum(restart_lsn_str);
+ }
+ else
+ nulls[i++] = true;
+
+ if (confirmed_flush_lsn != InvalidXLogRecPtr)
+ {
+ snprintf(confirmed_flush_lsn_str,
sizeof(confirmed_flush_lsn_str),
+ "%X/%X",
LSN_FORMAT_ARGS(confirmed_flush_lsn));
+ values[i++] =
CStringGetTextDatum(confirmed_flush_lsn_str);
+ }
+ else
+ nulls[i++] = true;
+
+ /* send it to dest */
+ do_tup_output(tstate, values, nulls);
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+
+ end_tup_output(tstate);
+}
+
/* Handle READ_REPLICATION_SLOT command */
static void
ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
@@ -1820,6 +2008,13 @@ exec_replication_command(const char *cmd_string)
EndReplicationCommand(cmdtag);
break;
+ case T_ListSlotsCmd:
+ cmdtag = "LIST_SLOTS";
+ set_ps_display(cmdtag);
+ ListSlots((ListSlotsCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
+ break;
+
case T_StartReplicationCmd:
{
StartReplicationCmd *cmd = (StartReplicationCmd
*) cmd_node;
diff --git a/src/backend/utils/activity/wait_event.c
b/src/backend/utils/activity/wait_event.c
index 7940d64639..f2a9517091 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -230,6 +230,9 @@ pgstat_get_wait_activity(WaitEventActivity w)
case WAIT_EVENT_LOGICAL_LAUNCHER_MAIN:
event_name = "LogicalLauncherMain";
break;
+ case WAIT_EVENT_REPL_SLOT_SYNC_MAIN:
+ event_name = "ReplSlotSyncMain";
+ break;
case WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN:
event_name = "LogicalParallelApplyMain";
break;
diff --git a/src/backend/utils/misc/guc_tables.c
b/src/backend/utils/misc/guc_tables.c
index cab3ddbe11..0ee7ad1348 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -63,8 +63,12 @@
#include "postmaster/syslogger.h"
#include "postmaster/walwriter.h"
#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
+#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/syncrep.h"
+#include "replication/walreceiver.h"
+#include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/large_object.h"
#include "storage/pg_shmem.h"
@@ -4587,6 +4591,28 @@ struct config_string ConfigureNamesString[] =
check_io_direct, assign_io_direct, NULL
},
+ {
+ {"synchronize_slot_names", PGC_SIGHUP, REPLICATION_STANDBY,
+ gettext_noop("Sets the names of replication slots which
to synchronize from primary to standby."),
+ gettext_noop("Value of \"*\" means all."),
+ GUC_LIST_INPUT | GUC_LIST_QUOTE
+ },
+ &synchronize_slot_names,
+ "",
+ check_synchronize_slot_names, NULL, NULL
+ },
+
+ {
+ {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY,
+ gettext_noop("List of physical slots that must confirm
changes before changes are sent to logical replication consumers."),
+ NULL,
+ GUC_LIST_INPUT | GUC_LIST_QUOTE
+ },
+ &standby_slot_names,
+ "",
+ check_standby_slot_names, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample
b/src/backend/utils/misc/postgresql.conf.sample
index dce5049bc2..2ff2188c02 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -330,6 +330,7 @@
# and comma-separated list of application_name
# from standby(s); '*' = all
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed
+#standby_slot_names = '' # physical standby slot names that logical
replication waits for
# - Standby Servers -
@@ -357,6 +358,7 @@
#wal_retrieve_retry_interval = 5s # time to wait before retrying to
# retrieve WAL after a failed attempt
#recovery_min_apply_delay = 0 # minimum delay for applying changes
during recovery
+#synchronize_slot_names = '' # logical replication slots to sync to
standby
# - Subscribers -
diff --git a/src/include/commands/subscriptioncmds.h
b/src/include/commands/subscriptioncmds.h
index 214dc6c29e..0e77f9ee5c 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -17,6 +17,7 @@
#include "catalog/objectaddress.h"
#include "parser/parse_node.h"
+#include "replication/walreceiver.h"
extern ObjectAddress CreateSubscription(ParseState *pstate,
CreateSubscriptionStmt *stmt,
bool isTopLevel);
@@ -28,4 +29,6 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid
newOwnerId);
extern char defGetStreamingMode(DefElem *def);
+extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char
*slotname, bool missing_ok);
+
#endif /* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 4321ba8f86..980e0b2ee2 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -33,6 +33,15 @@ typedef struct IdentifySystemCmd
NodeTag type;
} IdentifySystemCmd;
+/* ----------------------
+ * LIST_SLOTS command
+ * ----------------------
+ */
+typedef struct ListSlotsCmd
+{
+ NodeTag type;
+ List *slot_names;
+} ListSlotsCmd;
/* ----------------------
* BASE_BACKUP command
diff --git a/src/include/replication/logicallauncher.h
b/src/include/replication/logicallauncher.h
index a07c9cb311..80fdbf9657 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -31,4 +31,6 @@ extern bool IsLogicalLauncher(void);
extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
+extern PGDLLIMPORT char *PrimaryConnInfo;
+
#endif /* LOGICALLAUNCHER_H */
diff --git a/src/include/replication/logicalworker.h
b/src/include/replication/logicalworker.h
index 39588da79f..6408753557 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,10 +14,16 @@
#include <signal.h>
+#include "utils/guc.h"
+
+extern char *synchronize_slot_names;
+extern char *standby_slot_names;
+
extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
extern void ApplyWorkerMain(Datum main_arg);
extern void ParallelApplyWorkerMain(Datum main_arg);
+extern void ReplSlotSyncMain(Datum main_arg);
extern bool IsLogicalWorker(void);
extern bool IsLogicalParallelApplyWorker(void);
@@ -29,4 +35,7 @@ extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
extern void AtEOXact_LogicalRepWorkers(bool isCommit);
+extern bool check_synchronize_slot_names(char **newval, void **extra,
GucSource source);
+extern bool check_standby_slot_names(char **newval, void **extra, GucSource
source);
+
#endif /* LOGICALWORKER_H */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a8a89dc784..5dc2e0d30d 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,7 +15,6 @@
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/spin.h"
-#include "replication/walreceiver.h"
/*
* Behaviour of replication slots, upon release or crash.
@@ -238,7 +237,6 @@ extern ReplicationSlot *SearchNamedReplicationSlot(const
char *name, bool need_l
extern int ReplicationSlotIndex(ReplicationSlot *slot);
extern bool ReplicationSlotName(int index, Name name);
extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char
*syncslotname, Size szslot);
-extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char
*slotname, bool missing_ok);
extern void StartupReplicationSlots(void);
extern void CheckPointReplicationSlots(void);
@@ -246,4 +244,7 @@ extern void CheckPointReplicationSlots(void);
extern void CheckSlotRequirements(void);
extern void CheckSlotPermissions(void);
+extern XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto);
+
+
#endif /* SLOT_H */
diff --git a/src/include/replication/walreceiver.h
b/src/include/replication/walreceiver.h
index 281626fa6f..9e9d64faf2 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -20,6 +20,7 @@
#include "pgtime.h"
#include "port/atomics.h"
#include "replication/logicalproto.h"
+#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/condition_variable.h"
#include "storage/latch.h"
@@ -191,6 +192,17 @@ typedef struct
} proto;
} WalRcvStreamOptions;
+/*
+ * Slot information receiver from remote.
+ *
+ * Currently same as ReplicationSlotPersistentData except last_sync_time
+ */
+typedef struct WalRecvReplicationSlotData
+{
+ ReplicationSlotPersistentData persistent_data;
+ TimestampTz last_sync_time;
+} WalRecvReplicationSlotData;
+
struct WalReceiverConn;
typedef struct WalReceiverConn WalReceiverConn;
@@ -280,6 +292,11 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn
*conn,
typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
TimeLineID *primary_tli);
+/*
+ * TODO
+ */
+typedef List *(*walrcv_list_slots_fn) (WalReceiverConn *conn, const char
*slots);
+
/*
* walrcv_server_version_fn
*
@@ -393,6 +410,7 @@ typedef struct WalReceiverFunctionsType
walrcv_get_conninfo_fn walrcv_get_conninfo;
walrcv_get_senderinfo_fn walrcv_get_senderinfo;
walrcv_identify_system_fn walrcv_identify_system;
+ walrcv_list_slots_fn walrcv_list_slots;
walrcv_server_version_fn walrcv_server_version;
walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
walrcv_startstreaming_fn walrcv_startstreaming;
@@ -417,6 +435,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType
*WalReceiverFunctions;
WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host,
sender_port)
#define walrcv_identify_system(conn, primary_tli) \
WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
+#define walrcv_list_slots(conn, slots) \
+ WalReceiverFunctions->walrcv_list_slots(conn, slots)
#define walrcv_server_version(conn) \
WalReceiverFunctions->walrcv_server_version(conn)
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
diff --git a/src/include/replication/worker_internal.h
b/src/include/replication/worker_internal.h
index dce71d2c50..5b4fda2fd9 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -65,7 +65,7 @@ typedef struct LogicalRepWorker
* would be created for each transaction which will be deleted after the
* transaction is finished.
*/
- FileSet *stream_fileset;
+ struct FileSet *stream_fileset;
/*
* PID of leader apply worker if this slot is used for a parallel apply
@@ -226,15 +226,15 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
extern PGDLLIMPORT bool in_remote_transaction;
extern void logicalrep_worker_attach(int slot);
-extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
+extern LogicalRepWorker *logicalrep_worker_find(Oid dbid, Oid subid, Oid relid,
bool only_running);
extern List *logicalrep_workers_find(Oid subid, bool only_running);
extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
Oid
userid, Oid relid,
dsm_handle subworker_dsm);
-extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop(Oid dbid, Oid subid, Oid relid);
extern void logicalrep_pa_worker_stop(int slot_no, uint16 generation);
-extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup(Oid dbid, Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
extern int logicalrep_sync_worker_count(Oid subid);
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index 518d3b0a1f..cccd4d9d32 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -43,6 +43,7 @@ typedef enum
WAIT_EVENT_LOGICAL_APPLY_MAIN,
WAIT_EVENT_LOGICAL_LAUNCHER_MAIN,
WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN,
+ WAIT_EVENT_REPL_SLOT_SYNC_MAIN,
WAIT_EVENT_RECOVERY_WAL_STREAM,
WAIT_EVENT_SYSLOGGER_MAIN,
WAIT_EVENT_WAL_RECEIVER_MAIN,
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 2008958010..b6fcc8704e 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -42,6 +42,7 @@ tests += {
't/034_create_database.pl',
't/035_standby_logical_decoding.pl',
't/036_truncated_dropped.pl',
+ 't/037_slot_sync.pl',
],
},
}
diff --git a/src/test/recovery/t/037_slot_sync.pl
b/src/test/recovery/t/037_slot_sync.pl
new file mode 100644
index 0000000000..0520042d96
--- /dev/null
+++ b/src/test/recovery/t/037_slot_sync.pl
@@ -0,0 +1,130 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_phys_standby = PostgreSQL::Test::Cluster->new('phys_standby');
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+ my ($node, $pat, $off) = @_;
+
+ $off = 0 unless defined $off;
+ my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+ return 0 if (length($log) <= $off);
+
+ $log = substr($log, $off);
+
+ return $log =~ m/$pat/;
+}
+
+# Check invalidation in the logfile
+sub check_for_invalidation
+{
+ my ($log_start, $test_name) = @_;
+
+ # message should be issued
+ ok( find_in_log(
+ $node_phys_standby,
+ "invalidating obsolete replication slot \"sub1\"", $log_start),
+ "sub1 slot invalidation is logged $test_name");
+}
+
+# Check conflicting status in pg_replication_slots.
+sub check_slots_conflicting_status
+{
+ my $res = $node_phys_standby->safe_psql(
+ 'postgres', qq(
+ select bool_and(conflicting) from
pg_replication_slots;));
+
+ is($res, 't',
+ "Logical slot is reported as conflicting");
+}
+
+$node_primary->init(allows_streaming => 'logical');
+$node_primary->append_conf('postgresql.conf', "standby_slot_names = 'pslot1'");
+$node_primary->start;
+$node_primary->psql('postgres', q{SELECT
pg_create_physical_replication_slot('pslot1');});
+
+$node_primary->backup('backup');
+
+$node_phys_standby->init_from_backup($node_primary, 'backup', has_streaming =>
1);
+$node_phys_standby->append_conf('postgresql.conf', q{
+synchronize_slot_names = '*'
+primary_slot_name = 'pslot1'
+hot_standby_feedback = off
+});
+
+$node_phys_standby->start;
+
+$node_primary->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)");
+$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (1), (2), (3)");
+
+# Some tests need to wait for VACUUM to be replayed. But vacuum does not flush
+# WAL. An insert into flush_wal outside transaction does guarantee a flush.
+$node_primary->psql('postgres', q[CREATE TABLE flush_wal();]);
+
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+$node_subscriber->safe_psql('postgres', "CREATE TABLE t1 (a int PRIMARY KEY)");
+
+$node_primary->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR TABLE t1");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub1 CONNECTION '" . ($node_primary->connstr . '
dbname=postgres') . "' PUBLICATION pub1");
+
+# Wait for initial sync of all subscriptions
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r',
's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result = $node_primary->safe_psql('postgres',
+ "SELECT slot_name, plugin, database FROM pg_replication_slots WHERE
slot_type = 'logical'");
+
+is($result, qq(sub1|pgoutput|postgres), 'logical slot on primary');
+
+# FIXME: standby needs restart to pick up new slots
+$node_phys_standby->restart;
+sleep 3;
+
+$result = $node_phys_standby->safe_psql('postgres',
+ "SELECT slot_name, plugin, database FROM pg_replication_slots");
+
+is($result, qq(sub1|pgoutput|postgres), 'logical slot on standby');
+
+$node_primary->safe_psql('postgres', "INSERT INTO t1 VALUES (4), (5), (6)");
+$node_primary->wait_for_catchup('sub1');
+
+$node_primary->wait_for_catchup($node_phys_standby->name);
+
+# Logical subscriber and physical replica are caught up at this point.
+
+# Drop the subscription so that catalog_xmin is unknown on the primary
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
+
+# This should trigger a conflict as hot_standby_feedback is off on the standby
+$node_primary->safe_psql('postgres', qq[
+ CREATE TABLE conflict_test(x integer, y text);
+ DROP TABLE conflict_test;
+ VACUUM full pg_class;
+ INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
+]);
+
+# Ensure physical replay catches up
+$node_primary->wait_for_catchup($node_phys_standby);
+
+# Check invalidation in the logfile
+check_for_invalidation(1, 'with vacuum FULL on pg_class');
+
+# Check conflicting status in pg_replication_slots.
+check_slots_conflicting_status();
+
+done_testing();
--
2.34.1