On 02/10/2015 02:21 AM, Peter Geoghegan wrote:
On Fri, Feb 6, 2015 at 1:51 PM, Bruce Momjian <br...@momjian.us> wrote:
Other than the locking part, the biggest part of this patch is adjusting
things so that an INSERT can change into an UPDATE.

Thanks for taking a look at it. That's somewhat cleaned up in the
attached patchseries - V2.2. This has been rebased to repair the minor
bit-rot pointed out by Thom.

I don't really have the energy to review this patch in one batch, so I'd really like to split this into two:

1. Solve the existing "problem" with exclusion constraints that if two insertions happen concurrently, one of them might be aborted with "deadlock detected" error, rather then "conflicting key violation" error. That really wouldn't be worth fixing otherwise, but it happens to be a useful stepping stone for this upsert patch.

2. All the rest.

I took a stab at extracting the parts needed to do 1. See attached. I didn't modify ExecUpdate to use speculative insertions, so the issue remains for UPDATEs, but that's easy to add.

I did not solve the potential for livelocks (discussed at http://www.postgresql.org/message-id/cam3swztftt_fehet3tu3ykcpcypynnauquz3q+naasnh-60...@mail.gmail.com). The patch always super-deletes the already-inserted tuple on conflict, and then waits for the other inserter. That would be nice to fix, using one of the ideas from that thread, or something else.

We never really debated the options for how to do the speculative insertion and super-deletion. This patch is still based on the quick & dirty demo patches I posted about a year ago, in response to issues you found with earlier versions. That might be OK - maybe I really hit the nail on designing those things and got them right on first try - but more likely there are better alternatives.

Are we happy with acquiring the SpeculativeInsertLock heavy-weight lock for every insertion? That seems bad for performance reasons. Also, are we happy with adding the new fields to the proc array? Another alternative that was discussed was storing the speculative insertion token on the heap tuple itself. (See http://www.postgresql.org/message-id/52d00d2d.6030...@vmware.com)

Are we happy with the way super-deletion works? Currently, the xmin field is set to invalid to mark the tuple as super-deleted. That required checks in HeapTupleSatisfies* functions. One alternative would be to set xmax equal to xmin, and teach the code currently calls XactLockTableWait() to check if xmax=xmin, and not consider the tuple as conflicting.

- Heikki
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 46060bc1..0aa3e575 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2048,6 +2048,9 @@ FreeBulkInsertState(BulkInsertState bistate)
  * This causes rows to be frozen, which is an MVCC violation and
  * requires explicit options chosen by user.
  *
+ * If HEAP_INSERT_SPECULATIVE is specified, the MyProc->specInsert fields
+ * are filled.
+ *
  * Note that these options will be applied when inserting into the heap's
  * TOAST table, too, if the tuple requires any out-of-line data.
  *
@@ -2196,6 +2199,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 
 	END_CRIT_SECTION();
 
+	/*
+	 * Let others know that we speculatively inserted this tuple, before
+	 * releasing the buffer lock.
+	 */
+	if (options & HEAP_INSERT_SPECULATIVE)
+		SetSpeculativeInsertionTid(relation->rd_node, &heaptup->t_self);
+
 	UnlockReleaseBuffer(buffer);
 	if (vmbuffer != InvalidBuffer)
 		ReleaseBuffer(vmbuffer);
@@ -2616,11 +2626,17 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask)
  * (the last only for HeapTupleSelfUpdated, since we
  * cannot obtain cmax from a combocid generated by another transaction).
  * See comments for struct HeapUpdateFailureData for additional info.
+ *
+ * If 'killspeculative' is true, caller requires that we "super-delete" a tuple
+ * we just inserted in the same command. Instead of the normal visibility
+ * checks, we check that the tuple was inserted by the current transaction and
+ * given command id.  Also, instead of setting its xmax, we set xmin to
+ * invalid, making it immediately appear as dead to everyone.
  */
 HTSU_Result
 heap_delete(Relation relation, ItemPointer tid,
 			CommandId cid, Snapshot crosscheck, bool wait,
-			HeapUpdateFailureData *hufd)
+			HeapUpdateFailureData *hufd, bool killspeculative)
 {
 	HTSU_Result result;
 	TransactionId xid = GetCurrentTransactionId();
@@ -2678,7 +2694,18 @@ heap_delete(Relation relation, ItemPointer tid,
 	tp.t_self = *tid;
 
 l1:
-	result = HeapTupleSatisfiesUpdate(&tp, cid, buffer);
+	if (!killspeculative)
+	{
+		result = HeapTupleSatisfiesUpdate(&tp, cid, buffer);
+	}
+	else
+	{
+		if (tp.t_data->t_choice.t_heap.t_xmin != xid ||
+			tp.t_data->t_choice.t_heap.t_field3.t_cid != cid)
+			elog(ERROR, "attempted to super-delete a tuple from other CID");
+		result = HeapTupleMayBeUpdated;
+	}
+
 
 	if (result == HeapTupleInvisible)
 	{
@@ -2823,12 +2850,15 @@ l1:
 	 * using our own TransactionId below, since some other backend could
 	 * incorporate our XID into a MultiXact immediately afterwards.)
 	 */
-	MultiXactIdSetOldestMember();
+	if (!killspeculative)
+	{
+		MultiXactIdSetOldestMember();
 
-	compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(tp.t_data),
-							  tp.t_data->t_infomask, tp.t_data->t_infomask2,
-							  xid, LockTupleExclusive, true,
-							  &new_xmax, &new_infomask, &new_infomask2);
+		compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(tp.t_data),
+								  tp.t_data->t_infomask, tp.t_data->t_infomask2,
+								  xid, LockTupleExclusive, true,
+								  &new_xmax, &new_infomask, &new_infomask2);
+	}
 
 	START_CRIT_SECTION();
 
