On 2021-Jun-10, Álvaro Herrera wrote:

> I wrote a test (0002) to cover the case of signalling a walsender, which
> is currently not covered (we only deal with the case of a standby that's
> not running).  There are some sharp edges in this code -- I had to make
> it use background_psql() to send a CHECKPOINT, which hangs, because I
> previously send a SIGSTOP to the walreceiver.  Maybe there's a better
> way to achieve a walreceiver that remains connected but doesn't consume
> input from the primary, but I don't know what it is.  Anyway, the code
> becomes covered with this.  I would like to at least see it in master,
> to gather some reactions from buildfarm.

Small fixup to the test one, so that skipping it on Windows works
correctly.

-- 
Álvaro Herrera                            39°49'30"S 73°17'W
Voy a acabar con todos los humanos / con los humanos yo acabaré
voy a acabar con todos (bis) / con todos los humanos acabaré ¡acabaré! (Bender)
>From b5909d87a2252181fe55caecaf46d9544e05e47f Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 10 Jun 2021 16:43:43 -0400
Subject: [PATCH v2 1/2] the code fix

---
 src/backend/replication/slot.c | 236 +++++++++++++++++++++------------
 1 file changed, 149 insertions(+), 87 deletions(-)

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index c88b803e5d..a8bda68b21 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1160,6 +1160,152 @@ ReplicationSlotReserveWal(void)
 	}
 }
 
