On Wed, Oct 8, 2025 at 12:27 PM shveta malik <[email protected]> wrote:
> On Fri, Oct 3, 2025 at 1:28 PM Fabrice Chapuis <[email protected]> > wrote: > > > > Hi Shveta, > > Here is the v4 of the patch with pg_replication_slots view modified to > display the field allow_overwrite. Doc was also updated. > > > > The patch looks okay. The parameter name is still open for discussion, > and the comments could be improved. But we can focus on these finer > details once more reviewers start reviewing and there’s general > agreement on the concept. > > One trivial comment: we can slightly modify the doc to have something like > this: > > This parameter controls whether an existing logical replication slot > on the standby (with synced=false) can be overwritten during logical > replication slot synchronization (see Section 47.2.3). The default is > false. When true, an existing user slot with the same name on the > standby will be synchronized using the primary’s failover slot. > > <please see high-availability.sgml to find how 'Section 47.2.3' can be > referenced in the doc> > ~~ > > The next step will be to provide a way to modify this parameter via an > alter API, say pg_alter_logical_replication_slot(). This API can later > be extended to handle other parameters. This API can be implemented in > patch002 for easier review. > > thanks > Shveta > Hi, Here is the patch V5, I change with your doc text proposition and the link. At this stage, the patch can be submitted to the current commit fest for review? With Regards, Fabrice
From 68bda4e973c95ff9d24a6fc866187903d04d3d21 Mon Sep 17 00:00:00 2001 From: Fabrice Chapuis <[email protected]> Date: Mon, 29 Sep 2025 13:10:38 +0200 Subject: [PATCH v5] Add allow_overwrite option to logical replication slot creation This patch adds a new parameter allow_overwrite to pg_create_logical_replication_slot. When true, an existing failover slot on standby will be dropped and replaced automatically. Signed-off-by: Fabrice Chapuis <[email protected]> --- doc/src/sgml/system-views.sgml | 14 +++++++++ src/backend/catalog/system_functions.sql | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/replication/logical/launcher.c | 2 +- src/backend/replication/logical/slotsync.c | 34 ++++++++++++++++++---- src/backend/replication/slot.c | 3 +- src/backend/replication/slotfuncs.c | 13 ++++++--- src/backend/replication/walsender.c | 4 +-- src/include/catalog/pg_proc.dat | 14 ++++----- src/include/replication/slot.h | 8 ++++- 10 files changed, 74 insertions(+), 22 deletions(-) diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 4187191ea74..1a7bb7d2a4b 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -3036,6 +3036,20 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx </para></entry> </row> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>allow_overwrite</structfield> <type>bool</type> + </para> + <para> + This parameter controls whether an existing logical replication slot + on the standby (with synced=false) can be overwritten during logical + replication slot synchronization (see + <xref linkend="logicaldecoding-replication-slots-synchronization"/>). The default is + <literal>false</literal>. When <literal>true</literal>, an existing user slot with the same name on the + standby will be synchronized using the primary’s failover slot. + </para></entry> + </row> + </tbody> </tgroup> </table> diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 2d946d6d9e9..75ea84fd837 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -480,6 +480,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN temporary boolean DEFAULT false, IN twophase boolean DEFAULT false, IN failover boolean DEFAULT false, + IN allow_overwrite boolean DEFAULT false, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index c77fa0234bb..7fc42ddbe66 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1046,7 +1046,8 @@ CREATE VIEW pg_replication_slots AS L.conflicting, L.invalidation_reason, L.failover, - L.synced + L.synced, + L.allow_overwrite FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 218cefe86e2..a26af527775 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1519,7 +1519,7 @@ CreateConflictDetectionSlot(void) errmsg("creating replication conflict detection slot")); ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, - false, false); + false, false, false); init_conflict_slot_xmin(); } diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 8c061d55bdb..a8d44fe6b45 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -649,22 +649,45 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) return false; } +retry: /* Search for the named slot */ if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) { bool synced; + bool allow_overwrite; SpinLockAcquire(&slot->mutex); synced = slot->data.synced; + allow_overwrite = slot->data.allow_overwrite; SpinLockRelease(&slot->mutex); /* User-created slot with the same name exists, raise ERROR. */ if (!synced) - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("exiting from slot synchronization because same" - " name slot \"%s\" already exists on the standby", - remote_slot->name)); + { + /* Check if we need to overwrite an existing logical slot */ + if (allow_overwrite) + { + ereport(LOG, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("slot \"%s\" already exists" + " on the standby but will be overwritten as" + " allow_overwrite is set to true", + remote_slot->name)); + + /* Get rid of a replication slot that is no longer wanted */ + ReplicationSlotAcquire(remote_slot->name, true, false); + ReplicationSlotDropAcquired(); + goto retry; + } + else + { + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("exiting from slot synchronization because same" + " name slot \"%s\" already exists on the standby", + remote_slot->name)); + } + } /* * The slot has been synchronized before. @@ -760,6 +783,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, remote_slot->two_phase, remote_slot->failover, + false, true); /* For shorter lines. */ diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index fd0fdb96d42..fcba4d4cff9 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -351,7 +351,7 @@ IsSlotForConflictCheck(const char *name) void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover, bool synced) + bool two_phase, bool failover, bool allow_overwrite, bool synced) { ReplicationSlot *slot = NULL; int i; @@ -445,6 +445,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; + slot->data.allow_overwrite = allow_overwrite; slot->data.synced = synced; /* and then data only present in shared memory */ diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index b8f21153e7b..6a6a03e9a4d 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -41,7 +41,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, temporary ? RS_TEMPORARY : RS_PERSISTENT, false, - false, false); + false, false, false); if (immediately_reserve) { @@ -116,7 +116,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, - bool failover, + bool failover, bool allow_overwrite, XLogRecPtr restart_lsn, bool find_startpoint) { @@ -134,7 +134,7 @@ create_logical_replication_slot(char *name, char *plugin, */ ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, - failover, false); + failover, allow_overwrite, false); /* * Create logical decoding context to find start point or, if we don't @@ -173,6 +173,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); bool failover = PG_GETARG_BOOL(4); + bool allow_overwrite = PG_GETARG_BOOL(5); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -191,6 +192,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) temporary, two_phase, failover, + allow_overwrite, InvalidXLogRecPtr, true); @@ -235,7 +237,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 20 +#define PG_GET_REPLICATION_SLOTS_COLS 21 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -443,6 +445,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.synced); + values[i++] = BoolGetDatum(slot_contents.data.allow_overwrite); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, @@ -726,6 +730,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) temporary, false, false, + false, src_restart_lsn, false); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 59822f22b8d..40586b273ba 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1202,7 +1202,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false, false, false); + false, false, false, false); if (reserve_wal) { @@ -1233,7 +1233,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, failover, false); + two_phase, failover, false, false); /* * Do options check early so that we can bail before calling the diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 01eba3b5a19..2f980c0dd4f 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11503,17 +11503,17 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,pg_lsn,timestamptz,bool,text,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,two_phase_at,inactive_since,conflicting,invalidation_reason,failover,synced}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,pg_lsn,timestamptz,bool,text,bool,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,two_phase_at,inactive_since,conflicting,invalidation_reason,failover,synced,allow_overwrite}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'record', - proargtypes => 'name name bool bool bool', - proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}', + proargtypes => 'name name bool bool bool bool', + proallargtypes => '{name,name,bool,bool,bool,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,failover,allow_overwrite,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index fe62162cde3..ca50b79daaa 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -141,6 +141,12 @@ typedef struct ReplicationSlotPersistentData * for logical slots on the primary server. */ bool failover; + + /* + * Allow Postgres to drop logical replication slot on standby server to ensure + * creation of new failover slot when remote sync_replication_slots is set to true. + */ + bool allow_overwrite; } ReplicationSlotPersistentData; /* @@ -301,7 +307,7 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover, + bool two_phase, bool failover, bool allow_overwrite, bool synced); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); -- 2.39.5
