On 12/10/2014 08:35 PM, Robert Haas wrote:
On Wed, Dec 10, 2014 at 12:57 PM, Heikki Linnakangas
<hlinnakan...@vmware.com> wrote:
Clever. Could we use that method in ResourceOwnerReleaseInternal and
ResourceOwnerDelete, too? Might be best to have a
ResourceOwnerWalk(resowner, callback) function for walking all resource
owners in a tree, instead of one for walking the snapshots in them.

Sure.  It would be a little more complicated there since you want to
stop when you get back to the starting point, but not too bad.  But is
that solving any actual problem?

I thought that a transaction commit or abort in some special circumstances might call ResourceOwnerReleaseInternal on the top level, but I can't make it happen. The machinery in xact.c is too clever, and always releases the resource owners from the bottom up. And I can't find a way to create a deep resource owner tree in any other way. So I guess it's fine as it is.

MemoryContextCheck and MemoryContextPrint also recurse, however. MemoryContextCheck is only enabled with --enable-cassert, but MemoryContextPrint is called when you run out of memory. That could turn a plain "out of memory" error into a stack overrun, triggering a server crash and restart.

It occurs to me that the pairing heap I just posted in another thread
(http://www.postgresql.org/message-id/54886bb8.9040...@vmware.com) would be
a good fit for this. It's extremely cheap to insert to and to find the
minimum item (O(1)), and the delete operation is O(log N), amortized. I
didn't implement a delete operation, for removing a particular node, I only
did delete-min, but it's basically the same code. Using a pairing heap for
this might be overkill, but if we have that implementation anyway, the code
in snapmgr.c to use it would be very simple, so I see little reason not to.
It might even be simpler than your patch, because you wouldn't need to have
the heuristics on whether to attempt updating the xmin; it would be cheap
enough to always try it.

Care to code it up?

Here you are.

- Heikki
diff --git a/src/backend/lib/Makefile b/src/backend/lib/Makefile
index 327a1bc..b24ece6 100644
--- a/src/backend/lib/Makefile
+++ b/src/backend/lib/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/lib
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = ilist.o binaryheap.o stringinfo.o
+OBJS = ilist.o binaryheap.o pairingheap.o stringinfo.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/lib/pairingheap.c b/src/backend/lib/pairingheap.c
new file mode 100644
index 0000000..06cd117
--- /dev/null
+++ b/src/backend/lib/pairingheap.c
@@ -0,0 +1,237 @@
+/*-------------------------------------------------------------------------
+ *
+ * pairingheap.c
+ *	  A Pairing Heap implementation
+ *
+ * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/lib/pairingheap.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <math.h>
+
+#include "lib/pairingheap.h"
+
+static pairingheap_node * merge(pairingheap *heap, pairingheap_node *a, pairingheap_node *b)
+static pairingheap_node *merge_children(pairingheap *heap, pairingheap_node *children);
+
+/*
+ * pairingheap_allocate
+ *
+ * Returns a pointer to a newly-allocated heap, with the heap property
+ * defined by the given comparator function, which will be invoked with the
+ * additional argument specified by 'arg'.
+ */
+pairingheap *
+pairingheap_allocate(pairingheap_comparator compare, void *arg)
+{
+	pairingheap *heap;
+
+	heap = (pairingheap *) palloc(sizeof(pairingheap));
+	heap->ph_compare = compare;
+	heap->ph_arg = arg;
+
+	heap->ph_root = NULL;
+
+	return heap;
+}
+
+/*
+ * pairingheap_free
+ *
+ * Releases memory used by the given pairingheap.
+ *
+ * Note: The items in the heap are not released!
+ */
+void
+pairingheap_free(pairingheap *heap)
+{
+	pfree(heap);
+}
+
+
+/* A helper function to merge two subheaps into one. */
+static pairingheap_node *
+merge(pairingheap *heap, pairingheap_node *a, pairingheap_node *b)
+{
+	if (a == NULL)
+		return b;
+	if (b == NULL)
+		return a;
+
+	/* Put the larger of the items as a child of the smaller one */
+	if (heap->ph_compare(a, b, heap->ph_arg) < 0)
+	{
+		pairingheap_node *tmp;
+
+		tmp = a;
+		a = b;
+		b = tmp;
+	}
+
+	if (a->first_child)
+		a->first_child->prev_or_parent = b;
+	b->prev_or_parent = a;
+	b->next_sibling = a->first_child;
+	a->first_child = b;
+	return a;
+}
+
+/*
+ * pairingheap_add
+ *
+ * Adds the given datum to the heap in O(1) time.
+ */
+void
+pairingheap_add(pairingheap *heap, pairingheap_node *d)
+{
+	d->first_child = NULL;
+
+	/* Link the new item as a new tree */
+	heap->ph_root = merge(heap, heap->ph_root, d);
+}
+
+/*
+ * pairingheap_first
+ *
+ * Returns a pointer to the first (root, topmost) node in the heap
+ * without modifying the heap. The caller must ensure that this
+ * routine is not used on an empty heap. Always O(1).
+ */
+pairingheap_node *
+pairingheap_first(pairingheap *heap)
+{
+	Assert(!pairingheap_empty(heap));
+	return heap->ph_root;
+}
+
+/*
+ * pairingheap_remove_first
+ *
+ * Removes the first (root, topmost) node in the heap and returns a
+ * pointer to it after rebalancing the heap. The caller must ensure
+ * that this routine is not used on an empty heap. O(log n) amortized.
+ */
+pairingheap_node *
+pairingheap_remove_first(pairingheap *heap)
+{
+	pairingheap_node *result;
+	pairingheap_node *children;
+
+	Assert(!pairingheap_empty(heap));
+
+	/* Remove the smallest root. */
+	result = heap->ph_root;
+	children = result->first_child;
+
+	heap->ph_root = merge_children(heap, children);
+
+	return result;
+}
+
+/*
+ * Merge a list of subheaps into a single heap.
+ *
+ * This implements the basic two-pass merging strategy, first forming
+ * pairs from left to right, and then merging the pairs.
+ */
+static pairingheap_node *
+merge_children(pairingheap *heap, pairingheap_node *children)
+{
+	pairingheap_node *item, *next;
+	pairingheap_node *pairs;
+	pairingheap_node *newroot;
+
+	if (children == NULL || children->next_sibling == NULL)
+		return children;
+
+	/* Walk the remaining subheaps from left to right, merging in pairs */
+	next = children;
+	pairs = NULL;
+	for (;;)
+	{
+		item = next;
+		if (item == NULL)
+			break;
+		if (item->next_sibling == NULL)
+		{
+			/* last odd item at the end of list */
+			item->next_sibling = pairs;
+			pairs = item;
+			break;
+		}
+		else
+		{
+			next = item->next_sibling->next_sibling;
+
+			item = merge(heap, item, item->next_sibling);
+			item->next_sibling = pairs;
+			pairs = item;
+		}
+	}
+
+	/*
+	 * Form a single (sub)heap from the pairs.
+	 */
+	newroot = pairs;
+	next = pairs->next_sibling;
+	while (next)
+	{
+		item = next;
+		next = item->next_sibling;
+
+		newroot = merge(heap, newroot, item);
+	}
+
+	return newroot;
+}
+
+/*
+ * Remove 'item' from the heap. O(log n) amortized.
+ */
+void
+pairingheap_remove(pairingheap *heap, pairingheap_node *item)
+{
+	pairingheap_node *children;
+	pairingheap_node *replacement;
+	pairingheap_node *next_sibling;
+	pairingheap_node **prev_ptr;
+
+	if (item == heap->ph_root)
+	{
+		(void) pairingheap_remove_first(heap);
+		return;
+	}
+
+	children = item->first_child;
+	next_sibling = item->next_sibling;
+
+	if (item->prev_or_parent->first_child == item)
+		prev_ptr = &item->prev_or_parent->first_child;
+	else
+		prev_ptr = &item->prev_or_parent->next_sibling;
+	Assert(*prev_ptr == item);
+
+	/* Form a new heap of the children */
+	replacement = merge_children(heap, children);
+
+	if (replacement == NULL)
+	{
+		*prev_ptr = next_sibling;
+		if (next_sibling)
+			next_sibling->prev_or_parent = item->prev_or_parent;
+	}
+	else
+	{
+		replacement->prev_or_parent = item->prev_or_parent;
+		replacement->next_sibling = item->next_sibling;
+		*prev_ptr = replacement;
+		if (next_sibling)
+			next_sibling->prev_or_parent = replacement;
+	}
+}
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index d601efe..ebe9013 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -46,6 +46,7 @@
 
 #include "access/transam.h"
 #include "access/xact.h"
+#include "lib/pairingheap.h"
 #include "miscadmin.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
@@ -58,6 +59,12 @@
 #include "utils/syscache.h"
 #include "utils/tqual.h"
 
+/* Prototypes for local functions */
+static Snapshot CopySnapshot(Snapshot snapshot);
+static void FreeSnapshot(Snapshot snapshot);
+static void SnapshotResetXmin(void);
+static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg);
+
 
 /*
  * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
@@ -122,14 +129,8 @@ typedef struct ActiveSnapshotElt
 /* Top of the stack of active snapshots */
 static ActiveSnapshotElt *ActiveSnapshot = NULL;
 
