On Tue, 22 Aug 2023 at 15:42, Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Tue, Aug 22, 2023 at 2:56 PM Ashutosh Bapat
> <ashutosh.bapat....@gmail.com> wrote:
> >
> > On Tue, Aug 22, 2023 at 9:48 AM Amit Kapila <amit.kapil...@gmail.com> wrote:
> > > >
> > > > Another idea is to record the confirm_flush_lsn at the time of
> > > > persisting the slot. We can use it in two different ways 1. to mark a
> > > > slot dirty and persist if the last confirm_flush_lsn when slot was
> > > > persisted was too far from the current confirm_flush_lsn of the slot.
> > > > 2. at shutdown checkpoint, persist all the slots which have these two
> > > > confirm_flush_lsns different.
> > > >
> > >
> > > I think using it in the second (2) way sounds advantageous as compared
> > > to storing another dirty flag because this requires us to update
> > > last_persisted_confirm_flush_lsn only while writing the slot info.
> > > OTOH, having a flag dirty_for_shutdown_checkpoint will require us to
> > > update it each time we update confirm_flush_lsn under spinlock at
> > > multiple places. But, I don't see the need of doing what you proposed
> > > in (1) as the use case for it is very minor, basically this may
> > > sometimes help us to avoid decoding after crash recovery.
> >
> > Once we have last_persisted_confirm_flush_lsn, (1) is just an
> > optimization on top of that. With that we take the opportunity to
> > persist confirmed_flush_lsn which is much farther than the current
> > persisted value and thus improving chances of updating restart_lsn and
> > catalog_xmin faster after a WAL sender restart. We need to keep that
> > in mind when implementing (2). The problem is if we don't implement
> > (1) right now, we might just forget to do that small incremental
> > change in future. My preference is 1. Do both (1) and (2) together 2.
> > Do (2) first and then (1) as a separate commit. 3. Just implement (2)
> > if we don't have time at all for first two options.
> >
>
> I prefer one of (2) or (3). Anyway, it is better to do that
> optimization (persist confirm_flush_lsn at a regular interval) as a
> separate patch as we need to test and prove its value separately.

Here is a patch to persist to disk logical slots during a shutdown
checkpoint if the updated confirmed_flush_lsn has not yet been
persisted.

Regards,
Vignesh
From f99c7da6f1d65a1324dce4e280040dcb29a912db Mon Sep 17 00:00:00 2001
From: Julien Rouhaud <julien.rouh...@free.fr>
Date: Fri, 14 Apr 2023 13:49:09 +0800
Subject: [PATCH v3] Persist to disk logical slots during a shutdown checkpoint
 if the updated confirmed_flush_lsn has not yet been persisted.

It's entirely possible for a logical slot to have a confirmed_flush_lsn higher
than the last value saved on disk while not being marked as dirty.  It's
currently not a problem to lose that value during a clean shutdown / restart
cycle, but a later patch adding support for pg_upgrade of publications and
logical slots will rely on that value being properly persisted to disk.

Author: Julien Rouhaud
Reviewed-by: Wang Wei, Peter Smith, Masahiko Sawada
---
 contrib/test_decoding/meson.build             |  1 +
 contrib/test_decoding/t/002_always_persist.pl | 74 +++++++++++++++++++
 src/backend/access/transam/xlog.c             |  2 +-
 src/backend/replication/slot.c                | 33 ++++++---
 src/include/replication/slot.h                |  5 +-
 5 files changed, 103 insertions(+), 12 deletions(-)
 create mode 100644 contrib/test_decoding/t/002_always_persist.pl

diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index 7b05cc25a3..12afb9ea8c 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -72,6 +72,7 @@ tests += {
   'tap': {
     'tests': [
       't/001_repl_stats.pl',
+      't/002_always_persist.pl',
     ],
   },
 }
diff --git a/contrib/test_decoding/t/002_always_persist.pl b/contrib/test_decoding/t/002_always_persist.pl
new file mode 100644
index 0000000000..47bf64bb09
--- /dev/null
+++ b/contrib/test_decoding/t/002_always_persist.pl
@@ -0,0 +1,74 @@
+
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Test logical replication slots are always persisted to disk during a shutdown
+# checkpoint.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Test set-up
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init(allows_streaming => 'logical');
+$node->append_conf('postgresql.conf', q{
+autovacuum = off
+checkpoint_timeout = 1h
+});
+
+$node->start;
+
+# Create table
+$node->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+
+# Create replication slot
+$node->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('regression_slot1', 'test_decoding');"
+);
+
+# Insert some data
+$node->safe_psql('postgres',
+	"INSERT INTO test_tbl VALUES (generate_series(1, 5));");
+
+# Consume WAL records
+$node->safe_psql('postgres',
+    "SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL);"
+);
+
+# Stop the node to cause a shutdown checkpoint
+$node->stop();
+
+# Fetch checkPoint from the control file itself
+my ($stdout, $stderr) = run_command([ 'pg_controldata', $node->data_dir ]);
+my @control_data = split("\n", $stdout);
+my $latest_checkpoint = undef;
+foreach (@control_data)
+{
+	if ($_ =~ /^Latest checkpoint location:\s*(.*)$/mg)
+	{
+		$latest_checkpoint = $1;
+		last;
+	}
+}
+die "No checkPoint in control file found\n"
+  unless defined($latest_checkpoint);
+
+# Boot the node again and check confirmed_flush_lsn. If the slot has persisted,
+# the LSN becomes the same as the latest checkpoint location, which means the
+# SHUTDOWN_CHECKPOINT record.
+$node->start();
+my $confirmed_flush = $node->safe_psql('postgres',
+	"SELECT confirmed_flush_lsn FROM pg_replication_slots;"
+);
+
+# Compare confirmed_flush_lsn and checkPoint
+ok($confirmed_flush eq $latest_checkpoint,
+	"Check confirmed_flush is same as latest checkpoint location");
+
+# Shutdown
+$node->stop;
+
+done_testing();
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 60c0b7ec3a..6dced61cf4 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7026,7 +7026,7 @@ static void
 CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
 	CheckPointRelationMap();
