On Fri, Jun 01, 2018 at 02:53:09PM -0400, Michael Paquier wrote: > Definitely, while the previous patch was around mainly to show where > things are incorrect, I am attaching a set of patches for HEAD which can > be used for commit: > - One patch which addresses the several lock problems and adds some > instructions about the variables protected by spinlocks for slots in > use, which should be back-patched. > - A second patch for HEAD which addresses what has been noticed for the > new slot advance feature. This addresses as well the lock problems > introduced in the new advance code, hopefully the split makes sense to > others on this thread. > Once again those only apply to HEAD, please feel free to ping me if you > would like versions for back-branches (or anybody picking up those > patches).
And of course I found a typo just after sending those.. Please use the attached ones instead. -- Michael
From 69bb70049d8ba74af60e8554fd6379499fbd29ff Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Fri, 1 Jun 2018 14:30:55 -0400 Subject: [PATCH 1/2] Fix and document lock handling for in-memory replication xslot data While debugging issues on HEAD for the new slot forwarding feature of Postgres 11, some monitoring of the code surrounding in-memory slot data has proved that the lock handling may cause inconsistent data to be read by read-only callers of slot functions, particularly pg_get_replication_slots() which fetches data for the system view pg_replication_slots. The code paths involved in those problems concern the WAL sender, logical decoding initialization and WAL reservation for slots. A set of comments documenting all the lock handlings, particularly the dependency with LW locks for slots and the in_use flag as well as the internal mutex lock is added, based on a suggested by Simon Riggs. Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9g...@mail.gmail.com --- src/backend/replication/logical/logical.c | 13 +++++++++---- src/backend/replication/slot.c | 4 ++++ src/backend/replication/walsender.c | 14 +++++++++++--- src/include/replication/slot.h | 10 ++++++++++ 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1393591538..61588d626f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin, xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot); + SpinLockAcquire(&slot->mutex); slot->effective_catalog_xmin = xmin_horizon; slot->data.catalog_xmin = xmin_horizon; if (need_full_snapshot) slot->effective_xmin = xmin_horizon; + SpinLockRelease(&slot->mutex); ReplicationSlotsComputeRequiredXmin(true); @@ -445,13 +447,14 @@ void DecodingContextFindStartpoint(LogicalDecodingContext *ctx) { XLogRecPtr startptr; + ReplicationSlot *slot = ctx->slot; /* Initialize from where to start reading WAL. */ - startptr = ctx->slot->data.restart_lsn; + startptr = slot->data.restart_lsn; elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X", - (uint32) (ctx->slot->data.restart_lsn >> 32), - (uint32) ctx->slot->data.restart_lsn); + (uint32) (slot->data.restart_lsn >> 32), + (uint32) slot->data.restart_lsn); /* Wait for a consistent starting point */ for (;;) @@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) CHECK_FOR_INTERRUPTS(); } - ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr; + SpinLockAcquire(&slot->mutex); + slot->data.confirmed_flush = ctx->reader->EndRecPtr; + SpinLockRelease(&slot->mutex); } /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 056628fe8e..79d7a57d67 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void) XLogRecPtr flushptr; /* start at current insert position */ + SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = GetXLogInsertRecPtr(); + SpinLockRelease(&slot->mutex); /* make sure we have enough information to start */ flushptr = LogStandbySnapshot(); @@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void) } else { + SpinLockAcquire(&slot->mutex); slot->data.restart_lsn = GetRedoRecPtr(); + SpinLockRelease(&slot->mutex); } /* prevent WAL removal as fast as possible */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e47ddca6bc..0b1f1ba3c1 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1040,7 +1040,9 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) static void StartLogicalReplication(StartReplicationCmd *cmd) { - StringInfoData buf; + StringInfoData buf; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; /* make sure that our requirements are still fulfilled */ CheckLogicalDecodingRequirements(); @@ -1081,14 +1083,20 @@ StartLogicalReplication(StartReplicationCmd *cmd) WalSndWriteData, WalSndUpdateProgress); + /* Fetch all needed values from the slot */ + SpinLockAcquire(&MyReplicationSlot->mutex); + restart_lsn = MyReplicationSlot->data.restart_lsn; + confirmed_lsn = MyReplicationSlot->data.confirmed_flush; + SpinLockRelease(&MyReplicationSlot->mutex); + /* Start reading WAL from the oldest required WAL. */ - logical_startptr = MyReplicationSlot->data.restart_lsn; + logical_startptr = restart_lsn; /* * Report the location after which we'll send out further commits as the * current sentPtr. */ - sentPtr = MyReplicationSlot->data.confirmed_flush; + sentPtr = confirmed_lsn; /* Also update the sent position status in shared memory */ SpinLockAcquire(&MyWalSnd->mutex); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 76a88c6de7..6fa9df5553 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -86,6 +86,16 @@ typedef struct ReplicationSlotPersistentData /* * Shared memory state of a single replication slot. + * + * The data included in this structure, including the contents within + * ReplicationSlotPersistentData, are protected by mutex when read from + * other backends than the one registering the slot as in_use. If the + * slot is not marked as in_use, then no code paths refer or should refer + * to the in-memory data of a slot. + * + * Note that a slot is switched as in_use only with + * ReplicationSlotControlLock held in exclusive mode, protecting from any + * while readers have to hold this lock at least in shared mode. */ typedef struct ReplicationSlot { -- 2.17.0
From 7032bb81531527c495c77f97bdc170a6aa7e09ad Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Fri, 1 Jun 2018 14:37:50 -0400 Subject: [PATCH 2/2] Fix a couple of bugs with replication slot advancing feature A review of the code has showed up two issues fixed by this commit: - Physical slots have been using the confirmed LSN position as a start comparison point which is always 0/0, instead use the restart LSN position (logical slots need to use the confirmed LSN position). - Never return 0/0 is a slot cannot be advanced. This way, if a slot is advanced while the activity is idle, then the same position is returned by to caller over and over without raising an error. Note that as the slot is owned by the backend advancing it, then the read of those field is fine lockless, while updates need to happen while the slot mutex is held. Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9g...@mail.gmail.com --- src/backend/replication/slotfuncs.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d9e10263bb..d07d2c432b 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -322,15 +322,15 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) static XLogRecPtr pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) { - XLogRecPtr retlsn = InvalidXLogRecPtr; + XLogRecPtr retlsn = startlsn; - SpinLockAcquire(&MyReplicationSlot->mutex); if (MyReplicationSlot->data.restart_lsn < moveto) { + SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->data.restart_lsn = moveto; + SpinLockRelease(&MyReplicationSlot->mutex); retlsn = moveto; } - SpinLockRelease(&MyReplicationSlot->mutex); return retlsn; } @@ -343,7 +343,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) { LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; - XLogRecPtr retlsn = InvalidXLogRecPtr; + XLogRecPtr retlsn = startlsn; PG_TRY(); { @@ -472,7 +472,11 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) /* Acquire the slot so we "own" it */ ReplicationSlotAcquire(NameStr(*slotname), true); - startlsn = MyReplicationSlot->data.confirmed_flush; + if (OidIsValid(MyReplicationSlot->data.database)) + startlsn = MyReplicationSlot->data.confirmed_flush; + else + startlsn = MyReplicationSlot->data.restart_lsn; + if (moveto < startlsn) { ReplicationSlotRelease(); -- 2.17.0
signature.asc
Description: PGP signature