On Mon, May 28, 2018 at 05:57:47PM +0900, Michael Paquier wrote:
> Found one.  All the things I have spotted are in the patch attached.

Oops, forgot a ReplicationSlotRelease call.
--
Michael
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 056628fe8e..79d7a57d67 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
 			XLogRecPtr	flushptr;
 
 			/* start at current insert position */
+			SpinLockAcquire(&slot->mutex);
 			slot->data.restart_lsn = GetXLogInsertRecPtr();
+			SpinLockRelease(&slot->mutex);
 
 			/* make sure we have enough information to start */
 			flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
 		}
 		else
 		{
+			SpinLockAcquire(&slot->mutex);
 			slot->data.restart_lsn = GetRedoRecPtr();
+			SpinLockRelease(&slot->mutex);
 		}
 
 		/* prevent WAL removal as fast as possible */
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..1b8fd951d5 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -413,7 +413,9 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
 			ReplicationSlotMarkDirty();
 		}
 
+		SpinLockAcquire(&MyReplicationSlot->mutex);
 		retlsn = MyReplicationSlot->data.confirmed_flush;
+		SpinLockRelease(&MyReplicationSlot->mutex);
 
 		/* free context, call shutdown callback */
 		FreeDecodingContext(ctx);
@@ -472,7 +474,13 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	/* Acquire the slot so we "own" it */
 	ReplicationSlotAcquire(NameStr(*slotname), true);
 
-	startlsn = MyReplicationSlot->data.confirmed_flush;
+	SpinLockAcquire(&MyReplicationSlot->mutex);
+	if (OidIsValid(MyReplicationSlot->data.database))
+		startlsn = MyReplicationSlot->data.confirmed_flush;
+	else
+		startlsn = MyReplicationSlot->data.restart_lsn;
+	SpinLockRelease(&MyReplicationSlot->mutex);
+
 	if (moveto < startlsn)
 	{
 		ReplicationSlotRelease();
@@ -488,6 +496,13 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	else
 		endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
 
+	if (XLogRecPtrIsInvalid(endlsn))
+	{
+		ReplicationSlotRelease();
+		ereport(ERROR,
+				(errmsg("slot not moved forward as position is already reached")));
+	}
+
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	nulls[0] = false;
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e47ddca6bc..0b1f1ba3c1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1040,7 +1040,9 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
 static void
 StartLogicalReplication(StartReplicationCmd *cmd)
 {
-	StringInfoData buf;
+	StringInfoData	buf;
+	XLogRecPtr		restart_lsn;
+	XLogRecPtr		confirmed_lsn;
 
 	/* make sure that our requirements are still fulfilled */
 	CheckLogicalDecodingRequirements();
@@ -1081,14 +1083,20 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 												 WalSndWriteData,
 												 WalSndUpdateProgress);
 
+	/* Fetch all needed values from the slot */
+	SpinLockAcquire(&MyReplicationSlot->mutex);
+	restart_lsn = MyReplicationSlot->data.restart_lsn;
+	confirmed_lsn = MyReplicationSlot->data.confirmed_flush;
+	SpinLockRelease(&MyReplicationSlot->mutex);
+
 	/* Start reading WAL from the oldest required WAL. */
-	logical_startptr = MyReplicationSlot->data.restart_lsn;
+	logical_startptr = restart_lsn;
 
 	/*
 	 * Report the location after which we'll send out further commits as the
 	 * current sentPtr.
 	 */
-	sentPtr = MyReplicationSlot->data.confirmed_flush;
+	sentPtr = confirmed_lsn;
 
 	/* Also update the sent position status in shared memory */
 	SpinLockAcquire(&MyWalSnd->mutex);

Attachment: signature.asc
Description: PGP signature

Reply via email to