-	CheckPointReplicationSlots();
+	CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN);
 	CheckPointSnapBuild();
 	CheckPointLogicalRewriteHeap();
 	CheckPointReplicationOrigin();
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 1dc27264f6..ccd46d5442 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -109,7 +109,8 @@ static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 /* internal persistency functions */
 static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
-static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
+static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel,
+						   bool is_shutdown);
 
 /*
  * Report shared-memory space needed by ReplicationSlotsShmemInit.
@@ -321,6 +322,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->candidate_xmin_lsn = InvalidXLogRecPtr;
 	slot->candidate_restart_valid = InvalidXLogRecPtr;
 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
+	slot->last_persisted_confirmed_flush = InvalidXLogRecPtr;
 
 	/*
 	 * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -783,7 +785,7 @@ ReplicationSlotSave(void)
 	Assert(MyReplicationSlot != NULL);
 
 	sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
-	SaveSlotToPath(MyReplicationSlot, path, ERROR);
+	SaveSlotToPath(MyReplicationSlot, path, ERROR, false);
 }
 
 /*
@@ -1565,11 +1567,10 @@ restart:
 /*
  * Flush all replication slots to disk.
  *
- * This needn't actually be part of a checkpoint, but it's a convenient
- * location.
+ * is_shutdown is true in case of a shutdown checkpoint.
  */
 void
-CheckPointReplicationSlots(void)
+CheckPointReplicationSlots(bool is_shutdown)
 {
 	int			i;
 
@@ -1594,7 +1595,7 @@ CheckPointReplicationSlots(void)
 
 		/* save the slot to disk, locking is handled in SaveSlotToPath() */
 		sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
-		SaveSlotToPath(s, path, LOG);
+		SaveSlotToPath(s, path, LOG, is_shutdown);
 	}
 	LWLockRelease(ReplicationSlotAllocationLock);
 }
@@ -1700,7 +1701,7 @@ CreateSlotOnDisk(ReplicationSlot *slot)
 
 	/* Write the actual state file. */
 	slot->dirty = true;			/* signal that we really need to write */
-	SaveSlotToPath(slot, tmppath, ERROR);
+	SaveSlotToPath(slot, tmppath, ERROR, false);
 
 	/* Rename the directory into place. */
 	if (rename(tmppath, path) != 0)
@@ -1726,7 +1727,8 @@ CreateSlotOnDisk(ReplicationSlot *slot)
  * Shared functionality between saving and creating a replication slot.
  */
 static void
-SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
+SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel,
+			   bool is_shutdown)
 {
 	char		tmppath[MAXPGPATH];
 	char		path[MAXPGPATH];
@@ -1740,8 +1742,16 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
 	slot->just_dirtied = false;
 	SpinLockRelease(&slot->mutex);
 
-	/* and don't do anything if there's nothing to write */
-	if (!was_dirty)
+	/*
+	 * Don't do anything if there's nothing to write, unless this is called for
+	 * a logical slot during a shutdown checkpoint and if the updated
+	 * confirmed_flush LSN has not yet been persisted, as we want to persist
+	 * the updated confirmed_flush LSN in that case, even if that's the only
+	 * modification.
+	 */
+	if (!was_dirty &&
+		!(SlotIsLogical(slot) && is_shutdown &&
+		  (slot->data.confirmed_flush != slot->last_persisted_confirmed_flush)))
 		return;
 
 	LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
@@ -1871,6 +1881,8 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
 	SpinLockAcquire(&slot->mutex);
 	if (!slot->just_dirtied)
 		slot->dirty = false;
+
+	slot->last_persisted_confirmed_flush = slot->data.confirmed_flush;
 	SpinLockRelease(&slot->mutex);
 
 	LWLockRelease(&slot->io_in_progress_lock);
@@ -2072,6 +2084,7 @@ RestoreSlotFromDisk(const char *name)
 		slot->candidate_xmin_lsn = InvalidXLogRecPtr;
 		slot->candidate_restart_lsn = InvalidXLogRecPtr;
 		slot->candidate_restart_valid = InvalidXLogRecPtr;
+		slot->last_persisted_confirmed_flush = InvalidXLogRecPtr;
 
 		slot->in_use = true;
 		slot->active_pid = 0;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a8a89dc784..b519f7af5f 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -178,6 +178,9 @@ typedef struct ReplicationSlot
 	XLogRecPtr	candidate_xmin_lsn;
 	XLogRecPtr	candidate_restart_valid;
 	XLogRecPtr	candidate_restart_lsn;
+
+	/* The last persisted confirmed flush lsn */
+	XLogRecPtr	last_persisted_confirmed_flush;
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
@@ -241,7 +244,7 @@ extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslo
 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
 
 extern void StartupReplicationSlots(void);
-extern void CheckPointReplicationSlots(void);
+extern void CheckPointReplicationSlots(bool is_shutdown);
 
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
-- 
2.34.1

Reply via email to