@@ -2855,8 +2885,23 @@ l1:
 	tp.t_data->t_infomask |= new_infomask;
 	tp.t_data->t_infomask2 |= new_infomask2;
 	HeapTupleHeaderClearHotUpdated(tp.t_data);
-	HeapTupleHeaderSetXmax(tp.t_data, new_xmax);
-	HeapTupleHeaderSetCmax(tp.t_data, cid, iscombo);
+	/*
+	 * When killing a speculatively-inserted tuple, we set xmin to invalid
+	 * instead of setting xmax, to make the tuple clearly invisible to
+	 * everyone. In particular, we want HeapTupleSatisfiesDirty() to regard
+	 * the tuple as dead, so that another backend inserting a duplicate key
+	 * value won't unnecessarily wait for our transaction to finish.
+	 */
+	if (!killspeculative)
+	{
+		HeapTupleHeaderSetXmax(tp.t_data, new_xmax);
+		HeapTupleHeaderSetCmax(tp.t_data, cid, iscombo);
+	}
+	else
+	{
+		HeapTupleHeaderSetXmin(tp.t_data, InvalidTransactionId);
+	}
+
 	/* Make sure there is no forward chain link in t_ctid */
 	tp.t_data->t_ctid = tp.t_self;
 
@@ -2872,7 +2917,11 @@ l1:
 		if (RelationIsAccessibleInLogicalDecoding(relation))
 			log_heap_new_cid(relation, &tp);
 
-		xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
+		xlrec.flags = 0;
+		if (all_visible_cleared)
+			xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED;
+		if (killspeculative)
+			xlrec.flags |= XLOG_HEAP_KILLED_SPECULATIVE_TUPLE;
 		xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask,
 											  tp.t_data->t_infomask2);
 		xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self);
@@ -2977,7 +3026,7 @@ simple_heap_delete(Relation relation, ItemPointer tid)
 	result = heap_delete(relation, tid,
 						 GetCurrentCommandId(true), InvalidSnapshot,
 						 true /* wait for commit */ ,
-						 &hufd);
+						 &hufd, false);
 	switch (result)
 	{
 		case HeapTupleSelfUpdated:
@@ -4070,14 +4119,16 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update)
  *
  * Function result may be:
  *	HeapTupleMayBeUpdated: lock was successfully acquired
+ *	HeapTupleInvisible: lock failed because tuple instantaneously invisible
  *	HeapTupleSelfUpdated: lock failed because tuple updated by self
  *	HeapTupleUpdated: lock failed because tuple updated by other xact
  *	HeapTupleWouldBlock: lock couldn't be acquired and wait_policy is skip
  *
- * In the failure cases, the routine fills *hufd with the tuple's t_ctid,
- * t_xmax (resolving a possible MultiXact, if necessary), and t_cmax
- * (the last only for HeapTupleSelfUpdated, since we
- * cannot obtain cmax from a combocid generated by another transaction).
+ * In the failure cases other than HeapTupleInvisible, the routine fills
+ * *hufd with the tuple's t_ctid, t_xmax (resolving a possible MultiXact,
+ * if necessary), and t_cmax (the last only for HeapTupleSelfUpdated,
+ * since we cannot obtain cmax from a combocid generated by another
+ * transaction).
  * See comments for struct HeapUpdateFailureData for additional info.
  *
  * See README.tuplock for a thorough explanation of this mechanism.
