Hi,

I think the patch looks mostly fine. I'm about to do a bit more testing
on it, but a few comments. Attached diff shows which the discussed
places / comments more closely.


1) There's a race condition in LogicalLockTransaction. The code does
roughly this:

  if (!BecomeDecodeGroupMember(...))
     ... bail out ...

  Assert(MyProc->decodeGroupLeader);
  lwlock = LockHashPartitionLockByProc(MyProc->decodeGroupLeader);
  ...

but AFAICS there is no guarantee that the transaction does not commit
(or even abort) right after the become decode group member. In which
case LogicalDecodeRemoveTransaction might have already reset our pointer
to a leader to NULL. In which case the Assert() and lock will fail.

I've initially thought this can be fixed by setting decodeLocked=true in
BecomeDecodeGroupMember, but that's not really true - that would fix the
race for aborts, but not commits. LogicalDecodeRemoveTransaction skips
the wait for commits entirely, and just resets the flags anyway.

So this needs a different fix, I think. BecomeDecodeGroupMember also
needs the leader PGPROC pointer, but it does not have the issue because
it gets it as a parameter. I think the same thing would work for here
too - that is, use the AssignDecodeGroupLeader() result instead.


2) BecomeDecodeGroupMember sets the decodeGroupLeader=NULL when the
leader does not match the parameters, despite enforcing it by Assert()
at the beginning. Let's remove that assignment.


3) I don't quite understand why BecomeDecodeGroupMember does the
cross-check using PID. In which case would it help?


4) AssignDecodeGroupLeader still sets pid, which is never read. Remove.


5) ReorderBufferCommitInternal does elog(LOG) about interrupting the
decoding of aborted transaction only in one place. There are about three
other places where we check LogicalLockTransaction. Seems inconsistent.


6) The comment before LogicalLockTransaction is somewhat inaccurate,
because it talks about adding/removing the backend to the group, but
that's not what's happening. We join the group on the first call and
then we only tweak the decodeLocked flag.


7) I propose minor changes to a couple of comments.


regards

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 65382c2..b8b73a4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1236,13 +1236,19 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
  * while accessing any catalogs. To enforce that, each decoding backend
  * has to call LogicalLockTransaction prior to any catalog access, and
  * then LogicalUnlockTransaction immediately after it. These functions
- * add/remove the decoding backend from a "decoding group" for a given
- * transaction. While aborting a prepared transaction, the backend will
- * wait for all current members of the decoding group to leave (see
- * LogicalDecodeRemoveTransaction).
+ * add the decoding backend into a "decoding group" for the transaction
+ * (on the first call), and then update a flag indicating whether the
+ * decoding backend may be accessing any catalogs.
  *
- * The function return true when it's safe to access catalogs, and
- * false when the transaction aborted (or is being aborted) in which
+ * While aborting a prepared transaction, the backend is made to wait
+ * for all current members of the decoding group that may be currently
+ * accessing catalogs (see LogicalDecodeRemoveTransaction). Once the
+ * transaction completes (applies to both abort and commit), the group
+ * is destroyed and is not needed anymore (we can check transaction
+ * status directly, instead).
+ *
+ * The function returns true when it's safe to access catalogs, and
+ * false when the transaction aborted (or is being aborted), in which
  * case the plugin should stop decoding it.
  *
  * The decoding backend joins the decoding group only when actually
@@ -1324,6 +1330,12 @@ LogicalLockTransaction(ReorderBufferTXN *txn)
 	/*
 	 * If we were able to add ourself, then Abort processing will
 	 * interlock with us.
+	 *
+	 * XXX There's a race condition here, I think. BecomeDecodeGroupMember
+	 * made us a member of the group, but the transaction might have
+	 * finished since then. In which case (decodeGroupLeader == NULL).
+	 * We need to set (decodeLocked = true) in BecomeDecodeGroupMember,
+	 * so that the leader waits for us.
 	 */
 	Assert(MyProc->decodeGroupLeader);
 
