On Mon, May 28, 2018 at 05:57:47PM +0900, Michael Paquier wrote: > Found one. All the things I have spotted are in the patch attached.
Oops, forgot a ReplicationSlotRelease call. -- Michael
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 056628fe8e..79d7a57d67 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
XLogRecPtr flushptr;
/* start at current insert position */
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetXLogInsertRecPtr();
+ SpinLockRelease(&slot->mutex);
/* make sure we have enough information to start */
flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
}
else
{
+ SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = GetRedoRecPtr();
+ SpinLockRelease(&slot->mutex);
}
/* prevent WAL removal as fast as possible */
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d9e10263bb..1b8fd951d5 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -413,7 +413,9 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto)
ReplicationSlotMarkDirty();
}
+ SpinLockAcquire(&MyReplicationSlot->mutex);
retlsn = MyReplicationSlot->data.confirmed_flush;
+ SpinLockRelease(&MyReplicationSlot->mutex);
/* free context, call shutdown callback */
FreeDecodingContext(ctx);
@@ -472,7 +474,13 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
/* Acquire the slot so we "own" it */
ReplicationSlotAcquire(NameStr(*slotname), true);
- startlsn = MyReplicationSlot->data.confirmed_flush;
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ if (OidIsValid(MyReplicationSlot->data.database))
+ startlsn = MyReplicationSlot->data.confirmed_flush;
+ else
+ startlsn = MyReplicationSlot->data.restart_lsn;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
if (moveto < startlsn)
{
ReplicationSlotRelease();
@@ -488,6 +496,13 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
else
endlsn = pg_physical_replication_slot_advance(startlsn, moveto);
+ if (XLogRecPtrIsInvalid(endlsn))
+ {
+ ReplicationSlotRelease();
+ ereport(ERROR,
+ (errmsg("slot not moved forward as position is already reached")));
+ }
+
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
nulls[0] = false;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e47ddca6bc..0b1f1ba3c1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1040,7 +1040,9 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
static void
StartLogicalReplication(StartReplicationCmd *cmd)
{
- StringInfoData buf;
+ StringInfoData buf;
+ XLogRecPtr restart_lsn;
+ XLogRecPtr confirmed_lsn;
/* make sure that our requirements are still fulfilled */
CheckLogicalDecodingRequirements();
@@ -1081,14 +1083,20 @@ StartLogicalReplication(StartReplicationCmd *cmd)
WalSndWriteData,
WalSndUpdateProgress);
+ /* Fetch all needed values from the slot */
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ restart_lsn = MyReplicationSlot->data.restart_lsn;
+ confirmed_lsn = MyReplicationSlot->data.confirmed_flush;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
/* Start reading WAL from the oldest required WAL. */
- logical_startptr = MyReplicationSlot->data.restart_lsn;
+ logical_startptr = restart_lsn;
/*
* Report the location after which we'll send out further commits as the
* current sentPtr.
*/
- sentPtr = MyReplicationSlot->data.confirmed_flush;
+ sentPtr = confirmed_lsn;
/* Also update the sent position status in shared memory */
SpinLockAcquire(&MyWalSnd->mutex);
signature.asc
Description: PGP signature