-/*
- * How many snapshots is resowner.c tracking for us?
- *
- * Note: for now, a simple counter is enough.  However, if we ever want to be
- * smarter about advancing our MyPgXact->xmin we will need to be more
- * sophisticated about this, perhaps keeping our own list of snapshots.
- */
-static int	RegisteredSnapshots = 0;
+/* Snapshots registered with resowners. Ordered in a heap by xmin. */
+static pairingheap RegisteredSnapshots = { &xmin_cmp, NULL, NULL };
 
 /* first GetTransactionSnapshot call in a transaction? */
 bool		FirstSnapshotSet = false;
@@ -151,11 +152,6 @@ static Snapshot FirstXactSnapshot = NULL;
 static List *exportedSnapshots = NIL;
 
 
-static Snapshot CopySnapshot(Snapshot snapshot);
-static void FreeSnapshot(Snapshot snapshot);
-static void SnapshotResetXmin(void);
-
-
 /*
  * GetTransactionSnapshot
  *		Get the appropriate snapshot for a new query in a transaction.
@@ -183,7 +179,7 @@ GetTransactionSnapshot(void)
 	/* First call in transaction? */
 	if (!FirstSnapshotSet)
 	{
-		Assert(RegisteredSnapshots == 0);
+		Assert(pairingheap_is_empty(&RegisteredSnapshots));
 		Assert(FirstXactSnapshot == NULL);
 
 		/*
@@ -205,7 +201,7 @@ GetTransactionSnapshot(void)
 			FirstXactSnapshot = CurrentSnapshot;
 			/* Mark it as "registered" in FirstXactSnapshot */
 			FirstXactSnapshot->regd_count++;
-			RegisteredSnapshots++;
+			pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
 		}
 		else
 			CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
