For the first step (a), the pg_create_logical_replication_slot interface is extended. The slot on the new attached standby will be dropped and recreated if the flag allow_overwrite is set to true. I tested the modified source, could you please give me a feedback on code changes.
Regards, Fabrice diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 566f308..6cd3175 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -480,6 +480,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN temporary boolean DEFAULT false, IN twophase boolean DEFAULT false, IN failover boolean DEFAULT false, + IN allow_overwrite boolean DEFAULT false, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 656e66e..d6332cd 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -627,6 +627,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) ReplicationSlot *slot; XLogRecPtr latestFlushPtr; bool slot_updated = false; + bool allow_overwrite = false; /* * Make sure that concerned WAL is received and flushed before syncing @@ -649,24 +650,46 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) return false; } - - /* Search for the named slot */ + // Both local and remote slot have the same name if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) { - bool synced; + bool synced; SpinLockAcquire(&slot->mutex); synced = slot->data.synced; + allow_overwrite = slot->data.allow_overwrite; SpinLockRelease(&slot->mutex); - - /* User-created slot with the same name exists, raise ERROR. */ - if (!synced) - ereport(ERROR, + + if (!synced){ + /* + * Check if we need to overwrite an existing + * logical slot + */ + if (allow_overwrite){ + /* + * Get rid of a replication slot that is no + *longer wanted + */ + ReplicationSlotDrop(remote_slot->name,true); + + /* Get rid of a replication slot that is no longer wanted */ + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("slot \"%s\" already exists" + " on the standby but it will be dropped because flag allow_overwrite is set to true", + remote_slot->name)); + + /* Going back to the main loop after droping the failover slot */ + return false; + } + else + /* User-created slot with the same name exists, raise ERROR. */ + ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("exiting from slot synchronization because same" - " name slot \"%s\" already exists on the standby", - remote_slot->name)); - + " name slot \"%s\" already exists on the standby", + remote_slot->name)); + } /* * The slot has been synchronized before. * @@ -761,6 +784,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, remote_slot->two_phase, remote_slot->failover, + allow_overwrite, true); /* For shorter lines. */ diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 600b87f..d6bc5c6 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -323,7 +323,7 @@ ReplicationSlotValidateName(const char *name, int elevel) void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover, bool synced) + bool two_phase, bool failover, bool allow_overwrite, bool synced) { ReplicationSlot *slot = NULL; int i; @@ -413,6 +413,11 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; slot->data.synced = synced; + slot->data.allow_overwrite = allow_overwrite; + + elog(LOG, "Logical replication slot %s created with option allow_overwrite to %s", + NameStr(slot->data.name), + slot->data.allow_overwrite ? "true" : "false"); /* and then data only present in shared memory */ slot->just_dirtied = false; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 36cc2ed..6bd430f 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -40,7 +40,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, - temporary ? RS_TEMPORARY : RS_PERSISTENT, false, + temporary ? RS_TEMPORARY : RS_PERSISTENT, false, false, false, false); if (immediately_reserve) @@ -116,7 +116,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, - bool failover, + bool failover, bool allow_overwrite, XLogRecPtr restart_lsn, bool find_startpoint) { @@ -134,7 +134,7 @@ create_logical_replication_slot(char *name, char *plugin, */ ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, - failover, false); + failover, allow_overwrite, false); /* * Create logical decoding context to find start point or, if we don't @@ -173,6 +173,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); bool failover = PG_GETARG_BOOL(4); + bool allow_overwrite = PG_GETARG_BOOL(5); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -191,6 +192,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) temporary, two_phase, failover, + allow_overwrite, InvalidXLogRecPtr, true); @@ -210,6 +212,47 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) PG_RETURN_DATUM(result); } +/* + * This function is intended to modify a logical replication slot with + * given arguments. + */ +static void +alter_logical_replication_slot(char *name, bool two_phase, + bool failover, + bool allow_overwrite) +{ + Assert(!MyReplicationSlot); + + ReplicationSlotAcquire(name, true, true); + MyReplicationSlot->data.allow_overwrite = allow_overwrite; + ReplicationSlotMarkDirty(); + + ReplicationSlotRelease(); +} + +/* + * SQL function for altering logical replication slot properties. + */ +Datum +pg_alter_logical_replication_slot(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + bool two_phase = PG_GETARG_BOOL(1); + bool failover = PG_GETARG_BOOL(2); + bool allow_overwrite = PG_GETARG_BOOL(3); + + CheckSlotPermissions(); + + CheckLogicalDecodingRequirements(); + + alter_logical_replication_slot(NameStr(*name), + two_phase, + failover, + allow_overwrite); + + PG_RETURN_NAME(name); +} + /* * SQL function for dropping a replication slot. @@ -726,6 +769,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) temporary, false, false, + false, src_restart_lsn, false); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9fa8beb..ef22695 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1198,7 +1198,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false, false, false); + false, false, false, false); if (reserve_wal) { @@ -1229,7 +1229,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, failover, false); + two_phase, failover, false, false); /* * Do options check early so that we can bail before calling the diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 62beb71..074805d 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11480,10 +11480,10 @@ { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'record', - proargtypes => 'name name bool bool bool', - proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}', + proargtypes => 'name name bool bool bool bool', + proallargtypes => '{name,name,bool,bool,bool,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,failover,allow_overwrite,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index eb0b93b..1fd6445 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -134,6 +134,11 @@ typedef struct ReplicationSlotPersistentData * for logical slots on the primary server. */ bool failover; + /* + * Allow Postgres to drop logical replication slot on standby server to ensure + * creation of new failover slot when sync_replication_slots is true. + */ + bool allow_overwrite; } ReplicationSlotPersistentData; /* @@ -267,7 +272,7 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover, + bool two_phase, bool failover, bool allow_overwrite, bool synced); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); On Wed, Aug 13, 2025 at 8:04 AM shveta malik <shveta.ma...@gmail.com> wrote: > On Fri, Aug 8, 2025 at 7:01 PM Fabrice Chapuis <fabrice636...@gmail.com> > wrote: > > > > Thanks Shveta for coming on this point again and fixing the link. > > The idea is to check if the slot has same name to try to resynchronize > it with the primary. > > ok the check on the failover status for the remote slot is perhaps > redundant. > > I'm not sure what impact setting the synced flag to true might have. But > if you run an additional switchover, it works fine because the synced flag > on the new primary is set to true now. > > If we come back to the idea of the GUC or the API, adding an > allow_overwrite parameter to the pg_create_logical_replication_slot > function and removing the logical slot when set to true could be a suitable > approach. > > > > What is your opinion? > > > > If implemented as a GUC, it would address only a specific corner case, > making it less suitable to be added as a GUC. > > OTOH, adding it as a slot's property makes more sense. You can start > with introducing a new slot property, allow_overwrite. By default, > this property will be set to false. > > a) The function pg_create_logical_replication_slot() can be extended > to accept this parameter. > b) A new API pg_alter_logical_replication_slot() can be introduced, to > modify this property after slot creation if needed. > c) The commands CREATE SUBSCRIPTION and ALTER SUBSCRIPTION are not > needed to include an allow_overwrite parameter. When CREATE > SUBSCRIPTION creates a slot, it will always set allow_overwrite to > false by default. If users need to change this later, they can use the > new API pg_alter_logical_replication_slot() to update the property. > d) Additionally, pg_alter_logical_replication_slot() can serve as a > generic API to modify other slot properties as well. > > This appears to be a reasonable idea with potential use cases beyond > just allowing synchronization post switchover. Thoughts? > > ~~~ > > Another problem as you pointed out is inconsistent behaviour across > switchovers. On the first switchover, we get the error on new standby: > "Exiting from slot synchronization because a slot with the same name > already exists on the standby." > > But in the case of a double switchover, this error does not occur. > This is due to the 'synced' flag not set on new standby on first > switchover while set in double switchover. I think the behaviour > should be the same. In both cases, it should emit the same error. We > are thinking of a potential solution here and will start a new thread > if needed. > > thanks > Shveta >