On Tue, Mar 26, 2024 at 11:22 PM Bertrand Drouvot
<bertranddrouvot...@gmail.com> wrote:
>
> > I'm attaching v24 patches. It implements the above idea proposed
> > upthread for synced slots.
>
> ==== v24-0002
>
> 1 ===
>
>     This commit does two things:
>     1) Updates inactive_since for sync slots with the value
>     received from the primary's slot.
>
> Tested it and it does that.

Thanks. I've added a test case for this.

> 2 ===
>
>     2) Ensures the value is set to current timestamp during the
>     shutdown of slot sync machinery to help correctly interpret the
>     time if the standby gets promoted without a restart.
>
> Tested it and it does that.

Thanks. I've added a test case for this.

> 3 ===
>
> +/*
> + * Reset the synced slots info such as inactive_since after shutting
> + * down the slot sync machinery.
> + */
> +static void
> +update_synced_slots_inactive_time(void)
>
> Looks like the comment "reset" is not matching the name of the function and
> what it does.

Changed. I've also changed the function name to
update_synced_slots_inactive_since to be precise on what it exactly
does.

> 4 ===
>
> +                       /*
> +                        * We get the current time beforehand and only once 
> to avoid
> +                        * system calls overhead while holding the lock.
> +                        */
> +                       if (now == 0)
> +                               now = GetCurrentTimestamp();
>
> Also +1 of having GetCurrentTimestamp() just called one time within the loop.

Right.

> 5 ===
>
> -               if (!(RecoveryInProgress() && slot->data.synced))
> +               if (!(InRecovery && slot->data.synced))
>                         slot->inactive_since = GetCurrentTimestamp();
>                 else
>                         slot->inactive_since = 0;
>
> Not related to this change but more the way RestoreSlotFromDisk() behaves 
> here:
>
> For a sync slot on standby it will be set to zero and then later will be
> synchronized with the one coming from the primary. I think that's fine to have
> it to zero for this window of time.

Right.

> Now, if the standby is down and one sets sync_replication_slots to off,
> then inactive_since will be set to zero on the standby at startup and not
> synchronized (unless one triggers a manual sync). I also think that's fine but
> it might be worth to document this behavior (that after a standby startup
> inactive_since is zero until the next sync...).

Isn't this behaviour applicable for other slot parameters that the
slot syncs from the remote slot on the primary?

I've added the following note in the comments when we update
inactive_since in RestoreSlotFromDisk.

         * Note that for synced slots after the standby starts up (i.e. after
         * the slots are loaded from the disk), the inactive_since will remain
         * zero until the next slot sync cycle.
         */
        if (!(InRecovery && slot->data.synced))
            slot->inactive_since = GetCurrentTimestamp();
        else
            slot->inactive_since = 0;

> 6 ===
>
> +       print "HI  $slot_name $name $inactive_since $slot_creation_time\n";
>
> garbage?

Removed.

> 7 ===
>
> +# Capture and validate inactive_since of a given slot.
> +sub capture_and_validate_slot_inactive_since
> +{
> +       my ($node, $slot_name, $slot_creation_time) = @_;
> +       my $name = $node->name;
>
> We know have capture_and_validate_slot_inactive_since at 2 places:
> 040_standby_failover_slots_sync.pl and 019_replslot_limit.pl.
>
> Worth to create a sub in Cluster.pm?

I'd second that thought for now. We might have to debate first if it's
useful for all the nodes even without replication, and if yes, the
naming stuff and all that. Historically, we've had such duplicated
functions until recently, for instance advance_wal and log_contains.
We
moved them over to a common perl library Cluster.pm very recently. I'm
sure we can come back later to move it to Cluster.pm.

On Wed, Mar 27, 2024 at 9:02 AM shveta malik <shveta.ma...@gmail.com> wrote:
>
> 1)
> slot.c:
> + * data from the remote slot. We use InRecovery flag instead of
> + * RecoveryInProgress() as it always returns true even for normal
> + * server startup.
>
> a) Not clear what 'it' refers to. Better to use 'the latter'
> b) Is it better to mention the primary here:
>  'as the latter always returns true even on the primary server during 
> startup'.

Modified.

> 2)
> update_local_synced_slot():
>
> - strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
> + strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0 &&
> + remote_slot->inactive_since == slot->inactive_since)
>
> When this code was written initially, the intent was to do strcmp at
> the end (only if absolutely needed). It will be good if we maintain
> the same and add new checks before strcmp.

Done.

> 3)
> update_synced_slots_inactive_time():
>
> This assert is removed, is it intentional?
> Assert(s->active_pid == 0);