@@ -4115,8 +4166,15 @@ l3:
 
 	if (result == HeapTupleInvisible)
 	{
-		UnlockReleaseBuffer(*buffer);
-		elog(ERROR, "attempted to lock invisible tuple");
+		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+
+		/*
+		 * This is possible, but only when locking a tuple for speculative
+		 * insertion.  We return this value here rather than throwing an error
+		 * in order to give that case the opportunity to throw a more specific
+		 * error.
+		 */
+		return HeapTupleInvisible;
 	}
 	else if (result == HeapTupleBeingUpdated)
 	{
@@ -7326,7 +7384,10 @@ heap_xlog_delete(XLogReaderState *record)
 		HeapTupleHeaderClearHotUpdated(htup);
 		fix_infomask_from_infobits(xlrec->infobits_set,
 								   &htup->t_infomask, &htup->t_infomask2);
-		HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+		if (!(xlrec->flags & XLOG_HEAP_KILLED_SPECULATIVE_TUPLE))
+			HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+		else
+			HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
 		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
 
 		/* Mark the page as a candidate for pruning */
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index 932c6f78..1a4e18d 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -51,7 +51,8 @@ static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
 				 Relation heapRel, Buffer buf, OffsetNumber offset,
 				 ScanKey itup_scankey,
-				 IndexUniqueCheck checkUnique, bool *is_unique);
+				 IndexUniqueCheck checkUnique, bool *is_unique,
+				 uint32 *speculativeToken);
 static void _bt_findinsertloc(Relation rel,
 				  Buffer *bufptr,
 				  OffsetNumber *offsetptr,
@@ -159,17 +160,27 @@ top:
 	 */
 	if (checkUnique != UNIQUE_CHECK_NO)
 	{
-		TransactionId xwait;
+		TransactionId	xwait;
+		uint32			speculativeToken;
 
 		offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
 		xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
-								 checkUnique, &is_unique);
+								 checkUnique, &is_unique, &speculativeToken);
 
 		if (TransactionIdIsValid(xwait))
 		{
 			/* Have to wait for the other guy ... */
 			_bt_relbuf(rel, buf);
-			XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
+			/*
+			 * If it's a speculative insertion, wait for it to finish (ie.
+			 * to go ahead with the insertion, or kill the tuple). Otherwise
+			 * wait for the transaction to finish as usual.
+			 */
+			if (speculativeToken)
+				SpeculativeInsertionWait(xwait, speculativeToken);
+			else
+				XactLockTableWait(xwait, rel, &itup->t_tid, XLTW_InsertIndex);
+
 			/* start over... */
 			_bt_freestack(stack);
 			goto top;
@@ -211,9 +222,12 @@ top:
  * also point to end-of-page, which means that the first tuple to check
  * is the first tuple on the next page.
  *
- * Returns InvalidTransactionId if there is no conflict, else an xact ID
- * we must wait for to see if it commits a conflicting tuple.   If an actual
- * conflict is detected, no return --- just ereport().
+ * Returns InvalidTransactionId if there is no conflict, else an xact ID we
+ * must wait for to see if it commits a conflicting tuple.	If an actual
+ * conflict is detected, no return --- just ereport(). If an xact ID is
+ * returned, and the conflicting tuple still has a speculative insertion in
+ * progress, *speculativeToken is set to non-zero, and the caller can wait for
+ * the verdict on the insertion using SpeculativeInsertionWait().
  *
  * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
  * InvalidTransactionId because we don't want to wait.  In this case we
@@ -223,7 +237,8 @@ top:
 static TransactionId
 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 				 Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
-				 IndexUniqueCheck checkUnique, bool *is_unique)
+				 IndexUniqueCheck checkUnique, bool *is_unique,
+				 uint32 *speculativeToken)
 {
 	TupleDesc	itupdesc = RelationGetDescr(rel);
 	int			natts = rel->rd_rel->relnatts;
@@ -340,6 +355,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 						if (nbuf != InvalidBuffer)
 							_bt_relbuf(rel, nbuf);
 						/* Tell _bt_doinsert to wait... */
+						*speculativeToken = SnapshotDirty.speculativeToken;
 						return xwait;
 					}
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 33b172b..8d278b8 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1162,6 +1162,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
 	resultRelInfo->ri_NumIndices = 0;
 	resultRelInfo->ri_IndexRelationDescs = NULL;
 	resultRelInfo->ri_IndexRelationInfo = NULL;
+	resultRelInfo->ri_HasExclusionConstraints = false; /* set by ExecOpenIndices */
 	/* make a copy so as not to depend on relcache info not changing... */
 	resultRelInfo->ri_TrigDesc = CopyTriggerDesc(resultRelationDesc->trigdesc);
 	if (resultRelInfo->ri_TrigDesc)
@@ -2094,7 +2095,8 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
 			 * the latest version of the row was deleted, so we need do
 			 * nothing.  (Should be safe to examine xmin without getting
 			 * buffer's content lock, since xmin never changes in an existing
-			 * tuple.)
+			 * non-promise tuple, and there is no reason to lock a promise
+			 * tuple until it is clear that it has been fulfilled.)
 			 */
 			if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple.t_data),
 									 priorXmax))
@@ -2175,11 +2177,12 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
 					 * case, so as to avoid the "Halloween problem" of
 					 * repeated update attempts.  In the latter case it might
 					 * be sensible to fetch the updated tuple instead, but
