On 13.05.2013 17:21, Merlin Moncure wrote:
On Mon, May 13, 2013 at 7:50 AM, Heikki Linnakangas
<hlinnakan...@vmware.com>  wrote:
The attached patch is still work-in-progress. There needs to be a configure
test and fallback to spinlock if a CAS instruction is not available. I used
the gcc __sync_val_compare_and_swap() builtin directly, that needs to be
abstracted. Also, in the case that the wait queue needs to be manipulated,
the code spins on the CAS instruction, but there is no delay mechanism like
there is on a regular spinlock; that needs to be added in somehow.

These are really interesting results. Why is the CAS method so much
faster then TAS?

If my theory is right, the steep fall is caused by backends being sometimes context switched while holding the spinlock. The CAS method alleviates that, because the lock is never "held" by anyone. With a spinlock, if a process gets context switched while holding the lock, everyone else will have to wait until the process gets context switched back, and releases the spinlock. With the CAS method, if a process gets switched in the CAS-loop, it doesn't hurt others.

With the patch, the CAS-loop still spins, just like a spinlock, when the wait queue has to be manipulated. So when that happens, it will behave more or less like the spinlock implementation. I'm not too concerned about that optimizing that further; sleeping and signaling sleeping backends are heavy-weight operations anyway.

Did you see any contention?

With the current spinlock implementation? Yeah, per the profile I posted, a lot of CPU time was spent spinning, which is a sign of contention. I didn't see contention with the CAS implemention.

Attached is a new version of the patch. I cleaned it up a lot, and added a configure test, and fallback to the good old spinlock implementation if compare-and-swap is not available. Also, if the CAS loops needs to spin, like a spinlock, it now sleeps after a while and updates the "spins_per_delay" like regular spinlock.

- Heikki
diff --git a/configure b/configure
index 98d889a..43faaf8 100755
--- a/configure
+++ b/configure
@@ -22742,71 +22742,6 @@ fi
 done
 
 
-{ $as_echo "$as_me:$LINENO: checking for builtin locking functions" >&5
-$as_echo_n "checking for builtin locking functions... " >&6; }
-if test "${pgac_cv_gcc_int_atomics+set}" = set; then
-  $as_echo_n "(cached) " >&6
-else
-  cat >conftest.$ac_ext <<_ACEOF
-/* confdefs.h.  */
-_ACEOF
-cat confdefs.h >>conftest.$ac_ext
-cat >>conftest.$ac_ext <<_ACEOF
-/* end confdefs.h.  */
-
-int
-main ()
-{
-int lock = 0;
-   __sync_lock_test_and_set(&lock, 1);
-   __sync_lock_release(&lock);
-  ;
-  return 0;
-}
-_ACEOF
-rm -f conftest.$ac_objext conftest$ac_exeext
-if { (ac_try="$ac_link"
-case "(($ac_try" in
-  *\"* | *\`* | *\\*) ac_try_echo=\$ac_try;;
-  *) ac_try_echo=$ac_try;;
-esac
-eval ac_try_echo="\"\$as_me:$LINENO: $ac_try_echo\""
-$as_echo "$ac_try_echo") >&5
-  (eval "$ac_link") 2>conftest.er1
-  ac_status=$?
-  grep -v '^ *+' conftest.er1 >conftest.err
-  rm -f conftest.er1
-  cat conftest.err >&5
-  $as_echo "$as_me:$LINENO: \$? = $ac_status" >&5
-  (exit $ac_status); } && {
-	 test -z "$ac_c_werror_flag" ||
-	 test ! -s conftest.err
-       } && test -s conftest$ac_exeext && {
-	 test "$cross_compiling" = yes ||
-	 $as_test_x conftest$ac_exeext
-       }; then
-  pgac_cv_gcc_int_atomics="yes"
-else
-  $as_echo "$as_me: failed program was:" >&5
-sed 's/^/| /' conftest.$ac_ext >&5
-
-	pgac_cv_gcc_int_atomics="no"
-fi
-
-rm -rf conftest.dSYM
-rm -f core conftest.err conftest.$ac_objext conftest_ipa8_conftest.oo \
-      conftest$ac_exeext conftest.$ac_ext
-fi
-{ $as_echo "$as_me:$LINENO: result: $pgac_cv_gcc_int_atomics" >&5
-$as_echo "$pgac_cv_gcc_int_atomics" >&6; }
-if test x"$pgac_cv_gcc_int_atomics" = x"yes"; then
-
-cat >>confdefs.h <<\_ACEOF
-#define HAVE_GCC_INT_ATOMICS 1
-_ACEOF
-
-fi
-
 # Lastly, restore full LIBS list and check for readline/libedit symbols
 LIBS="$LIBS_including_readline"
 
@@ -28573,6 +28508,136 @@ _ACEOF
 fi
 
 