@@ -350,7 +346,7 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
 	/* Caller should have checked this already */
 	Assert(!FirstSnapshotSet);
 
-	Assert(RegisteredSnapshots == 0);
+	Assert(pairingheap_is_empty(RegisteredSnapshots));
 	Assert(FirstXactSnapshot == NULL);
 	Assert(!HistoricSnapshotActive());
 
@@ -413,7 +409,7 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
 		FirstXactSnapshot = CurrentSnapshot;
 		/* Mark it as "registered" in FirstXactSnapshot */
 		FirstXactSnapshot->regd_count++;
-		RegisteredSnapshots++;
+		pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
 	}
 
 	FirstSnapshotSet = true;
@@ -639,7 +635,8 @@ RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
 	snap->regd_count++;
 	ResourceOwnerRememberSnapshot(owner, snap);
 
-	RegisteredSnapshots++;
+	if (snap->regd_count == 1)
+		pairingheap_add(&RegisteredSnapshots, &snap->ph_node);
 
 	return snap;
 }
@@ -671,11 +668,16 @@ UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
 		return;
 
 	Assert(snapshot->regd_count > 0);
-	Assert(RegisteredSnapshots > 0);
+	Assert(!pairingheap_is_empty(&RegisteredSnapshots));
 
 	ResourceOwnerForgetSnapshot(owner, snapshot);
