Hello hackers,

I'd like to present a series of patches which is a continuation of
Vlad Sterzhanov's work on kNN for SP-GiST that was done for GSOC'14.

Original thread: "KNN searches support for SP-GiST [GSOC'14]"
https://www.postgresql.org/message-id/cak2ajc0-jhrb3ujozl+arbchotvwbejvprn-jofenn0va6+...@mail.gmail.com


I've done the following:
  * rebased onto HEAD
  * fixed several bugs
  * fixed indentation and unnecessary whitespace changes
  * implemented logic for order-by in spgvalidate() and spgproperty()
  * used pairing_heap instead of RBTree for the priority queue
    (as it was done earlier in GiST)
  * used existing traversalValue* fields instead of the newly added suppValue* 
fields
  * added caching of support functions infos
  * added recheck support for distances
  * refactored code
     - simplified some places
     - some functions were moved from spgproc.c to spgscan.c
       (now spgproc.c contains only one public function spg_point_distance()
        that can be moved somewhere and this file can be removed then)
     - extracted functions spgTestLeafTuple(), spgMakeInnerItem()
  * added new opclasses for circles and polygons with order-by-distance support
   (it was originally intended for kNN recheck testing)
  * improved tests for point_ops


Below is a very brief description of the patches:
1. Extracted two subroutines from GiST code (patch is the same as in "kNN for 
btree").
2. Exported two box functions that are used in patch 5.
3. Extracted subroutine from GiST code that is used in patch 4.
4. Main patch: added kNN support to SP-GiST.
5. Added ordering operators to kd_point_ops and quad_point_ops.
6. Added new SP-GiST support function spg_compress (gist_compress analogue)
    for compressed (lossy) representation of types in SP-GiST, used in patch 7.
7. Added new SP-GiST quad-tree opclasses for circles and polygons
    (reused existing quad-tree box_ops).

If you want to see the step-by-step sequence of changes applied to the original 
patch,
you can see it on my github: 
https://github.com/glukhovn/postgres/commits/knn_spgist

Please note that the tests for circle_ops and poly_ops will not work until
the а bug found in SP-GiST box_ops will not be fixed
(see 
https://www.postgresql.org/message-id/9ea5b157-478c-8874-bc9b-050076b7d042%40postgrespro.ru).

--
Nikita Glukhov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company


diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index f92baed..1f6b671 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -23,6 +23,7 @@
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
 #include "utils/syscache.h"
+#include "utils/lsyscache.h"
 
 
 /*
@@ -851,12 +852,6 @@ gistproperty(Oid index_oid, int attno,
 			 IndexAMProperty prop, const char *propname,
 			 bool *res, bool *isnull)
 {
-	HeapTuple	tuple;
-	Form_pg_index rd_index PG_USED_FOR_ASSERTS_ONLY;
-	Form_pg_opclass rd_opclass;
-	Datum		datum;
-	bool		disnull;
-	oidvector  *indclass;
 	Oid			opclass,
 				opfamily,
 				opcintype;
@@ -890,49 +885,28 @@ gistproperty(Oid index_oid, int attno,
 	}
 
 	/* First we need to know the column's opclass. */
-
-	tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid));
-	if (!HeapTupleIsValid(tuple))
+	opclass = get_index_column_opclass(index_oid, attno);
+	if (!OidIsValid(opclass))
 	{
 		*isnull = true;
 		return true;
 	}
-	rd_index = (Form_pg_index) GETSTRUCT(tuple);
-
-	/* caller is supposed to guarantee this */
-	Assert(attno > 0 && attno <= rd_index->indnatts);
-
-	datum = SysCacheGetAttr(INDEXRELID, tuple,
-							Anum_pg_index_indclass, &disnull);
-	Assert(!disnull);
-
-	indclass = ((oidvector *) DatumGetPointer(datum));
-	opclass = indclass->values[attno - 1];
-
-	ReleaseSysCache(tuple);
 
 	/* Now look up the opclass family and input datatype. */
-
-	tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
-	if (!HeapTupleIsValid(tuple))
+	if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
 	{
 		*isnull = true;
 		return true;
 	}
-	rd_opclass = (Form_pg_opclass) GETSTRUCT(tuple);
-
-	opfamily = rd_opclass->opcfamily;
-	opcintype = rd_opclass->opcintype;
-
-	ReleaseSysCache(tuple);
 
 	/* And now we can check whether the function is provided. */
-
 	*res = SearchSysCacheExists4(AMPROCNUM,
 								 ObjectIdGetDatum(opfamily),
 								 ObjectIdGetDatum(opcintype),
 								 ObjectIdGetDatum(opcintype),
 								 Int16GetDatum(procno));
+	isnull = false;
+
 	return true;
 }
 
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index aff88a5..26c81c9 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -1050,6 +1050,32 @@ get_opclass_input_type(Oid opclass)
 	return result;
 }
 
+/*
+ * get_opclass_family_and_input_type
+ *
+ *		Returns the OID of the operator family the opclass belongs to,
+ *				the OID of the datatype the opclass indexes
+ */
+bool
+get_opclass_opfamily_and_input_type(Oid opclass, Oid *opfamily, Oid *opcintype)
+{
+	HeapTuple	tp;
+	Form_pg_opclass cla_tup;
+
+	tp = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
+	if (!HeapTupleIsValid(tp))
+		return false;
+
+	cla_tup = (Form_pg_opclass) GETSTRUCT(tp);
+
+	*opfamily = cla_tup->opcfamily;
+	*opcintype = cla_tup->opcintype;
+
+	ReleaseSysCache(tp);
+
+	return true;
+}
+
 /*				---------- OPERATOR CACHE ----------					 */
 
 /*
@@ -3061,3 +3087,45 @@ get_range_subtype(Oid rangeOid)
 	else
 		return InvalidOid;
 }
+
+/*				---------- PG_INDEX CACHE ----------				 */
+
+/*
+ * get_index_column_opclass
+ *
+ *		Given the index OID and column number,
+ *		return opclass of the index column
+ *			or InvalidOid if the index was not found.
+ */
+Oid
+get_index_column_opclass(Oid index_oid, int attno)
+{
+	HeapTuple	tuple;
+	Form_pg_index rd_index PG_USED_FOR_ASSERTS_ONLY;
+	Datum		datum;
+	bool		isnull;
+	oidvector  *indclass;
+	Oid			opclass;
+
+	/* First we need to know the column's opclass. */
+
+	tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid));
+	if (!HeapTupleIsValid(tuple))
+		return InvalidOid;
+
+	rd_index = (Form_pg_index) GETSTRUCT(tuple);
+
+	/* caller is supposed to guarantee this */
+	Assert(attno > 0 && attno <= rd_index->indnatts);
+
+	datum = SysCacheGetAttr(INDEXRELID, tuple,
+							Anum_pg_index_indclass, &isnull);
+	Assert(!isnull);
+
+	indclass = ((oidvector *) DatumGetPointer(datum));
+	opclass = indclass->values[attno - 1];
+
+	ReleaseSysCache(tuple);
+
+	return opclass;
+}
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index b6d1fca..618c4e8 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -73,6 +73,8 @@ extern char *get_constraint_name(Oid conoid);
 extern char *get_language_name(Oid langoid, bool missing_ok);
 extern Oid	get_opclass_family(Oid opclass);
 extern Oid	get_opclass_input_type(Oid opclass);
+extern bool get_opclass_opfamily_and_input_type(Oid opclass,
+									Oid *opfamily, Oid *opcintype);
 extern RegProcedure get_opcode(Oid opno);
 extern char *get_opname(Oid opno);
 extern Oid	get_op_rettype(Oid opno);
@@ -159,6 +161,7 @@ extern void free_attstatsslot(Oid atttype,
 extern char *get_namespace_name(Oid nspid);
 extern char *get_namespace_name_or_temp(Oid nspid);
 extern Oid	get_range_subtype(Oid rangeOid);
+extern Oid	get_index_column_opclass(Oid index_oid, int attno);
 
 #define type_is_array(typid)  (get_element_type(typid) != InvalidOid)
 /* type_is_array_domain accepts both plain arrays and domains over arrays */
diff --git a/src/backend/utils/adt/geo_ops.c b/src/backend/utils/adt/geo_ops.c
index 655b81c..e1814ab 100644
--- a/src/backend/utils/adt/geo_ops.c
+++ b/src/backend/utils/adt/geo_ops.c
@@ -41,8 +41,6 @@ enum path_delim
 static int	point_inside(Point *p, int npts, Point *plist);
 static int	lseg_crossing(double x, double y, double px, double py);
 static BOX *box_construct(double x1, double x2, double y1, double y2);
-static BOX *box_copy(BOX *box);
-static BOX *box_fill(BOX *result, double x1, double x2, double y1, double y2);
 static bool box_ov(BOX *box1, BOX *box2);
 static double box_ht(BOX *box);
 static double box_wd(BOX *box);
@@ -452,7 +450,7 @@ box_construct(double x1, double x2, double y1, double y2)
 
 /*		box_fill		-		fill in a given box struct
  */
-static BOX *
+BOX *
 box_fill(BOX *result, double x1, double x2, double y1, double y2)
 {
 	if (x1 > x2)
@@ -482,8 +480,8 @@ box_fill(BOX *result, double x1, double x2, double y1, double y2)
 
 /*		box_copy		-		copy a box
  */
-static BOX *
-box_copy(BOX *box)
+BOX *
+box_copy(const BOX *box)
 {
 	BOX		   *result = (BOX *) palloc(sizeof(BOX));
 
diff --git a/src/include/utils/geo_decls.h b/src/include/utils/geo_decls.h
index 9b530db..a514af2 100644
--- a/src/include/utils/geo_decls.h
+++ b/src/include/utils/geo_decls.h
@@ -183,4 +183,8 @@ extern double point_dt(Point *pt1, Point *pt2);
 extern double point_sl(Point *pt1, Point *pt2);
 extern double pg_hypot(double x, double y);
 
+/* private box routines */
+extern BOX *box_fill(BOX *box, double xlo, double xhi, double ylo, double yhi);
+extern BOX *box_copy(const BOX *box);
+
 #endif   /* GEO_DECLS_H */
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index eea366b..d1ba2b6 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -14,9 +14,9 @@
  */
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/gist_private.h"
 #include "access/relscan.h"
-#include "catalog/pg_type.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "lib/pairingheap.h"
@@ -538,7 +538,6 @@ getNextNearest(IndexScanDesc scan)
 {
 	GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
 	bool		res = false;
-	int			i;
 
 	if (scan->xs_itup)
 	{
@@ -559,45 +558,10 @@ getNextNearest(IndexScanDesc scan)
 			/* found a heap item at currently minimal distance */
 			scan->xs_ctup.t_self = item->data.heap.heapPtr;
 			scan->xs_recheck = item->data.heap.recheck;
-			scan->xs_recheckorderby = item->data.heap.recheckDistances;
-			for (i = 0; i < scan->numberOfOrderBys; i++)
-			{
-				if (so->orderByTypes[i] == FLOAT8OID)
-				{
-#ifndef USE_FLOAT8_BYVAL
-					/* must free any old value to avoid memory leakage */
-					if (!scan->xs_orderbynulls[i])
-						pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
-#endif
-					scan->xs_orderbyvals[i] = Float8GetDatum(item->distances[i]);
-					scan->xs_orderbynulls[i] = false;
-				}
-				else if (so->orderByTypes[i] == FLOAT4OID)
-				{
-					/* convert distance function's result to ORDER BY type */
-#ifndef USE_FLOAT4_BYVAL
-					/* must free any old value to avoid memory leakage */
-					if (!scan->xs_orderbynulls[i])
-						pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
-#endif
-					scan->xs_orderbyvals[i] = Float4GetDatum((float4) item->distances[i]);
-					scan->xs_orderbynulls[i] = false;
-				}
-				else
-				{
-					/*
-					 * If the ordering operator's return value is anything
-					 * else, we don't know how to convert the float8 bound
-					 * calculated by the distance function to that.  The
-					 * executor won't actually need the order by values we
-					 * return here, if there are no lossy results, so only
-					 * insist on converting if the *recheck flag is set.
-					 */
-					if (scan->xs_recheckorderby)
-						elog(ERROR, "GiST operator family's FOR ORDER BY operator must return float8 or float4 if the distance function is lossy");
-					scan->xs_orderbynulls[i] = true;
-				}
-			}
+
+			index_store_orderby_distances(scan, so->orderByTypes,
+										  item->distances,
+										  item->data.heap.recheckDistances);
 
 			/* in an index-only scan, also return the reconstructed tuple. */
 			if (scan->xs_want_itup)
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 4822af9..9cfe841 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -71,6 +71,7 @@
 #include "access/xlog.h"
 #include "catalog/catalog.h"
 #include "catalog/index.h"
+#include "catalog/pg_type.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
@@ -771,3 +772,72 @@ index_getprocinfo(Relation irel,
 
 	return locinfo;
 }
+
+/* ----------------
+ *		index_store_orderby_distances
+ *
+ *		Convert AM distance function's results (that can be inexact)
+ *		to ORDER BY types and save them into xs_orderbyvals/xs_orderbynulls
+ *		for a possible recheck.
+ * ----------------
+ */
+void
+index_store_orderby_distances(IndexScanDesc scan, Oid *orderByTypes,
+							  double *distances, bool recheckOrderBy)
+{
+	int			i;
+
+	scan->xs_recheckorderby = recheckOrderBy;
+
+	if (!distances)
+	{
+		Assert(!scan->xs_recheckorderby);
+
+		for (i = 0; i < scan->numberOfOrderBys; i++)
+		{
+			scan->xs_orderbyvals[i] = (Datum) 0;
+			scan->xs_orderbynulls[i] = true;
+		}
+
+		return;
+	}
+
+	for (i = 0; i < scan->numberOfOrderBys; i++)
+	{
+		if (orderByTypes[i] == FLOAT8OID)
+		{
+#ifndef USE_FLOAT8_BYVAL
+			/* must free any old value to avoid memory leakage */
+			if (!scan->xs_orderbynulls[i])
+				pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
+#endif
+			scan->xs_orderbyvals[i] = Float8GetDatum(distances[i]);
+			scan->xs_orderbynulls[i] = false;
+		}
+		else if (orderByTypes[i] == FLOAT4OID)
+		{
+			/* convert distance function's result to ORDER BY type */
+#ifndef USE_FLOAT4_BYVAL
+			/* must free any old value to avoid memory leakage */
+			if (!scan->xs_orderbynulls[i])
+				pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
+#endif
+			scan->xs_orderbyvals[i] = Float4GetDatum((float4) distances[i]);
+			scan->xs_orderbynulls[i] = false;
+		}
+		else
+		{
+			/*
+			 * If the ordering operator's return value is anything
+			 * else, we don't know how to convert the float8 bound
+			 * calculated by the distance function to that.  The
+			 * executor won't actually need the order by values we
+			 * return here, if there are no lossy results, so only
+			 * insist on converting if the *recheck flag is set.
+			 */
+			if (scan->xs_recheckorderby)
+				elog(ERROR, "GiST operator family's FOR ORDER BY operator must return float8 or float4 if the distance function is lossy");
+			scan->xs_orderbynulls[i] = true;
+		}
+	}
+}
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index b2e078a..5b75ca6 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -161,6 +161,8 @@ extern RegProcedure index_getprocid(Relation irel, AttrNumber attnum,
 				uint16 procnum);
 extern FmgrInfo *index_getprocinfo(Relation irel, AttrNumber attnum,
 				  uint16 procnum);
+extern void index_store_orderby_distances(IndexScanDesc scan, Oid *orderByTypes,
+										double *distances, bool recheckOrderBy);
 
 /*
  * index access method support routines (in genam.c)
diff --git a/doc/src/sgml/spgist.sgml b/doc/src/sgml/spgist.sgml
index cd4a8d0..53ca8bf 100644
--- a/doc/src/sgml/spgist.sgml
+++ b/doc/src/sgml/spgist.sgml
@@ -584,7 +584,9 @@ CREATE FUNCTION my_inner_consistent(internal, internal) RETURNS void ...
 typedef struct spgInnerConsistentIn
 {
     ScanKey     scankeys;       /* array of operators and comparison values */
+    ScanKey		orderbyKeys;	/* array of ordering operators and comparison values */
     int         nkeys;          /* length of array */
+    int         norderbys;      /* length of array */
 
     Datum       reconstructedValue;     /* value reconstructed at parent */
     void       *traversalValue; /* opclass-specific traverse value */
@@ -607,6 +609,7 @@ typedef struct spgInnerConsistentOut
     int        *levelAdds;      /* increment level by this much for each */
     Datum      *reconstructedValues;    /* associated reconstructed values */
     void      **traversalValues;        /* opclass-specific traverse values */
+    double    **distances;              /* associated distances */
 } spgInnerConsistentOut;
 </programlisting>
 
@@ -621,6 +624,8 @@ typedef struct spgInnerConsistentOut
        In particular it is not necessary to check <structfield>sk_flags</> to
        see if the comparison value is NULL, because the SP-GiST core code
        will filter out such conditions.
+       <structfield>orderbyKeys</>, of length <structfield>norderbys</>,
+       describes ordering operators (if any) in the same fashion.
        <structfield>reconstructedValue</> is the value reconstructed for the
        parent tuple; it is <literal>(Datum) 0</> at the root level or if the
        <function>inner_consistent</> function did not provide a value at the
@@ -661,6 +666,11 @@ typedef struct spgInnerConsistentOut
        <structfield>reconstructedValues</> to an array of the values
        reconstructed for each child node to be visited; otherwise, leave
        <structfield>reconstructedValues</> as NULL.
+       <structfield>distances</> if the ordered search is carried out,
+       the implementation is supposed to fill them in accordance to the
+       ordering operators provided in <structfield>orderbyKeys</>
+       (nodes with lowest distances will be processed first). Leave it
+       NULL otherwise.
        If it is desired to pass down additional out-of-band information
        (<quote>traverse values</>) to lower levels of the tree search,
        set <structfield>traversalValues</> to an array of the appropriate
@@ -669,7 +679,7 @@ typedef struct spgInnerConsistentOut
        Note that the <function>inner_consistent</> function is
        responsible for palloc'ing the
        <structfield>nodeNumbers</>, <structfield>levelAdds</>,
-       <structfield>reconstructedValues</>, and
+       <structfield>distances</>, <structfield>reconstructedValues</> and
        <structfield>traversalValues</> arrays in the current memory context.
        However, any output traverse values pointed to by
        the <structfield>traversalValues</> array should be allocated
@@ -699,7 +709,9 @@ CREATE FUNCTION my_leaf_consistent(internal, internal) RETURNS bool ...
 typedef struct spgLeafConsistentIn
 {
     ScanKey     scankeys;       /* array of operators and comparison values */
-    int         nkeys;          /* length of array */
+    ScanKey     orderbykeys;    /* array of ordering operators and comparison values */
+    int         nkeys;          /* length of scankeys array */
+    int         norderbys;      /* length of orderbykeys array */
 
     Datum       reconstructedValue;     /* value reconstructed at parent */
     void       *traversalValue; /* opclass-specific traverse value */
@@ -711,8 +723,10 @@ typedef struct spgLeafConsistentIn
 
 typedef struct spgLeafConsistentOut
 {
-    Datum       leafValue;      /* reconstructed original data, if any */
-    bool        recheck;        /* set true if operator must be rechecked */
+    Datum       leafValue;        /* reconstructed original data, if any */
+    bool        recheck;          /* set true if operator must be rechecked */
+    bool        recheckDistances; /* set true if distances must be rechecked */
+    double     *distances;        /* associated distances */
 } spgLeafConsistentOut;
 </programlisting>
 
@@ -727,6 +741,8 @@ typedef struct spgLeafConsistentOut
        In particular it is not necessary to check <structfield>sk_flags</> to
        see if the comparison value is NULL, because the SP-GiST core code
        will filter out such conditions.
+       The array <structfield>orderbykeys>, of length <structfield>norderbys</>,
+       describes the ordering operators in the same fashion.
        <structfield>reconstructedValue</> is the value reconstructed for the
        parent tuple; it is <literal>(Datum) 0</> at the root level or if the
        <function>inner_consistent</> function did not provide a value at the
@@ -752,6 +768,13 @@ typedef struct spgLeafConsistentOut
        <structfield>recheck</> may be set to <literal>true</> if the match
        is uncertain and so the operator(s) must be re-applied to the actual
        heap tuple to verify the match.
+       In case when the ordered search is performed, <structfield>distances</>
+       should be set in accordance with the ordering operator provided, otherwise
+       implementation is supposed to set it to NULL.
+       If at least one of returned distances is not exact, the function must set
+       <structfield>recheckDistances</> to true.  In this case the executor will
+       calculate the accurate distance after fetching the tuple from the heap,
+       and reorder the tuples if necessary.       
       </para>
      </listitem>
     </varlistentry>
diff --git a/src/backend/access/spgist/Makefile b/src/backend/access/spgist/Makefile
index 14948a5..5be3df5 100644
--- a/src/backend/access/spgist/Makefile
+++ b/src/backend/access/spgist/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = spgutils.o spginsert.o spgscan.o spgvacuum.o spgvalidate.o \
 	spgdoinsert.o spgxlog.o \
-	spgtextproc.o spgquadtreeproc.o spgkdtreeproc.o
+	spgtextproc.o spgquadtreeproc.o spgkdtreeproc.o \
+	spgproc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/spgist/README b/src/backend/access/spgist/README
index 09ab21a..1420a18 100644
--- a/src/backend/access/spgist/README
+++ b/src/backend/access/spgist/README
@@ -41,7 +41,12 @@ contain exactly one inner tuple.
 
 When the search traversal algorithm reaches an inner tuple, it chooses a set
 of nodes to continue tree traverse in depth.  If it reaches a leaf page it
-scans a list of leaf tuples to find the ones that match the query.
+scans a list of leaf tuples to find the ones that match the query. SP-GiSTs
+also support ordered searches - that is during the scan is performed,
+implementation can choose to map floating-point distances to inner and leaf
+tuples and the traversal will be performed by the closest-first model. It
+can be usefull e.g. for performing nearest-neighbour searches on the dataset.
+
 
 The insertion algorithm descends the tree similarly, except it must choose
 just one node to descend to from each inner tuple.  Insertion might also have
@@ -371,3 +376,4 @@ AUTHORS
 
     Teodor Sigaev <teo...@sigaev.ru>
     Oleg Bartunov <o...@sai.msu.su>
+    Vlad Sterzhanov <glide...@gmail.com>
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c
index 139d998..f585b62 100644
--- a/src/backend/access/spgist/spgscan.c
+++ b/src/backend/access/spgist/spgscan.c
@@ -15,79 +15,118 @@
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/relscan.h"
 #include "access/spgist_private.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "utils/builtins.h"
 #include "utils/datum.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
 
 typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
-								 Datum leafValue, bool isnull, bool recheck);
+							   Datum leafValue, bool isnull, bool recheck,
+							   bool recheckDist, double *distances);
 
-typedef struct ScanStackEntry
+/*
+ * Pairing heap comparison function for the SpGistSearchItem queue
+ */
+static int
+pairingheap_SpGistSearchItem_cmp(const pairingheap_node *a,
+								 const pairingheap_node *b, void *arg)
 {
-	Datum		reconstructedValue;		/* value reconstructed from parent */
-	void	   *traversalValue; /* opclass-specific traverse value */
-	int			level;			/* level of items on this page */
-	ItemPointerData ptr;		/* block and offset to scan from */
-} ScanStackEntry;
+	const SpGistSearchItem *sa = (const SpGistSearchItem *) a;
+	const SpGistSearchItem *sb = (const SpGistSearchItem *) b;
+	IndexScanDesc scan = (IndexScanDesc) arg;
+	int			i;
+
+	/* Order according to distance comparison */
+	for (i = 0; i < scan->numberOfOrderBys; i++)
+	{
+		if (sa->distances[i] != sb->distances[i])
+			return (sa->distances[i] < sb->distances[i]) ? 1 : -1;
+	}
 
+	/* Leaf items go before inner pages, to ensure a depth-first search */
+	if (sa->isLeaf && !sb->isLeaf)
+		return 1;
+	if (!sa->isLeaf && sb->isLeaf)
+		return -1;
+
+	return 0;
+}
 
-/* Free a ScanStackEntry */
 static void
-freeScanStackEntry(SpGistScanOpaque so, ScanStackEntry *stackEntry)
+spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item)
 {
 	if (!so->state.attType.attbyval &&
-		DatumGetPointer(stackEntry->reconstructedValue) != NULL)
-		pfree(DatumGetPointer(stackEntry->reconstructedValue));
-	if (stackEntry->traversalValue)
-		pfree(stackEntry->traversalValue);
+		DatumGetPointer(item->value) != NULL)
+		pfree(DatumGetPointer(item->value));
+
+	if (item->traversalValue)
+		pfree(item->traversalValue);
 
-	pfree(stackEntry);
+	pfree(item);
+}
+
+/*
+ * Add SpGistSearchItem to queue
+ *
+ * Called in queue context
+ */
+static void
+spgAddSearchItemToQueue(SpGistScanOpaque so, SpGistSearchItem *item,
+						double *distances)
+{
+	memcpy(item->distances, distances, so->numberOfOrderBys * sizeof(double));
+	pairingheap_add(so->queue, &item->phNode);
 }
 