Yes, the slot can get acquired in the corner case when someone runs
pg_sync_replication_slots concurrently at this time. I'm referring to
the issue reported upthread. We don't prevent one running
pg_sync_replication_slots in promotion/ShutDownSlotSync phase right?
Maybe we should prevent that otherwise some of the slots are synced
and the standby gets promoted while others are yet-to-be-synced.

> 4)
> 040_standby_failover_slots_sync.pl:
>
> +# Capture the inactive_since of the slot from the standby the logical 
> failover
> +# slots are synced/created on the standby.
>
> The comment is unclear, something seems missing.

Nice catch. Yes, that was wrong. I've modified it now.

Please find the attached v25-0001 (made this 0001 patch now as
inactive_since patch is committed) patch with the above changes.
--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 790f791b4200ff06cfdf55fdf1572436e9d982fd Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 27 Mar 2024 04:23:52 +0000
Subject: [PATCH v25] Maintain inactive_since for synced slots correctly.

The slot's inactive_since isn't currently maintained for
synced slots on the standby. The commit a11f330b55 prevents
updating inactive_since with RecoveryInProgress() check in
RestoreSlotFromDisk(). But, the issue is that
RecoveryInProgress() always returns true in
RestoreSlotFromDisk() as 'xlogctl->SharedRecoveryState' is
always 'RECOVERY_STATE_CRASH' at that time. The impact of this
on a promoted standby inactive_since is always NULL for all
synced slots even after server restart.

Above issue led us to a question as to why we can't just update
inactive_since for synced slots on the standby with the value
received from remote slot on the primary. This is consistent with
any other slot parameter i.e. all of them are synced from the
primary.

This commit does two things:
1) Updates inactive_since for sync slots with the value
received from the primary's slot.

2) Ensures the value is set to current timestamp during the
shutdown of slot sync machinery to help correctly interpret the
time if the standby gets promoted without a restart.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik
Discussion: https://www.postgresql.org/message-id/CALj2ACWLctoiH-pSjWnEpR54q4DED6rw_BRJm5pCx86_Y01MoQ%40mail.gmail.com
---
 doc/src/sgml/system-views.sgml                |  4 ++
 src/backend/replication/logical/slotsync.c    | 62 ++++++++++++++++-
 src/backend/replication/slot.c                | 41 ++++++++----
 .../t/040_standby_failover_slots_sync.pl      | 66 +++++++++++++++++++
 4 files changed, 158 insertions(+), 15 deletions(-)

diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 3c8dca8ca3..7713f168e7 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2530,6 +2530,10 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       <para>
         The time since the slot has become inactive.
         <literal>NULL</literal> if the slot is currently being used.
+        Note that the slots that are being synced from a primary server
+        (whose <structfield>synced</structfield> field is true), will get the
+        <structfield>inactive_since</structfield> value from the corresponding
+        remote slot on the primary.
       </para></entry>
      </row>
 
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 30480960c5..d367c9ed3c 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -137,9 +137,12 @@ typedef struct RemoteSlot
 
 	/* RS_INVAL_NONE if valid, or the reason of invalidation */
 	ReplicationSlotInvalidationCause invalidated;
+
+	TimestampTz inactive_since; /* in seconds */
 } RemoteSlot;
 
 static void slotsync_failure_callback(int code, Datum arg);
+static void update_synced_slots_inactive_since(void);
 
 /*
  * If necessary, update the local synced slot's metadata based on the data
@@ -167,6 +170,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		remote_slot->two_phase == slot->data.two_phase &&
 		remote_slot->failover == slot->data.failover &&
 		remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
+		remote_slot->inactive_since == slot->inactive_since &&
 		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
 		return false;
 
@@ -182,6 +186,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 	slot->data.confirmed_flush = remote_slot->confirmed_lsn;
 	slot->data.catalog_xmin = remote_slot->catalog_xmin;
 	slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+	slot->inactive_since = remote_slot->inactive_since;
 	SpinLockRelease(&slot->mutex);
 
 	if (xmin_changed)
@@ -652,9 +657,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 static bool
 synchronize_slots(WalReceiverConn *wrconn)
 {
-#define SLOTSYNC_COLUMN_COUNT 9
+#define SLOTSYNC_COLUMN_COUNT 10
 	Oid			slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
-	LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
+	LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID, TIMESTAMPTZOID};
 
 	WalRcvExecResult *res;
 	TupleTableSlot *tupslot;
@@ -663,7 +668,7 @@ synchronize_slots(WalReceiverConn *wrconn)
 	bool		started_tx = false;
 	const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
 		" restart_lsn, catalog_xmin, two_phase, failover,"
-		" database, invalidation_reason"
+		" database, invalidation_reason, inactive_since"
 		" FROM pg_catalog.pg_replication_slots"
 		" WHERE failover and NOT temporary";
 
@@ -743,6 +748,14 @@ synchronize_slots(WalReceiverConn *wrconn)
 		remote_slot->invalidated = isnull ? RS_INVAL_NONE :
 			GetSlotInvalidationCause(TextDatumGetCString(d));
 
+		/*
+		 * It is possible to get null value for inactive_since if the slot is
+		 * active on the primary server, so handle accordingly.
+		 */
+		d = DatumGetTimestampTz(slot_getattr(tupslot, ++col,
+											 &isnull));
+		remote_slot->inactive_since = isnull ? 0 : DatumGetLSN(d);
+
 		/* Sanity check */
 		Assert(col == SLOTSYNC_COLUMN_COUNT);
 
