On Thu, May 31, 2018 at 06:31:24PM +0100, Simon Riggs wrote:

Thanks for the input, Simon.  I have been able to spend more time
monitoring the slot-related code.

> On 28 May 2018 at 09:57, Michael Paquier <mich...@paquier.xyz> wrote:
>> Yes, this only returns InvalidXLogRecPtr if the location could not be
>> moved forward.  Still, there is more going on here.  For a physical
>> slot, confirmed_lsn is always 0/0, hence the backward check is never
>> applied for it.  What I think should be used for value assigned to
>> startlsn is restart_lsn instead.  Then what do we do if the position
>> cannot be moved: should we raise an error, as what my patch attached
>> does, or just report the existing position the slot is at?
> 
> I don't see why an ERROR would be appropriate.

Okay, the caller can always compare if the returned position matches
what happened in the past or not, so that's fine for me.  About that,
the LSN used as the initialization should then be startlsn instead of
InvalidXLogRecPtr.

>> A second error that I can see is in pg_logical_replication_slot_advance,
>> which does not take the mutex lock of MyReplicationSlot, so concurrent
>> callers like those of ReplicationSlotsComputeRequiredLSN (applies to
>> physical slot actually) and pg_get_replication_slots() may read false
>> data.
>>
>> On top of that, it seems to me that StartLogicalReplication is reading a
>> couple of fields from a slot without taking a lock, so at least
>> pg_get_replication_slots() may read incorrect data.
>> ReplicationSlotReserveWal also is missing a lock..  Those are older than
>> v11 though.

Actually, there are two extra problems:
- In CreateInitDecodingContext which can be called after the slot is
marked as in use so there could be inconsistencies with
pg_get_replication_slots() as well for catalog_xmin & co.
- In DecodingContextFindStartpoint where confirmed_lsn is updated
without the mutex taken.

> I think the problem here is there are no comments explaining how to
> access the various fields in the structure, so there was no way to
> check whether the code was good or not.
>
> If we add corrective code we should clarify that in comments the .h
> file also, as is done in XlogCtl

Yes, I agree with you.  There are at least two LWLocks used for the
overall slot creation and handling, as well as a set of spin locks used
for the fields.  The code also does not mention in its comments that
slots marked with in_use = false are not scanned at all by concurrent
backends, but this is strongly implied in the code, hence you don't need
to care about taking lock for them when working on fields for this slot
as long as its flag in_use has not been marked to true.  There is one
code path which bypasses slots with in_use marked to true, but that's
for the startup process recovering the slot data, so there is no need to
care about locking in this case.

> Your points look correct to me, well spotted. I'd like to separate the
> correction of these issues from the change of behavior patch. Those
> earlier issues can be backpatched, but the change of behavior only
> affects PG11.

Definitely, while the previous patch was around mainly to show where
things are incorrect, I am attaching a set of patches for HEAD which can
be used for commit:
- One patch which addresses the several lock problems and adds some
instructions about the variables protected by spinlocks for slots in
use, which should be back-patched.
- A second patch for HEAD which addresses what has been noticed for the
new slot advance feature.  This addresses as well the lock problems
introduced in the new advance code, hopefully the split makes sense to
others on this thread.
Once again those only apply to HEAD, please feel free to ping me if you
would like versions for back-branches (or anybody picking up those
patches).

Thanks,
--
Michael
From 69bb70049d8ba74af60e8554fd6379499fbd29ff Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Fri, 1 Jun 2018 14:30:55 -0400
Subject: [PATCH 1/2] Fix and document lock handling for in-memory replication
 xslot data

While debugging issues on HEAD for the new slot forwarding feature of
Postgres 11, some monitoring of the code surrounding in-memory slot data
has proved that the lock handling may cause inconsistent data to be read
by read-only callers of slot functions, particularly
pg_get_replication_slots() which fetches data for the system view
pg_replication_slots.

The code paths involved in those problems concern the WAL sender,
logical decoding initialization and WAL reservation for slots.

A set of comments documenting all the lock handlings, particularly the
dependency with LW locks for slots and the in_use flag as well as the
internal mutex lock is added, based on a suggested by Simon Riggs.

Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9g...@mail.gmail.com
---
 src/backend/replication/logical/logical.c | 13 +++++++++----
 src/backend/replication/slot.c            |  4 ++++
 src/backend/replication/walsender.c       | 14 +++++++++++---
 src/include/replication/slot.h            | 10 ++++++++++
 4 files changed, 34 insertions(+), 7 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1393591538..61588d626f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin,
 
 	xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
 
+	SpinLockAcquire(&slot->mutex);
 	slot->effective_catalog_xmin = xmin_horizon;
 	slot->data.catalog_xmin = xmin_horizon;
 	if (need_full_snapshot)
 		slot->effective_xmin = xmin_horizon;
+	SpinLockRelease(&slot->mutex);
 
 	ReplicationSlotsComputeRequiredXmin(true);
 
@@ -445,13 +447,14 @@ void
 DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 {
 	XLogRecPtr	startptr;
+	ReplicationSlot *slot = ctx->slot;
 
 	/* Initialize from where to start reading WAL. */
-	startptr = ctx->slot->data.restart_lsn;
+	startptr = slot->data.restart_lsn;
 
 	elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
-		 (uint32) (ctx->slot->data.restart_lsn >> 32),
-		 (uint32) ctx->slot->data.restart_lsn);
+		 (uint32) (slot->data.restart_lsn >> 32),
+		 (uint32) slot->data.restart_lsn);
 
 	/* Wait for a consistent starting point */
 	for (;;)
