On Fri, Jun 01, 2018 at 02:53:09PM -0400, Michael Paquier wrote:
> Definitely, while the previous patch was around mainly to show where
> things are incorrect, I am attaching a set of patches for HEAD which can
> be used for commit:
> - One patch which addresses the several lock problems and adds some
> instructions about the variables protected by spinlocks for slots in
> use, which should be back-patched.
> - A second patch for HEAD which addresses what has been noticed for the
> new slot advance feature.  This addresses as well the lock problems
> introduced in the new advance code, hopefully the split makes sense to
> others on this thread.
> Once again those only apply to HEAD, please feel free to ping me if you
> would like versions for back-branches (or anybody picking up those
> patches).

And of course I found a typo just after sending those..  Please use the
attached ones instead.
--
Michael
From 69bb70049d8ba74af60e8554fd6379499fbd29ff Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Fri, 1 Jun 2018 14:30:55 -0400
Subject: [PATCH 1/2] Fix and document lock handling for in-memory replication
 xslot data

While debugging issues on HEAD for the new slot forwarding feature of
Postgres 11, some monitoring of the code surrounding in-memory slot data
has proved that the lock handling may cause inconsistent data to be read
by read-only callers of slot functions, particularly
pg_get_replication_slots() which fetches data for the system view
pg_replication_slots.

The code paths involved in those problems concern the WAL sender,
logical decoding initialization and WAL reservation for slots.

A set of comments documenting all the lock handlings, particularly the
dependency with LW locks for slots and the in_use flag as well as the
internal mutex lock is added, based on a suggested by Simon Riggs.

Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9g...@mail.gmail.com
---
 src/backend/replication/logical/logical.c | 13 +++++++++----
 src/backend/replication/slot.c            |  4 ++++
 src/backend/replication/walsender.c       | 14 +++++++++++---
 src/include/replication/slot.h            | 10 ++++++++++
 4 files changed, 34 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1393591538..61588d626f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin,
 
 	xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
 
+	SpinLockAcquire(&slot->mutex);
 	slot->effective_catalog_xmin = xmin_horizon;
 	slot->data.catalog_xmin = xmin_horizon;
 	if (need_full_snapshot)
 		slot->effective_xmin = xmin_horizon;
+	SpinLockRelease(&slot->mutex);
 
 	ReplicationSlotsComputeRequiredXmin(true);
 
@@ -445,13 +447,14 @@ void
 DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 {
 	XLogRecPtr	startptr;
+	ReplicationSlot *slot = ctx->slot;
 
 	/* Initialize from where to start reading WAL. */
-	startptr = ctx->slot->data.restart_lsn;
+	startptr = slot->data.restart_lsn;
 
 	elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
-		 (uint32) (ctx->slot->data.restart_lsn >> 32),
-		 (uint32) ctx->slot->data.restart_lsn);
+		 (uint32) (slot->data.restart_lsn >> 32),
+		 (uint32) slot->data.restart_lsn);
 
 	/* Wait for a consistent starting point */
 	for (;;)
@@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 		CHECK_FOR_INTERRUPTS();
 	}
 
-	ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	SpinLockAcquire(&slot->mutex);
+	slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	SpinLockRelease(&slot->mutex);
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 056628fe8e..79d7a57d67 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
 			XLogRecPtr	flushptr;
 
 			/* start at current insert position */
+			SpinLockAcquire(&slot->mutex);
 			slot->data.restart_lsn = GetXLogInsertRecPtr();
+			SpinLockRelease(&slot->mutex);
 
 			/* make sure we have enough information to start */
 			flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
 		}
 		else
 		{
+			SpinLockAcquire(&slot->mutex);
 			slot->data.restart_lsn = GetRedoRecPtr();
+			SpinLockRelease(&slot->mutex);
 		}
 
 		/* prevent WAL removal as fast as possible */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e47ddca6bc..0b1f1ba3c1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1040,7 +1040,9 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
 static void
 StartLogicalReplication(StartReplicationCmd *cmd)
 {
-	StringInfoData buf;
+	StringInfoData	buf;
+	XLogRecPtr		restart_lsn;
+	XLogRecPtr		confirmed_lsn;
 
 	/* make sure that our requirements are still fulfilled */
 	CheckLogicalDecodingRequirements();
@@ -1081,14 +1083,20 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 												 WalSndWriteData,
 												 WalSndUpdateProgress);
 