@@ -1296,6 +1309,46 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 	Assert(false);
 }
 
+/*
+ * Update the inactive_since property for synced slots.
+ */
+static void
+update_synced_slots_inactive_since(void)
+{
+	TimestampTz now = 0;
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+	for (int i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		/* Check if it is a synchronized slot */
+		if (s->in_use && s->data.synced)
+		{
+			Assert(SlotIsLogical(s));
+
+			/*
+			 * We get the current time beforehand and only once to avoid
+			 * system calls overhead while holding the lock.
+			 */
+			if (now == 0)
+				now = GetCurrentTimestamp();
+
+			/*
+			 * Set the time since the slot has become inactive after shutting
+			 * down slot sync machinery. This helps correctly interpret the
+			 * time if the standby gets promoted without a restart.
+			 */
+			SpinLockAcquire(&s->mutex);
+			s->inactive_since = now;
+			SpinLockRelease(&s->mutex);
+		}
+	}
+
+	LWLockRelease(ReplicationSlotControlLock);
+}
+
 /*
  * Shut down the slot sync worker.
  */
@@ -1309,6 +1362,7 @@ ShutDownSlotSync(void)
 	if (SlotSyncCtx->pid == InvalidPid)
 	{
 		SpinLockRelease(&SlotSyncCtx->mutex);
+		update_synced_slots_inactive_since();
 		return;
 	}
 	SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1341,6 +1395,8 @@ ShutDownSlotSync(void)
 	}
 
 	SpinLockRelease(&SlotSyncCtx->mutex);
+
+	update_synced_slots_inactive_since();
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index d778c0b921..5d6882e4db 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -42,6 +42,7 @@
 #include "access/transam.h"
 #include "access/xlog_internal.h"
 #include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
 #include "common/file_utils.h"
 #include "common/string.h"
 #include "miscadmin.h"
@@ -655,6 +656,7 @@ ReplicationSlotRelease(void)
 	char	   *slotname = NULL;	/* keep compiler quiet */
 	bool		is_logical = false; /* keep compiler quiet */
 	TimestampTz now = 0;
+	bool		update_inactive_since;
 
 	Assert(slot != NULL && slot->active_pid != 0);
 
@@ -690,13 +692,19 @@ ReplicationSlotRelease(void)
 	}
 
 	/*
-	 * Set the last inactive time after marking the slot inactive. We don't
-	 * set it for the slots currently being synced from the primary to the
-	 * standby because such slots are typically inactive as decoding is not
-	 * allowed on those.
+	 * Set the time since the slot has become inactive.
+	 *
+	 * Note that we don't set it for the slots currently being synced from the
+	 * primary to the standby, because such slots typically sync the data from
+	 * the remote slot.
 	 */
 	if (!(RecoveryInProgress() && slot->data.synced))
+	{
 		now = GetCurrentTimestamp();
+		update_inactive_since = true;
+	}
+	else
+		update_inactive_since = false;
 
 	if (slot->data.persistency == RS_PERSISTENT)
 	{
@@ -706,11 +714,12 @@ ReplicationSlotRelease(void)
 		 */
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
-		slot->inactive_since = now;
+		if (update_inactive_since)
+			slot->inactive_since = now;
 		SpinLockRelease(&slot->mutex);
 		ConditionVariableBroadcast(&slot->active_cv);
 	}
