On Mon, May 28, 2018 at 05:57:47PM +0900, Michael Paquier wrote: > Found one. All the things I have spotted are in the patch attached.
Oops, forgot a ReplicationSlotRelease call. -- Michael
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 056628fe8e..79d7a57d67 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void) XLogRecPtr flushptr; /* start at current insert position */ + SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = GetXLogInsertRecPtr(); + SpinLockRelease(&slot->mutex); /* make sure we have enough information to start */ flushptr = LogStandbySnapshot(); @@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void) } else { + SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = GetRedoRecPtr(); + SpinLockRelease(&slot->mutex); } /* prevent WAL removal as fast as possible */ diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d9e10263bb..1b8fd951d5 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -413,7 +413,9 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) ReplicationSlotMarkDirty(); } + SpinLockAcquire(&MyReplicationSlot->mutex); retlsn = MyReplicationSlot->data.confirmed_flush; + SpinLockRelease(&MyReplicationSlot->mutex); /* free context, call shutdown callback */ FreeDecodingContext(ctx); @@ -472,7 +474,13 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) /* Acquire the slot so we "own" it */ ReplicationSlotAcquire(NameStr(*slotname), true); - startlsn = MyReplicationSlot->data.confirmed_flush; + SpinLockAcquire(&MyReplicationSlot->mutex); + if (OidIsValid(MyReplicationSlot->data.database)) + startlsn = MyReplicationSlot->data.confirmed_flush; + else + startlsn = MyReplicationSlot->data.restart_lsn; + SpinLockRelease(&MyReplicationSlot->mutex); + if (moveto < startlsn) { ReplicationSlotRelease(); @@ -488,6 +496,13 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) else endlsn = pg_physical_replication_slot_advance(startlsn, moveto); + if (XLogRecPtrIsInvalid(endlsn)) + { + ReplicationSlotRelease(); + ereport(ERROR, + (errmsg("slot not moved forward as position is already reached"))); + } + values[0] = NameGetDatum(&MyReplicationSlot->data.name); nulls[0] = false; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e47ddca6bc..0b1f1ba3c1 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1040,7 +1040,9 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) static void StartLogicalReplication(StartReplicationCmd *cmd) { - StringInfoData buf; + StringInfoData buf; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; /* make sure that our requirements are still fulfilled */ CheckLogicalDecodingRequirements(); @@ -1081,14 +1083,20 @@ StartLogicalReplication(StartReplicationCmd *cmd) WalSndWriteData, WalSndUpdateProgress); + /* Fetch all needed values from the slot */ + SpinLockAcquire(&MyReplicationSlot->mutex); + restart_lsn = MyReplicationSlot->data.restart_lsn; + confirmed_lsn = MyReplicationSlot->data.confirmed_flush; + SpinLockRelease(&MyReplicationSlot->mutex); + /* Start reading WAL from the oldest required WAL. */ - logical_startptr = MyReplicationSlot->data.restart_lsn; + logical_startptr = restart_lsn; /* * Report the location after which we'll send out further commits as the * current sentPtr. */ - sentPtr = MyReplicationSlot->data.confirmed_flush; + sentPtr = confirmed_lsn; /* Also update the sent position status in shared memory */ SpinLockAcquire(&MyWalSnd->mutex);
signature.asc
Description: PGP signature