+# Check for GCC built-in atomic functions
+{ $as_echo "$as_me:$LINENO: checking for builtin test-and-set function" >&5
+$as_echo_n "checking for builtin test-and-set function... " >&6; }
+if test "${pgac_cv_gcc_int_test_and_set+set}" = set; then
+  $as_echo_n "(cached) " >&6
+else
+  cat >conftest.$ac_ext <<_ACEOF
+/* confdefs.h.  */
+_ACEOF
+cat confdefs.h >>conftest.$ac_ext
+cat >>conftest.$ac_ext <<_ACEOF
+/* end confdefs.h.  */
+
+int
+main ()
+{
+int lock = 0;
+   __sync_lock_test_and_set(&lock, 1);
+   __sync_lock_release(&lock);
+  ;
+  return 0;
+}
+_ACEOF
+rm -f conftest.$ac_objext conftest$ac_exeext
+if { (ac_try="$ac_link"
+case "(($ac_try" in
+  *\"* | *\`* | *\\*) ac_try_echo=\$ac_try;;
+  *) ac_try_echo=$ac_try;;
+esac
+eval ac_try_echo="\"\$as_me:$LINENO: $ac_try_echo\""
+$as_echo "$ac_try_echo") >&5
+  (eval "$ac_link") 2>conftest.er1
+  ac_status=$?
+  grep -v '^ *+' conftest.er1 >conftest.err
+  rm -f conftest.er1
+  cat conftest.err >&5
+  $as_echo "$as_me:$LINENO: \$? = $ac_status" >&5
+  (exit $ac_status); } && {
+	 test -z "$ac_c_werror_flag" ||
+	 test ! -s conftest.err
+       } && test -s conftest$ac_exeext && {
+	 test "$cross_compiling" = yes ||
+	 $as_test_x conftest$ac_exeext
+       }; then
+  pgac_cv_gcc_int_test_and_set="yes"
+else
+  $as_echo "$as_me: failed program was:" >&5
+sed 's/^/| /' conftest.$ac_ext >&5
+
+	pgac_cv_gcc_int_test_and_set="no"
+fi
+
+rm -rf conftest.dSYM
+rm -f core conftest.err conftest.$ac_objext conftest_ipa8_conftest.oo \
+      conftest$ac_exeext conftest.$ac_ext
+fi
+{ $as_echo "$as_me:$LINENO: result: $pgac_cv_gcc_int_test_and_set" >&5
+$as_echo "$pgac_cv_gcc_int_test_and_set" >&6; }
+if test x"$pgac_cv_gcc_int_test_and_set" = x"yes"; then
+
+cat >>confdefs.h <<\_ACEOF
+#define HAVE_GCC_INT_TEST_AND_SET 1
+_ACEOF
+
+fi
+
+{ $as_echo "$as_me:$LINENO: checking for builtin compare-and-swap function" >&5
+$as_echo_n "checking for builtin compare-and-swap function... " >&6; }
+if test "${pgac_cv_gcc_int64_compare_and_swap+set}" = set; then
+  $as_echo_n "(cached) " >&6
+else
+  cat >conftest.$ac_ext <<_ACEOF
+/* confdefs.h.  */
+_ACEOF
+cat confdefs.h >>conftest.$ac_ext
+cat >>conftest.$ac_ext <<_ACEOF
+/* end confdefs.h.  */
+
+int
+main ()
+{
+PG_INT64_TYPE lock = 0;
+   (void) __sync_val_compare_and_swap(&lock, 1, 1);
+  ;
+  return 0;
+}
+_ACEOF
+rm -f conftest.$ac_objext conftest$ac_exeext
+if { (ac_try="$ac_link"
+case "(($ac_try" in
+  *\"* | *\`* | *\\*) ac_try_echo=\$ac_try;;
+  *) ac_try_echo=$ac_try;;
+esac
+eval ac_try_echo="\"\$as_me:$LINENO: $ac_try_echo\""
+$as_echo "$ac_try_echo") >&5
+  (eval "$ac_link") 2>conftest.er1
+  ac_status=$?
+  grep -v '^ *+' conftest.er1 >conftest.err
+  rm -f conftest.er1
+  cat conftest.err >&5
+  $as_echo "$as_me:$LINENO: \$? = $ac_status" >&5
+  (exit $ac_status); } && {
+	 test -z "$ac_c_werror_flag" ||
+	 test ! -s conftest.err
+       } && test -s conftest$ac_exeext && {
+	 test "$cross_compiling" = yes ||
+	 $as_test_x conftest$ac_exeext
+       }; then
+  pgac_cv_gcc_int64_compare_and_swap="yes"
+else
+  $as_echo "$as_me: failed program was:" >&5
+sed 's/^/| /' conftest.$ac_ext >&5
+
+	pgac_cv_gcc_int64_compare_and_swap="no"
+fi
+
+rm -rf conftest.dSYM
+rm -f core conftest.err conftest.$ac_objext conftest_ipa8_conftest.oo \
+      conftest$ac_exeext conftest.$ac_ext
+fi
+{ $as_echo "$as_me:$LINENO: result: $pgac_cv_gcc_int64_compare_and_swap" >&5
+$as_echo "$pgac_cv_gcc_int64_compare_and_swap" >&6; }
+if test x"$pgac_cv_gcc_int64_compare_and_swap" = x"yes"; then
+
+cat >>confdefs.h <<\_ACEOF
+#define HAVE_GCC_INT64_COMPARE_AND_SWAP 1
+_ACEOF
+
+fi
+
 
 if test "$PORTNAME" != "win32"
 then