+	/* Fetch all needed values from the slot */
+	SpinLockAcquire(&MyReplicationSlot->mutex);
+	restart_lsn = MyReplicationSlot->data.restart_lsn;
+	confirmed_lsn = MyReplicationSlot->data.confirmed_flush;
+	SpinLockRelease(&MyReplicationSlot->mutex);
+
 	/* Start reading WAL from the oldest required WAL. */
-	logical_startptr = MyReplicationSlot->data.restart_lsn;
+	logical_startptr = restart_lsn;
 
 	/*
 	 * Report the location after which we'll send out further commits as the
 	 * current sentPtr.
 	 */
-	sentPtr = MyReplicationSlot->data.confirmed_flush;
+	sentPtr = confirmed_lsn;
 
 	/* Also update the sent position status in shared memory */
 	SpinLockAcquire(&MyWalSnd->mutex);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 76a88c6de7..6fa9df5553 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -86,6 +86,16 @@ typedef struct ReplicationSlotPersistentData
 
 /*
  * Shared memory state of a single replication slot.
+ *
+ * The data included in this structure, including the contents within
+ * ReplicationSlotPersistentData, are protected by mutex when read from
+ * other backends than the one registering the slot as in_use.  If the
+ * slot is not marked as in_use, then no code paths refer or should refer
+ * to the in-memory data of a slot.
+ *
+ * Note that a slot is switched as in_use only with
+ * ReplicationSlotControlLock held in exclusive mode, protecting from any
+ * while readers have to hold this lock at least in shared mode.
  */
 typedef struct ReplicationSlot
 {
-- 
2.17.0

From 7032bb81531527c495c77f97bdc170a6aa7e09ad Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Fri, 1 Jun 2018 14:37:50 -0400
Subject: [PATCH 2/2] Fix a couple of bugs with replication slot advancing
 feature

A review of the code has showed up two issues fixed by this commit:
- Physical slots have been using the confirmed LSN position as a start
comparison point which is always 0/0, instead use the restart LSN
position (logical slots need to use the confirmed LSN position).
- Never return 0/0 is a slot cannot be advanced.  This way, if a slot is
advanced while the activity is idle, then the same position is returned
by to caller over and over without raising an error.

Note that as the slot is owned by the backend advancing it, then the
read of those field is fine lockless, while updates need to happen while
the slot mutex is held.

Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9g...@mail.gmail.com
---
 src/backend/replication/slotfuncs.c | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..d07d2c432b 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -322,15 +322,15 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 static XLogRecPtr
 pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
 {
-	XLogRecPtr	retlsn = InvalidXLogRecPtr;
+	XLogRecPtr	retlsn = startlsn;
 
-	SpinLockAcquire(&MyReplicationSlot->mutex);
 	if (MyReplicationSlot->data.restart_lsn < moveto)
 	{
+		SpinLockAcquire(&MyReplicationSlot->mutex);
 		MyReplicationSlot->data.restart_lsn = moveto;
+		SpinLockRelease(&MyReplicationSlot->mutex);
 		retlsn = moveto;
 	}
-	SpinLockRelease(&MyReplicationSlot->mutex);
 
 	return retlsn;
 }
@@ -343,7 +343,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
 {
 	LogicalDecodingContext *ctx;
 	ResourceOwner old_resowner = CurrentResourceOwner;
-	XLogRecPtr	retlsn = InvalidXLogRecPtr;
+	XLogRecPtr	retlsn = startlsn;
 
 	PG_TRY();
 	{
@@ -472,7 +472,11 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	/* Acquire the slot so we "own" it */
 	ReplicationSlotAcquire(NameStr(*slotname), true);
 
-	startlsn = MyReplicationSlot->data.confirmed_flush;
+	if (OidIsValid(MyReplicationSlot->data.database))
+		startlsn = MyReplicationSlot->data.confirmed_flush;
+	else
+		startlsn = MyReplicationSlot->data.restart_lsn;
+
 	if (moveto < startlsn)
 	{
 		ReplicationSlotRelease();
-- 
2.17.0

Attachment: signature.asc
Description: PGP signature

Reply via email to