On Fri, May 25, 2018 at 02:12:32PM +0200, Magnus Hagander wrote: > I agree that returning 0/0 on this is wrong. > > However, can this actually occour for any case other than exactly the case > of "moving the position to where the position already is"? If I look at the > physical slot path at least that seems to be the only case, and in that > case I think the correct thing to return would be the new position, and not > NULL. If we actually *fail* to move the position, we give an error.
Yes, this only returns InvalidXLogRecPtr if the location could not be moved forward. Still, there is more going on here. For a physical slot, confirmed_lsn is always 0/0, hence the backward check is never applied for it. What I think should be used for value assigned to startlsn is restart_lsn instead. Then what do we do if the position cannot be moved: should we raise an error, as what my patch attached does, or just report the existing position the slot is at? A second error that I can see is in pg_logical_replication_slot_advance, which does not take the mutex lock of MyReplicationSlot, so concurrent callers like those of ReplicationSlotsComputeRequiredLSN (applies to physical slot actually) and pg_get_replication_slots() may read false data. On top of that, it seems to me that StartLogicalReplication is reading a couple of fields from a slot without taking a lock, so at least pg_get_replication_slots() may read incorrect data. ReplicationSlotReserveWal also is missing a lock.. Those are older than v11 though. > Actually, isn't there also a race there? That is, if we try to move it, we > check that we're not trying to move it backwards, and throw an error, but > that's checked outside the lock. Then later we actually move it, and check > *again* if we try to move it backwards, but if that one fails we return > InvalidXLogRecPtr (which can happen in the case of concurrent activity on > the slot, I think)? In this case, maybe we should just re-check that and > raise an error appropriately? Er, isn't the take here that ReplicationSlotAcquire is used to lock any replication slot update to happen from other backends? It seems to me that what counts at the end if if a backend PID is associated to a slot in memory. If you look at the code paths updating a logical or physical slot then those imply that the slot is owned, still a spin lock needs to be taken for concurrent readers. > (I haven't looked at the logical slot path, but I assume it would have > something similar in it) Found one. All the things I have spotted are in the patch attached. -- Michael
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 056628fe8e..79d7a57d67 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void) XLogRecPtr flushptr; /* start at current insert position */ + SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = GetXLogInsertRecPtr(); + SpinLockRelease(&slot->mutex); /* make sure we have enough information to start */ flushptr = LogStandbySnapshot(); @@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void) } else { + SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = GetRedoRecPtr(); + SpinLockRelease(&slot->mutex); } /* prevent WAL removal as fast as possible */ diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d9e10263bb..4cf2aef95f 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -413,7 +413,9 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) ReplicationSlotMarkDirty(); } + SpinLockAcquire(&MyReplicationSlot->mutex); retlsn = MyReplicationSlot->data.confirmed_flush; + SpinLockRelease(&MyReplicationSlot->mutex); /* free context, call shutdown callback */ FreeDecodingContext(ctx); @@ -472,7 +474,13 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) /* Acquire the slot so we "own" it */ ReplicationSlotAcquire(NameStr(*slotname), true); - startlsn = MyReplicationSlot->data.confirmed_flush; + SpinLockAcquire(&MyReplicationSlot->mutex); + if (OidIsValid(MyReplicationSlot->data.database)) + startlsn = MyReplicationSlot->data.confirmed_flush; + else + startlsn = MyReplicationSlot->data.restart_lsn; + SpinLockRelease(&MyReplicationSlot->mutex); + if (moveto < startlsn) { ReplicationSlotRelease(); @@ -488,6 +496,10 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) else endlsn = pg_physical_replication_slot_advance(startlsn, moveto); + if (XLogRecPtrIsInvalid(endlsn)) + ereport(ERROR, + (errmsg("slot not moved forward as position is already reached"))); + values[0] = NameGetDatum(&MyReplicationSlot->data.name); nulls[0] = false; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e47ddca6bc..0b1f1ba3c1 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1040,7 +1040,9 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) static void StartLogicalReplication(StartReplicationCmd *cmd) { - StringInfoData buf; + StringInfoData buf; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; /* make sure that our requirements are still fulfilled */ CheckLogicalDecodingRequirements(); @@ -1081,14 +1083,20 @@ StartLogicalReplication(StartReplicationCmd *cmd) WalSndWriteData, WalSndUpdateProgress); + /* Fetch all needed values from the slot */ + SpinLockAcquire(&MyReplicationSlot->mutex); + restart_lsn = MyReplicationSlot->data.restart_lsn; + confirmed_lsn = MyReplicationSlot->data.confirmed_flush; + SpinLockRelease(&MyReplicationSlot->mutex); + /* Start reading WAL from the oldest required WAL. */ - logical_startptr = MyReplicationSlot->data.restart_lsn; + logical_startptr = restart_lsn; /* * Report the location after which we'll send out further commits as the * current sentPtr. */ - sentPtr = MyReplicationSlot->data.confirmed_flush; + sentPtr = confirmed_lsn; /* Also update the sent position status in shared memory */ SpinLockAcquire(&MyWalSnd->mutex);
signature.asc
Description: PGP signature