@@ -1333,6 +1345,9 @@ LogicalLockTransaction(ReorderBufferTXN *txn)
 	/*
 	 * Re-check if we were told to abort by the leader after taking
 	 * the above lock
+	 *
+	 * XXX It's not quite clear to me why we need the separate flag
+	 * in our process. Why not to simply check the leader's flag?
 	 */
 	if (MyProc->decodeAbortPending)
 	{
@@ -1410,7 +1425,12 @@ LogicalUnlockTransaction(ReorderBufferTXN *txn)
 	if (rbtxn_commit(txn))
 		return;
 
+	/*
+	 * We're guaranteed to still have a leader here, because were are
+	 * in locked mode, so the leader can't just disappear.
+	 */
 	Assert(MyProc->decodeGroupLeader);
+
 	leader_lwlock = LockHashPartitionLockByProc(MyProc->decodeGroupLeader);
 	LWLockAcquire(leader_lwlock, LW_SHARED);
 	if (MyProc->decodeAbortPending)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index adb6ade..908eada 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1396,6 +1396,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
 										 MAIN_FORKNUM));
 
 					/* Lock transaction before catalog access */
+					/* XXX Why no elog(LOG) here? */
 					if (!LogicalLockTransaction(txn))
 						break;
 
@@ -1443,6 +1444,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
 						 * transaction will be around for the duration of
 						 * the apply_change call below
 						 */
+						/* XXX Why no elog(LOG) here? */
 						if (!LogicalLockTransaction(txn))
 							break;
 						ReorderBufferToastReplace(rb, txn, relation, change);
@@ -1520,7 +1522,7 @@ ReorderBufferCommitInternal(ReorderBufferTXN *txn,
 					break;
 
 				case REORDER_BUFFER_CHANGE_MESSAGE:
-					/* XXX does rb->message need lock/unlock? */
+					/* XXX Why no elog(LOG) here? */
 					if (!LogicalLockTransaction(txn))
 						break;
 					rb->message(rb, txn, change->lsn, true,
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 2c002a2..3fc3a65 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -2004,7 +2004,6 @@ PGPROC *
 AssignDecodeGroupLeader(TransactionId xid)
 {
 	PGPROC *proc = NULL;
-	int			pid;
 	LWLock	   *leader_lwlock;
 
 	Assert(xid != InvalidTransactionId);
@@ -2015,9 +2014,7 @@ AssignDecodeGroupLeader(TransactionId xid)
 	 * If the transaction already completed, we can bail out.
 	 */
 	proc = BackendXidGetProc(xid);
-	if (proc)
-		pid = proc->pid;
-	else
+	if (!proc)
 		return NULL;
 
 	/*
@@ -2093,6 +2090,10 @@ AssignDecodeGroupLeader(TransactionId xid)
  * that, we require the caller to pass the PID of the intended PGPROC as
  * an interlock.  Returns true if we successfully join the intended lock
  * group, and false if not.
+ *
+ * XXX Not sure why are we passing-in the PID, considering we only deal
+ * with prepared transactions now, which means (pid==0). Shouldn't we
+ * use XID instead, for example?
  */
 bool
 BecomeDecodeGroupMember(PGPROC *leader, int pid, bool is_prepared)
@@ -2107,7 +2108,7 @@ BecomeDecodeGroupMember(PGPROC *leader, int pid, bool is_prepared)
 	Assert(MyProc->decodeGroupLeader == NULL);
 
 	/* PID must be valid OR this is a prepared transaction. */
-	Assert(pid != 0 || is_prepared);
+	Assert(((pid != 0) && !is_prepared) || ((pid == 0) && is_prepared));
 
 	/*
 	 * Get lock protecting the group fields.  Note LockHashPartitionLockByProc
@@ -2122,8 +2123,6 @@ BecomeDecodeGroupMember(PGPROC *leader, int pid, bool is_prepared)
 	/* Is this the leader we're looking for? */
 	if (leader->pid == pid && leader->decodeGroupLeader == leader)
 	{
-		if (is_prepared)
-			Assert(pid == 0);
 		/* is the leader going away? */
 		if (leader->decodeAbortPending)
 			ok = false;
@@ -2131,10 +2130,13 @@ BecomeDecodeGroupMember(PGPROC *leader, int pid, bool is_prepared)
 		{
 			/* OK, join the group */
 			ok = true;
+			/* XXX unfortunately this does not prevent the race in LockLogicalTransaction :-( */
+			MyProc->decodeLocked = true;
 			MyProc->decodeGroupLeader = leader;
 			dlist_push_tail(&leader->decodeGroupMembers, &MyProc->decodeGroupLink);
 		}
 	}
+	/* XXX seems unnecessary, considering the assert at the beginning */
 	else
 		MyProc->decodeGroupLeader = NULL;
 	LWLockRelease(leader_lwlock);

Reply via email to