@@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 		CHECK_FOR_INTERRUPTS();
 	}
 
-	ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	SpinLockAcquire(&slot->mutex);
+	slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	SpinLockRelease(&slot->mutex);
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 056628fe8e..79d7a57d67 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
 			XLogRecPtr	flushptr;
 
 			/* start at current insert position */
+			SpinLockAcquire(&slot->mutex);
 			slot->data.restart_lsn = GetXLogInsertRecPtr();
+			SpinLockRelease(&slot->mutex);
 
 			/* make sure we have enough information to start */
 			flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
 		}
 		else
 		{
+			SpinLockAcquire(&slot->mutex);
 			slot->data.restart_lsn = GetRedoRecPtr();
+			SpinLockRelease(&slot->mutex);
 		}
 
 		/* prevent WAL removal as fast as possible */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e47ddca6bc..0b1f1ba3c1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1040,7 +1040,9 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
 static void
 StartLogicalReplication(StartReplicationCmd *cmd)
 {
-	StringInfoData buf;
+	StringInfoData	buf;
+	XLogRecPtr		restart_lsn;
+	XLogRecPtr		confirmed_lsn;
 
 	/* make sure that our requirements are still fulfilled */
 	CheckLogicalDecodingRequirements();
@@ -1081,14 +1083,20 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 												 WalSndWriteData,
 												 WalSndUpdateProgress);
 
+	/* Fetch all needed values from the slot */
+	SpinLockAcquire(&MyReplicationSlot->mutex);
+	restart_lsn = MyReplicationSlot->data.restart_lsn;
+	confirmed_lsn = MyReplicationSlot->data.confirmed_flush;
+	SpinLockRelease(&MyReplicationSlot->mutex);
+
 	/* Start reading WAL from the oldest required WAL. */
-	logical_startptr = MyReplicationSlot->data.restart_lsn;
+	logical_startptr = restart_lsn;
 
 	/*
 	 * Report the location after which we'll send out further commits as the
 	 * current sentPtr.
 	 */
-	sentPtr = MyReplicationSlot->data.confirmed_flush;
+	sentPtr = confirmed_lsn;
 
 	/* Also update the sent position status in shared memory */
 	SpinLockAcquire(&MyWalSnd->mutex);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 76a88c6de7..6fa9df5553 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -86,6 +86,16 @@ typedef struct ReplicationSlotPersistentData
 
 /*
  * Shared memory state of a single replication slot.
+ *
+ * The data included in this structure, including the contents within
+ * ReplicationSlotPersistentData, are protected by mutex when read from
+ * other backends than the one registering the slot as in_use.  If the
+ * slot is not marked as in_use, then no code paths refer or should refer
+ * to the in-memory data of a slot.
+ *
+ * Note that a slot is switched as in_use only with
+ * ReplicationSlotControlLock held in exclusive mode, protecting from any
+ * while readers have to hold this lock at least in shared mode.
  */
 typedef struct ReplicationSlot
 {
-- 
2.17.0

From 20af21231112a31f9d825d379bf75ce1e0aecf54 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Fri, 1 Jun 2018 14:37:50 -0400
Subject: [PATCH 2/2] Fix a couple of bugs with replication slot advancing
 feature

A review of the code has showed up two issues fixed by this commit:
- Physical slots have been using the confirmed LSN position as a start
comparison point which is always 0/0, instead use the restart LSN
position (logical slots need to use the confirmed LSN position).
- Never return 0/0 is a slot cannot be advanced.  This way, if a slot is
advanced while the activity is idle, then the same position is returned
by to caller over and over without raising an error.

Note that as the slot is owned by the backend advancing it, then the
read of those field is fine lockless, while updates need to happen while
the slot mutex is held.

Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9g...@mail.gmail.com
---
 src/backend/replication/slotfuncs.c | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..151151b1a8 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -322,15 +322,15 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 static XLogRecPtr
 pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
 {
-	XLogRecPtr	retlsn = InvalidXLogRecPtr;
+	XLogRecPtr	retlsn = startlsn;
 
-	SpinLockAcquire(&MyReplicationSlot->mutex);
 	if (MyReplicationSlot->data.restart_lsn < moveto)
 	{
+		SpinLockAcquire(&MyReplicationSlot->mutex);
 		MyReplicationSlot->data.restart_lsn = moveto;
+		SpinLockRelease(&MyReplicationSlot->mutex)
 		retlsn = moveto;
 	}
-	SpinLockRelease(&MyReplicationSlot->mutex);
 
 	return retlsn;
 }
@@ -343,7 +343,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
 {
 	LogicalDecodingContext *ctx;
 	ResourceOwner old_resowner = CurrentResourceOwner;
-	XLogRecPtr	retlsn = InvalidXLogRecPtr;
+	XLogRecPtr	retlsn = startlsn;
 
 	PG_TRY();
 	{
@@ -472,7 +472,11 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	/* Acquire the slot so we "own" it */
 	ReplicationSlotAcquire(NameStr(*slotname), true);
 
-	startlsn = MyReplicationSlot->data.confirmed_flush;
+	if (OidIsValid(MyReplicationSlot->data.database))
+		startlsn = MyReplicationSlot->data.confirmed_flush;
+	else
+		startlsn = MyReplicationSlot->data.restart_lsn;
+
 	if (moveto < startlsn)
 	{
 		ReplicationSlotRelease();
-- 
2.17.0

Attachment: signature.asc
Description: PGP signature

Reply via email to