-/* Free the entire stack */
 static void
-freeScanStack(SpGistScanOpaque so)
+spgAddStartItem(SpGistScanOpaque so, bool isnull)
 {
-	ListCell   *lc;
+	SpGistSearchItem *startEntry = (SpGistSearchItem *) palloc0(
+								SizeOfSpGistSearchItem(so->numberOfOrderBys));
 
-	foreach(lc, so->scanStack)
-	{
-		freeScanStackEntry(so, (ScanStackEntry *) lfirst(lc));
-	}
-	list_free(so->scanStack);
-	so->scanStack = NIL;
+	ItemPointerSet(&startEntry->heap,
+				   isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO,
+				   FirstOffsetNumber);
+	startEntry->isLeaf = false;
+	startEntry->level = 0;
+	startEntry->isnull = isnull;
+
+	spgAddSearchItemToQueue(so, startEntry, so->zeroDistances);
 }
 
 /*
- * Initialize scanStack to search the root page, resetting
+ * Initialize queue to search the root page, resetting
  * any previously active scan
  */
 static void
 resetSpGistScanOpaque(SpGistScanOpaque so)
 {
-	ScanStackEntry *startEntry;
-
-	freeScanStack(so);
+	MemoryContext oldCtx = MemoryContextSwitchTo(so->queueCxt);
 
 	if (so->searchNulls)
-	{
-		/* Stack a work item to scan the null index entries */
-		startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
-		ItemPointerSet(&startEntry->ptr, SPGIST_NULL_BLKNO, FirstOffsetNumber);
-		so->scanStack = lappend(so->scanStack, startEntry);
-	}
+		/* Add a work item to scan the null index entries */
+		spgAddStartItem(so, true);
 
 	if (so->searchNonNulls)
+		/* Add a work item to scan the non-null index entries */
+		spgAddStartItem(so, false);
+
+	MemoryContextSwitchTo(oldCtx);
+
+	if (so->numberOfOrderBys > 0)
 	{
-		/* Stack a work item to scan the non-null index entries */
-		startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
-		ItemPointerSet(&startEntry->ptr, SPGIST_ROOT_BLKNO, FirstOffsetNumber);
-		so->scanStack = lappend(so->scanStack, startEntry);
+		/* Must pfree distances to avoid memory leak */
+		int			i;
+
+		for (i = 0; i < so->nPtrs; i++)
+			pfree(so->distances[i]);
 	}
 
 	if (so->want_itup)
@@ -122,6 +161,9 @@ spgPrepareScanKeys(IndexScanDesc scan)
 	int			nkeys;
 	int			i;
 
+	so->numberOfOrderBys = scan->numberOfOrderBys;
+	so->orderByData = scan->orderByData;
+
 	if (scan->numberOfKeys <= 0)
 	{
 		/* If no quals, whole-index scan is required */
@@ -182,8 +224,9 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
 {
 	IndexScanDesc scan;
 	SpGistScanOpaque so;
+	int			i;
 
-	scan = RelationGetIndexScan(rel, keysz, 0);
+	scan = RelationGetIndexScan(rel, keysz, orderbysz);
 
 	so = (SpGistScanOpaque) palloc0(sizeof(SpGistScanOpaqueData));
 	if (keysz > 0)
@@ -191,6 +234,7 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
 	else
 		so->keyData = NULL;
 	initSpGistState(&so->state, scan->indexRelation);
+
 	so->tempCxt = AllocSetContextCreate(CurrentMemoryContext,
 										"SP-GiST search temporary context",
 										ALLOCSET_DEFAULT_SIZES);
@@ -198,6 +242,36 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
 	/* Set up indexTupDesc and xs_itupdesc in case it's an index-only scan */
 	so->indexTupDesc = scan->xs_itupdesc = RelationGetDescr(rel);
 
+	if (scan->numberOfOrderBys > 0)
+	{
+		so->zeroDistances = palloc(sizeof(double) * scan->numberOfOrderBys);
+		so->infDistances = palloc(sizeof(double) * scan->numberOfOrderBys);
+
+		for (i = 0; i < scan->numberOfOrderBys; i++)
+		{
+			so->zeroDistances[i] = 0.0;
+			so->infDistances[i] = get_float8_infinity();
+		}
+
+		scan->xs_orderbyvals = palloc0(sizeof(Datum) * scan->numberOfOrderBys);
+		scan->xs_orderbynulls = palloc(sizeof(bool) * scan->numberOfOrderBys);
+		memset(scan->xs_orderbynulls, true, sizeof(bool) * scan->numberOfOrderBys);
+	}
+
+	so->queueCxt = AllocSetContextCreate(CurrentMemoryContext,
+										 "SP-GiST queue context",
+										 ALLOCSET_DEFAULT_SIZES);
+
+	fmgr_info_copy(&so->innerConsistentFn,
+				   index_getprocinfo(rel, 1, SPGIST_INNER_CONSISTENT_PROC),
+				   CurrentMemoryContext);
+
+	fmgr_info_copy(&so->leafConsistentFn,
+				   index_getprocinfo(rel, 1, SPGIST_LEAF_CONSISTENT_PROC),
+				   CurrentMemoryContext);
+
+	so->indexCollation = rel->rd_indcollation[0];
+
 	scan->opaque = so;
 
 	return scan;
@@ -208,18 +282,51 @@ spgrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 		  ScanKey orderbys, int norderbys)
 {
 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
+	MemoryContext oldCxt;
 
 	/* copy scankeys into local storage */
 	if (scankey && scan->numberOfKeys > 0)
-	{
 		memmove(scan->keyData, scankey,
 				scan->numberOfKeys * sizeof(ScanKeyData));
+
+	if (orderbys && scan->numberOfOrderBys > 0)
+	{
+		int			i;
+
+		memmove(scan->orderByData, orderbys,
+				scan->numberOfOrderBys * sizeof(ScanKeyData));
+
+		so->orderByTypes = (Oid *) palloc(sizeof(Oid) * scan->numberOfOrderBys);
+
+		for (i = 0; i < scan->numberOfOrderBys; i++)
+		{
+			ScanKey		skey = &scan->orderByData[i];
+
+			/*
+			 * Look up the datatype returned by the original ordering
+			 * operator. SP-GiST always uses a float8 for the distance function,
+			 * but the ordering operator could be anything else.
+			 *
+			 * XXX: The distance function is only allowed to be lossy if the
+			 * ordering operator's result type is float4 or float8.  Otherwise
+			 * we don't know how to return the distance to the executor.  But
+			 * we cannot check that here, as we won't know if the distance
+			 * function is lossy until it returns *recheck = true for the
+			 * first time.
+			 */
+			so->orderByTypes[i] = get_func_rettype(skey->sk_func.fn_oid);
+		}
 	}
 
 	/* preprocess scankeys, set up the representation in *so */
 	spgPrepareScanKeys(scan);
 
-	/* set up starting stack entries */
+	MemoryContextReset(so->queueCxt);
+	oldCxt = MemoryContextSwitchTo(so->queueCxt);
+	so->queue = pairingheap_allocate(pairingheap_SpGistSearchItem_cmp, scan);
+	MemoryContextSwitchTo(oldCxt);
+
+	/* set up starting queue entries */
 	resetSpGistScanOpaque(so);
 }
 
@@ -229,65 +336,336 @@ spgendscan(IndexScanDesc scan)
 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
 
 	MemoryContextDelete(so->tempCxt);
+	MemoryContextDelete(so->queueCxt);
+
+	if (scan->numberOfOrderBys > 0)
+	{
+		pfree(so->zeroDistances);
+		pfree(so->infDistances);
+	}
+}
+
+/*
+ * Leaf SpGistSearchItem constructor, called in queue context
+ */
+static SpGistSearchItem *
+spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointerData heapPtr,
+			   Datum leafValue, bool recheckQual, bool recheckDist, bool isnull)
+{
+	SpGistSearchItem *item = (SpGistSearchItem *) palloc(
+								SizeOfSpGistSearchItem(so->numberOfOrderBys));
+
+	item->level = level;
+	item->heap = heapPtr;
+	/* copy value to queue cxt out of tmp cxt */
+	item->value = isnull ? (Datum) 0 :
+		datumCopy(leafValue, so->state.attType.attbyval,
+				  so->state.attType.attlen);
+	item->traversalValue = NULL;
+	item->isLeaf = true;
+	item->recheckQual = recheckQual;
+	item->recheckDist = recheckDist;
+	item->isnull = isnull;
+
+	return item;
 }
 
 /*
  * Test whether a leaf tuple satisfies all the scan keys
  *
- * *leafValue is set to the reconstructed datum, if provided
- * *recheck is set true if any of the operators are lossy
+ * *reportedSome is set to true if:
+ *		the scan is not ordered AND the item satisfies the scankeys
  */
 static bool
-spgLeafTest(Relation index, SpGistScanOpaque so,
+spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
 			SpGistLeafTuple leafTuple, bool isnull,
-			int level, Datum reconstructedValue,
-			void *traversalValue,
-			Datum *leafValue, bool *recheck)
+			bool *reportedSome, storeRes_func storeRes)
 {
+	Datum		leafValue;
+	double	   *distances;
 	bool		result;
-	Datum		leafDatum;
-	spgLeafConsistentIn in;
-	spgLeafConsistentOut out;
-	FmgrInfo   *procinfo;
-	MemoryContext oldCtx;
+	bool		recheckQual;
+	bool		recheckDist;
 
 	if (isnull)
 	{
 		/* Should not have arrived on a nulls page unless nulls are wanted */
 		Assert(so->searchNulls);
-		*leafValue = (Datum) 0;
-		*recheck = false;
-		return true;
+		leafValue = (Datum) 0;
+		/* Assume that all distances for null entries are infinities */
+		distances = so->infDistances;
+		recheckQual = false;
+		recheckDist = false;
+		result = true;
+	}
+	else
+	{
+		spgLeafConsistentIn in;
+		spgLeafConsistentOut out;
+
+		/* use temp context for calling leaf_consistent */
+		MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt);
+
+		in.scankeys = so->keyData;
+		in.nkeys = so->numberOfKeys;
+		in.orderbykeys = so->orderByData;
+		in.norderbys = so->numberOfOrderBys;
+		in.reconstructedValue = item->value;
+		in.traversalValue = item->traversalValue;
+		in.level = item->level;
+		in.returnData = so->want_itup;
+		in.leafDatum = SGLTDATUM(leafTuple, &so->state);
+
+		out.leafValue = (Datum) 0;
+		out.recheck = false;
+		out.distances = NULL;
+		out.recheckDistances = false;
+
+		result = DatumGetBool(FunctionCall2Coll(&so->leafConsistentFn,
+												so->indexCollation,
+												PointerGetDatum(&in),
+												PointerGetDatum(&out)));
+		recheckQual = out.recheck;
+		recheckDist = out.recheckDistances;
+		leafValue = out.leafValue;
+		distances = out.distances;
+
+		MemoryContextSwitchTo(oldCxt);
 	}
 