-	RegisteredSnapshots--;
-	if (--snapshot->regd_count == 0 && snapshot->active_count == 0)
+
+	snapshot->regd_count--;
+
+	if (snapshot->regd_count == 0)
+		pairingheap_remove(&RegisteredSnapshots, &snapshot->ph_node);
+
+	if (snapshot->regd_count == 0 && snapshot->active_count == 0)
 	{
 		FreeSnapshot(snapshot);
 		SnapshotResetXmin();
@@ -683,17 +685,54 @@ UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
 }
 
 /*
+ * Comparison function for RegisteredSnapshots heap. Snapshots are ordered
+ * by xmin, so that the snapshot with smallest xmin is at the top.
+ */
+static int
+xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
+{
+	Snapshot asnap = pairingheap_container(SnapshotData, ph_node, (pairingheap_node *) a);
+	Snapshot bsnap = pairingheap_container(SnapshotData, ph_node, (pairingheap_node *) b);
+
+	if (TransactionIdPrecedes(asnap->xmin, bsnap->xmin))
+		return 1;
+	else if (TransactionIdFollows(asnap->xmin, bsnap->xmin))
+		return -1;
+	else
+		return 0;
+}
+
+/*
  * SnapshotResetXmin
  *
  * If there are no more snapshots, we can reset our PGXACT->xmin to InvalidXid.
  * Note we can do this without locking because we assume that storing an Xid
  * is atomic.
+ *
+ * Even if there are some remaining snapshots, we may be able to advance our
+ * PGXACT->xmin to some degree.  This typically happens when a portal is
+ * dropped.  For efficiency, we only consider recomputing PGXACT->xmin when
+ * the active snapshot stack is empty.
  */
 static void
 SnapshotResetXmin(void)
 {
-	if (RegisteredSnapshots == 0 && ActiveSnapshot == NULL)
+	Snapshot minSnapshot;
+
+	if (ActiveSnapshot != NULL)
+		return;
+
+	if (pairingheap_is_empty(&RegisteredSnapshots))
+	{
 		MyPgXact->xmin = InvalidTransactionId;
+		return;
+	}
+
+	minSnapshot = pairingheap_container(SnapshotData, ph_node,
+									pairingheap_first(&RegisteredSnapshots));
+
+	if (TransactionIdPrecedes(MyPgXact->xmin, minSnapshot->xmin))
+		MyPgXact->xmin = minSnapshot->xmin;
 }
 
 /*
@@ -770,7 +809,7 @@ AtEOXact_Snapshot(bool isCommit)
 	{
 		Assert(FirstXactSnapshot->regd_count > 0);
 		Assert(RegisteredSnapshots > 0);
-		RegisteredSnapshots--;
+		pairingheap_remove(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
 	}
 	FirstXactSnapshot = NULL;
 
@@ -782,6 +821,7 @@ AtEOXact_Snapshot(bool isCommit)
 		TransactionId myxid = GetTopTransactionId();
 		int			i;
 		char		buf[MAXPGPATH];
+		ListCell   *lc;
 
 		/*
 		 * Get rid of the files.  Unlink failure is only a WARNING because (1)
@@ -798,14 +838,13 @@ AtEOXact_Snapshot(bool isCommit)
 		/*
 		 * As with the FirstXactSnapshot, we needn't spend any effort on
 		 * cleaning up the per-snapshot data structures, but we do need to
-		 * adjust the RegisteredSnapshots count to prevent a warning below.
-		 *
-		 * Note: you might be thinking "why do we have the exportedSnapshots
-		 * list at all?  All we need is a counter!".  You're right, but we do
-		 * it this way in case we ever feel like improving xmin management.
+		 * unlink them from RegisteredSnapshots to prevent a warning below.
 		 */
-		Assert(RegisteredSnapshots >= list_length(exportedSnapshots));
-		RegisteredSnapshots -= list_length(exportedSnapshots);
+		foreach(lc, exportedSnapshots)
+		{
+			Snapshot snap = (Snapshot) lfirst(lc);
+			pairingheap_remove(&RegisteredSnapshots, &snap->ph_node);
+		}
 
 		exportedSnapshots = NIL;
 	}
