On Mon, Sep 11, 2023 at 02:49:49PM +0530, Amit Kapila wrote:
> I don't know if the difference is worth inventing a new BACKEND_TYPE_*
> but if you think so then we can probably discuss this in a new thread.
> I think we may want to improve some comments as a separate patch to
> make this evident.

The comments in postmaster.c could be improved, at least.  There is no
need to discuss that here.

> This point is not very clear to me. Can you please quote the exact
> comment if you think something needs to be changed?

Hmm.  Don't think that's it yet..

Please see the v11 attached, that rewords all the places of the patch
that need clarifications IMO.  I've found that the comment additions
in CheckPointReplicationSlots() to be overcomplicated:
- The key point to force a flush of a slot if its confirmed_lsn has
moved ahead of the last LSN where it was saved is to make the follow
up restart more responsive.
- Not sure that there is any point to mention the other code paths in
the tree where ReplicationSlotSave() can be called, and a slot can be
saved in other processes than just WAL senders (like slot
manipulations in normal backends, for one).  This was the last
sentence in v10.
- Persist is incorrect in this context in the tests, slot.c and
slot.h, as it should refer to the slot's data being flushed, saved or
just "made durable" because this is what the new last saved LSN is
here for.  Persistence is a slot property, and does not refer to the
fact of flushing the data IMO.

+           if (s->data.invalidated == RS_INVAL_NONE &&
+               s->data.confirmed_flush != s->last_saved_confirmed_flush)

Actually this is incorrect, no?  Shouldn't we make sure that the
confirmed_flush is strictly higher than the last saved LSN?
--
Michael
From 0aa4c8245359b60001ed90de1193dbb8b2b6c91c Mon Sep 17 00:00:00 2001
From: Julien Rouhaud <julien.rouh...@free.fr>
Date: Fri, 14 Apr 2023 13:49:09 +0800
Subject: [PATCH v11] Flush logical slots to disk during a shutdown checkpoint
 if required.

It's entirely possible for a logical slot to have a confirmed_flush LSN
higher than the last value saved on disk while not being marked as dirty.
Currently, it is not a major problem but a later patch adding support for
the upgrade of slots relies on that value being properly flushed to disk.

It can also help avoid processing the same transactions again in some
boundary cases after the clean shutdown and restart.  Say, we process
some transactions for which we didn't send anything downstream (the
changes got filtered) but the confirm_flush LSN is updated due to
keepalives.  As we don't flush the latest value of confirm_flush LSN, it
may lead to processing the same changes again without this patch.

The approach taken by this patch has been suggested by Ashutosh Bapat.

Author: Vignesh C, Julien Rouhaud, Kuroda Hayato
Reviewed-by: Amit Kapila, Dilip Kumar, Ashutosh Bapat, Peter Smith
Discussion: http://postgr.es/m/CAA4eK1JzJagMmb_E8D4au=gyqkxox0afnbm1fbp7sy7t4yw...@mail.gmail.com
Discussion: http://postgr.es/m/tyapr01mb58664c81887b3af2eb6b16e3f5...@tyapr01mb5866.jpnprd01.prod.outlook.com
---
 src/include/replication/slot.h                |   8 +-
 src/backend/access/transam/xlog.c             |   2 +-
 src/backend/replication/slot.c                |  32 +++++-
 src/test/recovery/meson.build                 |   1 +
 .../t/038_save_logical_slots_shutdown.pl      | 102 ++++++++++++++++++
 5 files changed, 140 insertions(+), 5 deletions(-)
 create mode 100644 src/test/recovery/t/038_save_logical_slots_shutdown.pl

diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a8a89dc784..5e60030234 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -178,6 +178,12 @@ typedef struct ReplicationSlot
 	XLogRecPtr	candidate_xmin_lsn;
 	XLogRecPtr	candidate_restart_valid;
 	XLogRecPtr	candidate_restart_lsn;
+
+	/*
+	 * LSN used to track the last confirmed_flush LSN where the slot's data
+	 * has been flushed to disk.
+	 */
+	XLogRecPtr	last_saved_confirmed_flush;
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
@@ -241,7 +247,7 @@ extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslo
 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
 
 extern void StartupReplicationSlots(void);
-extern void CheckPointReplicationSlots(void);
+extern void CheckPointReplicationSlots(bool is_shutdown);
 
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f6f8adc72a..f26c8d18a6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7039,7 +7039,7 @@ static void
 CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
 	CheckPointRelationMap();
-	CheckPointReplicationSlots();
+	CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN);
 	CheckPointSnapBuild();
 	CheckPointLogicalRewriteHeap();
 	CheckPointReplicationOrigin();
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index bb09c4010f..a31c7867cf 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -321,6 +321,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->candidate_xmin_lsn = InvalidXLogRecPtr;
 	slot->candidate_restart_valid = InvalidXLogRecPtr;
 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
+	slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
 
 	/*
 	 * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -1573,10 +1574,12 @@ restart:
  * Flush all replication slots to disk.
  *
  * This needn't actually be part of a checkpoint, but it's a convenient
- * location.
+ * location.  Additionally, in case of a shutdown checkpoint, we also identify
+ * the slots for which the confirmed_flush LSN has been updated since the last
+ * time it was saved and flush them.
  */
 void