diff --git a/configure.in b/configure.in
index 4ea5699..ff8470e 100644
--- a/configure.in
+++ b/configure.in
@@ -1445,17 +1445,6 @@ fi
 AC_CHECK_FUNCS([strtoll strtoq], [break])
 AC_CHECK_FUNCS([strtoull strtouq], [break])
 
-AC_CACHE_CHECK([for builtin locking functions], pgac_cv_gcc_int_atomics,
-[AC_TRY_LINK([],
-  [int lock = 0;
-   __sync_lock_test_and_set(&lock, 1);
-   __sync_lock_release(&lock);],
-  [pgac_cv_gcc_int_atomics="yes"],
-  [pgac_cv_gcc_int_atomics="no"])])
-if test x"$pgac_cv_gcc_int_atomics" = x"yes"; then
-  AC_DEFINE(HAVE_GCC_INT_ATOMICS, 1, [Define to 1 if you have __sync_lock_test_and_set(int *) and friends.])
-fi
-
 # Lastly, restore full LIBS list and check for readline/libedit symbols
 LIBS="$LIBS_including_readline"
 
@@ -1730,6 +1719,28 @@ AC_CHECK_TYPES([int8, uint8, int64, uint64], [], [],
 # C, but is missing on some old platforms.
 AC_CHECK_TYPES(sig_atomic_t, [], [], [#include <signal.h>])
 
+# Check for GCC built-in atomic functions
+AC_CACHE_CHECK([for builtin test-and-set function], pgac_cv_gcc_int_test_and_set,
+[AC_TRY_LINK([],
+  [int lock = 0;
+   __sync_lock_test_and_set(&lock, 1);
+   __sync_lock_release(&lock);],
+  [pgac_cv_gcc_int_test_and_set="yes"],
+  [pgac_cv_gcc_int_test_and_set="no"])])
+if test x"$pgac_cv_gcc_int_test_and_set" = x"yes"; then
+  AC_DEFINE(HAVE_GCC_INT_TEST_AND_SET, 1, [Define to 1 if you have __sync_lock_test_and_set(int *) and __sync_lock_release(int *).])
+fi
+
+AC_CACHE_CHECK([for builtin compare-and-swap function], pgac_cv_gcc_int64_compare_and_swap,
+[AC_TRY_LINK([],
+  [PG_INT64_TYPE lock = 0;
+   (void) __sync_val_compare_and_swap(&lock, 1, 1);],
+  [pgac_cv_gcc_int64_compare_and_swap="yes"],
+  [pgac_cv_gcc_int64_compare_and_swap="no"])])
+if test x"$pgac_cv_gcc_int64_compare_and_swap" = x"yes"; then
+  AC_DEFINE(HAVE_GCC_INT64_COMPARE_AND_SWAP, 1, [Define to 1 if you have __sync_val_compare_and_swap(int64 *, int64, int64).])
+fi
+
 
 if test "$PORTNAME" != "win32"
 then
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 4f88d3f..adbeb77 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -27,6 +27,7 @@
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
+#include "storage/compare_and_swap.h"
 #include "storage/ipc.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
@@ -36,19 +37,215 @@
 /* We use the ShmemLock spinlock to protect LWLockAssign */
 extern slock_t *ShmemLock;
 
-
-typedef struct LWLock
+/*
+ * LWLock_atomic is the part of LWLock struct that needs to be swappable with
+ * an atomic compare-and-swap instruction. Assuming we have an atomic 64-bit
+ * CAS instruction, sizeof(LWLock_atomic) must be <= 8.
+ */
+typedef struct
 {
-	slock_t		mutex;			/* Protects LWLock and queue of PGPROCs */
+#ifdef HAS_COMPARE_AND_SWAP_64
+	bool		mutex;			/* Protects LWLock and queue of PGPROCs */
+#endif
 	bool		releaseOK;		/* T if ok to release waiters */
+	bool		waiters;		/* T if queue is not empty (head != NULL) */
 	char		exclusive;		/* # of exclusive holders (0 or 1) */
-	int			shared;			/* # of shared holders (0..MaxBackends) */
+	uint32		shared;			/* # of shared holders (0..MaxBackends) */
+} LWLock_atomic;
+
+/* this allows handling LWLock_atomic as an int64 */
+typedef union
+{
+	LWLock_atomic f;
+	int64_t		val;
+} LWLock_cas_union;
+
+typedef struct LWLock
+{
+	/*
+	 * If 64-bit compare-and-swap instruction is not available, all the fields
+	 * are protected by a spinlock. Otherwise, the 'atomic' part is manipulated
+	 * using compare-and-swap, and the head and tail pointers are protected
+	 * by atomic.f.mutex.
+	 */
+#ifndef HAS_COMPARE_AND_SWAP_64
+	slock_t		mutex;
+#endif
+	LWLock_cas_union atomic;
 	PGPROC	   *head;			/* head of list of waiting PGPROCs */
 	PGPROC	   *tail;			/* tail of list of waiting PGPROCs */
 	/* tail is undefined when head is NULL */
 } LWLock;
 
 /*
+ * There are two different implementations of the low-level locking that
+ * protects the LWLock struct: spinlock, and compare-and-swap. The
+ * compare-and-swap implementation scales better, but requires the CPU
+ * architecture to have an atomic 64-bit compare-and-swap instruction. All
+ * modern architectures have one, but some older ones do not. If a CAS
+ * instruction is not available, we fall back to the spinlock-based
+ * implementation.
+ *
+ * The spinlock based implementation uses a spinlock to protect the whole
+ * LWLock struct. When acquiring or releasing a lwlock, the spinlock is
+ * acquired first, and the exclusive/shared counter and wait queue are
+ * manipulated while holding the spinlock.
+ *
+ * In the compare-and-swap implementation, the CAS instruction is used to
+ * increment the shared/exclusive counter, when the lock can be acquired
+ * without waiting. Likewise, when releasing a lwlock, and there is no need to
+ * modify the wait queue to release waiting backends, the counter is
+ * decremented with the CAS instruction.  The wait queue is still protected by
+ * the 'mutex' field in LWLock_atomic, which amounts to a spinlock. When the
+ * mutex field is set, you must not modify any of the fields; you must spin
+ * until it is no longer set. This spinning is accomplished by looping with
+ * the CAS instruction, always using mutex = false in the old value to compare
+ * with.
+ *
+ * These two implementations are abstracted by the below macros. The intended
+ * usage is like this:
+ *
+ * {
+ *	LWLock *lock = ...;
+ *	LWLOCK_CAS_VARS;
+ *
+ *	LWLOCK_CAS_LOOP(lock);
+ *  {
+ *	     Check the old values in LWLock_atomic (oldval), and based on that,
+ *		 set the new values (newval). If you need to acquire the mutex that
+ *		 protects the wait queue, set newval.f.mutex = true;
+ *		 This part might be executed many times, if the compare-and-swap
+ *		 operation fails because of concurrent activity.
+ *
+ *      e.g:
+ *		if (oldval.f.shared > 0)
+ *			newval.f.shared++;
+ *  }
+ *  LWLOCK_CAS_END(lock);
+ *
+ *  At this point, the compare-and-swap succeeded. If you didn't acquire
+ *  the mutex by setting newval.f.mutex = true, you're done. Otherwise,
+ *  proceed to manipulate the wait queue, and when you're done, call
+ *  LWLOCK_RELEASE_MUTEX:
+ *
+ *  if (acquired_mutex)
+ *  {
+ *     ... manipulate wait queue ...
+ *		LWLOCK_RELEASE_MUTEX(lock);
+ *  }
+ *
+ *  ... all done ...
+ * }
+ *
+ * To break out of the CAS loop prematurely, without changing the value, use:
+ *
+ *	LWLOCK_CAS_ABORT();
+ *	break;
+ *
+ */
+#ifdef HAS_COMPARE_AND_SWAP_64
+
+#define LWLOCK_CAS_VARS													\
+	LWLock_cas_union oldval;											\
+	LWLock_cas_union newval;											\
+	int64_t		tmpval													\
+
+/*
+ * NB: The initial fetch of the old value isn't guaranteed to be atomic on
+ * all platforms. That's OK: if the value changes concurrently, the
+ * compare-and-swap operation will fail, and we will loop back here with the
+ * correct value. However, it means that all the CAS loops must tolerate a
+ * bogus, "torn", old value!
+ */
+#define LWLOCK_CAS_LOOP(lock)											\
+	do {																\
+		bool		spinned = false;									\
+																		\
+		/* initial fetch */												\
+		oldval.val = (lock)->atomic.val;								\
+		for (;;)														\
+		{																\
+			oldval.f.mutex = false;										\
+			newval = oldval												\
+
+			/* The user-code that modifies newval goes here */
+
+#define LWLOCK_CAS_END(lock, keepmutex)									\
+			newval.f.mutex = keepmutex;									\
+			tmpval = pg_compare_and_swap_64(&(lock)->atomic.val,		\
+											oldval.val, newval.val);	\
+			if (tmpval != oldval.val)									\
+			{															\
+				/* failed to acquire lock or mutex, retry */			\
+				if (oldval.f.mutex)										\
+				{														\
+					/*													\
+					 * When we fail because someone is holding the		\
+					 * mutex, this amounts to spinning on a spinlock,	\
+					 * so keep the spinlock delay accounting up-to-date	\
+					 */ 												\
+					spin_fail();										\
+					spinned = true;										\
+				}														\
+				oldval.val = tmpval;									\
+				continue;												\
+			}															\
+			else														\
+			{															\
+				/* CAS succeeded */										\
+				if (spinned)											\
+					spin_success();										\
+				oldval = newval;										\
+				break;													\
+			}															\
+		}																\
+	} while(0)
+
+#define LWLOCK_CAS_ABORT(lock) /* do nothing */
+
+#define LWLOCK_CAS_RELEASE_MUTEX(lock)									\
+	do {																\
+		Assert(oldval.f.mutex);											\
+		newval.f.mutex = false;											\
+		tmpval = pg_compare_and_swap_64(&(lock)->atomic.val,			\
+										oldval.val, newval.val);		\
+		/* should always succeed when we're holding the mutex */		\
+		Assert(tmpval == oldval.val);									\
+	} while(0)
+
+#else
+
+/* spinlock-based implementation */
+#define LWLOCK_CAS_VARS													\
+	LWLock_cas_union oldval;											\
+	LWLock_cas_union newval												\
+
+#define LWLOCK_CAS_LOOP(lock)											\
+	do {																\
+		SpinLockAcquire(&(lock)->mutex);								\
+		oldval = (lock)->atomic;										\
+		newval = oldval;												\
+
+#define LWLOCK_CAS_END(lock, keepmutex)									\
+		if (!keepmutex)													\
+		{																\
+			(lock)->atomic = newval;									\
+			SpinLockRelease(&(lock)->mutex);							\
+		}																\
+	} while(0)
+
+#define LWLOCK_CAS_ABORT(lock)											\
+		SpinLockRelease(&(lock)->mutex)
+
+#define LWLOCK_CAS_RELEASE_MUTEX(lock)									\
+	do {																\
+		(lock)->atomic = newval;										\
+		SpinLockRelease(&(lock)->mutex);								\
+	} while(0)
+
+#endif		/* HAS_COMPARE_AND_SWAP_64 */
+
+/*
  * All the LWLock structs are allocated as an array in shared memory.
  * (LWLockIds are indexes into the array.)	We force the array stride to
  * be a power of 2, which saves a few cycles in indexing, but more
@@ -267,6 +464,11 @@ CreateLWLocks(void)
 	char	   *ptr;
 	int			id;
 
+#ifdef HAS_COMPARE_AND_SWAP_64
+StaticAssertStmt(sizeof(LWLock_cas_union) == sizeof(int64_t),
+					 "unexpected size of LWLock_cas_t");
+#endif
+
 	/* Allocate space */
 	ptr = (char *) ShmemAlloc(spaceLocks);
 
@@ -283,10 +485,16 @@ CreateLWLocks(void)
 	 */
 	for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
 	{
+#ifndef HAS_COMPARE_AND_SWAP_64
 		SpinLockInit(&lock->lock.mutex);
-		lock->lock.releaseOK = true;
-		lock->lock.exclusive = 0;
-		lock->lock.shared = 0;
+#else
+		lock->lock.atomic.f.mutex = false;
+#endif
+		lock->lock.atomic.val = 0; /* clear possible padding */
+		lock->lock.atomic.f.releaseOK = true;
+		lock->lock.atomic.f.waiters = false;
+		lock->lock.atomic.f.exclusive = 0;
+		lock->lock.atomic.f.shared = 0;
 		lock->lock.head = NULL;
 		lock->lock.tail = NULL;
 	}
@@ -395,39 +603,42 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
 	for (;;)
 	{
 		bool		mustwait;
+		LWLOCK_CAS_VARS;
 
-		/* Acquire mutex.  Time spent holding mutex should be short! */
-#ifdef LWLOCK_STATS
-		spin_delay_counts[lockid] += SpinLockAcquire(&lock->mutex);
-#else
-		SpinLockAcquire(&lock->mutex);
-#endif
-
-		/* If retrying, allow LWLockRelease to release waiters again */
-		if (retry)
-			lock->releaseOK = true;
-
-		/* If I can get the lock, do so quickly. */
-		if (mode == LW_EXCLUSIVE)
+		/*
+		 * Increment the exclusive- or shared-hold counter, if we can acquire
+		 * the lock without waiting. Acquire mutex if we have to wait.  Time
+		 * spent holding mutex should be short!
+		 */
+		LWLOCK_CAS_LOOP(lock);
 		{
-			if (lock->exclusive == 0 && lock->shared == 0)
+			/* If retrying, allow LWLockRelease to release waiters again */
+			if (retry)
+				newval.f.releaseOK = true;
+
+			/* If I can get the lock, do so quickly. */
+			if (mode == LW_EXCLUSIVE)
 			{
-				lock->exclusive++;
-				mustwait = false;
+				if (oldval.f.exclusive == 0 && oldval.f.shared == 0)
+				{
+					newval.f.exclusive++;
+					mustwait = false;
+				}
+				else
+					mustwait = true;
 			}
 			else
-				mustwait = true;
-		}
-		else
-		{
-			if (lock->exclusive == 0)
 			{
-				lock->shared++;
-				mustwait = false;
+				if (oldval.f.exclusive == 0)
+				{
+					newval.f.shared++;
+					mustwait = false;
+				}
+				else
+					mustwait = true;
 			}
-			else
-				mustwait = true;
 		}
+		LWLOCK_CAS_END(lock, mustwait);
 
 		if (!mustwait)
 			break;				/* got the lock */
@@ -452,7 +663,8 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
 		lock->tail = proc;
 
 		/* Can release the mutex now */
-		SpinLockRelease(&lock->mutex);
+		newval.f.waiters = true;
+		LWLOCK_CAS_RELEASE_MUTEX(lock);
 
 		/*
 		 * Wait until awakened.
@@ -492,7 +704,6 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
 	}
 
 	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
 
 	TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode);
 
@@ -518,6 +729,7 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
 {
 	volatile LWLock *lock = &(LWLockArray[lockid].lock);
 	bool		mustwait;
+	LWLOCK_CAS_VARS;
 
 	PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
 
@@ -532,33 +744,41 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
 	 */
 	HOLD_INTERRUPTS();
 
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&lock->mutex);
-
-	/* If I can get the lock, do so quickly. */
-	if (mode == LW_EXCLUSIVE)
+	/* Try to acquire the lock. */
+	LWLOCK_CAS_LOOP(lock);
 	{
-		if (lock->exclusive == 0 && lock->shared == 0)
+		/* If I can get the lock, do so quickly. */
+		if (mode == LW_EXCLUSIVE)
 		{
-			lock->exclusive++;
-			mustwait = false;
+			if (oldval.f.exclusive == 0 && oldval.f.shared == 0)
+			{
+				newval.f.exclusive++;
+				mustwait = false;
+			}
+			else
+				mustwait = true;
 		}
 		else
-			mustwait = true;
-	}
-	else
-	{
-		if (lock->exclusive == 0)
 		{
-			lock->shared++;
-			mustwait = false;
+			if (oldval.f.exclusive == 0)
+			{
+				newval.f.shared++;
+				mustwait = false;
+			}
+			else
+				mustwait = true;
+		}
+
+		/* if we would need to wait, give up now */
+		if (mustwait)
+		{
+			LWLOCK_CAS_ABORT(lock);
+			break;
 		}
-		else
-			mustwait = true;
 	}
+	LWLOCK_CAS_END(lock, false);
 
 	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
 
 	if (mustwait)
 	{
@@ -598,6 +818,7 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
 	PGPROC	   *proc = MyProc;
 	bool		mustwait;
 	int			extraWaits = 0;
+	LWLOCK_CAS_VARS;
 
 	PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock);
 
@@ -618,30 +839,36 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
 	 */
 	HOLD_INTERRUPTS();
 
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&lock->mutex);
-
-	/* If I can get the lock, do so quickly. */
-	if (mode == LW_EXCLUSIVE)
+	/*
+	 * Increment the exclusive- or shared-hold counter, if we can acquire the
+	 * lock without waiting. Acquire mutex if we have to wait.  Time spent
+	 * holding mutex should be short!
+	 */
+	LWLOCK_CAS_LOOP(lock);
 	{
-		if (lock->exclusive == 0 && lock->shared == 0)
+		/* If I can get the lock, do so quickly. */
+		if (mode == LW_EXCLUSIVE)
 		{
-			lock->exclusive++;
-			mustwait = false;
+			if (oldval.f.exclusive == 0 && oldval.f.shared == 0)
+			{
+				newval.f.exclusive++;
+				mustwait = false;
+			}
+			else
+				mustwait = true;
 		}
 		else
-			mustwait = true;
-	}
-	else
-	{
-		if (lock->exclusive == 0)
 		{
-			lock->shared++;
-			mustwait = false;
+			if (oldval.f.exclusive == 0)
+			{
+				newval.f.shared++;
+				mustwait = false;
+			}
+			else
+				mustwait = true;
 		}
-		else
-			mustwait = true;
 	}
+	LWLOCK_CAS_END(lock, mustwait);
 
 	if (mustwait)
 	{
@@ -665,7 +892,8 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
 		lock->tail = proc;
 
 		/* Can release the mutex now */
-		SpinLockRelease(&lock->mutex);
+		newval.f.waiters = true;
+		LWLOCK_CAS_RELEASE_MUTEX(lock);
 
 		/*
 		 * Wait until awakened.  Like in LWLockAcquire, be prepared for bogus
@@ -694,8 +922,7 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
 	}
 	else
 	{
-		/* We are done updating shared state of the lock itself. */
-		SpinLockRelease(&lock->mutex);
+		/* No need to update the lock */
 	}
 
 	/*
@@ -731,6 +958,8 @@ LWLockRelease(LWLockId lockid)
 	PGPROC	   *head;
 	PGPROC	   *proc;
 	int			i;
+	LWLOCK_CAS_VARS;
+	bool		releaseWaiters;
 
 	PRINT_LWDEBUG("LWLockRelease", lockid, lock);
 
@@ -749,80 +978,86 @@ LWLockRelease(LWLockId lockid)
 	for (; i < num_held_lwlocks; i++)
 		held_lwlocks[i] = held_lwlocks[i + 1];
 
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&lock->mutex);
-
-	/* Release my hold on lock */
-	if (lock->exclusive > 0)
-		lock->exclusive--;
-	else
+	LWLOCK_CAS_LOOP(lock);
 	{
-		Assert(lock->shared > 0);
-		lock->shared--;
+		/* Release my hold on lock */
+		if (oldval.f.exclusive > 0)
+			newval.f.exclusive--;
+		else
+		{
+			Assert(oldval.f.shared > 0);
+			newval.f.shared--;
+		}
+
+		/*
+		 * See if I need to awaken any waiters.  If I released a non-last
+		 * shared hold, there cannot be anything to do.  Also, do not awaken
+		 * any waiters if someone has already awakened waiters that haven't
+		 * yet acquired the lock.
+		 *
+		 * If we need to awaken anyone, need to acquire the mutex.
+		 */
+		releaseWaiters = (newval.f.waiters > 0 && newval.f.releaseOK &&
+						  newval.f.exclusive == 0 && newval.f.shared == 0);
 	}
+	LWLOCK_CAS_END(lock, releaseWaiters);
 
-	/*
-	 * See if I need to awaken any waiters.  If I released a non-last shared
-	 * hold, there cannot be anything to do.  Also, do not awaken any waiters
-	 * if someone has already awakened waiters that haven't yet acquired the
-	 * lock.
-	 */
-	head = lock->head;
-	if (head != NULL)
+	if (releaseWaiters)
 	{
-		if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
-		{
-			/*
-			 * Remove the to-be-awakened PGPROCs from the queue.
-			 */
-			bool		releaseOK = true;
-
-			proc = head;
-
-			/*
-			 * First wake up any backends that want to be woken up without
-			 * acquiring the lock.
-			 */
-			while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
-				proc = proc->lwWaitLink;
+		/*
+		 * Remove the to-be-awakened PGPROCs from the queue.
+		 */
+		bool		releaseOK = true;
+
+		head = lock->head;
+		Assert(head != NULL);
+
+		proc = head;
+
+		/*
+		 * First wake up any backends that want to be woken up without
+		 * acquiring the lock.
+		 */
+		while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
+			proc = proc->lwWaitLink;
 
-			/*
-			 * If the front waiter wants exclusive lock, awaken him only.
-			 * Otherwise awaken as many waiters as want shared access.
-			 */
-			if (proc->lwWaitMode != LW_EXCLUSIVE)
+		/*
+		 * If the front waiter wants exclusive lock, awaken him only.
+		 * Otherwise awaken as many waiters as want shared access.
+		 */
+		if (proc->lwWaitMode != LW_EXCLUSIVE)
+		{
+			while (proc->lwWaitLink != NULL &&
+				   proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
 			{
-				while (proc->lwWaitLink != NULL &&
-					   proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
-				{
-					if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
-						releaseOK = false;
-					proc = proc->lwWaitLink;
-				}
+				if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
+					releaseOK = false;
+				proc = proc->lwWaitLink;
 			}
-			/* proc is now the last PGPROC to be released */
-			lock->head = proc->lwWaitLink;
-			proc->lwWaitLink = NULL;
-
-			/*
-			 * Prevent additional wakeups until retryer gets to run. Backends
-			 * that are just waiting for the lock to become free don't retry
-			 * automatically.
-			 */
-			if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
-				releaseOK = false;
-
-			lock->releaseOK = releaseOK;
-		}
-		else
-		{
-			/* lock is still held, can't awaken anything */
-			head = NULL;
 		}
+		/* proc is now the last PGPROC to be released */
+		lock->head = proc->lwWaitLink;
+		proc->lwWaitLink = NULL;
+
+		/*
+		 * Prevent additional wakeups until retryer gets to run. Backends
+		 * that are just waiting for the lock to become free don't retry
+		 * automatically.
+		 */
+		if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
+			releaseOK = false;
+
+		newval.f.releaseOK = releaseOK;
+
+		/* release mutex */
+		if (lock->head == NULL)
+			newval.f.waiters = false;
+		LWLOCK_CAS_RELEASE_MUTEX(lock);
 	}
+	else
+		head = NULL;
 
 	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
 
 	TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid);
 
diff --git a/src/backend/storage/lmgr/s_lock.c b/src/backend/storage/lmgr/s_lock.c
index ed1f56a..bb74545 100644
--- a/src/backend/storage/lmgr/s_lock.c
+++ b/src/backend/storage/lmgr/s_lock.c
@@ -42,6 +42,9 @@ s_lock_stuck(volatile slock_t *lock, const char *file, int line)
 #endif
 }
 
+static int	spins = 0;
+static int	delays = 0;
+static int	cur_delay = 0;
 
 /*
  * s_lock(lock) - platform-independent portion of waiting for a spinlock.
@@ -93,42 +96,25 @@ s_lock(volatile slock_t *lock, const char *file, int line)
 #define MIN_DELAY_MSEC		1
 #define MAX_DELAY_MSEC		1000
 
-	int			spins = 0;
-	int			delays = 0;
-	int			cur_delay = 0;
+	spins = 0;
+	delays = 0;
+	cur_delay = 0;
 
 	while (TAS_SPIN(lock))
 	{
 		/* CPU-specific delay each time through the loop */
 		SPIN_DELAY();
 
-		/* Block the process every spins_per_delay tries */
 		if (++spins >= spins_per_delay)
-		{
-			if (++delays > NUM_DELAYS)
-				s_lock_stuck(lock, file, line);
-
-			if (cur_delay == 0) /* first time to delay? */
-				cur_delay = MIN_DELAY_MSEC;
-
-			pg_usleep(cur_delay * 1000L);
-
-#if defined(S_LOCK_TEST)
-			fprintf(stdout, "*");
-			fflush(stdout);
-#endif
-
-			/* increase delay by a random fraction between 1X and 2X */
-			cur_delay += (int) (cur_delay *
-					  ((double) random() / (double) MAX_RANDOM_VALUE) + 0.5);
-			/* wrap back to minimum delay when max is exceeded */
-			if (cur_delay > MAX_DELAY_MSEC)
-				cur_delay = MIN_DELAY_MSEC;
-
-			spins = 0;
-		}
+			s_lock_fail(file, line);
 	}
 
+	return s_lock_success(file, line);
+}
+
+int
+s_lock_success(const char *file, int line)
+{
 	/*
 	 * If we were able to acquire the lock without delaying, it's a good
 	 * indication we are in a multiprocessor.  If we had to delay, it's a sign
@@ -155,9 +141,46 @@ s_lock(volatile slock_t *lock, const char *file, int line)
 		if (spins_per_delay > MIN_SPINS_PER_DELAY)
 			spins_per_delay = Max(spins_per_delay - 1, MIN_SPINS_PER_DELAY);
 	}
+
+	spins = 0;
+	delays = 0;
+	cur_delay = 0;
+
 	return delays;
 }
 
+void
+s_lock_fail(const char *file, int line)
+{
+	/* CPU-specific delay each time through the loop */
+	SPIN_DELAY();
+
+	/* Block the process every spins_per_delay tries */
+	if (++spins >= spins_per_delay)
+	{
+		if (++delays > NUM_DELAYS)
+			s_lock_stuck(NULL, file, line); /* XXX: passing NULL here.. */
+
+		if (cur_delay == 0) /* first time to delay? */
+			cur_delay = MIN_DELAY_MSEC;
+
+		pg_usleep(cur_delay * 1000L);
+
+#if defined(S_LOCK_TEST)
+		fprintf(stdout, "*");
+		fflush(stdout);
+#endif
+
+		/* increase delay by a random fraction between 1X and 2X */
+		cur_delay += (int) (cur_delay *
+			  ((double) random() / (double) MAX_RANDOM_VALUE) + 0.5);
+		/* wrap back to minimum delay when max is exceeded */
+		if (cur_delay > MAX_DELAY_MSEC)
+			cur_delay = MIN_DELAY_MSEC;
+
+		spins = 0;
+	}
+}
 
 /*
  * Set local copy of spins_per_delay during backend startup.
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 8aabf3c..e31f652 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -173,8 +173,13 @@
 /* Define to 1 if your compiler understands __FUNCTION__. */
 #undef HAVE_FUNCNAME__FUNCTION
 
-/* Define to 1 if you have __sync_lock_test_and_set(int *) and friends. */
-#undef HAVE_GCC_INT_ATOMICS
+/* Define to 1 if you have __sync_val_compare_and_swap(int64 *, int64, int64).
+   */
+#undef HAVE_GCC_INT64_COMPARE_AND_SWAP
+
+/* Define to 1 if you have __sync_lock_test_and_set(int *) and
+   __sync_lock_release(int *). */
+#undef HAVE_GCC_INT_TEST_AND_SET
 
 /* Define to 1 if you have the `getaddrinfo' function. */
 #undef HAVE_GETADDRINFO
diff --git a/src/include/storage/compare_and_swap.h b/src/include/storage/compare_and_swap.h
new file mode 100644
index 0000000..7ff5c0f
--- /dev/null
+++ b/src/include/storage/compare_and_swap.h
@@ -0,0 +1,53 @@
+/*-------------------------------------------------------------------------
+ *
+ * compare_and_swap.h
+ *	  Atomic compare-and-swap operation.
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/compare_and_swap.h
+ *
+ * The API is:
+ *
+ * int64 pg_compare_and_swap_64(int64 *p, int64 oldval, int64 newval)
+ *		Compares *p with oldval, and if the values are equal, replaces *p
+ *		with newval. In either case, the old value in *p is returned.
+ *
+ *
+ * Not all platforms have an atomic compare-and-swap operation. Callers
+ * must use #ifdef HAS_COMPARE_AND_SWAP_64, and provide a fallback
+ * implementation e.g using a spinlock if it is not defined.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef COMPARE_AND_SWAP_H
+#define COMPARE_AND_SWAP_H
+
+/*
+ * On Windows, use InterlockedCompareExchange64. (It compiles into a
+ * "lock cmpxchg8b" instruction.)
+ */
+#if !defined(HAS_COMPARE_AND_SWAP_64) && defined(WIN32_ONLY_COMPILER)
+
+#define HAS_COMPARE_AND_SWAP_64
+
+#define pg_compare_and_swap_64(p, oldval, newval) InterlockedCompareExchange64(p, newval, oldval)
+
+#endif
+
+/*
+ * Use gcc builtins, if available.
+ *
+ * We trust that if the compiler provides a built-in compare-and-swap, it
+ * actually works and performs.
+ */
+#if !defined(HAS_COMPARE_AND_SWAP_64) && defined(HAVE_GCC_INT64_COMPARE_AND_SWAP)
+
+#define HAS_COMPARE_AND_SWAP_64
+
+#define pg_compare_and_swap_64(p, oldval, newval) __sync_val_compare_and_swap(p, oldval, newval)
+
+#endif
+
+#endif   /* COMPARE_AND_SWAP_H */
diff --git a/src/include/storage/s_lock.h b/src/include/storage/s_lock.h
index ce45ffe..f29b757 100644
--- a/src/include/storage/s_lock.h
+++ b/src/include/storage/s_lock.h
@@ -1014,6 +1014,10 @@ extern int	tas(volatile slock_t *lock);		/* in port/.../tas.s, or
  * Platform-independent out-of-line support routines
  */
 extern int s_lock(volatile slock_t *lock, const char *file, int line);
+extern int s_lock_success(const char *file, int line);
+extern void s_lock_fail(const char *file, int line);
+#define spin_success() s_lock_success(__FILE__, __LINE__)
+#define spin_fail() s_lock_fail(__FILE__, __LINE__)
 
 /* Support for dynamic adjustment of spins_per_delay */
 #define DEFAULT_SPINS_PER_DELAY  100
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to