@@ -815,9 +854,8 @@ AtEOXact_Snapshot(bool isCommit)
 	{
 		ActiveSnapshotElt *active;
 
-		if (RegisteredSnapshots != 0)
-			elog(WARNING, "%d registered snapshots seem to remain after cleanup",
-				 RegisteredSnapshots);
+		if (!pairingheap_is_empty(&RegisteredSnapshots))
+			elog(WARNING, "registered snapshots seem to remain after cleanup");
 
 		/* complain about unpopped active snapshots */
 		for (active = ActiveSnapshot; active != NULL; active = active->as_next)
@@ -829,7 +867,7 @@ AtEOXact_Snapshot(bool isCommit)
 	 * it'll go away with TopTransactionContext.
 	 */
 	ActiveSnapshot = NULL;
-	RegisteredSnapshots = 0;
+	pairingheap_reset(&RegisteredSnapshots);
 
 	CurrentSnapshot = NULL;
 	SecondarySnapshot = NULL;
@@ -900,8 +938,7 @@ ExportSnapshot(Snapshot snapshot)
 	 * Copy the snapshot into TopTransactionContext, add it to the
 	 * exportedSnapshots list, and mark it pseudo-registered.  We do this to
 	 * ensure that the snapshot's xmin is honored for the rest of the
-	 * transaction.  (Right now, because SnapshotResetXmin is so stupid, this
-	 * is overkill; but later we might make that routine smarter.)
+	 * transaction.
 	 */
 	snapshot = CopySnapshot(snapshot);
 
@@ -910,7 +947,7 @@ ExportSnapshot(Snapshot snapshot)
 	MemoryContextSwitchTo(oldcxt);
 
 	snapshot->regd_count++;
-	RegisteredSnapshots++;
+	pairingheap_add(&RegisteredSnapshots, &snapshot->ph_node);
 
 	/*
 	 * Fill buf with a text serialization of the snapshot, plus identification
@@ -1303,7 +1340,8 @@ DeleteAllExportedSnapshotFiles(void)
 bool
 ThereAreNoPriorRegisteredSnapshots(void)
 {
-	if (RegisteredSnapshots <= 1)
+	if (pairingheap_is_empty(&RegisteredSnapshots) ||
+		pairingheap_is_singular(&RegisteredSnapshots))
 		return true;
 
 	return false;
diff --git a/src/include/lib/pairingheap.h b/src/include/lib/pairingheap.h
new file mode 100644
index 0000000..e78196d
--- /dev/null
+++ b/src/include/lib/pairingheap.h
@@ -0,0 +1,67 @@
+/*
+ * pairingheap.h
+ *
+ * A Pairing Heap implementation
+ *
+ * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group
+ *
+ * src/include/lib/pairingheap.h
+ */
+
+#ifndef PAIRINGHEAP_H
+#define PAIRINGHEAP_H
+
+/*
+ * This represents an element stored in the heap. Embed this in a larger
+ * struct containing the actual data you're storing.
+ */
+typedef struct pairingheap_node
+{
+	struct pairingheap_node *first_child;
+	struct pairingheap_node *next_sibling;
+	struct pairingheap_node *prev_or_parent;
+} pairingheap_node;
+
+/*
+ * Return the containing struct of 'type' where 'membername' is the
+ * pairingheap_node pointed at by 'ptr'.
+ *
+ * This is used to convert a pairingheap_node * back to its containing struct.
+ */
+#define pairingheap_container(type, membername, ptr)								\
+	(AssertVariableIsOfTypeMacro(ptr, pairingheap_node *),					\
+	 AssertVariableIsOfTypeMacro(((type *) NULL)->membername, pairingheap_node),	\
+	 ((type *) ((char *) (ptr) - offsetof(type, membername))))
+
+/*
+ * For a max-heap, the comparator must return <0 iff a < b, 0 iff a == b,
+ * and >0 iff a > b.  For a min-heap, the conditions are reversed.
+ */
+typedef int (*pairingheap_comparator) (const pairingheap_node *a, const pairingheap_node *b, void *arg);
+
+/*
+ * A pairing heap.
+ */
+typedef struct pairingheap
+{
+	pairingheap_comparator ph_compare;	/* comparison function */
+	void	   *ph_arg;					/* opaque argument to ph_compare */
+	pairingheap_node *ph_root;			/* current root of the heap */
+} pairingheap;
+
+extern pairingheap *pairingheap_allocate(pairingheap_comparator compare,
+					void *arg);
+extern void pairingheap_free(pairingheap *heap);
+extern void pairingheap_add(pairingheap *heap, pairingheap_node *d);
+extern pairingheap_node *pairingheap_first(pairingheap *heap);
+extern pairingheap_node *pairingheap_remove_first(pairingheap *heap);
+extern void pairingheap_remove(pairingheap *heap, pairingheap_node *d);
+
+#define pairingheap_reset(h)			((h)->ph_root = NULL)
+
+#define pairingheap_is_empty(h)			((h)->ph_root == NULL)
+
+/* Returns true if the heap contains exactly one item */
+#define pairingheap_is_singular(h)		((h)->ph_root && (h)->ph_root->first_child == NULL)
+
+#endif   /* PAIRINGHEAP_H */
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 53e474f..8b78a3a 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -14,6 +14,7 @@
 #define SNAPSHOT_H
 
 #include "access/htup.h"
+#include "lib/pairingheap.h"
 #include "storage/buf.h"
 
 
@@ -91,7 +92,9 @@ typedef struct SnapshotData
 	 */
 	CommandId	curcid;			/* in my xact, CID < curcid are visible */
 	uint32		active_count;	/* refcount on ActiveSnapshot stack */
-	uint32		regd_count;		/* refcount on RegisteredSnapshotList */
+	uint32		regd_count;		/* refcount on RegisteredSnapshots */
+
+	pairingheap_node ph_node;	/* link in the RegisteredSnapshots heap */
 } SnapshotData;
 
 /*
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to