+/*
+ * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
+ * and mark it invalid, if necessary and possible.
+ *
+ * Returns whether ReplicationSlotControlLock was released (and in that case,
+ * we're not holding the lock at return; otherwise we are).
+ *
+ * This is inherently racy, because we want to release the
+ * LWLock for syscalls, so caller must restart if we return true.
+ */
+static bool
+InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
+{
+	int			last_signaled_pid = 0;
+	bool		released_lock = false;
+
+	for (;;)
+	{
+		XLogRecPtr	restart_lsn;
+		NameData	slotname;
+		int			active_pid = 0;
+
+		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
+
+		if (!s->in_use)
+		{
+			if (released_lock)
+				LWLockRelease(ReplicationSlotControlLock);
+			break;
+		}
+
+		/*
+		 * Check if the slot needs to be invalidated. If it needs to be
+		 * invalidated, and is not currently acquired, acquire it and mark it
+		 * as having been invalidated.  We do this with the spinlock held to
+		 * avoid race conditions -- for example the restart_lsn could move
+		 * forward, or the slot could be dropped.
+		 */
+		SpinLockAcquire(&s->mutex);
+
+		restart_lsn = s->data.restart_lsn;
+
+		/*
+		 * If the slot is already invalid or is fresh enough, we don't need to
+		 * do anything.
+		 */
+		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
+		{
+			SpinLockRelease(&s->mutex);
+			if (released_lock)
+				LWLockRelease(ReplicationSlotControlLock);
+			break;
+		}
+
+		slotname = s->data.name;
+		active_pid = s->active_pid;
+
+		/*
+		 * If the slot can be acquired, do so and mark it invalidated
+		 * immediately.  Otherwise we'll signal the owning process, below, and
+		 * retry.
+		 */
+		if (active_pid == 0)
+		{
+			MyReplicationSlot = s;
+			s->active_pid = MyProcPid;
+			s->data.invalidated_at = restart_lsn;
+			s->data.restart_lsn = InvalidXLogRecPtr;
+		}
+
+		SpinLockRelease(&s->mutex);
+
+		if (active_pid != 0)
+		{
+			LWLockRelease(ReplicationSlotControlLock);
+			released_lock = true;
+
+			/*
+			 * Signal to terminate the process that owns the slot, if we
+			 * haven't already signalled it.  (Avoidance of repeated
+			 * signalling is the only reason for there to be a loop in this
+			 * routine; otherwise we could rely on caller's restart loop.)
+			 *
+			 * There is the race condition that other process may own the slot
+			 * after its current owner process is terminated and before this
+			 * process owns it. To handle that, we signal only if the PID of
+			 * the owning process has changed from the previous time. (This
+			 * logic assumes that the same PID is not reused very quickly.)
+			 */
+			if (last_signaled_pid != active_pid)
+			{
+				ereport(LOG,
+						(errmsg("terminating process %d to release replication slot \"%s\"",
+								active_pid, NameStr(slotname))));
+
+				(void) kill(active_pid, SIGTERM);
+				last_signaled_pid = active_pid;
+			}
+
+			/*
+			 * Wait until the slot is released.
+			 *
+			 * Will immediately return in the first iteration, so we can
+			 * recheck the condition before sleeping. That addresses the
+			 * otherwise possible race of the slot already having been
+			 * released.
+			 */
+			ConditionVariableSleep(&s->active_cv,
+								   WAIT_EVENT_REPLICATION_SLOT_DROP);
+
+			/* we're done; have caller restart */
+			break;
+		}
+		else
+		{
+			/*
+			 * We hold the slot now and have already invalidated it; flush it
+			 * to ensure that state persists.
+			 *
+			 * Don't want to hold ReplicationSlotControlLock across file
+			 * system operations, so release it now but be sure to tell caller
+			 * to restart from scratch.
+			 */
+			LWLockRelease(ReplicationSlotControlLock);
+			released_lock = true;
+
+			/* Make sure the invalidated state persists across server restart */
+			ReplicationSlotMarkDirty();
+			ReplicationSlotSave();
+			ReplicationSlotRelease();
+
+			ereport(LOG,
+					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
+							NameStr(slotname),
+							LSN_FORMAT_ARGS(restart_lsn))));
+
+			/* done with this slot for now */
+			break;
+		}
+	}
+
+	Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
+
+	return released_lock;
+}
+
 /*
  * Mark any slot that points to an LSN older than the given segment
  * as invalid; it requires WAL that's about to be removed.
@@ -1178,99 +1324,15 @@ restart:
 	for (int i = 0; i < max_replication_slots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
-		XLogRecPtr	restart_lsn = InvalidXLogRecPtr;
-		NameData	slotname;
-		int			wspid;
-		int			last_signaled_pid = 0;
 
 		if (!s->in_use)
 			continue;
 
-		SpinLockAcquire(&s->mutex);
-		slotname = s->data.name;
-		restart_lsn = s->data.restart_lsn;
-		SpinLockRelease(&s->mutex);
-
-		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
-			continue;
-		LWLockRelease(ReplicationSlotControlLock);
-		CHECK_FOR_INTERRUPTS();
-
-		/* Get ready to sleep on the slot in case it is active */
-		ConditionVariablePrepareToSleep(&s->active_cv);
-
-		for (;;)
+		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN))
 		{
-			/*
-			 * Try to mark this slot as used by this process.
-			 *
-			 * Note that ReplicationSlotAcquireInternal(SAB_Inquire) should
-			 * not cancel the prepared condition variable if this slot is
-			 * active in other process. Because in this case we have to wait
-			 * on that CV for the process owning the slot to be terminated,
-			 * later.
-			 */
-			wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire);
-
-			/*
-			 * Exit the loop if we successfully acquired the slot or the slot
-			 * was dropped during waiting for the owning process to be
-			 * terminated. For example, the latter case is likely to happen
-			 * when the slot is temporary because it's automatically dropped
-			 * by the termination of the owning process.
-			 */
-			if (wspid <= 0)
-				break;
-
-			/*
-			 * Signal to terminate the process that owns the slot.
-			 *
-			 * There is the race condition where other process may own the
-			 * slot after the process using it was terminated and before this
-			 * process owns it. To handle this case, we signal again if the
-			 * PID of the owning process is changed than the last.
-			 *
-			 * XXX This logic assumes that the same PID is not reused very
-			 * quickly.
-			 */
-			if (last_signaled_pid != wspid)
-			{
-				ereport(LOG,
-						(errmsg("terminating process %d because replication slot \"%s\" is too far behind",
-								wspid, NameStr(slotname))));
-				(void) kill(wspid, SIGTERM);
-				last_signaled_pid = wspid;
-			}
-
-			ConditionVariableTimedSleep(&s->active_cv, 10,
-										WAIT_EVENT_REPLICATION_SLOT_DROP);
-		}
-		ConditionVariableCancelSleep();
-
-		/*
-		 * Do nothing here and start from scratch if the slot has already been
-		 * dropped.
-		 */
-		if (wspid == -1)
+			/* if the lock was released, start from scratch */
 			goto restart;
-
-		ereport(LOG,
-				(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
-						NameStr(slotname),
-						LSN_FORMAT_ARGS(restart_lsn))));
-
-		SpinLockAcquire(&s->mutex);
-		s->data.invalidated_at = s->data.restart_lsn;
-		s->data.restart_lsn = InvalidXLogRecPtr;
-		SpinLockRelease(&s->mutex);
-
-		/* Make sure the invalidated state persists across server restart */
-		ReplicationSlotMarkDirty();
-		ReplicationSlotSave();
-		ReplicationSlotRelease();
-
-		/* if we did anything, start from scratch */
-		goto restart;
+		}
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 }
-- 
2.20.1