-CheckPointReplicationSlots(void)
+CheckPointReplicationSlots(bool is_shutdown)
 {
 	int			i;
 
@@ -1601,6 +1604,27 @@ CheckPointReplicationSlots(void)
 
 		/* save the slot to disk, locking is handled in SaveSlotToPath() */
 		sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
+
+		/*
+		 * Slot's data is not flushed each time the confirmed_flush LSN is
+		 * updated as that could lead to frequent writes.  However, we decide
+		 * to force a flush of all logical slot's data at the time of shutdown
+		 * if the confirmed_flush LSN is changed since we last flushed it to
+		 * disk.  This helps in avoiding an unnecessary retreat of the
+		 * confirmed_flush LSN after restart.
+		 */
+		if (is_shutdown && SlotIsLogical(s))
+		{
+			SpinLockAcquire(&s->mutex);
+			if (s->data.invalidated == RS_INVAL_NONE &&
+				s->data.confirmed_flush > s->last_saved_confirmed_flush)
+			{
+				s->just_dirtied = true;
+				s->dirty = true;
+			}
+			SpinLockRelease(&s->mutex);
+		}
+
 		SaveSlotToPath(s, path, LOG);
 	}
 	LWLockRelease(ReplicationSlotAllocationLock);
@@ -1873,11 +1897,12 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
 
 	/*
 	 * Successfully wrote, unset dirty bit, unless somebody dirtied again
-	 * already.
+	 * already and remember the confirmed_flush LSN value.
 	 */
 	SpinLockAcquire(&slot->mutex);
 	if (!slot->just_dirtied)
 		slot->dirty = false;
+	slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
 	SpinLockRelease(&slot->mutex);
 
 	LWLockRelease(&slot->io_in_progress_lock);
@@ -2074,6 +2099,7 @@ RestoreSlotFromDisk(const char *name)
 		/* initialize in memory state */
 		slot->effective_xmin = cp.slotdata.xmin;
 		slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
+		slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
 
 		slot->candidate_catalog_xmin = InvalidTransactionId;
 		slot->candidate_xmin_lsn = InvalidXLogRecPtr;
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index e7328e4894..646d6ffde4 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -43,6 +43,7 @@ tests += {
       't/035_standby_logical_decoding.pl',
       't/036_truncated_dropped.pl',
       't/037_invalid_database.pl',
+      't/038_save_logical_slots_shutdown.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/038_save_logical_slots_shutdown.pl b/src/test/recovery/t/038_save_logical_slots_shutdown.pl
new file mode 100644
index 0000000000..de19829560
--- /dev/null
+++ b/src/test/recovery/t/038_save_logical_slots_shutdown.pl
@@ -0,0 +1,102 @@
+
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Test logical replication slots are always flushed to disk during a shutdown
+# checkpoint.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+sub compare_confirmed_flush
+{
+	my ($node, $confirmed_flush_from_log) = @_;
+
+	# Fetch Latest checkpoint location from the control file
+	my ($stdout, $stderr) =
+	  run_command([ 'pg_controldata', $node->data_dir ]);
+	my @control_data = split("\n", $stdout);
+	my $latest_checkpoint = undef;
+	foreach (@control_data)
+	{
+		if ($_ =~ /^Latest checkpoint location:\s*(.*)$/mg)
+		{
+			$latest_checkpoint = $1;
+			last;
+		}
+	}
+	die "Latest checkpoint location not found in control file\n"
+	  unless defined($latest_checkpoint);
+
+	# Is it same as the value read from log?
+	ok( $latest_checkpoint eq $confirmed_flush_from_log,
+		"Check that the slot's confirmed_flush LSN is the same as the latest_checkpoint location"
+	);
+
+	return;
+}
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('pub');
+$node_publisher->init(allows_streaming => 'logical');
+# Avoid checkpoint during the test, otherwise, the latest checkpoint location
+# will change.
+$node_publisher->append_conf(
+	'postgresql.conf', q{
+checkpoint_timeout = 1h
+autovacuum = off
+});
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('sub');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create tables
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+
+# Insert some data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tbl VALUES (generate_series(1, 5));");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub FOR ALL TABLES");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub"
+);
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub');
+
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl");
+
+is($result, qq(5), "check initial copy was done");
+
+my $offset = -s $node_publisher->logfile;
+
+# Restart the publisher to ensure that the slot will be flushed if required
+$node_publisher->restart();
+
+# Wait until the walsender creates decoding context
+$node_publisher->wait_for_log(
+	qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./,
+	$offset);
+
+# Extract confirmed_flush from the logfile
+my $log_contents = slurp_file($node_publisher->logfile, $offset);
+$log_contents =~
+  qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./
+  or die "could not get confirmed_flush_lsn";
+
+# Ensure that the slot's confirmed_flush LSN is the same as the
+# latest_checkpoint location.
+compare_confirmed_flush($node_publisher, $1);
+
+done_testing();
-- 
2.40.1

Attachment: signature.asc
Description: PGP signature

Reply via email to