-	else
+	else if (update_inactive_since)
 	{
 		SpinLockAcquire(&slot->mutex);
 		slot->inactive_since = now;
@@ -2369,13 +2378,21 @@ RestoreSlotFromDisk(const char *name)
 		slot->active_pid = 0;
 
 		/*
-		 * We set the last inactive time after loading the slot from the disk
-		 * into memory. Whoever acquires the slot i.e. makes the slot active
-		 * will reset it. We don't set it for the slots currently being synced
-		 * from the primary to the standby because such slots are typically
-		 * inactive as decoding is not allowed on those.
+		 * Set the time since the slot has become inactive after loading the
+		 * slot from the disk into memory. Whoever acquires the slot i.e.
+		 * makes the slot active will reset it.
+		 *
+		 * Note that we don't set it for the slots currently being synced from
+		 * the primary to the standby, because such slots typically sync the
+		 * data from the remote slot. We use InRecovery flag instead of
+		 * RecoveryInProgress() as the latter always returns true at this time
+		 * even on primary.
+		 *
+		 * Note that for synced slots after the standby starts up (i.e. after
+		 * the slots are loaded from the disk), the inactive_since will remain
+		 * zero until the next slot sync cycle.
 		 */
-		if (!(RecoveryInProgress() && slot->data.synced))
+		if (!(InRecovery && slot->data.synced))
 			slot->inactive_since = GetCurrentTimestamp();
 		else
 			slot->inactive_since = 0;
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index f47bfd78eb..58d5177bad 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -35,6 +35,13 @@ my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
 $subscriber1->init;
 $subscriber1->start;
 
+# Capture the time before the logical failover slot is created on the
+# primary. We later call this publisher as primary anyway.
+my $slot_creation_time_on_primary = $publisher->safe_psql(
+	'postgres', qq[
+    SELECT current_timestamp;
+]);
+
 # Create a slot on the publisher with failover disabled
 $publisher->safe_psql('postgres',
 	"SELECT 'init' FROM pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, false);"
@@ -174,6 +181,10 @@ $primary->poll_query_until(
 	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
 	1);
 
+# Capture the inactive_since of the slot from the primary
+my $inactive_since_on_primary =
+	capture_and_validate_slot_inactive_since($primary, 'lsub1_slot', $slot_creation_time_on_primary);
+
 # Wait for the standby to catch up so that the standby is not lagging behind
 # the subscriber.
 $primary->wait_for_replay_catchup($standby1);
@@ -190,6 +201,18 @@ is( $standby1->safe_psql(
 	"t",
 	'logical slots have synced as true on standby');
 
+# Capture the inactive_since of the synced slot on the standby
+my $inactive_since_on_standby =
+	capture_and_validate_slot_inactive_since($standby1, 'lsub1_slot', $slot_creation_time_on_primary);
+
+# Synced slots on the standby must get the inactive_since from the primary.
+is( $standby1->safe_psql(
+		'postgres',
+		"SELECT '$inactive_since_on_primary'::timestamptz = '$inactive_since_on_standby'::timestamptz;"
+	),
+	"t",
+	'synchronized slot has got the inactive_since from the primary');
+
 ##################################################
 # Test that the synchronized slot will be dropped if the corresponding remote
 # slot on the primary server has been dropped.
@@ -750,8 +773,28 @@ $primary->reload;
 $standby1->start;
 $primary->wait_for_replay_catchup($standby1);
 
+# Capture the time before the standby is promoted
+my $promotion_time_on_primary = $standby1->safe_psql(
+	'postgres', qq[
+    SELECT current_timestamp;
+]);
+
 $standby1->promote;
 
+# Capture the inactive_since of the synced slot after the promotion.
+# Expectation here is that the slot gets its own inactive_since as part of the
+# promotion. We do this check before the slot is enabled on the new primary
+# below, otherwise the slot gets active setting inactive_since to NULL.
+my $inactive_since_on_new_primary =
+	capture_and_validate_slot_inactive_since($standby1, 'lsub1_slot', $promotion_time_on_primary);
+
+is( $standby1->safe_psql(
+		'postgres',
+		"SELECT '$inactive_since_on_new_primary'::timestamptz > '$inactive_since_on_primary'::timestamptz"
+	),
+	"t",
+	'synchronized slot has got its own inactive_since on the new primary');
+
 # Update subscription with the new primary's connection info
 my $standby1_conninfo = $standby1->connstr . ' dbname=postgres';
 $subscriber1->safe_psql('postgres',
@@ -773,4 +816,27 @@ is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
 	"20",
 	'data replicated from the new primary');
 
+# Capture and validate inactive_since of a given slot.
+sub capture_and_validate_slot_inactive_since
+{
+	my ($node, $slot_name, $reference_time) = @_;
+	my $name = $node->name;
+
+	my $inactive_since = $node->safe_psql('postgres',
+		qq(SELECT inactive_since FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
+		);
+
+	# Check that the captured time is sane
+	is( $node->safe_psql(
+			'postgres',
+			qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
+				'$inactive_since'::timestamptz >= '$reference_time'::timestamptz;]
+		),
+		't',
+		"last inactive time for slot $slot_name is sane on node $name");
+
+	return $inactive_since;
+}
+
 done_testing();
-- 
2.34.1

Reply via email to