>From 1a3fce0dedcb3fcf2e26b6256ce291ebb0e89fe2 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 10 Jun 2021 16:44:03 -0400
Subject: [PATCH v2 2/2] the test

---
 src/test/recovery/t/019_replslot_limit.pl | 77 ++++++++++++++++++++++-
 1 file changed, 74 insertions(+), 3 deletions(-)

diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index 7094aa0704..cb63878bda 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -11,7 +11,7 @@ use TestLib;
 use PostgresNode;
 
 use File::Path qw(rmtree);
-use Test::More tests => 14;
+use Test::More tests => $TestLib::windows_os ? 14 : 17;
 use Time::HiRes qw(usleep);
 
 $ENV{PGDATABASE} = 'postgres';
@@ -211,8 +211,8 @@ for (my $i = 0; $i < 10000; $i++)
 }
 ok($failed, 'check that replication has been broken');
 
-$node_primary->stop('immediate');
-$node_standby->stop('immediate');
+$node_primary->stop;
+$node_standby->stop;
 
 my $node_primary2 = get_new_node('primary2');
 $node_primary2->init(allows_streaming => 1);
@@ -253,6 +253,77 @@ my @result =
 		timeout => '60'));
 is($result[1], 'finished', 'check if checkpoint command is not blocked');
 
+$node_primary2->stop;
+$node_standby->stop;
+
+# The next test depends on Perl's `kill`, which apparently is not
+# portable to Windows.  (It would be nice to use Test::More's `subtest`,
+# but that's not in the ancient version we require.)
+if ($TestLib::windows_os)
+{
+	done_testing();
+	exit;
+}
+
+# Get a slot terminated while the walsender is active
+# We do this by sending SIGSTOP to the walreceiver.  Skip this on Windows.
+my $node_primary3 = get_new_node('primary3');
+$node_primary3->init(allows_streaming => 1, extra => ['--wal-segsize=1']);
+$node_primary3->append_conf(
+	'postgresql.conf', qq(
+	min_wal_size = 2MB
+	max_wal_size = 2MB
+	log_checkpoints = yes
+	max_slot_wal_keep_size = 1MB
+	));
+$node_primary3->start;
+$node_primary3->safe_psql('postgres',
+	"SELECT pg_create_physical_replication_slot('rep3')");
+# Take backup
+$backup_name = 'my_backup';
+$node_primary3->backup($backup_name);
+# Create standby
+my $node_standby3 = get_new_node('standby_3');
+$node_standby3->init_from_backup($node_primary3, $backup_name,
+	has_streaming => 1);
+$node_standby3->append_conf('postgresql.conf', "primary_slot_name = 'rep3'");
+$node_standby3->start;
+$node_primary3->wait_for_catchup($node_standby3->name, 'replay');
+my $pid = $node_standby3->safe_psql('postgres',
+	"SELECT pid FROM pg_stat_activity WHERE backend_type = 'walreceiver'");
+like($pid, qr/^[0-9]+$/, "have walreceiver pid $pid");
+
+# freeze walreceiver. Slot will still be active, but it won't advance
+kill 'STOP', $pid;
+$logstart = get_log_size($node_primary3);
+advance_wal($node_primary3, 2);
+
+my ($in, $out, $timer, $h);
+$timer = IPC::Run::timeout(180);
+$h = $node_primary3->background_psql('postgres', \$in, \$out, $timer);
+$in .= qq{
+checkpoint;
+};
+$h->pump_nb;
+ok(find_in_log($node_primary3, "to release replication slot"),
+	"walreceiver termination logged");
+
+# Now let it continue to its demise
+kill 'CONT', $pid;
+
+$node_primary3->poll_query_until('postgres',
+	"SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep3'",
+	"lost")
+  or die "timed out waiting for slot to be lost";
+
+ok( find_in_log(
+		$node_primary3, 'invalidating slot "rep3" because its restart_lsn'),
+	"slot invalidation logged");
+
+
+$node_primary3->stop;
+$node_standby3->stop;
+
 #####################################
 # Advance WAL of $node by $n segments
 sub advance_wal
-- 
2.20.1

Reply via email to