-	leafDatum = SGLTDATUM(leafTuple, &so->state);
+	if (result)
+	{
+		/* item passes the scankeys */
+		if (so->numberOfOrderBys > 0)
+		{
+			/* the scan is ordered -> add the item to the queue */
+			MemoryContext oldCxt = MemoryContextSwitchTo(so->queueCxt);
+			SpGistSearchItem *heapItem = spgNewHeapItem(so, item->level,
+														leafTuple->heapPtr,
+														leafValue,
+														recheckQual,
+														recheckDist,
+														isnull);
+
+			spgAddSearchItemToQueue(so, heapItem, distances);
+
+			MemoryContextSwitchTo(oldCxt);
+		}
+		else
+		{
+			/* non-ordered scan, so report the item right away */
+			Assert(!recheckDist);
+			storeRes(so, &leafTuple->heapPtr, leafValue, isnull,
+					 recheckQual, false, NULL);
+			*reportedSome = true;
+		}
+	}
 
-	/* use temp context for calling leaf_consistent */
-	oldCtx = MemoryContextSwitchTo(so->tempCxt);
+	return result;
+}
 
-	in.scankeys = so->keyData;
-	in.nkeys = so->numberOfKeys;
-	in.reconstructedValue = reconstructedValue;
-	in.traversalValue = traversalValue;
-	in.level = level;
-	in.returnData = so->want_itup;
-	in.leafDatum = leafDatum;
+/* A bundle initializer for inner_consistent methods */
+static void
+spgInitInnerConsistentIn(spgInnerConsistentIn *in,
+						 SpGistScanOpaque so,
+						 SpGistSearchItem *item,
+						 SpGistInnerTuple innerTuple,
+						 MemoryContext traversalMemoryContext)
+{
+	in->scankeys = so->keyData;
+	in->nkeys = so->numberOfKeys;
+	in->norderbys = so->numberOfOrderBys;
+	in->orderbyKeys = so->orderByData;
+	in->reconstructedValue = item->value;
+	in->traversalMemoryContext = traversalMemoryContext;
+	in->traversalValue = item->traversalValue;
+	in->level = item->level;
+	in->returnData = so->want_itup;
+	in->allTheSame = innerTuple->allTheSame;
+	in->hasPrefix = (innerTuple->prefixSize > 0);
+	in->prefixDatum = SGITDATUM(innerTuple, &so->state);
+	in->nNodes = innerTuple->nNodes;
+	in->nodeLabels = spgExtractNodeLabels(&so->state, innerTuple);
+}
+
+static SpGistSearchItem *
+spgMakeInnerItem(SpGistScanOpaque so,
+				 SpGistSearchItem *parentItem,
+				 SpGistNodeTuple tuple,
+				 spgInnerConsistentOut *out, int i, bool isnull)
+{
+	SpGistSearchItem *item = palloc(SizeOfSpGistSearchItem(so->numberOfOrderBys));
+
+	item->heap = tuple->t_tid;
+	item->level = out->levelAdds ? parentItem->level + out->levelAdds[i]
+								 : parentItem->level;
+
+	/* Must copy value out of temp context */
+	item->value = out->reconstructedValues
+					? datumCopy(out->reconstructedValues[i],
+								so->state.attType.attbyval,
+								so->state.attType.attlen)
+					: (Datum) 0;
+
+	/*
+	 * Elements of out.traversalValues should be allocated in
+	 * in.traversalMemoryContext, which is actually a long
+	 * lived context of index scan.
+	 */
+	item->traversalValue =
+			out->traversalValues ? out->traversalValues[i] : NULL;
+
+	item->isLeaf = false;
+	item->recheckQual = false;
+	item->recheckDist = false;
+	item->isnull = isnull;
+
+	return item;
+}
 
-	out.leafValue = (Datum) 0;
-	out.recheck = false;
+static void
+spgInnerTest(SpGistScanOpaque so, SpGistSearchItem *item,
+			 SpGistInnerTuple innerTuple, bool isnull)
+{
+	MemoryContext			oldCxt = MemoryContextSwitchTo(so->tempCxt);
+	spgInnerConsistentOut	out;
+	int						nNodes = innerTuple->nNodes;
+	int						i;
 
-	procinfo = index_getprocinfo(index, 1, SPGIST_LEAF_CONSISTENT_PROC);
-	result = DatumGetBool(FunctionCall2Coll(procinfo,
-											index->rd_indcollation[0],
-											PointerGetDatum(&in),
-											PointerGetDatum(&out)));
+	memset(&out, 0, sizeof(out));
 
-	*leafValue = out.leafValue;
-	*recheck = out.recheck;
+	if (!isnull)
+	{
+		spgInnerConsistentIn in;
 
-	MemoryContextSwitchTo(oldCtx);
+		spgInitInnerConsistentIn(&in, so, item, innerTuple, oldCxt);
 
-	return result;
+		/* use user-defined inner consistent method */
+		FunctionCall2Coll(&so->innerConsistentFn,
+						  so->indexCollation,
+						  PointerGetDatum(&in),
+						  PointerGetDatum(&out));
+	}
+	else
+	{
+		/* force all children to be visited */
+		out.nNodes = nNodes;
+		out.nodeNumbers = (int *) palloc(sizeof(int) * nNodes);
+		for (i = 0; i < nNodes; i++)
+			out.nodeNumbers[i] = i;
+	}
+
+	/* If allTheSame, they should all or none of 'em match */
+	if (innerTuple->allTheSame)
+		if (out.nNodes != 0 && out.nNodes != nNodes)
+			elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple");
+
+	if (out.nNodes)
+	{
+		/* collect node pointers */
+		SpGistNodeTuple		node;
+		SpGistNodeTuple	   *nodes = (SpGistNodeTuple *) palloc(
+									sizeof(SpGistNodeTuple) * nNodes);
+
+		SGITITERATE(innerTuple, i, node)
+		{
+			nodes[i] = node;
+		}
+
+		MemoryContextSwitchTo(so->queueCxt);
+
+		for (i = 0; i < out.nNodes; i++)
+		{
+			int			nodeN = out.nodeNumbers[i];
+			SpGistSearchItem *innerItem;
+
+			Assert(nodeN >= 0 && nodeN < nNodes);
+
+			node = nodes[nodeN];
+
+			if (!ItemPointerIsValid(&node->t_tid))
+				continue;
+
+			innerItem = spgMakeInnerItem(so, item, node, &out, i, isnull);
+
+			/* Will copy out the distances in spgAddSearchItemToQueue anyway */
+			spgAddSearchItemToQueue(so, innerItem,
+									out.distances ? out.distances[i]
+												  : so->infDistances);
+		}
+	}
+
+	MemoryContextSwitchTo(oldCxt);
+}
+
+/* Returns a next item in an (ordered) scan or null if the index is exhausted */
+static SpGistSearchItem *
+spgGetNextQueueItem(SpGistScanOpaque so)
+{
+	SpGistSearchItem *item;
+
+	if (!pairingheap_is_empty(so->queue))
+		item = (SpGistSearchItem *) pairingheap_remove_first(so->queue);
+	else
+		/* Done when both heaps are empty */
+		item = NULL;
+
+	/* Return item; caller is responsible to pfree it */
+	return item;
+}
+
+enum SpGistSpecialOffsetNumbers
+{
+	SpGistBreakOffsetNumber = InvalidOffsetNumber,
+	SpGistRedirectOffsetNumber = MaxOffsetNumber + 1,
+	SpGistErrorOffsetNumber = MaxOffsetNumber + 2
+};
+
+static OffsetNumber
+spgTestLeafTuple(SpGistScanOpaque so,
+				 SpGistSearchItem *item,
+				 Page page, OffsetNumber offset,
+				 bool isnull, bool isroot,
+				 bool *reportedSome,
+				 storeRes_func storeRes)
+{
+	SpGistLeafTuple leafTuple = (SpGistLeafTuple)
+		PageGetItem(page, PageGetItemId(page, offset));
+
+	if (leafTuple->tupstate != SPGIST_LIVE)
+	{
+		if (!isroot) /* all tuples on root should be live */
+		{
+			if (leafTuple->tupstate == SPGIST_REDIRECT)
+			{
+				/* redirection tuple should be first in chain */
+				Assert(offset == ItemPointerGetOffsetNumber(&item->heap));
+				/* transfer attention to redirect point */
+				item->heap = ((SpGistDeadTuple) leafTuple)->pointer;
+				Assert(ItemPointerGetBlockNumber(&item->heap) != SPGIST_METAPAGE_BLKNO);
+				return SpGistRedirectOffsetNumber;
+			}
+
+			if (leafTuple->tupstate == SPGIST_DEAD)
+			{
+				/* dead tuple should be first in chain */
+				Assert(offset == ItemPointerGetOffsetNumber(&item->heap));
+				/* No live entries on this page */
+				Assert(leafTuple->nextOffset == InvalidOffsetNumber);
+				return SpGistBreakOffsetNumber;
+			}
+		}
+
+		/* We should not arrive at a placeholder */
+		elog(ERROR, "unexpected SPGiST tuple state: %d", leafTuple->tupstate);
+		return SpGistErrorOffsetNumber;
+	}
+
+	Assert(ItemPointerIsValid(&leafTuple->heapPtr));
+
+	spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes);
+
+	return leafTuple->nextOffset;
 }
 
 /*
@@ -306,247 +684,101 @@ spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
 
 	while (scanWholeIndex || !reportedSome)
 	{
-		ScanStackEntry *stackEntry;
-		BlockNumber blkno;
-		OffsetNumber offset;
-		Page		page;
-		bool		isnull;
+		SpGistSearchItem *item = spgGetNextQueueItem(so);
 
-		/* Pull next to-do item from the list */
-		if (so->scanStack == NIL)
-			break;				/* there are no more pages to scan */
-
-		stackEntry = (ScanStackEntry *) linitial(so->scanStack);
-		so->scanStack = list_delete_first(so->scanStack);
+		if (item == NULL)
+			break; /* No more items in queue -> done */
 
 redirect:
 		/* Check for interrupts, just in case of infinite loop */
 		CHECK_FOR_INTERRUPTS();
 
-		blkno = ItemPointerGetBlockNumber(&stackEntry->ptr);
-		offset = ItemPointerGetOffsetNumber(&stackEntry->ptr);
-
-		if (buffer == InvalidBuffer)
+		if (item->isLeaf)
 		{
-			buffer = ReadBuffer(index, blkno);
-			LockBuffer(buffer, BUFFER_LOCK_SHARE);
+			/* We store heap items in the queue only in case of ordered search */
+			Assert(so->numberOfOrderBys > 0);
+			storeRes(so, &item->heap, item->value, item->isnull,
+					 item->recheckQual, item->recheckDist, item->distances);
+			reportedSome = true;
 		}
-		else if (blkno != BufferGetBlockNumber(buffer))
+		else
 		{
-			UnlockReleaseBuffer(buffer);
-			buffer = ReadBuffer(index, blkno);
-			LockBuffer(buffer, BUFFER_LOCK_SHARE);
-		}
-		/* else new pointer points to the same page, no work needed */
+			BlockNumber		blkno  = ItemPointerGetBlockNumber(&item->heap);
+			OffsetNumber	offset = ItemPointerGetOffsetNumber(&item->heap);
+			Page			page;
+			bool			isnull;
 
-		page = BufferGetPage(buffer);
-		TestForOldSnapshot(snapshot, index, page);
+			if (buffer == InvalidBuffer)
+			{
+				buffer = ReadBuffer(index, blkno);
+				LockBuffer(buffer, BUFFER_LOCK_SHARE);
+			}
+			else if (blkno != BufferGetBlockNumber(buffer))
+			{
+				UnlockReleaseBuffer(buffer);
+				buffer = ReadBuffer(index, blkno);
+				LockBuffer(buffer, BUFFER_LOCK_SHARE);
+			}
 
-		isnull = SpGistPageStoresNulls(page) ? true : false;
+			/* else new pointer points to the same page, no work needed */
 
-		if (SpGistPageIsLeaf(page))
-		{
-			SpGistLeafTuple leafTuple;
-			OffsetNumber max = PageGetMaxOffsetNumber(page);
-			Datum		leafValue = (Datum) 0;
-			bool		recheck = false;
+			page = BufferGetPage(buffer);
+			TestForOldSnapshot(snapshot, index, page);
 
-			if (SpGistBlockIsRoot(blkno))
+			isnull = SpGistPageStoresNulls(page) ? true : false;
+
+			if (SpGistPageIsLeaf(page))
 			{
-				/* When root is a leaf, examine all its tuples */
-				for (offset = FirstOffsetNumber; offset <= max; offset++)
-				{
-					leafTuple = (SpGistLeafTuple)
-						PageGetItem(page, PageGetItemId(page, offset));
-					if (leafTuple->tupstate != SPGIST_LIVE)
-					{
-						/* all tuples on root should be live */
-						elog(ERROR, "unexpected SPGiST tuple state: %d",
-							 leafTuple->tupstate);
-					}
+				/* Page is a leaf - that is, all it's tuples are heap items */
+				OffsetNumber max = PageGetMaxOffsetNumber(page);
 
-					Assert(ItemPointerIsValid(&leafTuple->heapPtr));
-					if (spgLeafTest(index, so,
-									leafTuple, isnull,
-									stackEntry->level,
-									stackEntry->reconstructedValue,
-									stackEntry->traversalValue,
-									&leafValue,
-									&recheck))
-					{
-						storeRes(so, &leafTuple->heapPtr,
-								 leafValue, isnull, recheck);
-						reportedSome = true;
-					}
+				if (SpGistBlockIsRoot(blkno))
+				{
+					/* When root is a leaf, examine all its tuples */
+					for (offset = FirstOffsetNumber; offset <= max; offset++)
+						(void) spgTestLeafTuple(so, item, page, offset,
+												isnull, true,
+												&reportedSome, storeRes);
 				}
-			}
-			else
-			{
-				/* Normal case: just examine the chain we arrived at */
-				while (offset != InvalidOffsetNumber)
+				else
 				{
-					Assert(offset >= FirstOffsetNumber && offset <= max);
-					leafTuple = (SpGistLeafTuple)
-						PageGetItem(page, PageGetItemId(page, offset));
-					if (leafTuple->tupstate != SPGIST_LIVE)
+					/* Normal case: just examine the chain we arrived at */
+					while (offset != InvalidOffsetNumber)
 					{
-						if (leafTuple->tupstate == SPGIST_REDIRECT)
-						{
-							/* redirection tuple should be first in chain */
-							Assert(offset == ItemPointerGetOffsetNumber(&stackEntry->ptr));
-							/* transfer attention to redirect point */
-							stackEntry->ptr = ((SpGistDeadTuple) leafTuple)->pointer;
-							Assert(ItemPointerGetBlockNumber(&stackEntry->ptr) != SPGIST_METAPAGE_BLKNO);
+						Assert(offset >= FirstOffsetNumber && offset <= max);
+						offset = spgTestLeafTuple(so, item, page, offset,
+												  isnull, false,
+												  &reportedSome, storeRes);
+						if (offset == SpGistRedirectOffsetNumber)
 							goto redirect;
-						}
-						if (leafTuple->tupstate == SPGIST_DEAD)
-						{
-							/* dead tuple should be first in chain */
-							Assert(offset == ItemPointerGetOffsetNumber(&stackEntry->ptr));
-							/* No live entries on this page */
-							Assert(leafTuple->nextOffset == InvalidOffsetNumber);
-							break;
-						}
-						/* We should not arrive at a placeholder */
-						elog(ERROR, "unexpected SPGiST tuple state: %d",
-							 leafTuple->tupstate);
-					}
-
-					Assert(ItemPointerIsValid(&leafTuple->heapPtr));
-					if (spgLeafTest(index, so,
-									leafTuple, isnull,
-									stackEntry->level,
-									stackEntry->reconstructedValue,
-									stackEntry->traversalValue,
-									&leafValue,
-									&recheck))
-					{
-						storeRes(so, &leafTuple->heapPtr,
-								 leafValue, isnull, recheck);
-						reportedSome = true;
 					}
-
-					offset = leafTuple->nextOffset;
-				}
-			}
-		}
-		else	/* page is inner */
-		{
-			SpGistInnerTuple innerTuple;
-			spgInnerConsistentIn in;
-			spgInnerConsistentOut out;
-			FmgrInfo   *procinfo;
-			SpGistNodeTuple *nodes;
-			SpGistNodeTuple node;
-			int			i;
-			MemoryContext oldCtx;
-
-			innerTuple = (SpGistInnerTuple) PageGetItem(page,
-												PageGetItemId(page, offset));
-
-			if (innerTuple->tupstate != SPGIST_LIVE)
-			{
-				if (innerTuple->tupstate == SPGIST_REDIRECT)
-				{
-					/* transfer attention to redirect point */
-					stackEntry->ptr = ((SpGistDeadTuple) innerTuple)->pointer;
-					Assert(ItemPointerGetBlockNumber(&stackEntry->ptr) != SPGIST_METAPAGE_BLKNO);
-					goto redirect;
 				}
-				elog(ERROR, "unexpected SPGiST tuple state: %d",
-					 innerTuple->tupstate);
 			}
-
-			/* use temp context for calling inner_consistent */
-			oldCtx = MemoryContextSwitchTo(so->tempCxt);
-
-			in.scankeys = so->keyData;
-			in.nkeys = so->numberOfKeys;
-			in.reconstructedValue = stackEntry->reconstructedValue;
-			in.traversalMemoryContext = oldCtx;
-			in.traversalValue = stackEntry->traversalValue;
-			in.level = stackEntry->level;
-			in.returnData = so->want_itup;
-			in.allTheSame = innerTuple->allTheSame;
-			in.hasPrefix = (innerTuple->prefixSize > 0);
-			in.prefixDatum = SGITDATUM(innerTuple, &so->state);
-			in.nNodes = innerTuple->nNodes;
-			in.nodeLabels = spgExtractNodeLabels(&so->state, innerTuple);
-
-			/* collect node pointers */
-			nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * in.nNodes);
-			SGITITERATE(innerTuple, i, node)
-			{
-				nodes[i] = node;
-			}
-
-			memset(&out, 0, sizeof(out));
-
-			if (!isnull)
+			else	/* page is inner */
 			{
-				/* use user-defined inner consistent method */
-				procinfo = index_getprocinfo(index, 1, SPGIST_INNER_CONSISTENT_PROC);
-				FunctionCall2Coll(procinfo,
-								  index->rd_indcollation[0],
-								  PointerGetDatum(&in),
-								  PointerGetDatum(&out));
-			}
-			else
-			{
-				/* force all children to be visited */
-				out.nNodes = in.nNodes;
-				out.nodeNumbers = (int *) palloc(sizeof(int) * in.nNodes);
-				for (i = 0; i < in.nNodes; i++)
-					out.nodeNumbers[i] = i;
-			}
-
-			MemoryContextSwitchTo(oldCtx);
-
-			/* If allTheSame, they should all or none of 'em match */
-			if (innerTuple->allTheSame)
-				if (out.nNodes != 0 && out.nNodes != in.nNodes)
-					elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple");
-
-			for (i = 0; i < out.nNodes; i++)
-			{
-				int			nodeN = out.nodeNumbers[i];
+				SpGistInnerTuple innerTuple = (SpGistInnerTuple)
+						PageGetItem(page, PageGetItemId(page, offset));
 
-				Assert(nodeN >= 0 && nodeN < in.nNodes);
-				if (ItemPointerIsValid(&nodes[nodeN]->t_tid))
+				if (innerTuple->tupstate != SPGIST_LIVE)
 				{
-					ScanStackEntry *newEntry;
-
-					/* Create new work item for this node */
-					newEntry = palloc(sizeof(ScanStackEntry));
-					newEntry->ptr = nodes[nodeN]->t_tid;
-					if (out.levelAdds)
-						newEntry->level = stackEntry->level + out.levelAdds[i];
-					else
-						newEntry->level = stackEntry->level;
-					/* Must copy value out of temp context */
-					if (out.reconstructedValues)
-						newEntry->reconstructedValue =
-							datumCopy(out.reconstructedValues[i],
-									  so->state.attType.attbyval,
-									  so->state.attType.attlen);
-					else
-						newEntry->reconstructedValue = (Datum) 0;
-
-					/*
-					 * Elements of out.traversalValues should be allocated in
-					 * in.traversalMemoryContext, which is actually a long
-					 * lived context of index scan.
-					 */
-					newEntry->traversalValue = (out.traversalValues) ?
-						out.traversalValues[i] : NULL;
-
-					so->scanStack = lcons(newEntry, so->scanStack);
+					if (innerTuple->tupstate == SPGIST_REDIRECT)
+					{
+						/* transfer attention to redirect point */
+						item->heap = ((SpGistDeadTuple) innerTuple)->pointer;
+						Assert(ItemPointerGetBlockNumber(&item->heap) !=
+							   SPGIST_METAPAGE_BLKNO);
+						goto redirect;
+					}
+					elog(ERROR, "unexpected SPGiST tuple state: %d",
+						 innerTuple->tupstate);
 				}
+
+				spgInnerTest(so, item, innerTuple, isnull);
 			}
 		}
 
-		/* done with this scan stack entry */
-		freeScanStackEntry(so, stackEntry);
+		/* done with this scan item */
+		spgFreeSearchItem(so, item);
 		/* clear temp context before proceeding to the next one */
 		MemoryContextReset(so->tempCxt);
 	}
@@ -555,11 +787,14 @@ redirect:
 		UnlockReleaseBuffer(buffer);
 }
 
+
 /* storeRes subroutine for getbitmap case */
 static void
 storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
-			Datum leafValue, bool isnull, bool recheck)
+			Datum leafValue, bool isnull, bool recheck, bool recheckDist,
+			double *distances)
 {
+	Assert(!recheckDist && !distances);
 	tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
 	so->ntids++;
 }
@@ -583,11 +818,21 @@ spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 /* storeRes subroutine for gettuple case */
 static void
 storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
-			  Datum leafValue, bool isnull, bool recheck)
+			  Datum leafValue, bool isnull, bool recheck, bool recheckDist,
+			  double *distances)
 {
 	Assert(so->nPtrs < MaxIndexTuplesPerPage);
 	so->heapPtrs[so->nPtrs] = *heapPtr;
 	so->recheck[so->nPtrs] = recheck;
+	so->recheckDist[so->nPtrs] = recheckDist;
+
+	if (so->numberOfOrderBys > 0)
+	{
+		Size		size = sizeof(double) * so->numberOfOrderBys;
+
+		so->distances[so->nPtrs] = memcpy(palloc(size), distances, size);
+	}
+
 	if (so->want_itup)
 	{
 		/*
@@ -616,14 +861,28 @@ spggettuple(IndexScanDesc scan, ScanDirection dir)
 	{
 		if (so->iPtr < so->nPtrs)
 		{
-			/* continuing to return tuples from a leaf page */
+			/* continuing to return reported tuples */
 			scan->xs_ctup.t_self = so->heapPtrs[so->iPtr];
 			scan->xs_recheck = so->recheck[so->iPtr];
 			scan->xs_itup = so->indexTups[so->iPtr];
+
+			if (so->numberOfOrderBys > 0)
+				index_store_orderby_distances(scan, so->orderByTypes,
+											  so->distances[so->iPtr],
+											  so->recheckDist[so->iPtr]);
 			so->iPtr++;
 			return true;
 		}
 
+		if (so->numberOfOrderBys > 0)
+		{
+			/* Must pfree distances to avoid memory leak */
+			int			i;
+
+			for (i = 0; i < so->nPtrs; i++)
+				pfree(so->distances[i]);
+		}
+
 		if (so->want_itup)
 		{
 			/* Must pfree IndexTuples to avoid memory leak */
diff --git a/src/backend/access/spgist/spgtextproc.c b/src/backend/access/spgist/spgtextproc.c
index 8678854..00eb27f 100644
--- a/src/backend/access/spgist/spgtextproc.c
+++ b/src/backend/access/spgist/spgtextproc.c
@@ -86,6 +86,7 @@ spg_text_config(PG_FUNCTION_ARGS)
 	cfg->labelType = INT2OID;
 	cfg->canReturnData = true;
 	cfg->longValuesOK = true;	/* suffixing will shorten long values */
+	cfg->suppLen = 0; /* we don't need any supplimentary data */
 	PG_RETURN_VOID();
 }
 
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index ca4b0bd..bcefa14 100644
--- a/src/backend/access/spgist/spgutils.c
+++ b/src/backend/access/spgist/spgutils.c
@@ -15,16 +15,21 @@
 
 #include "postgres.h"
 
+#include "access/amvalidate.h"
+#include "access/htup_details.h"
 #include "access/reloptions.h"
 #include "access/spgist_private.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "catalog/pg_amop.h"
 #include "storage/bufmgr.h"
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
+#include "utils/catcache.h"
 #include "utils/index_selfuncs.h"
 #include "utils/lsyscache.h"
+#include "utils/syscache.h"
 
 
 /*
@@ -39,7 +44,7 @@ spghandler(PG_FUNCTION_ARGS)
 	amroutine->amstrategies = 0;
 	amroutine->amsupport = SPGISTNProc;
 	amroutine->amcanorder = false;
-	amroutine->amcanorderbyop = false;
+	amroutine->amcanorderbyop = true;
 	amroutine->amcanbackward = false;
 	amroutine->amcanunique = false;
 	amroutine->amcanmulticol = false;
@@ -59,7 +64,7 @@ spghandler(PG_FUNCTION_ARGS)
 	amroutine->amcanreturn = spgcanreturn;
 	amroutine->amcostestimate = spgcostestimate;
 	amroutine->amoptions = spgoptions;
-	amroutine->amproperty = NULL;
+	amroutine->amproperty = spgproperty;
 	amroutine->amvalidate = spgvalidate;
 	amroutine->ambeginscan = spgbeginscan;
 	amroutine->amrescan = spgrescan;
@@ -907,3 +912,82 @@ SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
 
 	return offnum;
 }
+
+/*
+ *	spgproperty() -- Check boolean properties of indexes.
+ *
+ * This is optional for most AMs, but is required for SP-GiST because the core
+ * property code doesn't support AMPROP_DISTANCE_ORDERABLE.
+ */
+bool
+spgproperty(Oid index_oid, int attno,
+			IndexAMProperty prop, const char *propname,
+			bool *res, bool *isnull)
+{
+	Oid			opclass,
+				opfamily,
+				opcintype;
+	CatCList   *catlist;
+	int			i;
+
+	/* Only answer column-level inquiries */
+	if (attno == 0)
+		return false;
+
+	switch (prop)
+	{
+		case AMPROP_DISTANCE_ORDERABLE:
+			break;
+		default:
+			return false;
+	}
+
+	/*
+	 * Currently, SP-GiST distance-ordered scans require that there be a
+	 * distance operator in the opclass with the default types. So we assume
+	 * that if such a operator exists, then there's a reason for it.
+	 */
+
+	/* First we need to know the column's opclass. */
+	opclass = get_index_column_opclass(index_oid, attno);
+	if (!OidIsValid(opclass))
+	{
+		*isnull = true;
+		return true;
+	}
+
+	/* Now look up the opclass family and input datatype. */
+	if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
+	{
+		*isnull = true;
+		return true;
+	}
+
+	/* And now we can check whether the operator is provided. */
+	catlist = SearchSysCacheList1(AMOPSTRATEGY,
+								  ObjectIdGetDatum(opfamily));
+
+	*res = false;
+
+	for (i = 0; i < catlist->n_members; i++)
+	{
+		HeapTuple	amoptup = &catlist->members[i]->tuple;
+		Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(amoptup);
+
+		if (amopform->amoppurpose == AMOP_ORDER &&
+			(amopform->amoplefttype == opcintype ||
+			 amopform->amoprighttype == opcintype) &&
+			 opfamily_can_sort_type(amopform->amopsortfamily,
+									get_op_rettype(amopform->amopopr)))
+		{
+			*res = true;
+			break;
+		}
+	}
+
+	ReleaseSysCacheList(catlist);
+
+	*isnull = false;
+
+	return true;
+}
diff --git a/src/backend/access/spgist/spgvalidate.c b/src/backend/access/spgist/spgvalidate.c
index 1bc5bce..0a3eeb6 100644
--- a/src/backend/access/spgist/spgvalidate.c
+++ b/src/backend/access/spgist/spgvalidate.c
@@ -22,6 +22,7 @@
 #include "catalog/pg_opfamily.h"
 #include "catalog/pg_type.h"
 #include "utils/builtins.h"
+#include "utils/lsyscache.h"
 #include "utils/regproc.h"
 #include "utils/syscache.h"
 
@@ -138,6 +139,7 @@ spgvalidate(Oid opclassoid)
 	{
 		HeapTuple	oprtup = &oprlist->members[i]->tuple;
 		Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
+		Oid			op_rettype;
 
 		/* TODO: Check that only allowed strategy numbers exist */
 		if (oprform->amopstrategy < 1 || oprform->amopstrategy > 63)
@@ -151,20 +153,26 @@ spgvalidate(Oid opclassoid)
 			result = false;
 		}
 
-		/* spgist doesn't support ORDER BY operators */
-		if (oprform->amoppurpose != AMOP_SEARCH ||
-			OidIsValid(oprform->amopsortfamily))
+		/* spgist supports ORDER BY operators */
+		if (oprform->amoppurpose != AMOP_SEARCH)
 		{
-			ereport(INFO,
-					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
-					 errmsg("spgist operator family \"%s\" contains invalid ORDER BY specification for operator %s",
-							opfamilyname,
-							format_operator(oprform->amopopr))));
-			result = false;
+			/* ... and operator result must match the claimed btree opfamily */
+			op_rettype = get_op_rettype(oprform->amopopr);
+			if (!opfamily_can_sort_type(oprform->amopsortfamily, op_rettype))
+			{
+				ereport(INFO,
+						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+						 errmsg("spgist operator family %s contains incorrect ORDER BY opfamily specification for operator %s",
+								opfamilyname,
+								format_operator(oprform->amopopr))));
+				result = false;
+			}
 		}
+		else
+			op_rettype = BOOLOID;
 
 		/* Check operator signature --- same for all spgist strategies */
-		if (!check_amop_signature(oprform->amopopr, BOOLOID,
+		if (!check_amop_signature(oprform->amopopr, op_rettype,
 								  oprform->amoplefttype,
 								  oprform->amoprighttype))
 		{
diff --git a/src/include/access/spgist.h b/src/include/access/spgist.h
index aaf78bc..23ed9bb 100644
--- a/src/include/access/spgist.h
+++ b/src/include/access/spgist.h
@@ -133,7 +133,9 @@ typedef struct spgPickSplitOut
 typedef struct spgInnerConsistentIn
 {
 	ScanKey		scankeys;		/* array of operators and comparison values */
+	ScanKey		orderbyKeys;
 	int			nkeys;			/* length of array */
+	int			norderbys;
 
 	Datum		reconstructedValue;		/* value reconstructed at parent */
 	void	   *traversalValue; /* opclass-specific traverse value */
@@ -157,6 +159,7 @@ typedef struct spgInnerConsistentOut
 	int		   *levelAdds;		/* increment level by this much for each */
 	Datum	   *reconstructedValues;	/* associated reconstructed values */
 	void	  **traversalValues;	/* opclass-specific traverse values */
+	double	  **distances;		/* associated distances */
 } spgInnerConsistentOut;
 
 /*
@@ -165,7 +168,10 @@ typedef struct spgInnerConsistentOut
 typedef struct spgLeafConsistentIn
 {
 	ScanKey		scankeys;		/* array of operators and comparison values */
+	ScanKey		orderbykeys;	/* array of ordering operators and comparison
+								 * values */
 	int			nkeys;			/* length of array */
+	int			norderbys;
 
 	Datum		reconstructedValue;		/* value reconstructed at parent */
 	void	   *traversalValue; /* opclass-specific traverse value */
@@ -179,6 +185,8 @@ typedef struct spgLeafConsistentOut
 {
 	Datum		leafValue;		/* reconstructed original data, if any */
 	bool		recheck;		/* set true if operator must be rechecked */
+	bool		recheckDistances; /* set true if distances must be rechecked */
+	double	   *distances;		/* associated distances */
 } spgLeafConsistentOut;
 
 
diff --git a/src/include/access/spgist_private.h b/src/include/access/spgist_private.h
index b2979a9..14d0700 100644
--- a/src/include/access/spgist_private.h
+++ b/src/include/access/spgist_private.h
@@ -16,8 +16,10 @@
 
 #include "access/itup.h"
 #include "access/spgist.h"
+#include "lib/rbtree.h"
 #include "nodes/tidbitmap.h"
 #include "storage/buf.h"
+#include "storage/relfilenode.h"
 #include "utils/relcache.h"
 
 
@@ -129,12 +131,34 @@ typedef struct SpGistState
 	bool		isBuild;		/* true if doing index build */
 } SpGistState;
 
+typedef struct SpGistSearchItem
+{
+	pairingheap_node phNode;	/* pairing heap node */
+	Datum		value;			/* value reconstructed from parent or
+								 * leafValue if heaptuple */
+	void	   *traversalValue; /* opclass-specific traverse value */
+	int			level;			/* level of items on this page */
+	ItemPointerData heap;		/* heap info, if heap tuple */
+	bool		isLeaf;			/* SearchItem is heap item */
+	bool		recheckQual;	/* qual recheck is needed */
+	bool		recheckDist;	/* distance recheck is needed */
+	bool		isnull;
+
+	/* array with numberOfOrderBys entries */
+	double		distances[FLEXIBLE_ARRAY_MEMBER];
+} SpGistSearchItem;
+
+#define SizeOfSpGistSearchItem(n_distances) \
+	(offsetof(SpGistSearchItem, distances) + sizeof(double) * (n_distances))
+
 /*
  * Private state of an index scan
  */
 typedef struct SpGistScanOpaqueData
 {
 	SpGistState state;			/* see above */
+	pairingheap *queue;			/* queue of unvisited items */
+	MemoryContext queueCxt;		/* context holding the queue */
 	MemoryContext tempCxt;		/* short-lived memory context */
 
 	/* Control flags showing whether to search nulls and/or non-nulls */
@@ -144,9 +168,17 @@ typedef struct SpGistScanOpaqueData
 	/* Index quals to be passed to opclass (null-related quals removed) */
 	int			numberOfKeys;	/* number of index qualifier conditions */
 	ScanKey		keyData;		/* array of index qualifier descriptors */
+	int			numberOfOrderBys;
+	ScanKey		orderByData;
+	Oid		   *orderByTypes;
+
+	FmgrInfo	innerConsistentFn;
+	FmgrInfo	leafConsistentFn;
+	Oid			indexCollation;
 
-	/* Stack of yet-to-be-visited pages */
-	List	   *scanStack;		/* List of ScanStackEntrys */
+	/* Pre-allocated workspace arrays: */
+	double	   *zeroDistances;
+	double	   *infDistances;
 
 	/* These fields are only used in amgetbitmap scans: */
 	TIDBitmap  *tbm;			/* bitmap being filled */
@@ -159,7 +191,9 @@ typedef struct SpGistScanOpaqueData
 	int			iPtr;			/* index for scanning through same */
 	ItemPointerData heapPtrs[MaxIndexTuplesPerPage];	/* TIDs from cur page */
 	bool		recheck[MaxIndexTuplesPerPage]; /* their recheck flags */
+	bool		recheckDist[MaxIndexTuplesPerPage]; /* distance recheck flags */
 	IndexTuple	indexTups[MaxIndexTuplesPerPage];		/* reconstructed tuples */
+	double	   *distances[MaxIndexTuplesPerPage];	/* distances (for recheck) */
 
 	/*
 	 * Note: using MaxIndexTuplesPerPage above is a bit hokey since
@@ -637,6 +671,9 @@ extern OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page,
 					 Item item, Size size,
 					 OffsetNumber *startOffset,
 					 bool errorOK);
+extern bool spgproperty(Oid index_oid, int attno,
+			IndexAMProperty prop, const char *propname,
+			bool *res, bool *isnull);
 
 /* spgdoinsert.c */
 extern void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
diff --git a/doc/src/sgml/spgist.sgml b/doc/src/sgml/spgist.sgml
index 53ca8bf..b0d1cb3 100644
--- a/doc/src/sgml/spgist.sgml
+++ b/doc/src/sgml/spgist.sgml
@@ -70,6 +70,7 @@
       <entry>Name</entry>
       <entry>Indexed Data Type</entry>
       <entry>Indexable Operators</entry>
+      <entry>Ordering Operators</entry>
      </row>
     </thead>
     <tbody>
@@ -84,6 +85,9 @@
        <literal>&gt;^</>
        <literal>~=</>
       </entry>
+      <entry>
+       <literal>&lt;-&gt;</>
+      </entry>
      </row>
      <row>
       <entry><literal>quad_point_ops</></entry>
@@ -96,6 +100,9 @@
        <literal>&gt;^</>
        <literal>~=</>
       </entry>
+      <entry>
+       <literal>&lt;-&gt;</>
+      </entry>
      </row>
      <row>
       <entry><literal>range_ops</></entry>
@@ -111,6 +118,8 @@
        <literal>&gt;&gt;</>
        <literal>@&gt;</>
       </entry>
+      <entry>
+      </entry>
      </row>
      <row>
       <entry><literal>box_ops</></entry>
@@ -129,6 +138,8 @@
        <literal>|&gt;&gt;</literal>
        <literal>|&amp;&gt;</>
       </entry>
+      <entry>
+      </entry>
      </row>
      <row>
       <entry><literal>text_ops</></entry>
@@ -144,6 +155,8 @@
        <literal>~&gt;=~</>
        <literal>~&gt;~</>
       </entry>
+      <entry>
+      </entry>      
      </row>
      <row>
       <entry><literal>inet_ops</></entry>
@@ -161,6 +174,8 @@
        <literal>&lt;=</>
        <literal>=</>
       </entry>
+      <entry>
+      </entry>
      </row>
     </tbody>
    </tgroup>
@@ -172,6 +187,10 @@
   supports the same operators but uses a different index data structure which
   may offer better performance in some applications.
  </para>
+ <para>
+  By supporting the ordering &lt;-&gt; operator the quad_point_ops and kd_point_ops provide 
+  a user with the ability to perform a K-nearest-neighbour search over the indexed point dataset.
+ </para>
 
 </sect1>
 
diff --git a/src/backend/access/spgist/spgkdtreeproc.c b/src/backend/access/spgist/spgkdtreeproc.c
index 9a2649b..e074c3e 100644
--- a/src/backend/access/spgist/spgkdtreeproc.c
+++ b/src/backend/access/spgist/spgkdtreeproc.c
@@ -17,6 +17,7 @@
 
 #include "access/spgist.h"
 #include "access/stratnum.h"
+#include "access/spgist_private.h"
 #include "catalog/pg_type.h"
 #include "utils/builtins.h"
 #include "utils/geo_decls.h"
@@ -162,6 +163,7 @@ spg_kd_inner_consistent(PG_FUNCTION_ARGS)
 	double		coord;
 	int			which;
 	int			i;
+	BOX			boxes[2];
 
 	Assert(in->hasPrefix);
 	coord = DatumGetFloat8(in->prefixDatum);
@@ -248,12 +250,75 @@ spg_kd_inner_consistent(PG_FUNCTION_ARGS)
 	}
 
 	/* We must descend into the children identified by which */
-	out->nodeNumbers = (int *) palloc(sizeof(int) * 2);
 	out->nNodes = 0;
+
+	if (!which)
+		PG_RETURN_VOID();
+
+	out->nodeNumbers = (int *) palloc(sizeof(int) * 2);
+
+	if (in->norderbys > 0)
+	{
+		BOX			infArea;
+		BOX		   *area;
+
+		out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
+		out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
+
+		if (in->level == 0)
+		{
+			float8		inf = get_float8_infinity();
+
+			area = box_fill(&infArea, -inf, inf, -inf, inf);
+		}
+		else
+		{
+			area = (BOX *) in->traversalValue;
+			Assert(area);
+		}
+
+		boxes[0].low = area->low;
+		boxes[1].high = area->high;
+
+		if (in->level % 2)
+		{
+			/* split box by x */
+			boxes[0].high.x = boxes[1].low.x = coord;
+			boxes[0].high.y = area->high.y;
+			boxes[1].low.y = area->low.y;
+		}
+		else
+		{
+			/* split box by y */
+			boxes[0].high.y = boxes[1].low.y = coord;
+			boxes[0].high.x = area->high.x;
+			boxes[1].low.x = area->low.x;
+		}
+	}
+
 	for (i = 1; i <= 2; i++)
 	{
 		if (which & (1 << i))
-			out->nodeNumbers[out->nNodes++] = i - 1;
+		{
+			out->nodeNumbers[out->nNodes] = i - 1;
+
+			if (in->norderbys > 0)
+			{
+				MemoryContext oldCtx = MemoryContextSwitchTo(
+													in->traversalMemoryContext);
+				BOX		   *box = box_copy(&boxes[i - 1]);
+
+				MemoryContextSwitchTo(oldCtx);
+
+				out->traversalValues[out->nNodes] = box;
+
+				spg_point_distance(BoxPGetDatum(box),
+								   in->norderbys, in->orderbyKeys,
+								   &out->distances[out->nNodes], false);
+			}
+
+			out->nNodes++;
+		}
 	}
 
 	/* Set up level increments, too */
diff --git a/src/backend/access/spgist/spgproc.c b/src/backend/access/spgist/spgproc.c
new file mode 100644
index 0000000..ac386a4
--- /dev/null
+++ b/src/backend/access/spgist/spgproc.c
@@ -0,0 +1,67 @@
+/*-------------------------------------------------------------------------
+ *
+ * spgproc.c
+ *	  Common procedures for SP-GiST.
+ *
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *			src/backend/access/spgist/spgproc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/spgist_private.h"
+#include "utils/geo_decls.h"
+
+/* Point-box distance in the assumption that box is aligned by axis */
+static double
+point_box_distance(Point *point, BOX *box)
+{
+	double		dx,
+				dy;
+
+	if (isnan(point->x) || isnan(box->low.x) ||
+		isnan(point->y) || isnan(box->low.y))
+		return get_float8_nan();
+
+	if (point->x < box->low.x)
+		dx = box->low.x - point->x;
+	else if (point->x > box->high.x)
+		dx = point->x - box->high.x;
+	else
+		dx = 0.0;
+
+	if (point->y < box->low.y)
+		dy = box->low.y - point->y;
+	else if (point->y > box->high.y)
+		dy = point->y - box->high.y;
+	else
+		dy = 0.0;
+
+	return HYPOT(dx, dy);
+}
+
+void
+spg_point_distance(Datum to, int norderbys, ScanKey orderbyKeys,
+				   double **distances, bool isLeaf)
+{
+	double	   *distance;
+	int			sk_num;
+
+	distance = *distances = (double *) palloc(norderbys * sizeof(double));
+
+	for (sk_num = 0; sk_num < norderbys; ++sk_num, ++orderbyKeys, ++distance)
+	{
+		Point	   *point = DatumGetPointP(orderbyKeys->sk_argument);
+
+		*distance = isLeaf ? point_dt(point, DatumGetPointP(to))
+						   : point_box_distance(point, DatumGetBoxP(to));
+	}
+}
diff --git a/src/backend/access/spgist/spgquadtreeproc.c b/src/backend/access/spgist/spgquadtreeproc.c
index 6ad73f4..ede9ec5 100644
--- a/src/backend/access/spgist/spgquadtreeproc.c
+++ b/src/backend/access/spgist/spgquadtreeproc.c
@@ -17,6 +17,7 @@
 
 #include "access/spgist.h"
 #include "access/stratnum.h"
+#include "access/spgist_private.h"
 #include "catalog/pg_type.h"
 #include "utils/builtins.h"
 #include "utils/geo_decls.h"
@@ -77,6 +78,32 @@ getQuadrant(Point *centroid, Point *tst)
 	return 0;
 }
 
+/* Returns bounding box of a given quadrant */
+static BOX *
+getQuadrantArea(BOX *area, Point *centroid, int quadrant)
+{
+	BOX		   *box = (BOX *) palloc(sizeof(BOX));
+
+	switch (quadrant)
+	{
+		case 1:
+			box->high = area->high;
+			box->low = *centroid;
+			break;
+		case 2:
+			box_fill(box, centroid->x, area->high.x, area->low.y, centroid->y);
+			break;
+		case 3:
+			box->high = *centroid;
+			box->low = area->low;
+			break;
+		case 4:
+			box_fill(box, area->low.x, centroid->x, centroid->y, area->high.y);
+			break;
+	}
+
+	return box;
+}
 
 Datum
 spg_quad_choose(PG_FUNCTION_ARGS)
@@ -196,19 +223,56 @@ spg_quad_inner_consistent(PG_FUNCTION_ARGS)
 	spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
 	spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
 	Point	   *centroid;
+	BOX			infArea;
+	BOX		   *area = NULL;
 	int			which;
 	int			i;
 
 	Assert(in->hasPrefix);
 	centroid = DatumGetPointP(in->prefixDatum);
 
+	if (in->norderbys > 0)
+	{
+		out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
+		out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
+
+		if (in->level == 0)
+		{
+			double		inf = get_float8_infinity();
+
+			area = box_fill(&infArea, -inf, inf, -inf, inf);
+		}
+		else
+		{
+			area = in->traversalValue;
+			Assert(area);
+		}
+	}
+
 	if (in->allTheSame)
 	{
 		/* Report that all nodes should be visited */
 		out->nNodes = in->nNodes;
 		out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
 		for (i = 0; i < in->nNodes; i++)
+		{
 			out->nodeNumbers[i] = i;
+
+			if (in->norderbys > 0)
+			{
+				MemoryContext oldCtx = MemoryContextSwitchTo(
+													in->traversalMemoryContext);
+				/* Use parent quadrant box as traversalValue */
+				BOX		   *quadrant = box_copy(area);
+
+				MemoryContextSwitchTo(oldCtx);
+
+				out->traversalValues[i] = quadrant;
+				spg_point_distance(BoxPGetDatum(quadrant),
+								   in->norderbys, in->orderbyKeys,
+								   &out->distances[i], false);
+			}
+		}
 		PG_RETURN_VOID();
 	}
 
@@ -253,8 +317,8 @@ spg_quad_inner_consistent(PG_FUNCTION_ARGS)
 				boxQuery = DatumGetBoxP(in->scankeys[i].sk_argument);
 
 				if (DatumGetBool(DirectFunctionCall2(box_contain_pt,
-												   PointerGetDatum(boxQuery),
-												 PointerGetDatum(centroid))))
+													 PointerGetDatum(boxQuery),
+													PointerGetDatum(centroid))))
 				{
 					/* centroid is in box, so all quadrants are OK */
 				}
@@ -286,13 +350,37 @@ spg_quad_inner_consistent(PG_FUNCTION_ARGS)
 			break;				/* no need to consider remaining conditions */
 	}
 
+	out->levelAdds = palloc(sizeof(int) * 4);
+	for (i = 0; i < 4; ++i)
+		out->levelAdds[i] = 1;
+
 	/* We must descend into the quadrant(s) identified by which */
 	out->nodeNumbers = (int *) palloc(sizeof(int) * 4);
 	out->nNodes = 0;
+
 	for (i = 1; i <= 4; i++)
 	{
 		if (which & (1 << i))
-			out->nodeNumbers[out->nNodes++] = i - 1;
+		{
+			out->nodeNumbers[out->nNodes] = i - 1;
+
+			if (in->norderbys > 0)
+			{
+				MemoryContext oldCtx = MemoryContextSwitchTo(
+													in->traversalMemoryContext);
+				BOX		   *quadrant = getQuadrantArea(area, centroid, i);
+
+				MemoryContextSwitchTo(oldCtx);
+
+				out->traversalValues[out->nNodes] = quadrant;
+
+				spg_point_distance(BoxPGetDatum(quadrant),
+								   in->norderbys, in->orderbyKeys,
+								   &out->distances[out->nNodes], false);
+			}
+
+			out->nNodes++;
+		}
 	}
 
 	PG_RETURN_VOID();
@@ -356,5 +444,10 @@ spg_quad_leaf_consistent(PG_FUNCTION_ARGS)
 			break;
 	}
 
+	if (res && in->norderbys > 0)
+		/* ok, it passes -> let's compute the distances */
+		spg_point_distance(in->leafDatum,
+					  in->norderbys, in->orderbykeys, &out->distances, true);
+
 	PG_RETURN_BOOL(res);
 }
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c
index f585b62..291159a 100644
--- a/src/backend/access/spgist/spgscan.c
+++ b/src/backend/access/spgist/spgscan.c
@@ -43,11 +43,23 @@ pairingheap_SpGistSearchItem_cmp(const pairingheap_node *a,
 	IndexScanDesc scan = (IndexScanDesc) arg;
 	int			i;
 
-	/* Order according to distance comparison */
-	for (i = 0; i < scan->numberOfOrderBys; i++)
+	if (sa->isnull)
 	{
-		if (sa->distances[i] != sb->distances[i])
-			return (sa->distances[i] < sb->distances[i]) ? 1 : -1;
+		if (!sb->isnull)
+			return -1;
+	}
+	else if (sb->isnull)
+	{
+		return 1;
+	}
+	else
+	{
+		/* Order according to distance comparison */
+		for (i = 0; i < scan->numberOfOrderBys; i++)
+		{
+			if (sa->distances[i] != sb->distances[i])
+				return (sa->distances[i] < sb->distances[i]) ? 1 : -1;
+		}
 	}
 
 	/* Leaf items go before inner pages, to ensure a depth-first search */
@@ -81,7 +93,10 @@ static void
 spgAddSearchItemToQueue(SpGistScanOpaque so, SpGistSearchItem *item,
 						double *distances)
 {
-	memcpy(item->distances, distances, so->numberOfOrderBys * sizeof(double));
+	if (!item->isnull)
+		memcpy(item->distances, distances,
+			   so->numberOfOrderBys * sizeof(double));
+
 	pairingheap_add(so->queue, &item->phNode);
 }
 
@@ -126,7 +141,8 @@ resetSpGistScanOpaque(SpGistScanOpaque so)
 		int			i;
 
 		for (i = 0; i < so->nPtrs; i++)
-			pfree(so->distances[i]);
+			if (so->distances[i])
+				pfree(so->distances[i]);
 	}
 
 	if (so->want_itup)
@@ -828,9 +844,14 @@ storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
 
 	if (so->numberOfOrderBys > 0)
 	{
-		Size		size = sizeof(double) * so->numberOfOrderBys;
+		if (isnull)
+			so->distances[so->nPtrs] = NULL;
+		else
+		{
+			Size		size = sizeof(double) * so->numberOfOrderBys;
 
-		so->distances[so->nPtrs] = memcpy(palloc(size), distances, size);
+			so->distances[so->nPtrs] = memcpy(palloc(size), distances, size);
+		}
 	}
 
 	if (so->want_itup)
@@ -880,7 +901,8 @@ spggettuple(IndexScanDesc scan, ScanDirection dir)
 			int			i;
 
 			for (i = 0; i < so->nPtrs; i++)
-				pfree(so->distances[i]);
+				if (so->distances[i])
+					pfree(so->distances[i]);
 		}
 
 		if (so->want_itup)
diff --git a/src/backend/access/spgist/spgtextproc.c b/src/backend/access/spgist/spgtextproc.c
index 00eb27f..8678854 100644
--- a/src/backend/access/spgist/spgtextproc.c
+++ b/src/backend/access/spgist/spgtextproc.c
@@ -86,7 +86,6 @@ spg_text_config(PG_FUNCTION_ARGS)
 	cfg->labelType = INT2OID;
 	cfg->canReturnData = true;
 	cfg->longValuesOK = true;	/* suffixing will shorten long values */
-	cfg->suppLen = 0; /* we don't need any supplimentary data */
 	PG_RETURN_VOID();
 }
 
diff --git a/src/include/access/spgist_private.h b/src/include/access/spgist_private.h
index 14d0700..e1b6abf 100644
--- a/src/include/access/spgist_private.h
+++ b/src/include/access/spgist_private.h
@@ -685,4 +685,8 @@ extern void spgPageIndexMultiDelete(SpGistState *state, Page page,
 extern bool spgdoinsert(Relation index, SpGistState *state,
 			ItemPointer heapPtr, Datum datum, bool isnull);
 
+/* spgproc.c */
+extern void spg_point_distance(Datum to, int norderbys,
+				   ScanKey orderbyKeys, double **distances, bool isLeaf);
+
 #endif   /* SPGIST_PRIVATE_H */
diff --git a/src/include/catalog/pg_amop.h b/src/include/catalog/pg_amop.h
index 0251664..066cb46 100644
--- a/src/include/catalog/pg_amop.h
+++ b/src/include/catalog/pg_amop.h
@@ -764,6 +764,7 @@ DATA(insert (	4015   600 600 5 s	508 4000 0 ));
 DATA(insert (	4015   600 600 10 s 509 4000 0 ));
 DATA(insert (	4015   600 600 6 s	510 4000 0 ));
 DATA(insert (	4015   600 603 8 s	511 4000 0 ));
+DATA(insert (	4015   600 600 15 o 517 4000 1970 ));
 
 /*
  * SP-GiST kd_point_ops
@@ -774,6 +775,7 @@ DATA(insert (	4016   600 600 5 s	508 4000 0 ));
 DATA(insert (	4016   600 600 10 s 509 4000 0 ));
 DATA(insert (	4016   600 600 6 s	510 4000 0 ));
 DATA(insert (	4016   600 603 8 s	511 4000 0 ));
+DATA(insert (	4016   600 600 15 o 517 4000 1970 ));
 
 /*
  * SP-GiST text_ops
diff --git a/src/test/regress/expected/amutils.out b/src/test/regress/expected/amutils.out
index 74f7c9f..1464021 100644
--- a/src/test/regress/expected/amutils.out
+++ b/src/test/regress/expected/amutils.out
@@ -81,7 +81,8 @@ select prop,
        pg_index_column_has_property('onek_hundred'::regclass, 1, prop) as btree,
        pg_index_column_has_property('hash_i4_index'::regclass, 1, prop) as hash,
        pg_index_column_has_property('gcircleind'::regclass, 1, prop) as gist,
-       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist,
+       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist_radix,
+       pg_index_column_has_property('sp_quad_ind'::regclass, 1, prop) as spgist_quad,
        pg_index_column_has_property('botharrayidx'::regclass, 1, prop) as gin,
        pg_index_column_has_property('brinidx'::regclass, 1, prop) as brin
   from unnest(array['asc', 'desc', 'nulls_first', 'nulls_last',
@@ -90,18 +91,18 @@ select prop,
                     'bogus']::text[])
          with ordinality as u(prop,ord)
  order by ord;
-        prop        | btree | hash | gist | spgist | gin | brin 
---------------------+-------+------+------+--------+-----+------
- asc                | t     | f    | f    | f      | f   | f
- desc               | f     | f    | f    | f      | f   | f
- nulls_first        | f     | f    | f    | f      | f   | f
- nulls_last         | t     | f    | f    | f      | f   | f
- orderable          | t     | f    | f    | f      | f   | f
- distance_orderable | f     | f    | t    | f      | f   | f
- returnable         | t     | f    | f    | t      | f   | f
- search_array       | t     | f    | f    | f      | f   | f
- search_nulls       | t     | f    | t    | t      | f   | t
- bogus              |       |      |      |        |     | 
+        prop        | btree | hash | gist | spgist_radix | spgist_quad | gin | brin 
+--------------------+-------+------+------+--------------+-------------+-----+------
+ asc                | t     | f    | f    | f            | f           | f   | f
+ desc               | f     | f    | f    | f            | f           | f   | f
+ nulls_first        | f     | f    | f    | f            | f           | f   | f
+ nulls_last         | t     | f    | f    | f            | f           | f   | f
+ orderable          | t     | f    | f    | f            | f           | f   | f
+ distance_orderable | f     | f    | t    | f            | t           | f   | f
+ returnable         | t     | f    | f    | t            | t           | f   | f
+ search_array       | t     | f    | f    | f            | f           | f   | f
+ search_nulls       | t     | f    | t    | t            | t           | f   | t
+ bogus              |       |      |      |              |             |     | 
 (10 rows)
 
 select prop,
diff --git a/src/test/regress/expected/create_index.out b/src/test/regress/expected/create_index.out
index e519fdb..ab6e244 100644
--- a/src/test/regress/expected/create_index.out
+++ b/src/test/regress/expected/create_index.out
@@ -294,6 +294,15 @@ SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
      1
 (1 row)
 
+CREATE TEMP TABLE quad_point_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+CREATE TEMP TABLE quad_point_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+CREATE TEMP TABLE quad_point_tbl_ord_seq3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
  count 
 -------
@@ -883,6 +892,74 @@ SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
 (1 row)
 
 EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_quad_ind on quad_point_tbl
+         Order By: (p <-> '(0,0)'::point)
+(3 rows)
+
+CREATE TEMP TABLE quad_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN quad_point_tbl_ord_idx1 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+   n   | dist | p |   n   | dist | p 
+-------+------+---+-------+------+---
+ 11001 |      |   |       |      | 
+ 11001 |      |   |       |      | 
+ 11001 |      |   |       |      | 
+       |      |   | 11001 |      | 
+       |      |   | 11001 |      | 
+       |      |   | 11001 |      | 
+(6 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_quad_ind on quad_point_tbl
+         Index Cond: (p <@ '(1000,1000),(200,200)'::box)
+         Order By: (p <-> '(0,0)'::point)
+(4 rows)
+
+CREATE TEMP TABLE quad_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN quad_point_tbl_ord_idx2 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_quad_ind on quad_point_tbl
+         Index Cond: (p IS NOT NULL)
+         Order By: (p <-> '(333,400)'::point)
+(4 rows)
+
+CREATE TEMP TABLE quad_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN quad_point_tbl_ord_idx3 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
 SELECT count(*) FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
                        QUERY PLAN                        
 ---------------------------------------------------------
@@ -988,6 +1065,71 @@ SELECT count(*) FROM kd_point_tbl WHERE p ~= '(4585, 365)';
 (1 row)
 
 EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+                      QUERY PLAN                       
+-------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_kd_ind on kd_point_tbl
+         Order By: (p <-> '(0,0)'::point)
+(3 rows)
+
+CREATE TEMP TABLE kd_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN kd_point_tbl_ord_idx1 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+                       QUERY PLAN                        
+---------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_kd_ind on kd_point_tbl
+         Index Cond: (p <@ '(1000,1000),(200,200)'::box)
+         Order By: (p <-> '(0,0)'::point)
+(4 rows)
+
+CREATE TEMP TABLE kd_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN kd_point_tbl_ord_idx2 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+                      QUERY PLAN                       
+-------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_kd_ind on kd_point_tbl
+         Index Cond: (p IS NOT NULL)
+         Order By: (p <-> '(333,400)'::point)
+(4 rows)
+
+CREATE TEMP TABLE kd_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN kd_point_tbl_ord_idx3 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
                          QUERY PLAN                         
 ------------------------------------------------------------
diff --git a/src/test/regress/expected/opr_sanity.out b/src/test/regress/expected/opr_sanity.out
index 0bcec13..5e98601 100644
--- a/src/test/regress/expected/opr_sanity.out
+++ b/src/test/regress/expected/opr_sanity.out
@@ -1816,6 +1816,7 @@ ORDER BY 1, 2, 3;
        4000 |           12 | <=
        4000 |           12 | |&>
        4000 |           14 | >=
+       4000 |           15 | <->
        4000 |           15 | >
        4000 |           16 | @>
        4000 |           18 | =
@@ -1828,7 +1829,7 @@ ORDER BY 1, 2, 3;
        4000 |           25 | <<=
        4000 |           26 | >>
        4000 |           27 | >>=
-(121 rows)
+(122 rows)
 
 -- Check that all opclass search operators have selectivity estimators.
 -- This is not absolutely required, but it seems a reasonable thing
diff --git a/src/test/regress/sql/amutils.sql b/src/test/regress/sql/amutils.sql
index cec1dcb..fa178e1 100644
--- a/src/test/regress/sql/amutils.sql
+++ b/src/test/regress/sql/amutils.sql
@@ -40,7 +40,8 @@ select prop,
        pg_index_column_has_property('onek_hundred'::regclass, 1, prop) as btree,
        pg_index_column_has_property('hash_i4_index'::regclass, 1, prop) as hash,
        pg_index_column_has_property('gcircleind'::regclass, 1, prop) as gist,
-       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist,
+       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist_radix,
+       pg_index_column_has_property('sp_quad_ind'::regclass, 1, prop) as spgist_quad,
        pg_index_column_has_property('botharrayidx'::regclass, 1, prop) as gin,
        pg_index_column_has_property('brinidx'::regclass, 1, prop) as brin
   from unnest(array['asc', 'desc', 'nulls_first', 'nulls_last',
diff --git a/src/test/regress/sql/create_index.sql b/src/test/regress/sql/create_index.sql
index 1648072..5db3153 100644
--- a/src/test/regress/sql/create_index.sql
+++ b/src/test/regress/sql/create_index.sql
@@ -198,6 +198,18 @@ SELECT count(*) FROM quad_point_tbl WHERE p >^ '(5000, 4000)';
 
 SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
 
+CREATE TEMP TABLE quad_point_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+
+CREATE TEMP TABLE quad_point_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+
+CREATE TEMP TABLE quad_point_tbl_ord_seq3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
 
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcde';
@@ -362,6 +374,36 @@ SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
 SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
 
 EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+CREATE TEMP TABLE quad_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN quad_point_tbl_ord_idx1 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+CREATE TEMP TABLE quad_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN quad_point_tbl_ord_idx2 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+CREATE TEMP TABLE quad_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN quad_point_tbl_ord_idx3 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+
+EXPLAIN (COSTS OFF)
 SELECT count(*) FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
 SELECT count(*) FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
 
@@ -390,6 +432,39 @@ SELECT count(*) FROM kd_point_tbl WHERE p ~= '(4585, 365)';
 SELECT count(*) FROM kd_point_tbl WHERE p ~= '(4585, 365)';
 
 EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+CREATE TEMP TABLE kd_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN kd_point_tbl_ord_idx1 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+CREATE TEMP TABLE kd_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN kd_point_tbl_ord_idx2 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+CREATE TEMP TABLE kd_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN kd_point_tbl_ord_idx3 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+
+EXPLAIN (COSTS OFF)
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
 
diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c
index 748e568..7073146 100644
--- a/src/backend/access/spgist/spgdoinsert.c
+++ b/src/backend/access/spgist/spgdoinsert.c
@@ -1902,8 +1902,28 @@ spgdoinsert(Relation index, SpGistState *state,
 	 * cycles in the loop below.
 	 */
 	if (!isnull)
+	{
 		procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
 
+		/* Compress the data if the corresponding support function exists. */
+		if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
+		{
+			spgCompressIn in;
+			spgCompressIn out;
+			FmgrInfo   *compress = index_getprocinfo(index, 1,
+													 SPGIST_COMPRESS_PROC);
+
+			in.datum = datum;
+
+			FunctionCall2Coll(compress,
+							  index->rd_indcollation[0],
+							  PointerGetDatum(&in),
+							  PointerGetDatum(&out));
+
+			datum = out.datum;
+		}
+	}
+
 	/*
 	 * Since we don't use index_form_tuple in this AM, we have to make sure
 	 * value to be inserted is not toasted; FormIndexDatum doesn't guarantee
diff --git a/src/backend/access/spgist/spgvalidate.c b/src/backend/access/spgist/spgvalidate.c
index 0a3eeb6..02cc908 100644
--- a/src/backend/access/spgist/spgvalidate.c
+++ b/src/backend/access/spgist/spgvalidate.c
@@ -26,6 +26,7 @@
 #include "utils/regproc.h"
 #include "utils/syscache.h"
 
+#define SPGIST_OPTIONAL_PROCS_MASK (((uint64) 1) << SPGIST_COMPRESS_PROC)
 
 /*
  * Validator for an SP-GiST opclass.
@@ -104,6 +105,7 @@ spgvalidate(Oid opclassoid)
 			case SPGIST_CHOOSE_PROC:
 			case SPGIST_PICKSPLIT_PROC:
 			case SPGIST_INNER_CONSISTENT_PROC:
+			case SPGIST_COMPRESS_PROC:
 				ok = check_amproc_signature(procform->amproc, VOIDOID, true,
 											2, 2, INTERNALOID, INTERNALOID);
 				break;
@@ -224,6 +226,8 @@ spgvalidate(Oid opclassoid)
 		{
 			if ((thisgroup->functionset & (((uint64) 1) << i)) != 0)
 				continue;		/* got it */
+			if ((SPGIST_OPTIONAL_PROCS_MASK & (((uint64) 1) << i)) != 0)
+				continue;		/* support function is optional */
 			ereport(INFO,
 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
 					 errmsg("spgist operator family \"%s\" is missing support function %d for type %s",
diff --git a/src/include/access/spgist.h b/src/include/access/spgist.h
index 23ed9bb..2927605 100644
--- a/src/include/access/spgist.h
+++ b/src/include/access/spgist.h
@@ -30,7 +30,8 @@
 #define SPGIST_PICKSPLIT_PROC			3
 #define SPGIST_INNER_CONSISTENT_PROC	4
 #define SPGIST_LEAF_CONSISTENT_PROC		5
-#define SPGISTNProc						5
+#define SPGIST_COMPRESS_PROC			6
+#define SPGISTNProc						6
 
 /*
  * Argument structs for spg_config method
@@ -189,6 +190,19 @@ typedef struct spgLeafConsistentOut
 	double	   *distances;		/* associated distances */
 } spgLeafConsistentOut;
 
+/*
+ * Argument structs for spg_compress method
+ */
+typedef struct spgCompressIn
+{
+	Datum		datum;			/* data to be compressed for storage,
+								 * can be toasted */
+} spgCompressIn;
+
+typedef struct spgCompressOut
+{
+	Datum		datum;			/* compressed data to be stored at leaf */
+} spgCompressOut;
 
 /* spgutils.c */
 extern bytea *spgoptions(Datum reloptions, bool validate);
diff --git a/doc/src/sgml/spgist.sgml b/doc/src/sgml/spgist.sgml
index b0d1cb3..491e417 100644
--- a/doc/src/sgml/spgist.sgml
+++ b/doc/src/sgml/spgist.sgml
@@ -142,6 +142,48 @@
       </entry>
      </row>
      <row>
+      <entry><literal>circle_ops</></entry>
+      <entry><type>circle</></entry>
+      <entry>
+       <literal>&lt;&lt;</>
+       <literal>&amp;&lt;</>
+       <literal>&amp;&amp;</>
+       <literal>&amp;&gt;</>
+       <literal>&gt;&gt;</>
+       <literal>~=</>
+       <literal>@&gt;</>
+       <literal>&lt;@</>
+       <literal>&amp;&lt;|</>
+       <literal>&lt;&lt;|</>
+       <literal>|&gt;&gt;</>
+       <literal>|&amp;&gt;</>
+      </entry>
+      <entry>
+       <literal>&lt;-&gt;</>
+      </entry>
+     </row>
+     <row>
+      <entry><literal>poly_ops</></entry>
+      <entry><type>polygon</></entry>
+      <entry>
+       <literal>&lt;&lt;</>
+       <literal>&amp;&lt;</>
+       <literal>&amp;&amp;</>
+       <literal>&amp;&gt;</>
+       <literal>&gt;&gt;</>
+       <literal>~=</>
+       <literal>@&gt;</>
+       <literal>&lt;@</>
+       <literal>&amp;&lt;|</>
+       <literal>&lt;&lt;|</>
+       <literal>|&gt;&gt;</>
+       <literal>|&amp;&gt;</>
+      </entry>
+      <entry>
+       <literal>&lt;-&gt;</>
+      </entry>
+     </row>
+     <row>
       <entry><literal>text_ops</></entry>
       <entry><type>text</></entry>
       <entry>
diff --git a/src/backend/utils/adt/geo_spgist.c b/src/backend/utils/adt/geo_spgist.c
index 2dc5496..cd94a34 100644
--- a/src/backend/utils/adt/geo_spgist.c
+++ b/src/backend/utils/adt/geo_spgist.c
@@ -74,9 +74,11 @@
 #include "postgres.h"
 
 #include "access/spgist.h"
+#include "access/spgist_private.h"
 #include "access/stratnum.h"
 #include "catalog/pg_type.h"
 #include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/geo_decls.h"
 
 /*
@@ -366,6 +368,31 @@ overAbove4D(RectBox *rect_box, RangeBox *query)
 	return overhigher2D(&rect_box->range_box_y, &query->right);
 }
 
+/* Lower bound for the distance between point and rect_box */
+static double
+pointToRectBoxDistance(Point *point, RectBox *rect_box)
+{
+	double		dx;
+	double		dy;
+
+	if (point->x < rect_box->range_box_x.left.low)
+		dx = rect_box->range_box_x.left.low - point->x;
+	else if (point->x > rect_box->range_box_x.right.high)
+		dx = point->x - rect_box->range_box_x.right.high;
+	else
+		dx = 0;
+
+	if (point->y < rect_box->range_box_y.left.low)
+		dy = rect_box->range_box_y.left.low - point->y;
+	else if (point->y > rect_box->range_box_y.right.high)
+		dy = point->y - rect_box->range_box_y.right.high;
+	else
+		dy = 0;
+
+	return HYPOT(dx, dy);
+}
+
+
 /*
  * SP-GiST config function
  */
@@ -473,6 +500,64 @@ spg_box_quad_picksplit(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+/* get circle bounding box */
+static BOX *
+circle_bbox(CIRCLE *circle)
+{
+	BOX		   *bbox = (BOX *) palloc(sizeof(BOX));
+
+	bbox->high.x = circle->center.x + circle->radius;
+	bbox->low.x = circle->center.x - circle->radius;
+	bbox->high.y = circle->center.y + circle->radius;
+	bbox->low.y = circle->center.y - circle->radius;
+
+	return bbox;
+}
+
+static bool
+is_bounding_box_test_exact(StrategyNumber strategy)
+{
+	switch (strategy)
+	{
+		case RTLeftStrategyNumber:
+		case RTOverLeftStrategyNumber:
+		case RTOverRightStrategyNumber:
+		case RTRightStrategyNumber:
+		case RTOverBelowStrategyNumber:
+		case RTBelowStrategyNumber:
+		case RTAboveStrategyNumber:
+		case RTOverAboveStrategyNumber:
+			return true;
+
+		default:
+			return false;
+	}
+}
+
+static BOX *
+spg_box_quad_get_scankey_bbox(ScanKey sk, bool *recheck)
+{
+	switch (sk->sk_subtype)
+	{
+		case BOXOID:
+			return DatumGetBoxP(sk->sk_argument);
+
+		case CIRCLEOID:
+			if (recheck && !is_bounding_box_test_exact(sk->sk_strategy))
+				*recheck = true;
+			return circle_bbox(DatumGetCircleP(sk->sk_argument));
+
+		case POLYGONOID:
+			if (recheck && !is_bounding_box_test_exact(sk->sk_strategy))
+				*recheck = true;
+			return &DatumGetPolygonP(sk->sk_argument)->boundbox;
+
+		default:
+			elog(ERROR, "unrecognized scankey subtype: %d", sk->sk_subtype);
+			return NULL;
+	}
+}
+
 /*
  * SP-GiST inner consistent function
  */
@@ -488,6 +573,15 @@ spg_box_quad_inner_consistent(PG_FUNCTION_ARGS)
 	RangeBox   *centroid,
 			  **queries;
 
+	/*
+	 * We are saving the traversal value or initialize it an unbounded one, if
+	 * we have just begun to walk the tree.
+	 */
+	if (in->traversalValue)
+		rect_box = in->traversalValue;
+	else
+		rect_box = initRectBox();
+
 	if (in->allTheSame)
 	{
 		/* Report that all nodes should be visited */
@@ -496,31 +590,51 @@ spg_box_quad_inner_consistent(PG_FUNCTION_ARGS)
 		for (i = 0; i < in->nNodes; i++)
 			out->nodeNumbers[i] = i;
 
+		if (in->norderbys > 0 && in->nNodes > 0)
+		{
+			double	   *distances = palloc(sizeof(double) * in->norderbys);
+			int			j;
+
+			for (j = 0; j < in->norderbys; j++)
+			{
+				Point	   *pt = DatumGetPointP(in->orderbyKeys[j].sk_argument);
+
+				distances[j] = pointToRectBoxDistance(pt, rect_box);
+			}
+
+			out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
+			out->distances[0] = distances;
+
+			for (i = 1; i < in->nNodes; i++)
+			{
+				out->distances[i] = palloc(sizeof(double) * in->norderbys);
+				memcpy(out->distances[i], distances,
+					   sizeof(double) * in->norderbys);
+			}
+		}
+
 		PG_RETURN_VOID();
 	}
 
 	/*
-	 * We are saving the traversal value or initialize it an unbounded one, if
-	 * we have just begun to walk the tree.
-	 */
-	if (in->traversalValue)
-		rect_box = in->traversalValue;
-	else
-		rect_box = initRectBox();
-
-	/*
 	 * We are casting the prefix and queries to RangeBoxes for ease of the
 	 * following operations.
 	 */
 	centroid = getRangeBox(DatumGetBoxP(in->prefixDatum));
 	queries = (RangeBox **) palloc(in->nkeys * sizeof(RangeBox *));
 	for (i = 0; i < in->nkeys; i++)
-		queries[i] = getRangeBox(DatumGetBoxP(in->scankeys[i].sk_argument));
+	{
+		BOX		   *box = spg_box_quad_get_scankey_bbox(&in->scankeys[i], NULL);
+
+		queries[i] = getRangeBox(box);
+	}
 
 	/* Allocate enough memory for nodes */
 	out->nNodes = 0;
 	out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
 	out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
+	if (in->norderbys > 0)
+		out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
 
 	/*
 	 * We switch memory context, because we want to allocate memory for new
@@ -598,6 +712,22 @@ spg_box_quad_inner_consistent(PG_FUNCTION_ARGS)
 		{
 			out->traversalValues[out->nNodes] = next_rect_box;
 			out->nodeNumbers[out->nNodes] = quadrant;
+
+			if (in->norderbys > 0)
+			{
+				double	   *distances = palloc(sizeof(double) * in->norderbys);
+				int			j;
+
+				out->distances[out->nNodes] = distances;
+
+				for (j = 0; j < in->norderbys; j++)
+				{
+					Point *pt = DatumGetPointP(in->orderbyKeys[j].sk_argument);
+
+					distances[j] = pointToRectBoxDistance(pt, next_rect_box);
+				}
+			}
+
 			out->nNodes++;
 		}
 		else
@@ -638,7 +768,9 @@ spg_box_quad_leaf_consistent(PG_FUNCTION_ARGS)
 	for (i = 0; i < in->nkeys; i++)
 	{
 		StrategyNumber strategy = in->scankeys[i].sk_strategy;
-		Datum		query = in->scankeys[i].sk_argument;
+		BOX		   *box = 	spg_box_quad_get_scankey_bbox(&in->scankeys[i],
+														  &out->recheck);
+		Datum		query = BoxPGetDatum(box);
 
 		switch (strategy)
 		{
@@ -711,5 +843,63 @@ spg_box_quad_leaf_consistent(PG_FUNCTION_ARGS)
 			break;
 	}
 
+	if (flag && in->norderbys > 0)
+	{
+		Oid			distfnoid = in->orderbykeys[0].sk_func.fn_oid;
+
+		spg_point_distance(leaf, in->norderbys, in->orderbykeys,
+						   &out->distances, false);
+
+		/* Recheck is necessary when computing distance to polygon or circle */
+		out->recheckDistances = distfnoid == F_DIST_CPOINT ||
+								distfnoid == F_DIST_POLYP;
+	}
+
 	PG_RETURN_BOOL(flag);
 }
+
+
+/*
+ * SP-GiST config function for 2-D types that are lossy represented by their
+ * bounding boxes
+ */
+Datum
+spg_bbox_quad_config(PG_FUNCTION_ARGS)
+{
+	spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1);
+
+	cfg->prefixType = BOXOID;	/* A type represented by its bounding box */
+	cfg->labelType = VOIDOID;	/* We don't need node labels. */
+	cfg->canReturnData = false;
+	cfg->longValuesOK = false;
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * SP-GiST compress function for circles
+ */
+Datum
+spg_circle_quad_compress(PG_FUNCTION_ARGS)
+{
+	spgCompressIn *in = (spgCompressIn *) PG_GETARG_POINTER(0);
+	spgCompressOut *out = (spgCompressOut *) PG_GETARG_POINTER(1);
+
+	out->datum = BoxPGetDatum(circle_bbox(DatumGetCircleP(in->datum)));
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * SP-GiST compress function for polygons
+ */
+Datum
+spg_poly_quad_compress(PG_FUNCTION_ARGS)
+{
+	spgCompressIn *in = (spgCompressIn *) PG_GETARG_POINTER(0);
+	spgCompressOut *out = (spgCompressOut *) PG_GETARG_POINTER(1);
+
+	out->datum = BoxPGetDatum(box_copy(&DatumGetPolygonP(in->datum)->boundbox));
+
+	PG_RETURN_VOID();
+}
diff --git a/src/include/catalog/pg_amop.h b/src/include/catalog/pg_amop.h
index 066cb46..722e5b4 100644
--- a/src/include/catalog/pg_amop.h
+++ b/src/include/catalog/pg_amop.h
@@ -848,6 +848,40 @@ DATA(insert (	5000	603  603 11 s	2573	4000 0 ));
 DATA(insert (	5000	603  603 12 s	2572	4000 0 ));
 
 /*
+ * SP-GiST circle_ops
+ */
+DATA(insert (	5007	718  718  1 s	1506	4000 0 ));
+DATA(insert (	5007	718  718  2 s	1507	4000 0 ));
+DATA(insert (	5007	718  718  3 s	1513	4000 0 ));
+DATA(insert (	5007	718  718  4 s	1508	4000 0 ));
+DATA(insert (	5007	718  718  5 s	1509	4000 0 ));
+DATA(insert (	5007	718  718  6 s	1512	4000 0 ));
+DATA(insert (	5007	718  718  7 s	1511	4000 0 ));
+DATA(insert (	5007	718  718  8 s	1510	4000 0 ));
+DATA(insert (	5007	718  718  9 s	2589	4000 0 ));
+DATA(insert (	5007	718  718 10 s	1515	4000 0 ));
+DATA(insert (	5007	718  718 11 s	1514	4000 0 ));
+DATA(insert (	5007	718  718 12 s	2590	4000 0 ));
+DATA(insert (	5007	718  600 15 o	3291	4000 1970 ));
+
+/*
+ * SP-GiST poly_ops (supports polygons)
+ */
+DATA(insert (	5008   604	604  1 s	 485	4000 0 ));
+DATA(insert (	5008   604	604  2 s	 486	4000 0 ));
+DATA(insert (	5008   604	604  3 s	 492	4000 0 ));
+DATA(insert (	5008   604	604  4 s	 487	4000 0 ));
+DATA(insert (	5008   604	604  5 s	 488	4000 0 ));
+DATA(insert (	5008   604	604  6 s	 491	4000 0 ));
+DATA(insert (	5008   604	604  7 s	 490	4000 0 ));
+DATA(insert (	5008   604	604  8 s	 489	4000 0 ));
+DATA(insert (	5008   604	604  9 s	2575	4000 0 ));
+DATA(insert (	5008   604	604 10 s	2574	4000 0 ));
+DATA(insert (	5008   604	604 11 s	2577	4000 0 ));
+DATA(insert (	5008   604	604 12 s	2576	4000 0 ));
+DATA(insert (	5008   604	600 15 o	3289	4000 1970 ));
+
+/*
  * GiST inet_ops
  */
 DATA(insert (	3550	869 869 3 s		3552 783 0 ));
diff --git a/src/include/catalog/pg_amproc.h b/src/include/catalog/pg_amproc.h
index f1a52ce..2de94f3 100644
--- a/src/include/catalog/pg_amproc.h
+++ b/src/include/catalog/pg_amproc.h
@@ -306,6 +306,18 @@ DATA(insert (	5000   603 603 2 5013 ));
 DATA(insert (	5000   603 603 3 5014 ));
 DATA(insert (	5000   603 603 4 5015 ));
 DATA(insert (	5000   603 603 5 5016 ));
+DATA(insert (	5007   718 718 1 5009 ));
+DATA(insert (	5007   718 718 2 5013 ));
+DATA(insert (	5007   718 718 3 5014 ));
+DATA(insert (	5007   718 718 4 5015 ));
+DATA(insert (	5007   718 718 5 5016 ));
+DATA(insert (	5007   718 718 6 5010 ));
+DATA(insert (	5008   604 604 1 5009 ));
+DATA(insert (	5008   604 604 2 5013 ));
+DATA(insert (	5008   604 604 3 5014 ));
+DATA(insert (	5008   604 604 4 5015 ));
+DATA(insert (	5008   604 604 5 5016 ));
+DATA(insert (	5008   604 604 6 5011 ));
 
 /* BRIN opclasses */
 /* minmax bytea */
diff --git a/src/include/catalog/pg_opclass.h b/src/include/catalog/pg_opclass.h
index 0cde14c..d8eded9 100644
--- a/src/include/catalog/pg_opclass.h
+++ b/src/include/catalog/pg_opclass.h
@@ -203,6 +203,8 @@ DATA(insert (	4000	box_ops				PGNSP PGUID 5000  603  t 0 ));
 DATA(insert (	4000	quad_point_ops		PGNSP PGUID 4015  600 t 0 ));
 DATA(insert (	4000	kd_point_ops		PGNSP PGUID 4016  600 f 0 ));
 DATA(insert (	4000	text_ops			PGNSP PGUID 4017  25 t 0 ));
+DATA(insert (	4000	circle_ops			PGNSP PGUID 5007  718 t 603 ));
+DATA(insert (	4000	poly_ops			PGNSP PGUID 5008  604 t 603 ));
 DATA(insert (	403		jsonb_ops			PGNSP PGUID 4033  3802 t 0 ));
 DATA(insert (	405		jsonb_ops			PGNSP PGUID 4034  3802 t 0 ));
 DATA(insert (	2742	jsonb_ops			PGNSP PGUID 4036  3802 t 25 ));
diff --git a/src/include/catalog/pg_opfamily.h b/src/include/catalog/pg_opfamily.h
index bd673fe..4bb26cd 100644
--- a/src/include/catalog/pg_opfamily.h
+++ b/src/include/catalog/pg_opfamily.h
@@ -183,5 +183,7 @@ DATA(insert OID = 4103 (	3580	range_inclusion_ops		PGNSP PGUID ));
 DATA(insert OID = 4082 (	3580	pg_lsn_minmax_ops		PGNSP PGUID ));
 DATA(insert OID = 4104 (	3580	box_inclusion_ops		PGNSP PGUID ));
 DATA(insert OID = 5000 (	4000	box_ops		PGNSP PGUID ));
+DATA(insert OID = 5007 (	4000	circle_ops				PGNSP PGUID ));
+DATA(insert OID = 5008 (	4000	poly_ops				PGNSP PGUID ));
 
 #endif   /* PG_OPFAMILY_H */
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index ab12761..49e7f7d 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5184,6 +5184,13 @@ DESCR("SP-GiST support for quad tree over box");
 DATA(insert OID = 5016 (  spg_box_quad_leaf_consistent	PGNSP PGUID 12 1 0 0 0 f f f f t f i s 2 0 16 "2281 2281" _null_ _null_ _null_ _null_  _null_ spg_box_quad_leaf_consistent _null_ _null_ _null_ ));
 DESCR("SP-GiST support for quad tree over box");
 
+DATA(insert OID = 5009 (  spg_bbox_quad_config PGNSP PGUID 12 1 0 0 0 f f f f t f i s 2 0 2278 "2281 2281" _null_ _null_ _null_ _null_  _null_ spg_bbox_quad_config _null_ _null_ _null_ ));
+DESCR("SP-GiST support for quad tree over 2-D types represented by their bounding boxes");
+DATA(insert OID = 5010 (  spg_circle_quad_compress PGNSP PGUID 12 1 0 0 0 f f f f t f i s 2 0 2278 "2281 2281" _null_ _null_ _null_ _null_  _null_ spg_circle_quad_compress _null_ _null_ _null_ ));
+DESCR("SP-GiST support for quad tree over circle");
+DATA(insert OID = 5011 (  spg_poly_quad_compress PGNSP PGUID 12 1 0 0 0 f f f f t f i s 2 0 2278 "2281 2281" _null_ _null_ _null_ _null_  _null_ spg_poly_quad_compress _null_ _null_ _null_ ));
+DESCR("SP-GiST support for quad tree over polygons");
+
 /* replication slots */
 DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 16 16" "{19,16,16,19,3220}" "{i,i,i,o,o}" "{slot_name,immediately_reserve,temporary,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
 DESCR("create a physical replication slot");
diff --git a/src/test/regress/expected/circle.out b/src/test/regress/expected/circle.out
index 9ba4a04..2d4bf70 100644
--- a/src/test/regress/expected/circle.out
+++ b/src/test/regress/expected/circle.out
@@ -97,3 +97,291 @@ SELECT '' as five, c1.f1 AS one, c2.f1 AS two, (c1.f1 <-> c2.f1) AS distance
       | <(1,2),3>      | <(100,200),10> | 208.370729772479
 (5 rows)
 
+--
+-- Test the SP-GiST index
+--
+CREATE TEMPORARY TABLE quad_circle_tbl (id int, c circle);
+INSERT INTO quad_circle_tbl
+	SELECT (x - 1) * 100 + y, circle(point(x * 10, y * 10), 1 + (x + y) % 10)
+	FROM generate_series(1, 100) x,
+		 generate_series(1, 100) y;
+INSERT INTO quad_circle_tbl
+	SELECT i, '<(200, 300), 5>'
+	FROM generate_series(10001, 11000) AS i;
+INSERT INTO quad_circle_tbl
+	VALUES
+		(11001, NULL),
+		(11002, NULL),
+		(11003, '<(0,100), infinity>'),
+		(11004, '<(-infinity,0),1000>'),
+		(11005, '<(infinity,-infinity),infinity>');
+CREATE INDEX quad_circle_tbl_idx ON quad_circle_tbl USING spgist(c);
+-- get reference results for ORDER BY distance from seq scan
+SET enable_seqscan = ON;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = OFF;
+CREATE TEMP TABLE quad_cirle_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl;
+CREATE TEMP TABLE quad_cirle_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+-- check results results from index scan
+SET enable_seqscan = OFF;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = ON;
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c << circle '<(300,400),200>';
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c << '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c << '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c << circle '<(300,400),200>';
+ count 
+-------
+   891
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c &< circle '<(300,400),200>';
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c &< '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c &< '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c &< circle '<(300,400),200>';
+ count 
+-------
+  5901
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c && circle '<(300,400),200>';
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c && '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c && '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c && circle '<(300,400),200>';
+ count 
+-------
+  2334
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c &> circle '<(300,400),200>';
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c &> '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c &> '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c &> circle '<(300,400),200>';
+ count 
+-------
+ 10000
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c >> circle '<(300,400),200>';
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c >> '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c >> '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c >> circle '<(300,400),200>';
+ count 
+-------
+  4990
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c <<| circle '<(300,400),200>';
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c <<| '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c <<| '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c <<| circle '<(300,400),200>';
+ count 
+-------
+  1890
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c &<| circle '<(300,400),200>';
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c &<| '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c &<| '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c &<| circle '<(300,400),200>';
+ count 
+-------
+  6900
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c |&> circle '<(300,400),200>';
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c |&> '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c |&> '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c |&> circle '<(300,400),200>';
+ count 
+-------
+  9000
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c |>> circle '<(300,400),200>';
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c |>> '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c |>> '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c |>> circle '<(300,400),200>';
+ count 
+-------
+  3990
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c @> circle '<(300,400),1>';
+                        QUERY PLAN                        
+----------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c @> '<(300,400),1>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c @> '<(300,400),1>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c @> circle '<(300,400),1>';
+ count 
+-------
+     2
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c <@ '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c <@ '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+ count 
+-------
+  2181
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c ~= circle '<(300,400),1>';
+                        QUERY PLAN                        
+----------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c ~= '<(300,400),1>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c ~= '<(300,400),1>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c ~= circle '<(300,400),1>';
+ count 
+-------
+     1
+(1 row)
+
+-- test ORDER BY distance
+SET enable_indexscan = ON;
+SET enable_bitmapscan = OFF;
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl;
+                          QUERY PLAN                           
+---------------------------------------------------------------
+ WindowAgg
+   ->  Index Scan using quad_circle_tbl_idx on quad_circle_tbl
+         Order By: (c <-> '(123,456)'::point)
+(3 rows)
+
+CREATE TEMP TABLE quad_cirle_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl;
+SELECT *
+FROM quad_cirle_tbl_ord_seq1 seq FULL JOIN quad_cirle_tbl_ord_idx1 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+ n | dist | id | n | dist | id 
+---+------+----+---+------+----
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+                          QUERY PLAN                           
+---------------------------------------------------------------
+ WindowAgg
+   ->  Index Scan using quad_circle_tbl_idx on quad_circle_tbl
+         Index Cond: (c <@ '<(300,400),200>'::circle)
+         Order By: (c <-> '(123,456)'::point)
+(4 rows)
+
+CREATE TEMP TABLE quad_cirle_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+SELECT *
+FROM quad_cirle_tbl_ord_seq2 seq FULL JOIN quad_cirle_tbl_ord_idx2 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+ n | dist | id | n | dist | id 
+---+------+----+---+------+----
+(0 rows)
+
+RESET enable_seqscan;
+RESET enable_indexscan;
+RESET enable_bitmapscan;
diff --git a/src/test/regress/expected/polygon.out b/src/test/regress/expected/polygon.out
index 2361274..7a623be 100644
--- a/src/test/regress/expected/polygon.out
+++ b/src/test/regress/expected/polygon.out
@@ -227,3 +227,289 @@ SELECT	'(0,0)'::point <-> '((0,0),(1,2),(2,1))'::polygon as on_corner,
          0 |          0 |      0 | 1.4142135623731 |          3.2
 (1 row)
 
+--
+-- Test the SP-GiST index
+--
+CREATE TEMPORARY TABLE quad_poly_tbl (id int, p polygon);
+INSERT INTO quad_poly_tbl
+	SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10))
+	FROM generate_series(1, 100) x,
+		 generate_series(1, 100) y;
+INSERT INTO quad_poly_tbl
+	SELECT i, polygon '((200, 300),(210, 310),(230, 290))'
+	FROM generate_series(10001, 11000) AS i;
+INSERT INTO quad_poly_tbl
+	VALUES
+		(11001, NULL),
+		(11002, NULL),
+		(11003, NULL);
+CREATE INDEX quad_poly_tbl_idx ON quad_poly_tbl USING spgist(p);
+-- get reference results for ORDER BY distance from seq scan
+SET enable_seqscan = ON;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = OFF;
+CREATE TEMP TABLE quad_poly_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+CREATE TEMP TABLE quad_poly_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+-- check results results from index scan
+SET enable_seqscan = OFF;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = ON;
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p << polygon '((300,300),(400,600),(600,500),(700,200))';
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p << '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p << '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p << polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  3890
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p &< polygon '((300,300),(400,600),(600,500),(700,200))';
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p &< '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p &< '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p &< polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  7900
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p && polygon '((300,300),(400,600),(600,500),(700,200))';
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p && '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p && '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p && polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+   977
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p &> polygon '((300,300),(400,600),(600,500),(700,200))';
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p &> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p &> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p &> polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  7000
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p >> polygon '((300,300),(400,600),(600,500),(700,200))';
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p >> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p >> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p >> polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  2990
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p <<| polygon '((300,300),(400,600),(600,500),(700,200))';
+                                       QUERY PLAN                                       
+----------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p <<| '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p <<| '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p <<| polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  1890
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p &<| polygon '((300,300),(400,600),(600,500),(700,200))';
+                                       QUERY PLAN                                       
+----------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p &<| '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p &<| '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p &<| polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  6900
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p |&> polygon '((300,300),(400,600),(600,500),(700,200))';
+                                       QUERY PLAN                                       
+----------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p |&> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p |&> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p |&> polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  9000
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p |>> polygon '((300,300),(400,600),(600,500),(700,200))';
+                                       QUERY PLAN                                       
+----------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p |>> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p |>> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p |>> polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  3990
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p <@ '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p <@ '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+   831
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p @> polygon '((340,550),(343,552),(341,553))';
+                                 QUERY PLAN                                  
+-----------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p @> '((340,550),(343,552),(341,553))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p @> '((340,550),(343,552),(341,553))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p @> polygon '((340,550),(343,552),(341,553))';
+ count 
+-------
+     1
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p ~= polygon '((200, 300),(210, 310),(230, 290))';
+                                 QUERY PLAN                                  
+-----------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p ~= '((200,300),(210,310),(230,290))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p ~= '((200,300),(210,310),(230,290))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p ~= polygon '((200, 300),(210, 310),(230, 290))';
+ count 
+-------
+  1000
+(1 row)
+
+-- test ORDER BY distance
+SET enable_indexscan = ON;
+SET enable_bitmapscan = OFF;
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Scan using quad_poly_tbl_idx on quad_poly_tbl
+         Order By: (p <-> '(123,456)'::point)
+(3 rows)
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+SELECT *
+FROM quad_poly_tbl_ord_seq1 seq FULL JOIN quad_poly_tbl_ord_idx1 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+ n | dist | id | n | dist | id 
+---+------+----+---+------+----
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+                                   QUERY PLAN                                    
+---------------------------------------------------------------------------------
+ WindowAgg
+   ->  Index Scan using quad_poly_tbl_idx on quad_poly_tbl
+         Index Cond: (p <@ '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         Order By: (p <-> '(123,456)'::point)
+(4 rows)
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT *
+FROM quad_poly_tbl_ord_seq2 seq FULL JOIN quad_poly_tbl_ord_idx2 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+ n | dist | id | n | dist | id 
+---+------+----+---+------+----
+(0 rows)
+
+RESET enable_seqscan;
+RESET enable_indexscan;
+RESET enable_bitmapscan;
diff --git a/src/test/regress/sql/circle.sql b/src/test/regress/sql/circle.sql
index c0284b2..16fa514 100644
--- a/src/test/regress/sql/circle.sql
+++ b/src/test/regress/sql/circle.sql
@@ -43,3 +43,131 @@ SELECT '' as five, c1.f1 AS one, c2.f1 AS two, (c1.f1 <-> c2.f1) AS distance
   FROM CIRCLE_TBL c1, CIRCLE_TBL c2
   WHERE (c1.f1 < c2.f1) AND ((c1.f1 <-> c2.f1) > 0)
   ORDER BY distance, area(c1.f1), area(c2.f1);
+
+--
+-- Test the SP-GiST index
+--
+
+CREATE TEMPORARY TABLE quad_circle_tbl (id int, c circle);
+
+INSERT INTO quad_circle_tbl
+	SELECT (x - 1) * 100 + y, circle(point(x * 10, y * 10), 1 + (x + y) % 10)
+	FROM generate_series(1, 100) x,
+		 generate_series(1, 100) y;
+
+INSERT INTO quad_circle_tbl
+	SELECT i, '<(200, 300), 5>'
+	FROM generate_series(10001, 11000) AS i;
+
+INSERT INTO quad_circle_tbl
+	VALUES
+		(11001, NULL),
+		(11002, NULL),
+		(11003, '<(0,100), infinity>'),
+		(11004, '<(-infinity,0),1000>'),
+		(11005, '<(infinity,-infinity),infinity>');
+
+CREATE INDEX quad_circle_tbl_idx ON quad_circle_tbl USING spgist(c);
+
+-- get reference results for ORDER BY distance from seq scan
+SET enable_seqscan = ON;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = OFF;
+
+CREATE TEMP TABLE quad_cirle_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl;
+
+CREATE TEMP TABLE quad_cirle_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+
+-- check results results from index scan
+SET enable_seqscan = OFF;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = ON;
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c << circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c << circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c &< circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c &< circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c && circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c && circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c &> circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c &> circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c >> circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c >> circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c <<| circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c <<| circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c &<| circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c &<| circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c |&> circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c |&> circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c |>> circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c |>> circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c @> circle '<(300,400),1>';
+SELECT count(*) FROM quad_circle_tbl WHERE c @> circle '<(300,400),1>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c ~= circle '<(300,400),1>';
+SELECT count(*) FROM quad_circle_tbl WHERE c ~= circle '<(300,400),1>';
+
+-- test ORDER BY distance
+SET enable_indexscan = ON;
+SET enable_bitmapscan = OFF;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl;
+
+CREATE TEMP TABLE quad_cirle_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl;
+
+SELECT *
+FROM quad_cirle_tbl_ord_seq1 seq FULL JOIN quad_cirle_tbl_ord_idx1 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+
+CREATE TEMP TABLE quad_cirle_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+
+SELECT *
+FROM quad_cirle_tbl_ord_seq2 seq FULL JOIN quad_cirle_tbl_ord_idx2 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+
+RESET enable_seqscan;
+RESET enable_indexscan;
+RESET enable_bitmapscan;
diff --git a/src/test/regress/sql/polygon.sql b/src/test/regress/sql/polygon.sql
index 7ac8079..8b8b434 100644
--- a/src/test/regress/sql/polygon.sql
+++ b/src/test/regress/sql/polygon.sql
@@ -116,3 +116,129 @@ SELECT	'(0,0)'::point <-> '((0,0),(1,2),(2,1))'::polygon as on_corner,
 	'(2,2)'::point <-> '((0,0),(1,4),(3,1))'::polygon as inside,
 	'(3,3)'::point <-> '((0,2),(2,0),(2,2))'::polygon as near_corner,
 	'(4,4)'::point <-> '((0,0),(0,3),(4,0))'::polygon as near_segment;
+
+--
+-- Test the SP-GiST index
+--
+
+CREATE TEMPORARY TABLE quad_poly_tbl (id int, p polygon);
+
+INSERT INTO quad_poly_tbl
+	SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10))
+	FROM generate_series(1, 100) x,
+		 generate_series(1, 100) y;
+
+INSERT INTO quad_poly_tbl
+	SELECT i, polygon '((200, 300),(210, 310),(230, 290))'
+	FROM generate_series(10001, 11000) AS i;
+
+INSERT INTO quad_poly_tbl
+	VALUES
+		(11001, NULL),
+		(11002, NULL),
+		(11003, NULL);
+
+CREATE INDEX quad_poly_tbl_idx ON quad_poly_tbl USING spgist(p);
+
+-- get reference results for ORDER BY distance from seq scan
+SET enable_seqscan = ON;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = OFF;
+
+CREATE TEMP TABLE quad_poly_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+
+CREATE TEMP TABLE quad_poly_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+
+-- check results results from index scan
+SET enable_seqscan = OFF;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = ON;
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p << polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p << polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p &< polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p &< polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p && polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p && polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p &> polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p &> polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p >> polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p >> polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p <<| polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p <<| polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p &<| polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p &<| polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p |&> polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p |&> polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p |>> polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p |>> polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p @> polygon '((340,550),(343,552),(341,553))';
+SELECT count(*) FROM quad_poly_tbl WHERE p @> polygon '((340,550),(343,552),(341,553))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p ~= polygon '((200, 300),(210, 310),(230, 290))';
+SELECT count(*) FROM quad_poly_tbl WHERE p ~= polygon '((200, 300),(210, 310),(230, 290))';
+
+-- test ORDER BY distance
+SET enable_indexscan = ON;
+SET enable_bitmapscan = OFF;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+
+SELECT *
+FROM quad_poly_tbl_ord_seq1 seq FULL JOIN quad_poly_tbl_ord_idx1 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+
+SELECT *
+FROM quad_poly_tbl_ord_seq2 seq FULL JOIN quad_poly_tbl_ord_idx2 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+
+RESET enable_seqscan;
+RESET enable_indexscan;
+RESET enable_bitmapscan;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to