-					 * doing so would require changing heap_lock_tuple as well
-					 * as heap_update and heap_delete to not complain about
-					 * updating "invisible" tuples, which seems pretty scary.
-					 * So for now, treat the tuple as deleted and do not
-					 * process.
+					 * doing so would require changing heap_update and
+					 * heap_delete to not complain about updating "invisible"
+					 * tuples, which seems pretty scary (heap_lock_tuple will
+					 * not complain, but few callers expect HeapTupleInvisible,
+					 * and we're not one of them).  So for now, treat the tuple
+					 * as deleted and do not process.
 					 */
 					ReleaseBuffer(buffer);
 					return NULL;
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index 022041b..838d2c6 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -44,11 +44,14 @@
 
 #include "access/relscan.h"
 #include "access/transam.h"
+#include "access/xact.h"
 #include "catalog/index.h"
 #include "executor/execdebug.h"
 #include "nodes/nodeFuncs.h"
 #include "parser/parsetree.h"
 #include "storage/lmgr.h"
+#include "storage/procarray.h"
+#include "storage/proc.h"
 #include "utils/memutils.h"
 #include "utils/tqual.h"
 
@@ -938,6 +941,9 @@ ExecOpenIndices(ResultRelInfo *resultRelInfo)
 		/* extract index key information from the index's pg_index info */
 		ii = BuildIndexInfo(indexDesc);
 
+		if (ii->ii_ExclusionOps != NULL)
+			resultRelInfo->ri_HasExclusionConstraints = true;
+
 		relationDescs[i] = indexDesc;
 		indexInfoArray[i] = ii;
 		i++;
@@ -990,7 +996,8 @@ ExecCloseIndices(ResultRelInfo *resultRelInfo)
  *
  *		This returns a list of index OIDs for any unique or exclusion
  *		constraints that are deferred and that had
- *		potential (unconfirmed) conflicts.
+ *		potential (unconfirmed) conflicts. (if noDupErr == true, the
+ *		same is done for non-deferred constraints)
  *
  *		CAUTION: this must not be called for a HOT update.
  *		We can't defend against that here for lack of info.
@@ -1158,6 +1165,8 @@ ExecInsertIndexTuples(TupleTableSlot *slot,
  * newIndex: if true, we are trying to build a new index (this affects
  *		only the wording of error messages)
  * errorOK: if true, don't throw error for violation
+ * wait: if true, wait for conflicting transaction to finish, even if !errorOK
+ * conflictTid: if not-NULL, the TID of conflicting tuple is returned here.
  *
  * Returns true if OK, false if actual or potential violation
  *
@@ -1169,11 +1178,16 @@ ExecInsertIndexTuples(TupleTableSlot *slot,
  *
  * When errorOK is false, we'll throw error on violation, so a false result
  * is impossible.
+ *
+ * If this is a speculative insertion (MyProc->specInsertTid is valud),
+ * waiting on anyone else, kill our already-inserted tuple.
  */
 bool
-check_exclusion_constraint(Relation heap, Relation index, IndexInfo *indexInfo,
-						   ItemPointer tupleid, Datum *values, bool *isnull,
-						   EState *estate, bool newIndex, bool errorOK)
+check_exclusion_constraint(Relation heap, Relation index,
+						   IndexInfo *indexInfo, ItemPointer tupleid,
+						   Datum *values, bool *isnull,
+						   EState *estate, bool newIndex,
+						   bool errorOK)
 {
 	Oid		   *constr_procs = indexInfo->ii_ExclusionProcs;
 	uint16	   *constr_strats = indexInfo->ii_ExclusionStrats;
@@ -1307,10 +1321,28 @@ retry:
 
 		if (TransactionIdIsValid(xwait))
 		{
+			bool	speculative = ItemPointerIsValid(&MyProc->specInsertTid);
 			ctid_wait = tup->t_data->t_ctid;
 			index_endscan(index_scan);
-			XactLockTableWait(xwait, heap, &ctid_wait,
-							  XLTW_RecheckExclusionConstr);
+
+			if (speculative)
+			{
+				HeapUpdateFailureData hufd;
+
+				Assert(ItemPointerEquals(&MyProc->specInsertTid, tupleid));
+				heap_delete(heap, tupleid,
+							estate->es_output_cid, InvalidSnapshot, false,
+							&hufd, true);
+				SpeculativeInsertionLockRelease(GetCurrentTransactionId());
+				ClearSpeculativeInsertionState();
+			}
+
+			if (DirtySnapshot.speculativeToken)
+				SpeculativeInsertionWait(DirtySnapshot.xmin,
+										 DirtySnapshot.speculativeToken);
+			else
+				XactLockTableWait(xwait, heap, &ctid_wait,
+								  XLTW_RecheckExclusionConstr);
 			goto retry;
 		}
 
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index 48107d9..4699060 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -151,10 +151,11 @@ lnext:
 				 * case, so as to avoid the "Halloween problem" of repeated
 				 * update attempts.  In the latter case it might be sensible
 				 * to fetch the updated tuple instead, but doing so would
-				 * require changing heap_lock_tuple as well as heap_update and
-				 * heap_delete to not complain about updating "invisible"
-				 * tuples, which seems pretty scary.  So for now, treat the
-				 * tuple as deleted and do not process.
+				 * require changing heap_update and heap_delete to not complain
+				 * about updating "invisible" tuples, which seems pretty scary
+				 * (heap_lock_tuple will not complain, but few callers expect
+				 * HeapTupleInvisible, and we're not one of them).  So for now,
+				 * treat the tuple as deleted and do not process.
 				 */
 				goto lnext;
 
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index f96fb24..c477d1d 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -46,6 +46,9 @@
 #include "miscadmin.h"
 #include "nodes/nodeFuncs.h"
 #include "storage/bufmgr.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -246,6 +249,8 @@ ExecInsert(TupleTableSlot *slot,
 	}
 	else
 	{
+		bool speculative = false;
+
 		/*
 		 * Constraints might reference the tableoid column, so initialize
 		 * t_tableOid before evaluating them.
@@ -258,6 +263,19 @@ ExecInsert(TupleTableSlot *slot,
 		if (resultRelationDesc->rd_att->constr)
 			ExecConstraints(resultRelInfo, slot, estate);
 
+	vlock:
+		if (resultRelInfo->ri_HasExclusionConstraints)
+		{
+			/*
+			 * Before we start insertion proper, acquire our "promise tuple
+			 * insertion lock". Others can use that (rather than an XID lock,
+			 * which is appropriate only for non-promise tuples) to wait for
+			 * us to decide if we're going to go ahead with the insertion.
+			 */
+			SpeculativeInsertionLockAcquire(GetCurrentTransactionId());
+			speculative = true;
+		}
+
 		/*
 		 * insert the tuple
 		 *
@@ -265,14 +283,52 @@ ExecInsert(TupleTableSlot *slot,
 		 * the t_self field.
 		 */
 		newId = heap_insert(resultRelationDesc, tuple,
-							estate->es_output_cid, 0, NULL);
+							estate->es_output_cid,
+							speculative ? HEAP_INSERT_SPECULATIVE : 0,
+							NULL);
 
 		/*
 		 * insert index entries for tuple
 		 */
 		if (resultRelInfo->ri_NumIndices > 0)
+		{
 			recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
 												   estate);
+
+			if (speculative && !ItemPointerIsValid(&MyProc->specInsertTid))
+			{
+				/*
+				 * Looks like check_exclusion_constraint decided to
+				 * abort the insertion. It already waited for the conflicting
+				 * insertion to finish.
+				 */
+				/*
+				 * Consider possible race:  concurrent insertion conflicts with
+				 * our speculative heap tuple.  Must then "super-delete" the
+				 * heap tuple and retry from the start.
+				 *
+				 * This is occasionally necessary so that "unprincipled
+				 * deadlocks" are avoided;  now that a conflict was found,
+				 * other sessions should not wait on our speculative token, and
+				 * they certainly shouldn't treat our speculatively-inserted
+				 * heap tuple as an ordinary tuple that it must wait on the
+				 * outcome of our xact to UPDATE/DELETE.  This makes heap
+				 * tuples behave as conceptual "value locks" of short duration,
+				 * distinct from ordinary tuples that other xacts must wait on
+				 * xmin-xact-end of in the event of a possible unique/exclusion
+				 * violation (the violation that arbitrates taking the
+				 * alternative UPDATE/IGNORE path).
+				 */
+				list_free(recheckIndexes);
+				goto vlock;
+			}
+		}
+
+		if (speculative)
+		{
+			SpeculativeInsertionLockRelease(GetCurrentTransactionId());
+			ClearSpeculativeInsertionState();
+		}
 	}
 
 	if (canSetTag)
@@ -399,7 +455,8 @@ ldelete:;
 							 estate->es_output_cid,
 							 estate->es_crosscheck_snapshot,
 							 true /* wait for commit */ ,
-							 &hufd);
+							 &hufd,
+							 false);
 		switch (result)
 		{
 			case HeapTupleSelfUpdated:
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index a1ebc72..a1c5bcb 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -421,6 +421,13 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 								  latestXid))
 			ShmemVariableCache->latestCompletedXid = latestXid;
 
