On Thu, May 28, 2026 at 10:11 PM SATYANARAYANA NARLAPURAM
<[email protected]> wrote:
>
> Hi
>
> On Thu, May 28, 2026 at 9:17 PM Fujii Masao <[email protected]> wrote:
>>
>> On Thu, May 28, 2026 at 10:11 AM SATYANARAYANA NARLAPURAM
>> <[email protected]> wrote:
>> > Thanks for the patches, I combined these changes in my latest patch.
>> > Please find the v5.
>>
>> Thanks for updating the patch! But, v5 patch caused a compilation failure.
>>
>> slotfuncs.c:119:32: error: too few arguments to function call, single
>> argument 'try_disable' was not specified
>> 119 | ReplicationSlotDropAcquired();
>> | ~~~~~~~~~~~~~~~~~~~~~~~~~~~ ^
>> ../../../src/include/replication/slot.h:338:13: note:
>> 'ReplicationSlotDropAcquired' declared here
>> 338 | extern void ReplicationSlotDropAcquired(bool try_disable);
>> | ^ ~~~~~~~~~~~~~~~~
>> slotfuncs.c:207:32: error: too few arguments to function call, single
>> argument 'try_disable' was not specified
>> 207 | ReplicationSlotDropAcquired();
>> | ~~~~~~~~~~~~~~~~~~~~~~~~~~~ ^
>> ../../../src/include/replication/slot.h:338:13: note:
>> 'ReplicationSlotDropAcquired' declared here
>> 338 | extern void ReplicationSlotDropAcquired(bool try_disable);
>> | ^ ~~~~~~~~~~~~~~~~
>> slotfuncs.c:922:32: error: too few arguments to function call, single
>> argument 'try_disable' was not specified
>> 922 | ReplicationSlotDropAcquired();
>> | ~~~~~~~~~~~~~~~~~~~~~~~~~~~ ^
>> ../../../src/include/replication/slot.h:338:13: note:
>> 'ReplicationSlotDropAcquired' declared here
>> 338 | extern void ReplicationSlotDropAcquired(bool try_disable);
>> | ^ ~~~~~~~~~~~~~~~~
>> 3 errors generated.
>
>
> Please see the v6 patch. Upstream commit 2af1dc89282 changed the
> ReplicationSlotDropAcquired signature since the patch generated.
>
I've reviewed the v6 patch, and here are some comments:
Maybe pg_sync_replication_slots() has the same problem if it's called
inside an exception block?
---
The patch adds PG_TRY()/PG_CATCH() to each replication slot function,
but there is no comment explaining why we need them even though we
call ReplicationSlotRelease() and ReplicationSlotCleanup() in error
paths. Also, while probably the proposed idea works for back branches,
I guess we might want to consider more comprehensive approach for HEAD
to deal with this issue as it seems to me very error-prone,
especially when adding a new replication slot function.
This problem stems from the fact that when replication slots are used
within an exception block, we don't reach the error path even if an
error occurs, and we cannot simply call ReplicationSlotRelease()
during subtransaction abort as we need to start and abort transaction
while holding a slot. But I guess that we can release the slot when
aborting the subtransaction or above (sub)transaction where we
created/acquired the slot. I've drafted the patch for this idea and
confirmed it passes the regression tests, I still need to verify its
feasibility though.
Regards,
--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out
index 7de03c79f6f..3a04d2fbe67 100644
--- a/contrib/test_decoding/expected/slot.out
+++ b/contrib/test_decoding/expected/slot.out
@@ -466,3 +466,117 @@ SELECT pg_drop_replication_slot('physical_slot');
(1 row)
+--
+-- Test that replication slots are properly released or dropped on error,
+-- even when the error is caught by a PL/pgSQL EXCEPTION handler (which
+-- doesn't terminate the session).
+--
+-- pg_create_logical_replication_slot: error during slot creation should
+-- drop the slot.
+DO $$
+BEGIN
+ PERFORM pg_create_logical_replication_slot('regression_slot_error', 'nonexistent_plugin_xyz', true);
+EXCEPTION WHEN OTHERS THEN
+ RAISE NOTICE 'caught: %', SQLERRM;
+END;
+$$;
+NOTICE: caught: could not access file "nonexistent_plugin_xyz": No such file or directory
+-- the concerned slot must not exist (it was dropped on error)
+SELECT count(*) = 0 AS slot_was_dropped FROM pg_replication_slots
+ WHERE slot_name = 'regression_slot_error';
+ slot_was_dropped
+------------------
+ t
+(1 row)
+
+-- the session is still usable
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t3', 'test_decoding', false);
+ ?column?
+----------
+ init
+(1 row)
+
+-- pg_replication_slot_advance: error after acquiring the slot should
+-- release it so the session stays usable.
+SELECT slot_name FROM pg_replication_slot_advance('regression_slot_t3', pg_current_wal_lsn());
+ slot_name
+--------------------
+ regression_slot_t3
+(1 row)
+
+DO $$
+BEGIN
+ PERFORM pg_replication_slot_advance('regression_slot_t3', '0/1');
+EXCEPTION WHEN OTHERS THEN
+ RAISE NOTICE 'caught expected error';
+END;
+$$;
+NOTICE: caught expected error
+-- the session is still usable
+SELECT slot_name FROM pg_replication_slot_advance('regression_slot_t3', pg_current_wal_lsn());
+ slot_name
+--------------------
+ regression_slot_t3
+(1 row)
+
+-- pg_copy_logical_replication_slot: error after creating the destination
+-- slot should drop it.
+DO $$
+BEGIN
+ PERFORM pg_copy_logical_replication_slot('regression_slot_t3', 'regression_slot_dst', false, 'nonexistent_plugin_xyz');
+EXCEPTION WHEN OTHERS THEN
+ RAISE NOTICE 'caught: %', SQLERRM;
+END;
+$$;
+NOTICE: caught: could not access file "nonexistent_plugin_xyz": No such file or directory
+-- the destination slot must not exist (it was dropped on error)
+SELECT count(*) = 0 AS dst_slot_dropped FROM pg_replication_slots
+ WHERE slot_name = 'regression_slot_dst';
+ dst_slot_dropped
+------------------
+ t
+(1 row)
+
+-- the session is still usable
+SELECT count(*) >= 0 AS changes_ok FROM pg_logical_slot_get_changes('regression_slot_t3', NULL, NULL);
+ changes_ok
+------------
+ t
+(1 row)
+
+-- pg_logical_slot_get_changes: error after acquiring the slot should
+-- release it.
+SELECT 'init' FROM pg_create_physical_replication_slot('regression_slot_phy', true);
+ ?column?
+----------
+ init
+(1 row)
+
+DO $$
+BEGIN
+ PERFORM pg_logical_slot_get_changes('regression_slot_phy', NULL, NULL);
+EXCEPTION WHEN OTHERS THEN
+ RAISE NOTICE 'caught: %', SQLERRM;
+END;
+$$;
+NOTICE: caught: cannot use physical replication slot for logical decoding
+-- the session is still usable
+SELECT slot_name FROM pg_replication_slot_advance('regression_slot_t3', pg_current_wal_lsn());
+ slot_name
+--------------------
+ regression_slot_t3
+(1 row)
+
+-- cleanup
+SELECT pg_drop_replication_slot('regression_slot_phy');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+SELECT pg_drop_replication_slot('regression_slot_t3');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql
index 580e3ae3bef..d8e0adccbfb 100644
--- a/contrib/test_decoding/sql/slot.sql
+++ b/contrib/test_decoding/sql/slot.sql
@@ -190,3 +190,75 @@ SELECT pg_drop_replication_slot('failover_true_slot');
SELECT pg_drop_replication_slot('failover_false_slot');
SELECT pg_drop_replication_slot('failover_default_slot');
SELECT pg_drop_replication_slot('physical_slot');
+
+--
+-- Test that replication slots are properly released or dropped on error,
+-- even when the error is caught by a PL/pgSQL EXCEPTION handler (which
+-- doesn't terminate the session).
+--
+
+-- pg_create_logical_replication_slot: error during slot creation should
+-- drop the slot.
+DO $$
+BEGIN
+ PERFORM pg_create_logical_replication_slot('regression_slot_error', 'nonexistent_plugin_xyz', true);
+EXCEPTION WHEN OTHERS THEN
+ RAISE NOTICE 'caught: %', SQLERRM;
+END;
+$$;
+
+-- the concerned slot must not exist (it was dropped on error)
+SELECT count(*) = 0 AS slot_was_dropped FROM pg_replication_slots
+ WHERE slot_name = 'regression_slot_error';
+
+-- the session is still usable
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t3', 'test_decoding', false);
+
+-- pg_replication_slot_advance: error after acquiring the slot should
+-- release it so the session stays usable.
+SELECT slot_name FROM pg_replication_slot_advance('regression_slot_t3', pg_current_wal_lsn());
+
+DO $$
+BEGIN
+ PERFORM pg_replication_slot_advance('regression_slot_t3', '0/1');
+EXCEPTION WHEN OTHERS THEN
+ RAISE NOTICE 'caught expected error';
+END;
+$$;
+
+-- the session is still usable
+SELECT slot_name FROM pg_replication_slot_advance('regression_slot_t3', pg_current_wal_lsn());
+
+-- pg_copy_logical_replication_slot: error after creating the destination
+-- slot should drop it.
+DO $$
+BEGIN
+ PERFORM pg_copy_logical_replication_slot('regression_slot_t3', 'regression_slot_dst', false, 'nonexistent_plugin_xyz');
+EXCEPTION WHEN OTHERS THEN
+ RAISE NOTICE 'caught: %', SQLERRM;
+END;
+$$;
+
+-- the destination slot must not exist (it was dropped on error)
+SELECT count(*) = 0 AS dst_slot_dropped FROM pg_replication_slots
+ WHERE slot_name = 'regression_slot_dst';
+
+-- the session is still usable
+SELECT count(*) >= 0 AS changes_ok FROM pg_logical_slot_get_changes('regression_slot_t3', NULL, NULL);
+
+-- pg_logical_slot_get_changes: error after acquiring the slot should
+-- release it.
+SELECT 'init' FROM pg_create_physical_replication_slot('regression_slot_phy', true);
+DO $$
+BEGIN
+ PERFORM pg_logical_slot_get_changes('regression_slot_phy', NULL, NULL);
+EXCEPTION WHEN OTHERS THEN
+ RAISE NOTICE 'caught: %', SQLERRM;
+END;
+$$;
+-- the session is still usable
+SELECT slot_name FROM pg_replication_slot_advance('regression_slot_t3', pg_current_wal_lsn());
+
+-- cleanup
+SELECT pg_drop_replication_slot('regression_slot_phy');
+SELECT pg_drop_replication_slot('regression_slot_t3');
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 5586fbe5b07..c2f0042c664 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5364,6 +5364,8 @@ AbortSubTransaction(void)
AtEOSubXact_Parallel(false, s->subTransactionId);
s->parallelModeLevel = 0;
+ AtEOSubXact_ReplicationSlot(s->nestingLevel);
+
/*
* We can skip all this stuff if the subxact failed before creating a
* ResourceOwner...
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index d7fb9f5a67f..ab893d0c8a3 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -149,6 +149,8 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
static void ReplicationSlotsShmemRequest(void *arg);
static void ReplicationSlotsShmemInit(void *arg);
+int ReplicationSlotAcquireXactLevel = -1;
+
const ShmemCallbacks ReplicationSlotsShmemCallbacks = {
.request_fn = ReplicationSlotsShmemRequest,
.init_fn = ReplicationSlotsShmemInit,
@@ -362,6 +364,22 @@ IsSlotForConflictCheck(const char *name)
return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
}
+void
+AtEOSubXact_ReplicationSlot(int nestLevel)
+{
+ /*
+ * Nothing to do unless the aborting subxact is at or outside their* level
+ * where we acquired/created the slot.
+ */
+ if (nestLevel > ReplicationSlotAcquireXactLevel)
+ return;
+
+ if (MyReplicationSlot)
+ ReplicationSlotRelease();
+
+ /* XXX need to call ReplicationSlotCleanup() as well? */
+}
+
/*
* Create a new replication slot and mark it as used by this backend.
*
@@ -517,6 +535,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot->active_proc = MyProcNumber;
SpinLockRelease(&slot->mutex);
MyReplicationSlot = slot;
+ ReplicationSlotAcquireXactLevel = GetCurrentTransactionNestLevel();
LWLockRelease(ReplicationSlotControlLock);
@@ -722,6 +741,7 @@ retry:
/* We made this slot active, so it's ours now. */
MyReplicationSlot = s;
+ ReplicationSlotAcquireXactLevel = GetCurrentTransactionNestLevel();
/*
* We need to check for invalidation after making the slot ours to avoid
@@ -846,6 +866,8 @@ ReplicationSlotRelease(void)
pfree(slotname);
}
+
+ ReplicationSlotAcquireXactLevel = -1;
}
/*
@@ -1040,6 +1062,7 @@ ReplicationSlotDropAcquired(bool try_disable)
/* slot isn't acquired anymore */
MyReplicationSlot = NULL;
+ ReplicationSlotAcquireXactLevel = -1;
ReplicationSlotDropPtr(slot);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 9b29444cbca..3bb6ca950e0 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -328,6 +328,10 @@ extern PGDLLIMPORT int max_repack_replication_slots;
extern PGDLLIMPORT char *synchronized_standby_slots;
extern PGDLLIMPORT int idle_replication_slot_timeout_secs;
+extern int ReplicationSlotAcquireXactLevel;
+extern void AtEOSubXact_ReplicationSlot(int nestLevel);
+
+
/* management of individual slots */
extern void ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency,