Hi, I think the patch looks mostly fine. I'm about to do a bit more testing on it, but a few comments. Attached diff shows which the discussed places / comments more closely.
1) There's a race condition in LogicalLockTransaction. The code does roughly this: if (!BecomeDecodeGroupMember(...)) ... bail out ... Assert(MyProc->decodeGroupLeader); lwlock = LockHashPartitionLockByProc(MyProc->decodeGroupLeader); ... but AFAICS there is no guarantee that the transaction does not commit (or even abort) right after the become decode group member. In which case LogicalDecodeRemoveTransaction might have already reset our pointer to a leader to NULL. In which case the Assert() and lock will fail. I've initially thought this can be fixed by setting decodeLocked=true in BecomeDecodeGroupMember, but that's not really true - that would fix the race for aborts, but not commits. LogicalDecodeRemoveTransaction skips the wait for commits entirely, and just resets the flags anyway. So this needs a different fix, I think. BecomeDecodeGroupMember also needs the leader PGPROC pointer, but it does not have the issue because it gets it as a parameter. I think the same thing would work for here too - that is, use the AssignDecodeGroupLeader() result instead. 2) BecomeDecodeGroupMember sets the decodeGroupLeader=NULL when the leader does not match the parameters, despite enforcing it by Assert() at the beginning. Let's remove that assignment. 3) I don't quite understand why BecomeDecodeGroupMember does the cross-check using PID. In which case would it help? 4) AssignDecodeGroupLeader still sets pid, which is never read. Remove. 5) ReorderBufferCommitInternal does elog(LOG) about interrupting the decoding of aborted transaction only in one place. There are about three other places where we check LogicalLockTransaction. Seems inconsistent. 6) The comment before LogicalLockTransaction is somewhat inaccurate, because it talks about adding/removing the backend to the group, but that's not what's happening. We join the group on the first call and then we only tweak the decodeLocked flag. 7) I propose minor changes to a couple of comments. regards -- Tomas Vondra http://www.2ndQuadrant.com PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 65382c2..b8b73a4 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1236,13 +1236,19 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) * while accessing any catalogs. To enforce that, each decoding backend * has to call LogicalLockTransaction prior to any catalog access, and * then LogicalUnlockTransaction immediately after it. These functions - * add/remove the decoding backend from a "decoding group" for a given - * transaction. While aborting a prepared transaction, the backend will - * wait for all current members of the decoding group to leave (see - * LogicalDecodeRemoveTransaction). + * add the decoding backend into a "decoding group" for the transaction + * (on the first call), and then update a flag indicating whether the + * decoding backend may be accessing any catalogs. * - * The function return true when it's safe to access catalogs, and - * false when the transaction aborted (or is being aborted) in which + * While aborting a prepared transaction, the backend is made to wait + * for all current members of the decoding group that may be currently + * accessing catalogs (see LogicalDecodeRemoveTransaction). Once the + * transaction completes (applies to both abort and commit), the group + * is destroyed and is not needed anymore (we can check transaction + * status directly, instead). + * + * The function returns true when it's safe to access catalogs, and + * false when the transaction aborted (or is being aborted), in which * case the plugin should stop decoding it. * * The decoding backend joins the decoding group only when actually @@ -1324,6 +1330,12 @@ LogicalLockTransaction(ReorderBufferTXN *txn) /* * If we were able to add ourself, then Abort processing will * interlock with us. + * + * XXX There's a race condition here, I think. BecomeDecodeGroupMember + * made us a member of the group, but the transaction might have + * finished since then. In which case (decodeGroupLeader == NULL). + * We need to set (decodeLocked = true) in BecomeDecodeGroupMember, + * so that the leader waits for us. */ Assert(MyProc->decodeGroupLeader); @@ -1333,6 +1345,9 @@ LogicalLockTransaction(ReorderBufferTXN *txn) /* * Re-check if we were told to abort by the leader after taking * the above lock + * + * XXX It's not quite clear to me why we need the separate flag + * in our process. Why not to simply check the leader's flag? */ if (MyProc->decodeAbortPending) { @@ -1410,7 +1425,12 @@ LogicalUnlockTransaction(ReorderBufferTXN *txn) if (rbtxn_commit(txn)) return; + /* + * We're guaranteed to still have a leader here, because were are + * in locked mode, so the leader can't just disappear. + */ Assert(MyProc->decodeGroupLeader); + leader_lwlock = LockHashPartitionLockByProc(MyProc->decodeGroupLeader); LWLockAcquire(leader_lwlock, LW_SHARED); if (MyProc->decodeAbortPending) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index adb6ade..908eada 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1396,6 +1396,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn, MAIN_FORKNUM)); /* Lock transaction before catalog access */ + /* XXX Why no elog(LOG) here? */ if (!LogicalLockTransaction(txn)) break; @@ -1443,6 +1444,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn, * transaction will be around for the duration of * the apply_change call below */ + /* XXX Why no elog(LOG) here? */ if (!LogicalLockTransaction(txn)) break; ReorderBufferToastReplace(rb, txn, relation, change); @@ -1520,7 +1522,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn, break; case REORDER_BUFFER_CHANGE_MESSAGE: - /* XXX does rb->message need lock/unlock? */ + /* XXX Why no elog(LOG) here? */ if (!LogicalLockTransaction(txn)) break; rb->message(rb, txn, change->lsn, true, diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 2c002a2..3fc3a65 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -2004,7 +2004,6 @@ PGPROC * AssignDecodeGroupLeader(TransactionId xid) { PGPROC *proc = NULL; - int pid; LWLock *leader_lwlock; Assert(xid != InvalidTransactionId); @@ -2015,9 +2014,7 @@ AssignDecodeGroupLeader(TransactionId xid) * If the transaction already completed, we can bail out. */ proc = BackendXidGetProc(xid); - if (proc) - pid = proc->pid; - else + if (!proc) return NULL; /* @@ -2093,6 +2090,10 @@ AssignDecodeGroupLeader(TransactionId xid) * that, we require the caller to pass the PID of the intended PGPROC as * an interlock. Returns true if we successfully join the intended lock * group, and false if not. + * + * XXX Not sure why are we passing-in the PID, considering we only deal + * with prepared transactions now, which means (pid==0). Shouldn't we + * use XID instead, for example? */ bool BecomeDecodeGroupMember(PGPROC *leader, int pid, bool is_prepared) @@ -2107,7 +2108,7 @@ BecomeDecodeGroupMember(PGPROC *leader, int pid, bool is_prepared) Assert(MyProc->decodeGroupLeader == NULL); /* PID must be valid OR this is a prepared transaction. */ - Assert(pid != 0 || is_prepared); + Assert(((pid != 0) && !is_prepared) || ((pid == 0) && is_prepared)); /* * Get lock protecting the group fields. Note LockHashPartitionLockByProc @@ -2122,8 +2123,6 @@ BecomeDecodeGroupMember(PGPROC *leader, int pid, bool is_prepared) /* Is this the leader we're looking for? */ if (leader->pid == pid && leader->decodeGroupLeader == leader) { - if (is_prepared) - Assert(pid == 0); /* is the leader going away? */ if (leader->decodeAbortPending) ok = false; @@ -2131,10 +2130,13 @@ BecomeDecodeGroupMember(PGPROC *leader, int pid, bool is_prepared) { /* OK, join the group */ ok = true; + /* XXX unfortunately this does not prevent the race in LockLogicalTransaction :-( */ + MyProc->decodeLocked = true; MyProc->decodeGroupLeader = leader; dlist_push_tail(&leader->decodeGroupMembers, &MyProc->decodeGroupLink); } } + /* XXX seems unnecessary, considering the assert at the beginning */ else MyProc->decodeGroupLeader = NULL; LWLockRelease(leader_lwlock);