On 2021-Jun-10, Álvaro Herrera wrote: > I wrote a test (0002) to cover the case of signalling a walsender, which > is currently not covered (we only deal with the case of a standby that's > not running). There are some sharp edges in this code -- I had to make > it use background_psql() to send a CHECKPOINT, which hangs, because I > previously send a SIGSTOP to the walreceiver. Maybe there's a better > way to achieve a walreceiver that remains connected but doesn't consume > input from the primary, but I don't know what it is. Anyway, the code > becomes covered with this. I would like to at least see it in master, > to gather some reactions from buildfarm.
Small fixup to the test one, so that skipping it on Windows works correctly. -- Álvaro Herrera 39°49'30"S 73°17'W Voy a acabar con todos los humanos / con los humanos yo acabaré voy a acabar con todos (bis) / con todos los humanos acabaré ¡acabaré! (Bender)
>From b5909d87a2252181fe55caecaf46d9544e05e47f Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Thu, 10 Jun 2021 16:43:43 -0400 Subject: [PATCH v2 1/2] the code fix --- src/backend/replication/slot.c | 236 +++++++++++++++++++++------------ 1 file changed, 149 insertions(+), 87 deletions(-) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index c88b803e5d..a8bda68b21 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1160,6 +1160,152 @@ ReplicationSlotReserveWal(void) } } +/* + * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot + * and mark it invalid, if necessary and possible. + * + * Returns whether ReplicationSlotControlLock was released (and in that case, + * we're not holding the lock at return; otherwise we are). + * + * This is inherently racy, because we want to release the + * LWLock for syscalls, so caller must restart if we return true. + */ +static bool +InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN) +{ + int last_signaled_pid = 0; + bool released_lock = false; + + for (;;) + { + XLogRecPtr restart_lsn; + NameData slotname; + int active_pid = 0; + + Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); + + if (!s->in_use) + { + if (released_lock) + LWLockRelease(ReplicationSlotControlLock); + break; + } + + /* + * Check if the slot needs to be invalidated. If it needs to be + * invalidated, and is not currently acquired, acquire it and mark it + * as having been invalidated. We do this with the spinlock held to + * avoid race conditions -- for example the restart_lsn could move + * forward, or the slot could be dropped. + */ + SpinLockAcquire(&s->mutex); + + restart_lsn = s->data.restart_lsn; + + /* + * If the slot is already invalid or is fresh enough, we don't need to + * do anything. + */ + if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) + { + SpinLockRelease(&s->mutex); + if (released_lock) + LWLockRelease(ReplicationSlotControlLock); + break; + } + + slotname = s->data.name; + active_pid = s->active_pid; + + /* + * If the slot can be acquired, do so and mark it invalidated + * immediately. Otherwise we'll signal the owning process, below, and + * retry. + */ + if (active_pid == 0) + { + MyReplicationSlot = s; + s->active_pid = MyProcPid; + s->data.invalidated_at = restart_lsn; + s->data.restart_lsn = InvalidXLogRecPtr; + } + + SpinLockRelease(&s->mutex); + + if (active_pid != 0) + { + LWLockRelease(ReplicationSlotControlLock); + released_lock = true; + + /* + * Signal to terminate the process that owns the slot, if we + * haven't already signalled it. (Avoidance of repeated + * signalling is the only reason for there to be a loop in this + * routine; otherwise we could rely on caller's restart loop.) + * + * There is the race condition that other process may own the slot + * after its current owner process is terminated and before this + * process owns it. To handle that, we signal only if the PID of + * the owning process has changed from the previous time. (This + * logic assumes that the same PID is not reused very quickly.) + */ + if (last_signaled_pid != active_pid) + { + ereport(LOG, + (errmsg("terminating process %d to release replication slot \"%s\"", + active_pid, NameStr(slotname)))); + + (void) kill(active_pid, SIGTERM); + last_signaled_pid = active_pid; + } + + /* + * Wait until the slot is released. + * + * Will immediately return in the first iteration, so we can + * recheck the condition before sleeping. That addresses the + * otherwise possible race of the slot already having been + * released. + */ + ConditionVariableSleep(&s->active_cv, + WAIT_EVENT_REPLICATION_SLOT_DROP); + + /* we're done; have caller restart */ + break; + } + else + { + /* + * We hold the slot now and have already invalidated it; flush it + * to ensure that state persists. + * + * Don't want to hold ReplicationSlotControlLock across file + * system operations, so release it now but be sure to tell caller + * to restart from scratch. + */ + LWLockRelease(ReplicationSlotControlLock); + released_lock = true; + + /* Make sure the invalidated state persists across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + ReplicationSlotRelease(); + + ereport(LOG, + (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", + NameStr(slotname), + LSN_FORMAT_ARGS(restart_lsn)))); + + /* done with this slot for now */ + break; + } + } + + Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock)); + + return released_lock; +} + /* * Mark any slot that points to an LSN older than the given segment * as invalid; it requires WAL that's about to be removed. @@ -1178,99 +1324,15 @@ restart: for (int i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - XLogRecPtr restart_lsn = InvalidXLogRecPtr; - NameData slotname; - int wspid; - int last_signaled_pid = 0; if (!s->in_use) continue; - SpinLockAcquire(&s->mutex); - slotname = s->data.name; - restart_lsn = s->data.restart_lsn; - SpinLockRelease(&s->mutex); - - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) - continue; - LWLockRelease(ReplicationSlotControlLock); - CHECK_FOR_INTERRUPTS(); - - /* Get ready to sleep on the slot in case it is active */ - ConditionVariablePrepareToSleep(&s->active_cv); - - for (;;) + if (InvalidatePossiblyObsoleteSlot(s, oldestLSN)) { - /* - * Try to mark this slot as used by this process. - * - * Note that ReplicationSlotAcquireInternal(SAB_Inquire) should - * not cancel the prepared condition variable if this slot is - * active in other process. Because in this case we have to wait - * on that CV for the process owning the slot to be terminated, - * later. - */ - wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire); - - /* - * Exit the loop if we successfully acquired the slot or the slot - * was dropped during waiting for the owning process to be - * terminated. For example, the latter case is likely to happen - * when the slot is temporary because it's automatically dropped - * by the termination of the owning process. - */ - if (wspid <= 0) - break; - - /* - * Signal to terminate the process that owns the slot. - * - * There is the race condition where other process may own the - * slot after the process using it was terminated and before this - * process owns it. To handle this case, we signal again if the - * PID of the owning process is changed than the last. - * - * XXX This logic assumes that the same PID is not reused very - * quickly. - */ - if (last_signaled_pid != wspid) - { - ereport(LOG, - (errmsg("terminating process %d because replication slot \"%s\" is too far behind", - wspid, NameStr(slotname)))); - (void) kill(wspid, SIGTERM); - last_signaled_pid = wspid; - } - - ConditionVariableTimedSleep(&s->active_cv, 10, - WAIT_EVENT_REPLICATION_SLOT_DROP); - } - ConditionVariableCancelSleep(); - - /* - * Do nothing here and start from scratch if the slot has already been - * dropped. - */ - if (wspid == -1) + /* if the lock was released, start from scratch */ goto restart; - - ereport(LOG, - (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", - NameStr(slotname), - LSN_FORMAT_ARGS(restart_lsn)))); - - SpinLockAcquire(&s->mutex); - s->data.invalidated_at = s->data.restart_lsn; - s->data.restart_lsn = InvalidXLogRecPtr; - SpinLockRelease(&s->mutex); - - /* Make sure the invalidated state persists across server restart */ - ReplicationSlotMarkDirty(); - ReplicationSlotSave(); - ReplicationSlotRelease(); - - /* if we did anything, start from scratch */ - goto restart; + } } LWLockRelease(ReplicationSlotControlLock); } -- 2.20.1
>From 1a3fce0dedcb3fcf2e26b6256ce291ebb0e89fe2 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Thu, 10 Jun 2021 16:44:03 -0400 Subject: [PATCH v2 2/2] the test --- src/test/recovery/t/019_replslot_limit.pl | 77 ++++++++++++++++++++++- 1 file changed, 74 insertions(+), 3 deletions(-) diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl index 7094aa0704..cb63878bda 100644 --- a/src/test/recovery/t/019_replslot_limit.pl +++ b/src/test/recovery/t/019_replslot_limit.pl @@ -11,7 +11,7 @@ use TestLib; use PostgresNode; use File::Path qw(rmtree); -use Test::More tests => 14; +use Test::More tests => $TestLib::windows_os ? 14 : 17; use Time::HiRes qw(usleep); $ENV{PGDATABASE} = 'postgres'; @@ -211,8 +211,8 @@ for (my $i = 0; $i < 10000; $i++) } ok($failed, 'check that replication has been broken'); -$node_primary->stop('immediate'); -$node_standby->stop('immediate'); +$node_primary->stop; +$node_standby->stop; my $node_primary2 = get_new_node('primary2'); $node_primary2->init(allows_streaming => 1); @@ -253,6 +253,77 @@ my @result = timeout => '60')); is($result[1], 'finished', 'check if checkpoint command is not blocked'); +$node_primary2->stop; +$node_standby->stop; + +# The next test depends on Perl's `kill`, which apparently is not +# portable to Windows. (It would be nice to use Test::More's `subtest`, +# but that's not in the ancient version we require.) +if ($TestLib::windows_os) +{ + done_testing(); + exit; +} + +# Get a slot terminated while the walsender is active +# We do this by sending SIGSTOP to the walreceiver. Skip this on Windows. +my $node_primary3 = get_new_node('primary3'); +$node_primary3->init(allows_streaming => 1, extra => ['--wal-segsize=1']); +$node_primary3->append_conf( + 'postgresql.conf', qq( + min_wal_size = 2MB + max_wal_size = 2MB + log_checkpoints = yes + max_slot_wal_keep_size = 1MB + )); +$node_primary3->start; +$node_primary3->safe_psql('postgres', + "SELECT pg_create_physical_replication_slot('rep3')"); +# Take backup +$backup_name = 'my_backup'; +$node_primary3->backup($backup_name); +# Create standby +my $node_standby3 = get_new_node('standby_3'); +$node_standby3->init_from_backup($node_primary3, $backup_name, + has_streaming => 1); +$node_standby3->append_conf('postgresql.conf', "primary_slot_name = 'rep3'"); +$node_standby3->start; +$node_primary3->wait_for_catchup($node_standby3->name, 'replay'); +my $pid = $node_standby3->safe_psql('postgres', + "SELECT pid FROM pg_stat_activity WHERE backend_type = 'walreceiver'"); +like($pid, qr/^[0-9]+$/, "have walreceiver pid $pid"); + +# freeze walreceiver. Slot will still be active, but it won't advance +kill 'STOP', $pid; +$logstart = get_log_size($node_primary3); +advance_wal($node_primary3, 2); + +my ($in, $out, $timer, $h); +$timer = IPC::Run::timeout(180); +$h = $node_primary3->background_psql('postgres', \$in, \$out, $timer); +$in .= qq{ +checkpoint; +}; +$h->pump_nb; +ok(find_in_log($node_primary3, "to release replication slot"), + "walreceiver termination logged"); + +# Now let it continue to its demise +kill 'CONT', $pid; + +$node_primary3->poll_query_until('postgres', + "SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep3'", + "lost") + or die "timed out waiting for slot to be lost"; + +ok( find_in_log( + $node_primary3, 'invalidating slot "rep3" because its restart_lsn'), + "slot invalidation logged"); + + +$node_primary3->stop; +$node_standby3->stop; + ##################################### # Advance WAL of $node by $n segments sub advance_wal -- 2.20.1