+		/* Also clear any speculative insertion information */
+		MyProc->specInsertRel.spcNode = InvalidOid;
+		MyProc->specInsertRel.dbNode = InvalidOid;
+		MyProc->specInsertRel.relNode = InvalidOid;
+		ItemPointerSetInvalid(&MyProc->specInsertTid);
+		MyProc->specInsertToken = 0;
+
 		LWLockRelease(ProcArrayLock);
 	}
 	else
@@ -438,6 +445,11 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 		pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
 		pgxact->delayChkpt = false;		/* be sure this is cleared in abort */
 		proc->recoveryConflictPending = false;
+		MyProc->specInsertRel.spcNode = InvalidOid;
+		MyProc->specInsertRel.dbNode = InvalidOid;
+		MyProc->specInsertRel.relNode = InvalidOid;
+		ItemPointerSetInvalid(&MyProc->specInsertTid);
+		MyProc->specInsertToken = 0;
 
 		Assert(pgxact->nxids == 0);
 		Assert(pgxact->overflowed == false);
@@ -476,6 +488,13 @@ ProcArrayClearTransaction(PGPROC *proc)
 	/* Clear the subtransaction-XID cache too */
 	pgxact->nxids = 0;
 	pgxact->overflowed = false;
+
+	/* these should be clear, but just in case.. */
+	MyProc->specInsertRel.spcNode = InvalidOid;
+	MyProc->specInsertRel.dbNode = InvalidOid;
+	MyProc->specInsertRel.relNode = InvalidOid;
+	ItemPointerSetInvalid(&MyProc->specInsertTid);
+	MyProc->specInsertToken = 0;
 }
 
 /*
@@ -1110,6 +1129,96 @@ TransactionIdIsActive(TransactionId xid)
 
 
 /*
+ * SetSpeculativeInsertionToken -- Set speculative token
+ *
+ * The backend local counter value is set, to allow waiters to differentiate
+ * individual speculative insertions.
+ */
+void
+SetSpeculativeInsertionToken(uint32 token)
+{
+	MyProc->specInsertToken = token;
+}
+
+/*
+ * SetSpeculativeInsertionTid -- Set TID for speculative relfilenode
+ */
+void
+SetSpeculativeInsertionTid(RelFileNode relnode, ItemPointer tid)
+{
+	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+	MyProc->specInsertRel = relnode;
+	ItemPointerCopy(tid, &MyProc->specInsertTid);
+	LWLockRelease(ProcArrayLock);
+}
+
+/*
+ * ClearSpeculativeInsertionState -- Clear token and TID for ourselves
+ */
+void
+ClearSpeculativeInsertionState(void)
+{
+	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+	MyProc->specInsertRel.spcNode = InvalidOid;
+	MyProc->specInsertRel.dbNode = InvalidOid;
+	MyProc->specInsertRel.relNode = InvalidOid;
+	ItemPointerSetInvalid(&MyProc->specInsertTid);
+	MyProc->specInsertToken = 0;
+	LWLockRelease(ProcArrayLock);
+}
+
+/*
+ * Returns a speculative insertion token for waiting for the insertion to
+ * finish
+ */
+uint32
+SpeculativeInsertionIsInProgress(TransactionId xid, RelFileNode rel,
+								 ItemPointer tid)
+{
+	ProcArrayStruct	   *arrayP = procArray;
+	int					index;
+	uint32				result = 0;
+
+	if (TransactionIdPrecedes(xid, RecentXmin))
+		return result;
+
+	/*
+	 * Get the top transaction id.
+	 *
+	 * XXX We could search the proc array first, like
+	 * TransactionIdIsInProgress() does, but this isn't performance-critical.
+	 */
+	xid = SubTransGetTopmostTransaction(xid);
+
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+	for (index = 0; index < arrayP->numProcs; index++)
+	{
+		int			pgprocno = arrayP->pgprocnos[index];
+		PGPROC	   *proc = &allProcs[pgprocno];
+		volatile PGXACT *pgxact = &allPgXact[pgprocno];
+
+		if (pgxact->xid == xid)
+		{
+			/*
+			 * Found the backend.  Is it doing a speculative insertion of the
+			 * given tuple?
+			 */
+			if (RelFileNodeEquals(proc->specInsertRel, rel) &&
+				ItemPointerEquals(tid, &proc->specInsertTid))
+				result = proc->specInsertToken;
+
+			break;
+		}
+	}
+
+	LWLockRelease(ProcArrayLock);
+
+	return result;
+}
+
+
+/*
  * GetOldestXmin -- returns oldest transaction that was running
  *					when any current transaction was started.
  *
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index d13a167..7a1df22 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -575,6 +575,69 @@ ConditionalXactLockTableWait(TransactionId xid)
 	return true;
 }
 
+static uint32 speculativeInsertionToken = 0;
+
+/*
+ *		SpeculativeInsertionLockAcquire
+ *
+ * Insert a lock showing that the given transaction ID is inserting a tuple,
+ * but hasn't yet decided whether it's going to keep it. The lock can then be
+ * used to wait for the decision to go ahead with the insertion, or aborting
+ * it.
+ *
+ * The token is used to distinguish multiple insertions by the same
+ * transaction. A counter will do, for example.
+ */
+void
+SpeculativeInsertionLockAcquire(TransactionId xid)
+{
+	LOCKTAG		tag;
+
+	speculativeInsertionToken++;
+	SetSpeculativeInsertionToken(speculativeInsertionToken);
+
+	SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
+
+	(void) LockAcquire(&tag, ExclusiveLock, false, false);
+}
+
+/*
+ *		SpeculativeInsertionLockRelease
+ *
+ * Delete the lock showing that the given transaction is speculatively
+ * inserting a tuple.
+ */
+void
+SpeculativeInsertionLockRelease(TransactionId xid)
+{
+	LOCKTAG		tag;
+
+	SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
+
+	LockRelease(&tag, ExclusiveLock, false);
+}
+
+/*
+ *		SpeculativeInsertionWait
+ *
+ * Wait for the specified transaction to finish or abort the insertion of a
+ * tuple.
+ */
+void
+SpeculativeInsertionWait(TransactionId xid, uint32 token)
+{
+	LOCKTAG		tag;
+
+	SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, token);
+
+	Assert(TransactionIdIsValid(xid));
+	Assert(token != 0);
+
+	(void) LockAcquire(&tag, ShareLock, false, false);
+	LockRelease(&tag, ShareLock, false);
+}
+
+
 /*
  * XactLockTableWaitErrorContextCb
  *		Error context callback for transaction lock waits.
@@ -873,6 +936,11 @@ DescribeLockTag(StringInfo buf, const LOCKTAG *tag)
 							 tag->locktag_field1,
 							 tag->locktag_field2);
 			break;
+		case LOCKTAG_PROMISE_TUPLE_INSERTION:
+			appendStringInfo(buf,
+							 _("tuple insertion by transaction %u"),
+							 tag->locktag_field1);
+			break;
 		case LOCKTAG_OBJECT:
 			appendStringInfo(buf,
 							 _("object %u of class %u of database %u"),
diff --git a/src/backend/utils/adt/lockfuncs.c b/src/backend/utils/adt/lockfuncs.c
index a1967b69..95d62cb 100644
--- a/src/backend/utils/adt/lockfuncs.c
+++ b/src/backend/utils/adt/lockfuncs.c
@@ -28,6 +28,7 @@ static const char *const LockTagTypeNames[] = {
 	"tuple",
 	"transactionid",
 	"virtualxid",
+	"inserter transactionid",
 	"object",
 	"userlock",
 	"advisory"
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index 777f55c..99bb417 100644
--- a/src/backend/utils/time/tqual.c
+++ b/src/backend/utils/time/tqual.c
@@ -170,6 +170,13 @@ HeapTupleSatisfiesSelf(HeapTuple htup, Snapshot snapshot, Buffer buffer)
 	Assert(ItemPointerIsValid(&htup->t_self));
 	Assert(htup->t_tableOid != InvalidOid);
 
+	/*
+	 * Never return "super-deleted" tuples
+	 */
+	if (TransactionIdEquals(HeapTupleHeaderGetRawXmin(tuple),
+							InvalidTransactionId))
+		return false;
+
 	if (!HeapTupleHeaderXminCommitted(tuple))
 	{
 		if (HeapTupleHeaderXminInvalid(tuple))
@@ -726,6 +733,17 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
 	Assert(htup->t_tableOid != InvalidOid);
 
 	snapshot->xmin = snapshot->xmax = InvalidTransactionId;
+	snapshot->speculativeToken = 0;
+
+	/*
+	 * Never return "super-deleted" tuples
+	 *
+	 * XXX:  Comment this code out and you'll get conflicts within
+	 * ExecLockUpdateTuple(), which result in an infinite loop.
+	 */
+	if (TransactionIdEquals(HeapTupleHeaderGetRawXmin(tuple),
+							InvalidTransactionId))
+		return false;
 
 	if (!HeapTupleHeaderXminCommitted(tuple))
 	{
@@ -807,6 +825,26 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
 		}
 		else if (TransactionIdIsInProgress(HeapTupleHeaderGetRawXmin(tuple)))
 		{
+			RelFileNode		rnode;
+			ForkNumber		forkno;
+			BlockNumber		blockno;
+
+			BufferGetTag(buffer, &rnode, &forkno, &blockno);
+
+			/* tuples can only be in the main fork */
+			Assert(forkno == MAIN_FORKNUM);
+			Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
+
+			/*
+			 * Set speculative token.  Caller can worry about xmax, since it
+			 * requires a conclusively locked row version, and a concurrent
+			 * update to this tuple is a conflict of its purposes.
+			 */
+			snapshot->speculativeToken =
+				SpeculativeInsertionIsInProgress(HeapTupleHeaderGetRawXmin(tuple),
+												 rnode,
+												 &htup->t_self);
+
 			snapshot->xmin = HeapTupleHeaderGetRawXmin(tuple);
 			/* XXX shouldn't we fall through to look at xmax? */
 			return true;		/* in insertion by other */
@@ -922,6 +960,13 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
 	Assert(ItemPointerIsValid(&htup->t_self));
 	Assert(htup->t_tableOid != InvalidOid);
 
+	/*
+	 * Never return "super-deleted" tuples
+	 */
+	if (TransactionIdEquals(HeapTupleHeaderGetRawXmin(tuple),
+							InvalidTransactionId))
+		return false;
+
 	if (!HeapTupleHeaderXminCommitted(tuple))
 	{
 		if (HeapTupleHeaderXminInvalid(tuple))
@@ -1126,6 +1171,13 @@ HeapTupleSatisfiesVacuum(HeapTuple htup, TransactionId OldestXmin,
 	Assert(htup->t_tableOid != InvalidOid);
 
 	/*
+	 * Immediately VACUUM "super-deleted" tuples
+	 */
+	if (TransactionIdEquals(HeapTupleHeaderGetRawXmin(tuple),
+							InvalidTransactionId))
+		return HEAPTUPLE_DEAD;
+
+	/*
 	 * Has inserting transaction committed?
 	 *
 	 * If the inserting transaction aborted, then the tuple was never visible
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 939d93d..62e760a 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -28,6 +28,7 @@
 #define HEAP_INSERT_SKIP_WAL	0x0001
 #define HEAP_INSERT_SKIP_FSM	0x0002
 #define HEAP_INSERT_FROZEN		0x0004
+#define HEAP_INSERT_SPECULATIVE 0x0008
 
 typedef struct BulkInsertStateData *BulkInsertState;
 
@@ -141,7 +142,7 @@ extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 				  CommandId cid, int options, BulkInsertState bistate);
 extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
 			CommandId cid, Snapshot crosscheck, bool wait,
-			HeapUpdateFailureData *hufd);
+			HeapUpdateFailureData *hufd, bool killspeculative);
 extern HTSU_Result heap_update(Relation relation, ItemPointer otid,
 			HeapTuple newtup,
 			CommandId cid, Snapshot crosscheck, bool wait,
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index a2ed2a0..870985d 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -73,6 +73,8 @@
 #define XLOG_HEAP_SUFFIX_FROM_OLD			(1<<6)
 /* last xl_heap_multi_insert record for one heap_multi_insert() call */
 #define XLOG_HEAP_LAST_MULTI_INSERT			(1<<7)
+/* reuse xl_heap_multi_insert-only bit for xl_heap_delete */
+#define XLOG_HEAP_KILLED_SPECULATIVE_TUPLE	XLOG_HEAP_LAST_MULTI_INSERT
 
 /* convenience macro for checking whether any form of old tuple was logged */
 #define XLOG_HEAP_CONTAINS_OLD						\
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 41288ed..123bbae 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -318,6 +318,7 @@ typedef struct ResultRelInfo
 	int			ri_NumIndices;
 	RelationPtr ri_IndexRelationDescs;
 	IndexInfo **ri_IndexRelationInfo;
+	bool		ri_HasExclusionConstraints;
 	TriggerDesc *ri_TrigDesc;
 	FmgrInfo   *ri_TrigFunctions;
 	List	  **ri_TrigWhenExprs;
diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h
index f5d70e5..6bb95fc 100644
--- a/src/include/storage/lmgr.h
+++ b/src/include/storage/lmgr.h
@@ -76,6 +76,11 @@ extern bool ConditionalXactLockTableWait(TransactionId xid);
 extern void WaitForLockers(LOCKTAG heaplocktag, LOCKMODE lockmode);
 extern void WaitForLockersMultiple(List *locktags, LOCKMODE lockmode);
 
+/* Lock an XID for tuple insertion (used to wait for an insertion to finish) */
+extern void SpeculativeInsertionLockAcquire(TransactionId xid);
+extern void SpeculativeInsertionLockRelease(TransactionId xid);
+extern void SpeculativeInsertionWait(TransactionId xid, uint32 token);
+
 /* Lock a general object (other than a relation) of the current database */
 extern void LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
 				   LOCKMODE lockmode);
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 1100923..9c21810 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -176,6 +176,8 @@ typedef enum LockTagType
 	/* ID info for a transaction is its TransactionId */
 	LOCKTAG_VIRTUALTRANSACTION, /* virtual transaction (ditto) */
 	/* ID info for a virtual transaction is its VirtualTransactionId */
+	LOCKTAG_PROMISE_TUPLE_INSERTION, /* tuple insertion, keyed by Xid */
+	/* ID info for a transaction is its TransactionId */
 	LOCKTAG_OBJECT,				/* non-relation database object */
 	/* ID info for an object is DB OID + CLASS OID + OBJECT OID + SUBID */
 
@@ -261,6 +263,14 @@ typedef struct LOCKTAG
 	 (locktag).locktag_type = LOCKTAG_VIRTUALTRANSACTION, \
 	 (locktag).locktag_lockmethodid = DEFAULT_LOCKMETHOD)
 
+#define SET_LOCKTAG_SPECULATIVE_INSERTION(locktag,xid,token) \
+	((locktag).locktag_field1 = (xid), \
+	 (locktag).locktag_field2 = (token),		\
+	 (locktag).locktag_field3 = 0, \
+	 (locktag).locktag_field4 = 0, \
+	 (locktag).locktag_type = LOCKTAG_PROMISE_TUPLE_INSERTION, \
+	 (locktag).locktag_lockmethodid = DEFAULT_LOCKMETHOD)
+
 #define SET_LOCKTAG_OBJECT(locktag,dboid,classoid,objoid,objsubid) \
 	((locktag).locktag_field1 = (dboid), \
 	 (locktag).locktag_field2 = (classoid), \
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index e807a2e..cd15570 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -16,9 +16,11 @@
 
 #include "access/xlogdefs.h"
 #include "lib/ilist.h"
+#include "storage/itemptr.h"
 #include "storage/latch.h"
 #include "storage/lock.h"
 #include "storage/pg_sema.h"
+#include "storage/relfilenode.h"
 
 /*
  * Each backend advertises up to PGPROC_MAX_CACHED_SUBXIDS TransactionIds
@@ -132,6 +134,17 @@ struct PGPROC
 	 */
 	SHM_QUEUE	myProcLocks[NUM_LOCK_PARTITIONS];
 
+	/*
+	 * Info to allow us to perform speculative insertion without "unprincipled
+	 * deadlocks". This state allows others to wait on the outcome of an
+	 * optimistically inserted speculative tuple for only the duration of the
+	 * insertion (not to the end of our xact) iff the insertion does not work
+	 * out (due to our detecting a conflict).
+	 */
+	uint32		specInsertToken;
+	RelFileNode	specInsertRel;
+	ItemPointerData specInsertTid;
+
 	struct XidCache subxids;	/* cache for subtransaction XIDs */
 
 	/* Per-backend LWLock.  Protects fields below. */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 97c6e93..ea2bba9 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -55,6 +55,13 @@ extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
 extern TransactionId GetOldestActiveTransactionId(void);
 extern TransactionId GetOldestSafeDecodingTransactionId(void);
 
+extern void SetSpeculativeInsertionToken(uint32 token);
+extern void SetSpeculativeInsertionTid(RelFileNode relnode, ItemPointer tid);
+extern void ClearSpeculativeInsertionState(void);
+extern uint32 SpeculativeInsertionIsInProgress(TransactionId xid,
+											   RelFileNode rel,
+											   ItemPointer tid);
+
 extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids);
 extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids);
 
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 26fb257..cd5ad76 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -87,6 +87,17 @@ typedef struct SnapshotData
 	bool		copied;			/* false if it's a static snapshot */
 
 	/*
+	 * Snapshot's speculative token is value set by HeapTupleSatisfiesDirty,
+	 * indicating that the tuple is being inserted speculatively, and may yet
+	 * be "super-deleted" before EOX. The caller may use the value with
+	 * PromiseTupleInsertionWait to wait for the inserter to decide. It is only
+	 * set when a valid 'xmin' is set, too.  By convention, when
+	 * speculativeToken is zero, the caller must assume that is should wait on
+	 * a non-speculative tuple (i.e. wait for xmin/xmax to commit).
+	 */
+	uint32		speculativeToken;
+
+	/*
 	 * note: all ids in subxip[] are >= xmin, but we don't bother filtering
 	 * out any that are >= xmax
 	 */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to