Finally, completed patch "covering_unique_3.0.patch" is here.
It includes the functionality discussed above in the thread, regression
tests and docs update.
I think it's quite ready for review.
_Future work:_
Besides that, I'd like to get feedback about attached patch
"optional_opclass_3.0.patch".
It should be applied on the "covering_unique_3.0.patch".
Actually, this patch is the first step to do opclasses for "included"
columns optional
and implement real covering indexing.
Example:
CREATE TABLE tbl (c1 int, c4 box);
CREATE UNIQUE INDEX idx ON tbl USING btree (c1) INCLUDING (c4);
If we don't need c4 as an index scankey, we don't need any btree opclass
on it.
But we still want to have it in covering index for queries like
SELECT c4 FROM tbl WHERE c1=1000; // uses the IndexOnlyScan
SELECT * FROM tbl WHERE c1=1000; // uses the IndexOnlyScan
The patch "optional_opclass" completely ignores opclasses of included
attributes.
To see the difference, look at the explain analyze output:
explain analyze select * from tbl where c1=2 and c4 && box '(0,0,1,1)';
QUERY PLAN
---------------------------------------------------------------------------------------------------------------
Index Only Scan using idx on tbl (cost=0.13..4.15 rows=1 width=36)
(actual time=0.010..0.013 rows=1 loops=1)
Index Cond: (c1 = 2)
Filter: (c4 && '(1,1),(0,0)'::box)
"Index Cond" shows the index ScanKey conditions and "Filter" is for
conditions which are used after index scan. Anyway it is faster than
SeqScan that we had before, because IndexOnlyScan avoids extra heap fetches.
As I already said, this patch is just WIP, so included opclass is not
"optional" but actually "ignored".
And following example works worse than without the patch. Please, don't
care about it.
CREATE TABLE tbl2 (c1 int, c2 int);
CREATE UNIQUE INDEX idx2 ON tbl2 USING btree (c1) INCLUDING (c2);
explain analyze select * from tbl2 where c1<20 and c2<5;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
Index Only Scan using idx2 on tbl2 (cost=0.28..4.68 rows=10 width=8)
(actual time=0.055..0.066 rows=9 loops=1)
Index Cond: (c1 < 20)
Filter: (c2 < 5)
The question is more about suitable syntax.
We have two different optimizations here:
1. INCLUDED columns
2. Optional opclasses
It's logical to provide optional opclasses only for included columns.
Is it ok, to handle it using the same syntax and resolve all opclass
conflicts while create index?
CREATE TABLE tbl2 (c1 int, c2 int, c4 box);
CREATE UNIQUE INDEX idx2 ON tbl2 USING btree (c1) INCLUDING (c2, c4);
CREATE UNIQUE INDEX idx3 ON tbl2 USING btree (c1) INCLUDING (c4, c2);
Of course, order of attributes is important.
Attrs which have oplass and want to use it in ScanKey must be situated
before the others.
idx2 will use c2 in IndexCond, while idx3 will not. But I think that
it's the job for DBA.
If you see any related changes in planner, please mention them. I
haven't explored that part of code yet and could have missed something.
--
Anastasia Lubennikova
Postgres Professional:http://www.postgrespro.com
The Russian Postgres Company
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index b4ea227..4973e1b 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -644,6 +644,13 @@
<entry>Does an index of this type manage fine-grained predicate locks?</entry>
</row>
+ <row>
+ <entry><structfield>amcanincluding</structfield></entry>
+ <entry><type>bool</type></entry>
+ <entry></entry>
+ <entry>Does the access method support included columns?</entry>
+ </row>
+
<row>
<entry><structfield>amkeytype</structfield></entry>
<entry><type>oid</type></entry>
@@ -3704,6 +3711,14 @@
<literal>pg_class.relnatts</literal>)</entry>
</row>
+ <row>
+ <entry><structfield>indnkeyatts</structfield></entry>
+ <entry><type>int2</type></entry>
+ <entry></entry>
+ <entry>The number of key columns in the index. "Key columns" are ordinary
+ index columns in contrast with "included" columns.</entry>
+ </row>
+
<row>
<entry><structfield>indisunique</structfield></entry>
<entry><type>bool</type></entry>
diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml
index 1c09bae..0287c62 100644
--- a/doc/src/sgml/indexam.sgml
+++ b/doc/src/sgml/indexam.sgml
@@ -765,9 +765,11 @@ amrestrpos (IndexScanDesc scan);
<para>
<productname>PostgreSQL</productname> enforces SQL uniqueness constraints
using <firstterm>unique indexes</>, which are indexes that disallow
- multiple entries with identical keys. An access method that supports this
+ multiple entries with identical keys. An access method that supports this
feature sets <structname>pg_am</>.<structfield>amcanunique</> true.
- (At present, only b-tree supports it.)
+ Columns included with clause INCLUDING aren't used to enforce uniqueness.
+ An access method that supports this feature sets <structname>pg_am</>.<structfield>amcanincluding</> true.
+ (At present, only b-tree supports them.)
</para>
<para>
diff --git a/doc/src/sgml/indices.sgml b/doc/src/sgml/indices.sgml
index 2d131c9..5178834 100644
--- a/doc/src/sgml/indices.sgml
+++ b/doc/src/sgml/indices.sgml
@@ -633,7 +633,8 @@ CREATE INDEX test3_desc_index ON test3 (id DESC NULLS LAST);
Indexes can also be used to enforce uniqueness of a column's value,
or the uniqueness of the combined values of more than one column.
<synopsis>
-CREATE UNIQUE INDEX <replaceable>name</replaceable> ON <replaceable>table</replaceable> (<replaceable>column</replaceable> <optional>, ...</optional>);
+CREATE UNIQUE INDEX <replaceable>name</replaceable> ON <replaceable>table</replaceable> (<replaceable>column</replaceable> <optional>, ...</optional>)
+<optional>INCLUDING (<replaceable>column</replaceable> <optional>, ...</optional>)</optional>;
</synopsis>
Currently, only B-tree indexes can be declared unique.
</para>
@@ -642,7 +643,8 @@ CREATE UNIQUE INDEX <replaceable>name</replaceable> ON <replaceable>table</repla
When an index is declared unique, multiple table rows with equal
indexed values are not allowed. Null values are not considered
equal. A multicolumn unique index will only reject cases where all
- indexed columns are equal in multiple rows.
+ indexed columns are equal in multiple rows. Columns included with clause
+ INCLUDING aren't used to enforce constraints (UNIQUE, PRIMARY KEY, etc).
</para>
<para>
diff --git a/doc/src/sgml/ref/create_index.sgml b/doc/src/sgml/ref/create_index.sgml
index ce36a1b..75eddf2 100644
--- a/doc/src/sgml/ref/create_index.sgml
+++ b/doc/src/sgml/ref/create_index.sgml
@@ -23,6 +23,7 @@ PostgreSQL documentation
<synopsis>
CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class="parameter">name</replaceable> ] ON <replaceable class="parameter">table_name</replaceable> [ USING <replaceable class="parameter">method</replaceable> ]
( { <replaceable class="parameter">column_name</replaceable> | ( <replaceable class="parameter">expression</replaceable> ) } [ COLLATE <replaceable class="parameter">collation</replaceable> ] [ <replaceable class="parameter">opclass</replaceable> ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] )
+ [ INCLUDING ( { <replaceable class="parameter">column_name</replaceable> | ( <replaceable class="parameter">expression</replaceable> ) } [ COLLATE <replaceable class="parameter">collation</replaceable> ] [ <replaceable class="parameter">opclass</replaceable> ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] )
[ WITH ( <replaceable class="PARAMETER">storage_parameter</replaceable> = <replaceable class="PARAMETER">value</replaceable> [, ... ] ) ]
[ TABLESPACE <replaceable class="parameter">tablespace_name</replaceable> ]
[ WHERE <replaceable class="parameter">predicate</replaceable> ]
@@ -139,6 +140,26 @@ CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class=
</varlistentry>
<varlistentry>
+ <term><literal>INCLUDING</literal></term>
+ <listitem>
+ <para>
+ This clause specifies additional columns to be appended to the set of index columns.
+ Included columns don't support any constraints <literal>(UNIQUE, PRMARY KEY, EXCLUSION CONSTRAINT)</>.
+ These columns can improve the performance of some queries through using advantages of index-only scan
+ (Or so called <firstterm>covering</firstterm> indexes. Covering index is the index that
+ covers all columns required in the query and prevents a table access).
+ Besides that, included attributes are not stored in index inner pages.
+ It allows to decrease index size and furthermore it provides a way to extend included
+ columns to store atttributes without suitable opclass (not implemented yet).
+ This clause could be applied to both unique and nonunique indexes.
+ It's possible to have non-unique covering index, which behaves as a regular
+ multi-column index with a bit smaller index-size.
+ Currently, only the B-tree access method supports this feature.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><replaceable class="parameter">name</replaceable></term>
<listitem>
<para>
@@ -596,7 +617,7 @@ Indexes:
<title>Examples</title>
<para>
- To create a B-tree index on the column <literal>title</literal> in
+ To create an unique B-tree index on the column <literal>title</literal> in
the table <literal>films</literal>:
<programlisting>
CREATE UNIQUE INDEX title_idx ON films (title);
@@ -604,6 +625,15 @@ CREATE UNIQUE INDEX title_idx ON films (title);
</para>
<para>
+ To create an unique B-tree index on the column <literal>title</literal>
+ and included columns <literal>director</literal> and <literal>rating</literal>
+ in the table <literal>films</literal>:
+<programlisting>
+CREATE UNIQUE INDEX title_idx ON films (title) INCLUDING (director, rating);
+</programlisting>
+ </para>
+
+ <para>
To create an index on the expression <literal>lower(title)</>,
allowing efficient case-insensitive searches:
<programlisting>
diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c
index dc588d7..83f24c3 100644
--- a/src/backend/access/common/indextuple.c
+++ b/src/backend/access/common/indextuple.c
@@ -19,6 +19,7 @@
#include "access/heapam.h"
#include "access/itup.h"
#include "access/tuptoaster.h"
+#include "utils/rel.h"
/* ----------------------------------------------------------------
@@ -441,3 +442,30 @@ CopyIndexTuple(IndexTuple source)
memcpy(result, source, size);
return result;
}
+
+/*
+ * Reform index tuple. Truncate nonkey (INCLUDED) attributes.
+ */
+IndexTuple
+index_reform_tuple(Relation idxrel, IndexTuple olditup, int natts, int nkeyatts)
+{
+ TupleDesc itupdesc = RelationGetDescr(idxrel);
+ Datum values[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ IndexTuple newitup;
+
+ Assert(natts <= INDEX_MAX_KEYS);
+ Assert(nkeyatts > 0);
+ Assert(nkeyatts <= natts);
+
+ index_deform_tuple(olditup, itupdesc, values, isnull);
+
+ /* form new tuple that will contain only key attributes */
+ itupdesc->natts = nkeyatts;
+ newitup = index_form_tuple(itupdesc, values, isnull);
+ newitup->t_tid = olditup->t_tid;
+
+ itupdesc->natts = natts;
+
+ return newitup;
+}
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index 77c2fdf..d14df12 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -108,18 +108,23 @@ _bt_doinsert(Relation rel, IndexTuple itup,
IndexUniqueCheck checkUnique, Relation heapRel)
{
bool is_unique = false;
- int natts = rel->rd_rel->relnatts;
+ int nkeyatts = rel->rd_rel->relnatts;
ScanKey itup_scankey;
BTStack stack;
Buffer buf;
OffsetNumber offset;
+ Assert (rel->rd_index != NULL);
+ Assert(rel->rd_index->indnatts != 0);
+ Assert(rel->rd_index->indnkeyatts != 0);
+ nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
+
/* we need an insertion scan key to do our search, so build one */
itup_scankey = _bt_mkscankey(rel, itup);
top:
/* find the first page containing this key */
- stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
+ stack = _bt_search(rel, nkeyatts, itup_scankey, false, &buf, BT_WRITE);
offset = InvalidOffsetNumber;
@@ -134,7 +139,7 @@ top:
* move right in the tree. See Lehman and Yao for an excruciatingly
* precise description.
*/
- buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
+ buf = _bt_moveright(rel, buf, nkeyatts, itup_scankey, false,
true, stack, BT_WRITE);
/*
@@ -163,7 +168,7 @@ top:
TransactionId xwait;
uint32 speculativeToken;
- offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
+ offset = _bt_binsrch(rel, buf, nkeyatts, itup_scankey, false);
xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
checkUnique, &is_unique, &speculativeToken);
@@ -199,7 +204,7 @@ top:
*/
CheckForSerializableConflictIn(rel, NULL, buf);
/* do the insertion */
- _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
+ _bt_findinsertloc(rel, &buf, &offset, nkeyatts, itup_scankey, itup,
stack, heapRel);
_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
}
@@ -242,7 +247,12 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
uint32 *speculativeToken)
{
TupleDesc itupdesc = RelationGetDescr(rel);
- int natts = rel->rd_rel->relnatts;
+ int nkeyatts = rel->rd_index->indnkeyatts;
+
+ Assert (rel->rd_index != NULL);
+ Assert(rel->rd_index->indnatts != 0);
+ Assert(rel->rd_index->indnkeyatts != 0);
+
SnapshotData SnapshotDirty;
OffsetNumber maxoff;
Page page;
@@ -301,7 +311,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
* in real comparison, but only for ordering/finding items on
* pages. - vadim 03/24/97
*/
- if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
+ if (!_bt_isequal(itupdesc, page, offset, nkeyatts, itup_scankey))
break; /* we're past all the equal tuples */
/* okay, we gotta fetch the heap tuple ... */
@@ -457,7 +467,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
if (P_RIGHTMOST(opaque))
break;
if (!_bt_isequal(itupdesc, page, P_HIKEY,
- natts, itup_scankey))
+ nkeyatts, itup_scankey))
break;
/* Advance to next non-dead page --- there must be one */
for (;;)
@@ -745,6 +755,11 @@ _bt_insertonpg(Relation rel,
elog(ERROR, "cannot insert to incompletely split page %u",
BufferGetBlockNumber(buf));
+ /* Truncate nonkey attributes when inserting on nonleaf pages. */
+ if (rel->rd_index->indnatts != rel->rd_index->indnkeyatts)
+ if (!P_ISLEAF(lpageop))
+ itup = index_reform_tuple(rel, itup, rel->rd_index->indnatts, rel->rd_index->indnkeyatts);
+
itemsz = IndexTupleDSize(*itup);
itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
* need to be consistent */
@@ -1962,6 +1977,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
right_item_sz = ItemIdGetLength(itemid);
item = (IndexTuple) PageGetItem(lpage, itemid);
right_item = CopyIndexTuple(item);
+ right_item = index_reform_tuple(rel, right_item, rel->rd_index->indnatts, rel->rd_index->indnkeyatts);
ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
/* NO EREPORT(ERROR) from here till newroot op is logged */
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 6e65db9..73560b7 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -1254,7 +1254,7 @@ _bt_pagedel(Relation rel, Buffer buf)
/* we need an insertion scan key for the search, so build one */
itup_scankey = _bt_mkscankey(rel, targetkey);
/* find the leftmost leaf page containing this key */
- stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
+ stack = _bt_search(rel, IndexRelationGetNumberOfKeyAttributes(rel), itup_scankey,
false, &lbuf, BT_READ);
/* don't need a pin on the page */
_bt_relbuf(rel, lbuf);
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index f95f67a..c3501e7 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -593,6 +593,19 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
state->btps_minkey = CopyIndexTuple(itup);
}
+ /* Truncate nonkey attributes when inserting on nonleaf pages */
+ if (wstate->index->rd_index->indnatts != wstate->index->rd_index->indnkeyatts)
+ {
+ BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(npage);
+
+ if (!P_ISLEAF(pageop))
+ {
+ itup = index_reform_tuple(wstate->index, itup, wstate->index->rd_index->indnatts, wstate->index->rd_index->indnkeyatts);
+ itupsz = IndexTupleDSize(*itup);
+ itupsz = MAXALIGN(itupsz);
+ }
+ }
+
/*
* Add the new item into the current page.
*/
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index 91331ba..6932289 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -63,17 +63,24 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
{
ScanKey skey;
TupleDesc itupdesc;
- int natts;
- int16 *indoption;
+ int nkeyatts = rel->rd_rel->relnatts;
+ int16 *indoption = rel->rd_indoption;
int i;
-
itupdesc = RelationGetDescr(rel);
- natts = RelationGetNumberOfAttributes(rel);
- indoption = rel->rd_indoption;
- skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
+ Assert(rel->rd_index != NULL);
+ Assert(rel->rd_index->indnkeyatts != 0);
+ Assert(rel->rd_index->indnkeyatts <= rel->rd_index->indnatts);
- for (i = 0; i < natts; i++)
+ nkeyatts = rel->rd_index->indnkeyatts;
+
+ /*
+ * We'll execute search using ScanKey constructed on key columns.
+ * Non key (included) columns must be omitted.
+ */
+ skey = (ScanKey) palloc(nkeyatts * sizeof(ScanKeyData));
+
+ for (i = 0; i < nkeyatts; i++)
{
FmgrInfo *procinfo;
Datum arg;
diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y
index d8d1b06..002bcd5 100644
--- a/src/backend/bootstrap/bootparse.y
+++ b/src/backend/bootstrap/bootparse.y
@@ -293,6 +293,7 @@ Boot_DeclareIndexStmt:
stmt->accessMethod = $8;
stmt->tableSpace = NULL;
stmt->indexParams = $10;
+ stmt->indexIncludingParams = NIL;
stmt->options = NIL;
stmt->whereClause = NULL;
stmt->excludeOpNames = NIL;
@@ -336,6 +337,7 @@ Boot_DeclareUniqueIndexStmt:
stmt->accessMethod = $9;
stmt->tableSpace = NULL;
stmt->indexParams = $11;
+ stmt->indexIncludingParams = NIL;
stmt->options = NIL;
stmt->whereClause = NULL;
stmt->excludeOpNames = NIL;
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index e59b163..27a12a0 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -211,7 +211,7 @@ index_check_primary_key(Relation heapRel,
* null, otherwise attempt to ALTER TABLE .. SET NOT NULL
*/
cmds = NIL;
- for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+ for (i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
{
AttrNumber attnum = indexInfo->ii_KeyAttrNumbers[i];
HeapTuple atttuple;
@@ -606,6 +606,7 @@ UpdateIndexRelation(Oid indexoid,
values[Anum_pg_index_indexrelid - 1] = ObjectIdGetDatum(indexoid);
values[Anum_pg_index_indrelid - 1] = ObjectIdGetDatum(heapoid);
values[Anum_pg_index_indnatts - 1] = Int16GetDatum(indexInfo->ii_NumIndexAttrs);
+ values[Anum_pg_index_indnkeyatts - 1] = Int16GetDatum(indexInfo->ii_NumIndexKeyAttrs);
values[Anum_pg_index_indisunique - 1] = BoolGetDatum(indexInfo->ii_Unique);
values[Anum_pg_index_indisprimary - 1] = BoolGetDatum(primary);
values[Anum_pg_index_indisexclusion - 1] = BoolGetDatum(isexclusion);
@@ -1069,6 +1070,8 @@ index_create(Relation heapRelation,
else
Assert(indexRelation->rd_indexcxt != NULL);
+ indexRelation->rd_index->indnkeyatts = indexInfo->ii_NumIndexKeyAttrs;
+
/*
* If this is bootstrap (initdb) time, then we don't actually fill in the
* index yet. We'll be creating more indexes and classes later, so we
@@ -1189,7 +1192,7 @@ index_constraint_create(Relation heapRelation,
true,
RelationGetRelid(heapRelation),
indexInfo->ii_KeyAttrNumbers,
- indexInfo->ii_NumIndexAttrs,
+ indexInfo->ii_NumIndexKeyAttrs,
InvalidOid, /* no domain */
indexRelationId, /* index OID */
InvalidOid, /* no foreign key */
@@ -1637,6 +1640,10 @@ BuildIndexInfo(Relation index)
elog(ERROR, "invalid indnatts %d for index %u",
numKeys, RelationGetRelid(index));
ii->ii_NumIndexAttrs = numKeys;
+ ii->ii_NumIndexKeyAttrs = indexStruct->indnkeyatts;
+ Assert(ii->ii_NumIndexKeyAttrs != 0);
+ Assert(ii->ii_NumIndexKeyAttrs <= ii->ii_NumIndexAttrs);
+
for (i = 0; i < numKeys; i++)
ii->ii_KeyAttrNumbers[i] = indexStruct->indkey.values[i];
diff --git a/src/backend/catalog/indexing.c b/src/backend/catalog/indexing.c
index 0231084..89e20fa 100644
--- a/src/backend/catalog/indexing.c
+++ b/src/backend/catalog/indexing.c
@@ -119,6 +119,7 @@ CatalogIndexInsert(CatalogIndexState indstate, HeapTuple heapTuple)
Assert(indexInfo->ii_Predicate == NIL);
Assert(indexInfo->ii_ExclusionOps == NULL);
Assert(relationDescs[i]->rd_index->indimmediate);
+ Assert(indexInfo->ii_NumIndexKeyAttrs != 0);
/*
* FormIndexDatum fills in its values and isnull parameters with the
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index 3652d7b..fc47a37 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -313,6 +313,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid,
indexInfo = makeNode(IndexInfo);
indexInfo->ii_NumIndexAttrs = 2;
+ indexInfo->ii_NumIndexKeyAttrs = 2;
indexInfo->ii_KeyAttrNumbers[0] = 1;
indexInfo->ii_KeyAttrNumbers[1] = 2;
indexInfo->ii_Expressions = NIL;
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index b450bcf..4e23dbe 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -137,7 +137,6 @@ CheckIndexCompatible(Oid oldId,
Relation irel;
int i;
Datum d;
-
/* Caller should already have the relation locked in some way. */
relationId = IndexGetRelation(oldId, false);
@@ -208,7 +207,7 @@ CheckIndexCompatible(Oid oldId,
}
/* Any change in operator class or collation breaks compatibility. */
- old_natts = indexForm->indnatts;
+ old_natts = indexForm->indnkeyatts;
Assert(old_natts == numberOfAttributes);
d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indcollation, &isnull);
@@ -320,7 +319,8 @@ DefineIndex(Oid relationId,
Datum reloptions;
int16 *coloptions;
IndexInfo *indexInfo;
- int numberOfAttributes;
+ int numberOfAttributes,
+ numberOfKeyAttributes;
TransactionId limitXmin;
VirtualTransactionId *old_snapshots;
ObjectAddress address;
@@ -331,10 +331,27 @@ DefineIndex(Oid relationId,
Snapshot snapshot;
int i;
+ if(list_intersection(stmt->indexParams, stmt->indexIncludingParams) != NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("included columns must not intersect with key columns")));
+
/*
* count attributes in index
*/
+ numberOfKeyAttributes = list_length(stmt->indexParams);
+ if (numberOfKeyAttributes <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("must specify at least one key column")));
+
+ /*
+ * All information about key and included cols is in numberOfKeyAttributes number.
+ * So we can concat all index params into one list.
+ */
+ stmt->indexParams = list_concat(stmt->indexParams, stmt->indexIncludingParams);
numberOfAttributes = list_length(stmt->indexParams);
+
if (numberOfAttributes <= 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
@@ -466,6 +483,7 @@ DefineIndex(Oid relationId,
* look up the access method, verify it can handle the requested features
*/
accessMethodName = stmt->accessMethod;
+
tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
if (!HeapTupleIsValid(tuple))
{
@@ -511,6 +529,12 @@ DefineIndex(Oid relationId,
errmsg("access method \"%s\" does not support exclusion constraints",
accessMethodName)));
+ if (list_length(stmt->indexIncludingParams) > 0 && !accessMethodForm->amcanincluding)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("access method \"%s\" does not support included columns",
+ accessMethodName)));
+
amcanorder = accessMethodForm->amcanorder;
amoptions = accessMethodForm->amoptions;
@@ -536,6 +560,7 @@ DefineIndex(Oid relationId,
*/
indexInfo = makeNode(IndexInfo);
indexInfo->ii_NumIndexAttrs = numberOfAttributes;
+ indexInfo->ii_NumIndexKeyAttrs = numberOfKeyAttributes;
indexInfo->ii_Expressions = NIL; /* for now */
indexInfo->ii_ExpressionsState = NIL;
indexInfo->ii_Predicate = make_ands_implicit((Expr *) stmt->whereClause);
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index f42bd8f..fd55eb4 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -646,7 +646,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
Oid *constr_procs;
uint16 *constr_strats;
Oid *index_collations = index->rd_indcollation;
- int index_natts = index->rd_index->indnatts;
+ int index_nkeyatts = index->rd_index->indnkeyatts;
IndexScanDesc index_scan;
HeapTuple tup;
ScanKeyData scankeys[INDEX_MAX_KEYS];
@@ -673,7 +673,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
* If any of the input values are NULL, the constraint check is assumed to
* pass (i.e., we assume the operators are strict).
*/
- for (i = 0; i < index_natts; i++)
+ for (i = 0; i < index_nkeyatts; i++)
{
if (isnull[i])
return true;
@@ -685,7 +685,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
*/
InitDirtySnapshot(DirtySnapshot);
- for (i = 0; i < index_natts; i++)
+ for (i = 0; i < index_nkeyatts; i++)
{
ScanKeyEntryInitialize(&scankeys[i],
0,
@@ -717,8 +717,8 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
retry:
conflict = false;
found_self = false;
- index_scan = index_beginscan(heap, index, &DirtySnapshot, index_natts, 0);
- index_rescan(index_scan, scankeys, index_natts, NULL, 0);
+ index_scan = index_beginscan(heap, index, &DirtySnapshot, index_nkeyatts, 0);
+ index_rescan(index_scan, scankeys, index_nkeyatts, NULL, 0);
while ((tup = index_getnext(index_scan,
ForwardScanDirection)) != NULL)
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index bd2e80e..8febcab 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3096,6 +3096,7 @@ _copyIndexStmt(const IndexStmt *from)
COPY_STRING_FIELD(accessMethod);
COPY_STRING_FIELD(tableSpace);
COPY_NODE_FIELD(indexParams);
+ COPY_NODE_FIELD(indexIncludingParams);
COPY_NODE_FIELD(options);
COPY_NODE_FIELD(whereClause);
COPY_NODE_FIELD(excludeOpNames);
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 19412fe..20773ab 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1238,6 +1238,7 @@ _equalIndexStmt(const IndexStmt *a, const IndexStmt *b)
COMPARE_STRING_FIELD(accessMethod);
COMPARE_STRING_FIELD(tableSpace);
COMPARE_NODE_FIELD(indexParams);
+ COMPARE_NODE_FIELD(indexIncludingParams);
COMPARE_NODE_FIELD(options);
COMPARE_NODE_FIELD(whereClause);
COMPARE_NODE_FIELD(excludeOpNames);
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index a878498..97de06a 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2144,6 +2144,7 @@ _outIndexStmt(StringInfo str, const IndexStmt *node)
WRITE_STRING_FIELD(accessMethod);
WRITE_STRING_FIELD(tableSpace);
WRITE_NODE_FIELD(indexParams);
+ WRITE_NODE_FIELD(indexIncludingParams);
WRITE_NODE_FIELD(options);
WRITE_NODE_FIELD(whereClause);
WRITE_NODE_FIELD(excludeOpNames);
diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c
index 9442e5f..603d11d 100644
--- a/src/backend/optimizer/util/plancat.c
+++ b/src/backend/optimizer/util/plancat.c
@@ -164,7 +164,7 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
Relation indexRelation;
Form_pg_index index;
IndexOptInfo *info;
- int ncolumns;
+ int ncolumns, nkeycolumns;
int i;
/*
@@ -207,6 +207,23 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
RelationGetForm(indexRelation)->reltablespace;
info->rel = rel;
info->ncolumns = ncolumns = index->indnatts;
+ info->nkeycolumns = nkeycolumns = index->indnkeyatts;
+
+ /* TODO
+ * All these arrays below still have length = ncolumns.
+ * Fix, when optional opclass functionality will be added.
+ *
+ * Generally, any column could be returned by IndexOnlyScan.
+ * Even if it doesn't have opclass for that type of index.
+ *
+ * For example,
+ * we have an index "create index on tbl(c1) including c2".
+ * If there's no suitable oplass on c2
+ * query "select c2 from tbl where c2 < 10" can't use index-only scan
+ * and query "select c2 from tbl where c1 < 10" can.
+ * But now it doesn't because of requirement that
+ * each indexed column must have an opclass.
+ */
info->indexkeys = (int *) palloc(sizeof(int) * ncolumns);
info->indexcollations = (Oid *) palloc(sizeof(Oid) * ncolumns);
info->opfamily = (Oid *) palloc(sizeof(Oid) * ncolumns);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 1efc6d6..ff82428 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -354,6 +354,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
oper_argtypes RuleActionList RuleActionMulti
opt_column_list columnList opt_name_list
sort_clause opt_sort_clause sortby_list index_params
+ optincluding opt_including index_including_params
name_list role_list from_clause from_list opt_array_bounds
qualified_name_list any_name any_name_list type_name_list
any_operator expr_list attrs
@@ -6591,7 +6592,7 @@ defacl_privilege_target:
IndexStmt: CREATE opt_unique INDEX opt_concurrently opt_index_name
ON qualified_name access_method_clause '(' index_params ')'
- opt_reloptions OptTableSpace where_clause
+ opt_including opt_reloptions OptTableSpace where_clause
{
IndexStmt *n = makeNode(IndexStmt);
n->unique = $2;
@@ -6600,9 +6601,10 @@ IndexStmt: CREATE opt_unique INDEX opt_concurrently opt_index_name
n->relation = $7;
n->accessMethod = $8;
n->indexParams = $10;
- n->options = $12;
- n->tableSpace = $13;
- n->whereClause = $14;
+ n->indexIncludingParams = $12;
+ n->options = $13;
+ n->tableSpace = $14;
+ n->whereClause = $15;
n->excludeOpNames = NIL;
n->idxcomment = NULL;
n->indexOid = InvalidOid;
@@ -6707,6 +6709,16 @@ index_elem: ColId opt_collate opt_class opt_asc_desc opt_nulls_order
}
;
+optincluding : '(' index_including_params ')' { $$ = $2; }
+ ;
+opt_including: INCLUDING optincluding { $$ = $2; }
+ | /* EMPTY */ { $$ = NIL; }
+ ;
+
+index_including_params: index_elem { $$ = list_make1($1); }
+ | index_including_params ',' index_elem { $$ = lappend($1, $3); }
+ ;
+
opt_collate: COLLATE any_name { $$ = $2; }
| /*EMPTY*/ { $$ = NIL; }
;
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 51391f6..1e43181 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -1122,10 +1122,23 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
Oid keycoltype;
Oid keycolcollation;
+ /* Report the INCLUDED attributes, if any. */
+ if(keyno == idxrec->indnkeyatts)
+ {
+ if(!attrsOnly)
+ {
+ appendStringInfoString(&buf, ") INCLUDING (");
+ sep = "";
+ }
+ }
+ else if (keyno > idxrec->indnkeyatts)
+ sep = ", ";
+
if (!colno)
appendStringInfoString(&buf, sep);
sep = ", ";
+
if (attnum != 0)
{
/* Simple index column */
@@ -1133,8 +1146,12 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
int32 keycoltypmod;
attname = get_relid_attribute_name(indrelid, attnum);
- if (!colno || colno == keyno + 1)
+
+ if (!colno || colno == keyno + 1) {
appendStringInfoString(&buf, quote_identifier(attname));
+ if ((attrsOnly)&&(keyno >= idxrec->indnkeyatts))
+ appendStringInfoString(&buf, " (included)");
+ }
get_atttypetypmodcoll(indrelid, attnum,
&keycoltype, &keycoltypmod,
&keycolcollation);
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 420ef3d..513fa57 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -1940,6 +1940,7 @@ RelationReloadIndexInfo(Relation relation)
* it's not worth it to track exactly which ones they are. None of
* the array fields are allowed to change, though.
*/
+ relation->rd_index->indnkeyatts = index->indnkeyatts;
relation->rd_index->indisunique = index->indisunique;
relation->rd_index->indisprimary = index->indisprimary;
relation->rd_index->indisexclusion = index->indisexclusion;
diff --git a/src/include/access/itup.h b/src/include/access/itup.h
index c997545..00fdaee 100644
--- a/src/include/access/itup.h
+++ b/src/include/access/itup.h
@@ -18,6 +18,7 @@
#include "access/tupmacs.h"
#include "storage/bufpage.h"
#include "storage/itemptr.h"
+#include "utils/rel.h"
/*
* Index tuple header structure
@@ -147,5 +148,7 @@ extern Datum nocache_index_getattr(IndexTuple tup, int attnum,
extern void index_deform_tuple(IndexTuple tup, TupleDesc tupleDescriptor,
Datum *values, bool *isnull);
extern IndexTuple CopyIndexTuple(IndexTuple source);
+extern IndexTuple index_reform_tuple(Relation idxrel, IndexTuple tup,
+ int natts, int nkeyatts);
#endif /* ITUP_H */
diff --git a/src/include/catalog/pg_am.h b/src/include/catalog/pg_am.h
index 8a28b8e..f9f0841 100644
--- a/src/include/catalog/pg_am.h
+++ b/src/include/catalog/pg_am.h
@@ -51,6 +51,7 @@ CATALOG(pg_am,2601)
bool amstorage; /* can storage type differ from column type? */
bool amclusterable; /* does AM support cluster command? */
bool ampredlocks; /* does AM handle predicate locks? */
+ bool amcanincluding; /* does AM support INCLUDING columns? */
Oid amkeytype; /* type of data in index, or InvalidOid */
regproc aminsert; /* "insert this tuple" function */
regproc ambeginscan; /* "prepare for index scan" function */
@@ -80,7 +81,7 @@ typedef FormData_pg_am *Form_pg_am;
* compiler constants for pg_am
* ----------------
*/
-#define Natts_pg_am 30
+#define Natts_pg_am 31
#define Anum_pg_am_amname 1
#define Anum_pg_am_amstrategies 2
#define Anum_pg_am_amsupport 3
@@ -95,44 +96,45 @@ typedef FormData_pg_am *Form_pg_am;
#define Anum_pg_am_amstorage 12
#define Anum_pg_am_amclusterable 13
#define Anum_pg_am_ampredlocks 14
-#define Anum_pg_am_amkeytype 15
-#define Anum_pg_am_aminsert 16
-#define Anum_pg_am_ambeginscan 17
-#define Anum_pg_am_amgettuple 18
-#define Anum_pg_am_amgetbitmap 19
-#define Anum_pg_am_amrescan 20
-#define Anum_pg_am_amendscan 21
-#define Anum_pg_am_ammarkpos 22
-#define Anum_pg_am_amrestrpos 23
-#define Anum_pg_am_ambuild 24
-#define Anum_pg_am_ambuildempty 25
-#define Anum_pg_am_ambulkdelete 26
-#define Anum_pg_am_amvacuumcleanup 27
-#define Anum_pg_am_amcanreturn 28
-#define Anum_pg_am_amcostestimate 29
-#define Anum_pg_am_amoptions 30
+#define Anum_pg_am_amcanincluding 15
+#define Anum_pg_am_amkeytype 16
+#define Anum_pg_am_aminsert 17
+#define Anum_pg_am_ambeginscan 18
+#define Anum_pg_am_amgettuple 19
+#define Anum_pg_am_amgetbitmap 20
+#define Anum_pg_am_amrescan 21
+#define Anum_pg_am_amendscan 22
+#define Anum_pg_am_ammarkpos 23
+#define Anum_pg_am_amrestrpos 24
+#define Anum_pg_am_ambuild 25
+#define Anum_pg_am_ambuildempty 26
+#define Anum_pg_am_ambulkdelete 27
+#define Anum_pg_am_amvacuumcleanup 28
+#define Anum_pg_am_amcanreturn 29
+#define Anum_pg_am_amcostestimate 30
+#define Anum_pg_am_amoptions 31
/* ----------------
* initial contents of pg_am
* ----------------
*/
-DATA(insert OID = 403 ( btree 5 2 t f t t t t t t f t t 0 btinsert btbeginscan btgettuple btgetbitmap btrescan btendscan btmarkpos btrestrpos btbuild btbuildempty btbulkdelete btvacuumcleanup btcanreturn btcostestimate btoptions ));
+DATA(insert OID = 403 ( btree 5 2 t f t t t t t t f t t t 0 btinsert btbeginscan btgettuple btgetbitmap btrescan btendscan btmarkpos btrestrpos btbuild btbuildempty btbulkdelete btvacuumcleanup btcanreturn btcostestimate btoptions ));
DESCR("b-tree index access method");
#define BTREE_AM_OID 403
-DATA(insert OID = 405 ( hash 1 1 f f t f f f f f f f f 23 hashinsert hashbeginscan hashgettuple hashgetbitmap hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbuildempty hashbulkdelete hashvacuumcleanup - hashcostestimate hashoptions ));
+DATA(insert OID = 405 ( hash 1 1 f f t f f f f f f f f f 23 hashinsert hashbeginscan hashgettuple hashgetbitmap hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbuildempty hashbulkdelete hashvacuumcleanup - hashcostestimate hashoptions ));
DESCR("hash index access method");
#define HASH_AM_OID 405
-DATA(insert OID = 783 ( gist 0 9 f t f f t t f t t t f 0 gistinsert gistbeginscan gistgettuple gistgetbitmap gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbuildempty gistbulkdelete gistvacuumcleanup gistcanreturn gistcostestimate gistoptions ));
+DATA(insert OID = 783 ( gist 0 9 f t f f t t f t t t f f 0 gistinsert gistbeginscan gistgettuple gistgetbitmap gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbuildempty gistbulkdelete gistvacuumcleanup gistcanreturn gistcostestimate gistoptions ));
DESCR("GiST index access method");
#define GIST_AM_OID 783
-DATA(insert OID = 2742 ( gin 0 6 f f f f t t f f t f f 0 gininsert ginbeginscan - gingetbitmap ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbuildempty ginbulkdelete ginvacuumcleanup - gincostestimate ginoptions ));
+DATA(insert OID = 2742 ( gin 0 6 f f f f t t f f t f f f 0 gininsert ginbeginscan - gingetbitmap ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbuildempty ginbulkdelete ginvacuumcleanup - gincostestimate ginoptions ));
DESCR("GIN index access method");
#define GIN_AM_OID 2742
-DATA(insert OID = 4000 ( spgist 0 5 f f f f f t f t f f f 0 spginsert spgbeginscan spggettuple spggetbitmap spgrescan spgendscan spgmarkpos spgrestrpos spgbuild spgbuildempty spgbulkdelete spgvacuumcleanup spgcanreturn spgcostestimate spgoptions ));
+DATA(insert OID = 4000 ( spgist 0 5 f f f f f t f t f f f f 0 spginsert spgbeginscan spggettuple spggetbitmap spgrescan spgendscan spgmarkpos spgrestrpos spgbuild spgbuildempty spgbulkdelete spgvacuumcleanup spgcanreturn spgcostestimate spgoptions ));
DESCR("SP-GiST index access method");
#define SPGIST_AM_OID 4000
-DATA(insert OID = 3580 ( brin 0 15 f f f f t t f t t f f 0 brininsert brinbeginscan - bringetbitmap brinrescan brinendscan brinmarkpos brinrestrpos brinbuild brinbuildempty brinbulkdelete brinvacuumcleanup - brincostestimate brinoptions ));
+DATA(insert OID = 3580 ( brin 0 15 f f f f t t f t t f f f 0 brininsert brinbeginscan - bringetbitmap brinrescan brinendscan brinmarkpos brinrestrpos brinbuild brinbuildempty brinbulkdelete brinvacuumcleanup - brincostestimate brinoptions ));
DESCR("block range index (BRIN) access method");
#define BRIN_AM_OID 3580
diff --git a/src/include/catalog/pg_index.h b/src/include/catalog/pg_index.h
index 45c96e3..452e1cf 100644
--- a/src/include/catalog/pg_index.h
+++ b/src/include/catalog/pg_index.h
@@ -33,6 +33,7 @@ CATALOG(pg_index,2610) BKI_WITHOUT_OIDS BKI_SCHEMA_MACRO
Oid indexrelid; /* OID of the index */
Oid indrelid; /* OID of the relation it indexes */
int16 indnatts; /* number of columns in index */
+ int16 indnkeyatts; /* number of key columns in index */
bool indisunique; /* is this a unique index? */
bool indisprimary; /* is this index for primary key? */
bool indisexclusion; /* is this index for exclusion constraint? */
@@ -70,26 +71,27 @@ typedef FormData_pg_index *Form_pg_index;
* compiler constants for pg_index
* ----------------
*/
-#define Natts_pg_index 19
+#define Natts_pg_index 20
#define Anum_pg_index_indexrelid 1
#define Anum_pg_index_indrelid 2
#define Anum_pg_index_indnatts 3
-#define Anum_pg_index_indisunique 4
-#define Anum_pg_index_indisprimary 5
-#define Anum_pg_index_indisexclusion 6
-#define Anum_pg_index_indimmediate 7
-#define Anum_pg_index_indisclustered 8
-#define Anum_pg_index_indisvalid 9
-#define Anum_pg_index_indcheckxmin 10
-#define Anum_pg_index_indisready 11
-#define Anum_pg_index_indislive 12
-#define Anum_pg_index_indisreplident 13
-#define Anum_pg_index_indkey 14
-#define Anum_pg_index_indcollation 15
-#define Anum_pg_index_indclass 16
-#define Anum_pg_index_indoption 17
-#define Anum_pg_index_indexprs 18
-#define Anum_pg_index_indpred 19
+#define Anum_pg_index_indnkeyatts 4
+#define Anum_pg_index_indisunique 5
+#define Anum_pg_index_indisprimary 6
+#define Anum_pg_index_indisexclusion 7
+#define Anum_pg_index_indimmediate 8
+#define Anum_pg_index_indisclustered 9
+#define Anum_pg_index_indisvalid 10
+#define Anum_pg_index_indcheckxmin 11
+#define Anum_pg_index_indisready 12
+#define Anum_pg_index_indislive 13
+#define Anum_pg_index_indisreplident 14
+#define Anum_pg_index_indkey 15
+#define Anum_pg_index_indcollation 16
+#define Anum_pg_index_indclass 17
+#define Anum_pg_index_indoption 18
+#define Anum_pg_index_indexprs 19
+#define Anum_pg_index_indpred 20
/*
* Index AMs that support ordered scans must support these two indoption
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4ae2f3e..4bd0d6e 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -59,6 +59,7 @@ typedef struct IndexInfo
{
NodeTag type;
int ii_NumIndexAttrs;
+ int ii_NumIndexKeyAttrs;
AttrNumber ii_KeyAttrNumbers[INDEX_MAX_KEYS];
List *ii_Expressions; /* list of Expr */
List *ii_ExpressionsState; /* list of ExprState */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index f0dcd2f..a0abba8 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2414,6 +2414,7 @@ typedef struct IndexStmt
char *accessMethod; /* name of access method (eg. btree) */
char *tableSpace; /* tablespace, or NULL for default */
List *indexParams; /* columns to index: a list of IndexElem */
+ List *indexIncludingParams; /* additional columns to index: a list of IndexElem */
List *options; /* WITH clause options: a list of DefElem */
Node *whereClause; /* qualification (partial-index predicate) */
List *excludeOpNames; /* exclusion operator names, or NIL if none */
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index 5dc23d9..6c4e202 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -532,6 +532,7 @@ typedef struct IndexOptInfo
/* index descriptor information */
int ncolumns; /* number of columns in index */
+ int nkeycolumns; /* number of key columns in index */
int *indexkeys; /* column numbers of index's keys, or 0 */
Oid *indexcollations; /* OIDs of collations of index columns */
Oid *opfamily; /* OIDs of operator families for columns */
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 8a55a09..b0f5930 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -351,6 +351,12 @@ typedef struct ViewOptions
#define RelationGetNumberOfAttributes(relation) ((relation)->rd_rel->relnatts)
/*
+ * RelationGetNumberOfAttributes
+ * Returns the number of attributes in a relation.
+ */
+#define IndexRelationGetNumberOfKeyAttributes(relation) ((relation)->rd_index->indnkeyatts)
+
+/*
* RelationGetDescr
* Returns tuple descriptor for a relation.
*/
diff --git a/src/test/regress/expected/create_index.out b/src/test/regress/expected/create_index.out
index b72e65d..8ea9e67 100644
--- a/src/test/regress/expected/create_index.out
+++ b/src/test/regress/expected/create_index.out
@@ -2376,6 +2376,22 @@ DETAIL: Key ((f1 || f2))=(ABCDEF) already exists.
-- but this shouldn't:
INSERT INTO func_index_heap VALUES('QWERTY');
--
+-- Test unique index with included columns
+--
+CREATE TABLE covering_index_heap (f1 int, f2 int, f3 text);
+CREATE UNIQUE INDEX covering_index_index on covering_index_heap (f1,f2) INCLUDING(f3);
+INSERT INTO covering_index_heap VALUES(1,1,'AAA');
+INSERT INTO covering_index_heap VALUES(1,2,'AAA');
+-- this should fail because of unique index on f1,f2:
+INSERT INTO covering_index_heap VALUES(1,2,'BBB');
+ERROR: duplicate key value violates unique constraint "covering_index_index"
+DETAIL: Key (f1, f2, f3 (included))=(1, 2, BBB) already exists.
+-- and this shouldn't:
+INSERT INTO covering_index_heap VALUES(1,4,'AAA');
+-- Try to build index on table that already contains data
+CREATE UNIQUE INDEX covering_index_index2 on covering_index_heap (f1,f2) INCLUDING(f3);
+DROP TABLE covering_index_heap;
+--
-- Also try building functional, expressional, and partial indexes on
-- tables that already contain data.
--
diff --git a/src/test/regress/sql/create_index.sql b/src/test/regress/sql/create_index.sql
index ff86953..42d4881 100644
--- a/src/test/regress/sql/create_index.sql
+++ b/src/test/regress/sql/create_index.sql
@@ -722,6 +722,22 @@ INSERT INTO func_index_heap VALUES('ABCD', 'EF');
INSERT INTO func_index_heap VALUES('QWERTY');
--
+-- Test unique index with included columns
+--
+CREATE TABLE covering_index_heap (f1 int, f2 int, f3 text);
+CREATE UNIQUE INDEX covering_index_index on covering_index_heap (f1,f2) INCLUDING(f3);
+
+INSERT INTO covering_index_heap VALUES(1,1,'AAA');
+INSERT INTO covering_index_heap VALUES(1,2,'AAA');
+-- this should fail because of unique index on f1,f2:
+INSERT INTO covering_index_heap VALUES(1,2,'BBB');
+-- and this shouldn't:
+INSERT INTO covering_index_heap VALUES(1,4,'AAA');
+-- Try to build index on table that already contains data
+CREATE UNIQUE INDEX covering_index_index2 on covering_index_heap (f1,f2) INCLUDING(f3);
+DROP TABLE covering_index_heap;
+
+--
-- Also try building functional, expressional, and partial indexes on
-- tables that already contain data.
--
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index aa5b28c..6316ca5 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -180,6 +180,7 @@ BuildIndexValueDescription(Relation indexRelation,
Oid indexrelid = RelationGetRelid(indexRelation);
Oid indrelid;
AclResult aclresult;
+ // elog(NOTICE, "BuildIndexValueDescription. begin");
/*
* Check permissions- if the user does not have access to view all of the
@@ -221,6 +222,7 @@ BuildIndexValueDescription(Relation indexRelation,
for (keyno = 0; keyno < idxrec->indnatts; keyno++)
{
AttrNumber attnum = idxrec->indkey.values[keyno];
+ //elog(NOTICE, "BuildIndexValueDescription. keyno %d", keyno);
/*
* Note that if attnum == InvalidAttrNumber, then this is an index
@@ -244,30 +246,40 @@ BuildIndexValueDescription(Relation indexRelation,
appendStringInfo(&buf, "(%s)=(",
pg_get_indexdef_columns(indexrelid, true));
+ int nkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);
for (i = 0; i < natts; i++)
{
char *val;
-
if (isnull[i])
val = "null";
else
{
Oid foutoid;
bool typisvarlena;
+ TupleDesc tupdesc = RelationGetDescr(indexRelation);
+ if (i < nkeyatts)
+ {
+ /* TODO
+ * The provided data is not necessarily of the type stored in the
+ * index; rather it is of the index opclass's input type. So look
+ * at rd_opcintype not the index tupdesc.
+ *
+ * Note: this is a bit shaky for opclasses that have pseudotype
+ * input types such as ANYARRAY or RECORD. Currently, the
+ * typoutput functions associated with the pseudotypes will work
+ * okay, but we might have to try harder in future.
+ */
+ //elog(NOTICE, "BuildIndexValueDescription. tdtypeid[%d]=%u rd_opcintype[%d]=%u", i, tupdesc->attrs[i]->atttypid,i, indexRelation->rd_opcintype[i]);
+ getTypeOutputInfo(indexRelation->rd_opcintype[i], &foutoid, &typisvarlena);
+ val = OidOutputFunctionCall(foutoid, values[i]);
+ }
+ else
+ {
+ getTypeOutputInfo(tupdesc->attrs[i]->atttypid, &foutoid, &typisvarlena);
+ val = OidOutputFunctionCall(foutoid, values[i]);
+ //elog(NOTICE, "BuildIndexValueDescription. some included value [%d]", i);
- /*
- * The provided data is not necessarily of the type stored in the
- * index; rather it is of the index opclass's input type. So look
- * at rd_opcintype not the index tupdesc.
- *
- * Note: this is a bit shaky for opclasses that have pseudotype
- * input types such as ANYARRAY or RECORD. Currently, the
- * typoutput functions associated with the pseudotypes will work
- * okay, but we might have to try harder in future.
- */
- getTypeOutputInfo(indexRelation->rd_opcintype[i],
- &foutoid, &typisvarlena);
- val = OidOutputFunctionCall(foutoid, values[i]);
+ }
}
if (i > 0)
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index 6932289..175714d 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -86,11 +86,11 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
Datum arg;
bool null;
int flags;
-
/*
* We can use the cached (default) support procs since no cross-type
* comparison can be needed.
*/
+ //elog(NOTICE, "_bt_mkscankey i %d", i);
procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
arg = index_getattr(itup, i + 1, itupdesc, &null);
flags = (null ? SK_ISNULL : 0) | (indoption[i] << SK_BT_INDOPTION_SHIFT);
@@ -127,11 +127,12 @@ _bt_mkscankey_nodata(Relation rel)
int i;
natts = RelationGetNumberOfAttributes(rel);
+ int nkeyatts = rel->rd_index->indnkeyatts;
indoption = rel->rd_indoption;
skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
- for (i = 0; i < natts; i++)
+ for (i = 0; i < nkeyatts; i++)
{
FmgrInfo *procinfo;
int flags;
@@ -140,6 +141,7 @@ _bt_mkscankey_nodata(Relation rel)
* We can use the cached (default) support procs since no cross-type
* comparison can be needed.
*/
+ //elog(NOTICE, "_bt_mkscankey_nodata i %d", i);
procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
flags = SK_ISNULL | (indoption[i] << SK_BT_INDOPTION_SHIFT);
ScanKeyEntryInitializeWithInfo(&skey[i],
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 27a12a0..4ba362b 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -424,38 +424,44 @@ ConstructTupleDescriptor(Relation heapRelation,
elog(ERROR, "too few entries in colnames list");
namestrcpy(&to->attname, (const char *) lfirst(colnames_item));
colnames_item = lnext(colnames_item);
-
- /*
- * Check the opclass and index AM to see if either provides a keytype
- * (overriding the attribute type). Opclass takes precedence.
- */
- tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(classObjectId[i]));
- if (!HeapTupleIsValid(tuple))
- elog(ERROR, "cache lookup failed for opclass %u",
- classObjectId[i]);
- opclassTup = (Form_pg_opclass) GETSTRUCT(tuple);
- if (OidIsValid(opclassTup->opckeytype))
- keyType = opclassTup->opckeytype;
- else
- keyType = amform->amkeytype;
- ReleaseSysCache(tuple);
-
- if (OidIsValid(keyType) && keyType != to->atttypid)
+ if (i < indexInfo->ii_NumIndexKeyAttrs)
{
- /* index value and heap value have different types */
- tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(keyType));
+ /*
+ * Check the opclass and index AM to see if either provides a keytype
+ * (overriding the attribute type). Opclass takes precedence.
+ */
+ tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(classObjectId[i]));
if (!HeapTupleIsValid(tuple))
- elog(ERROR, "cache lookup failed for type %u", keyType);
- typeTup = (Form_pg_type) GETSTRUCT(tuple);
-
- to->atttypid = keyType;
- to->atttypmod = -1;
- to->attlen = typeTup->typlen;
- to->attbyval = typeTup->typbyval;
- to->attalign = typeTup->typalign;
- to->attstorage = typeTup->typstorage;
-
+ elog(ERROR, "cache lookup failed for opclass %u",
+ classObjectId[i]);
+ opclassTup = (Form_pg_opclass) GETSTRUCT(tuple);
+ if (OidIsValid(opclassTup->opckeytype))
+ keyType = opclassTup->opckeytype;
+ else
+ keyType = amform->amkeytype;
ReleaseSysCache(tuple);
+
+ if (OidIsValid(keyType) && keyType != to->atttypid)
+ {
+ /* index value and heap value have different types */
+ tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(keyType));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for type %u", keyType);
+ typeTup = (Form_pg_type) GETSTRUCT(tuple);
+
+ to->atttypid = keyType;
+ to->atttypmod = -1;
+ to->attlen = typeTup->typlen;
+ to->attbyval = typeTup->typbyval;
+ to->attalign = typeTup->typalign;
+ to->attstorage = typeTup->typstorage;
+
+ ReleaseSysCache(tuple);
+ }
+ }
+ else
+ {
+ //elog(NOTICE, "ConstructTupleDescriptor. Included opclass attr num %d. Don't check opclass", i);
}
}
@@ -500,7 +506,6 @@ AppendAttributeTuples(Relation indexRelation, int numatts)
pg_attribute = heap_open(AttributeRelationId, RowExclusiveLock);
indstate = CatalogOpenIndexes(pg_attribute);
-
/*
* insert data from new index's tupdesc into pg_attribute
*/
@@ -866,7 +871,6 @@ index_create(Relation heapRelation,
indexRelation->rd_rel->relowner = heapRelation->rd_rel->relowner;
indexRelation->rd_rel->relam = accessMethodObjectId;
indexRelation->rd_rel->relhasoids = false;
-
/*
* store index's pg_class entry
*/
@@ -998,7 +1002,7 @@ index_create(Relation heapRelation,
/* Store dependency on collations */
/* The default collation is pinned, so don't bother recording it */
- for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+ for (i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
{
if (OidIsValid(collationObjectId[i]) &&
collationObjectId[i] != DEFAULT_COLLATION_OID)
@@ -1012,7 +1016,7 @@ index_create(Relation heapRelation,
}
/* Store dependency on operator classes */
- for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+ for (i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
{
referenced.classId = OperatorClassRelationId;
referenced.objectId = classObjectId[i];
@@ -1700,7 +1704,7 @@ BuildIndexInfo(Relation index)
void
BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)
{
- int ncols = index->rd_rel->relnatts;
+ int nkeycols = IndexRelationGetNumberOfKeyAttributes(index);
int i;
/*
@@ -1711,16 +1715,16 @@ BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)
if (index->rd_rel->relam != BTREE_AM_OID)
elog(ERROR, "unexpected non-btree speculative unique index");
- ii->ii_UniqueOps = (Oid *) palloc(sizeof(Oid) * ncols);
- ii->ii_UniqueProcs = (Oid *) palloc(sizeof(Oid) * ncols);
- ii->ii_UniqueStrats = (uint16 *) palloc(sizeof(uint16) * ncols);
+ ii->ii_UniqueOps = (Oid *) palloc(sizeof(Oid) * nkeycols);
+ ii->ii_UniqueProcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
+ ii->ii_UniqueStrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
/*
* We have to look up the operator's strategy number. This provides a
* cross-check that the operator does match the index.
*/
/* We need the func OIDs and strategy numbers too */
- for (i = 0; i < ncols; i++)
+ for (i = 0; i < nkeycols; i++)
{
ii->ii_UniqueStrats[i] = BTEqualStrategyNumber;
ii->ii_UniqueOps[i] =
@@ -2644,7 +2648,6 @@ IndexCheckExclusion(Relation heapRelation,
EState *estate;
ExprContext *econtext;
Snapshot snapshot;
-
/*
* If we are reindexing the target index, mark it as no longer being
* reindexed, to forestall an Assert in index_beginscan when we try to use
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 4e23dbe..aa4e430 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -983,16 +983,14 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
ListCell *nextExclOp;
ListCell *lc;
int attn;
-
+ int nkeycols = indexInfo->ii_NumIndexKeyAttrs;
/* Allocate space for exclusion operator info, if needed */
if (exclusionOpNames)
{
- int ncols = list_length(attList);
-
- Assert(list_length(exclusionOpNames) == ncols);
- indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * ncols);
- indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * ncols);
- indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * ncols);
+ Assert(list_length(exclusionOpNames) == nkeycols);
+ indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * nkeycols);
+ indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
+ indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
nextExclOp = list_head(exclusionOpNames);
}
else
@@ -1125,110 +1123,117 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
/*
* Identify the opclass to use.
*/
- classOidP[attn] = GetIndexOpClass(attribute->opclass,
- atttype,
- accessMethodName,
- accessMethodId);
-
- /*
- * Identify the exclusion operator, if any.
- */
- if (nextExclOp)
+ if (attn < nkeycols)
{
- List *opname = (List *) lfirst(nextExclOp);
- Oid opid;
- Oid opfamily;
- int strat;
+ //elog(NOTICE, "ComputeIndexAttrs attn %d, nkeycolss %d", attn, nkeycols);
+ classOidP[attn] = GetIndexOpClass(attribute->opclass,
+ atttype,
+ accessMethodName,
+ accessMethodId);
/*
- * Find the operator --- it must accept the column datatype
- * without runtime coercion (but binary compatibility is OK)
- */
- opid = compatible_oper_opid(opname, atttype, atttype, false);
+ * Identify the exclusion operator, if any.
+ */
+ if (nextExclOp)
+ {
+ List *opname = (List *) lfirst(nextExclOp);
+ Oid opid;
+ Oid opfamily;
+ int strat;
- /*
- * Only allow commutative operators to be used in exclusion
- * constraints. If X conflicts with Y, but Y does not conflict
- * with X, bad things will happen.
- */
- if (get_commutator(opid) != opid)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("operator %s is not commutative",
- format_operator(opid)),
- errdetail("Only commutative operators can be used in exclusion constraints.")));
+ /*
+ * Find the operator --- it must accept the column datatype
+ * without runtime coercion (but binary compatibility is OK)
+ */
+ opid = compatible_oper_opid(opname, atttype, atttype, false);
- /*
- * Operator must be a member of the right opfamily, too
- */
- opfamily = get_opclass_family(classOidP[attn]);
- strat = get_op_opfamily_strategy(opid, opfamily);
- if (strat == 0)
- {
- HeapTuple opftuple;
- Form_pg_opfamily opfform;
+ /*
+ * Only allow commutative operators to be used in exclusion
+ * constraints. If X conflicts with Y, but Y does not conflict
+ * with X, bad things will happen.
+ */
+ if (get_commutator(opid) != opid)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("operator %s is not commutative",
+ format_operator(opid)),
+ errdetail("Only commutative operators can be used in exclusion constraints.")));
/*
- * attribute->opclass might not explicitly name the opfamily,
- * so fetch the name of the selected opfamily for use in the
- * error message.
- */
- opftuple = SearchSysCache1(OPFAMILYOID,
- ObjectIdGetDatum(opfamily));
- if (!HeapTupleIsValid(opftuple))
- elog(ERROR, "cache lookup failed for opfamily %u",
- opfamily);
- opfform = (Form_pg_opfamily) GETSTRUCT(opftuple);
+ * Operator must be a member of the right opfamily, too
+ */
+ opfamily = get_opclass_family(classOidP[attn]);
+ strat = get_op_opfamily_strategy(opid, opfamily);
+ if (strat == 0)
+ {
+ HeapTuple opftuple;
+ Form_pg_opfamily opfform;
+
+ /*
+ * attribute->opclass might not explicitly name the opfamily,
+ * so fetch the name of the selected opfamily for use in the
+ * error message.
+ */
+ opftuple = SearchSysCache1(OPFAMILYOID,
+ ObjectIdGetDatum(opfamily));
+ if (!HeapTupleIsValid(opftuple))
+ elog(ERROR, "cache lookup failed for opfamily %u",
+ opfamily);
+ opfform = (Form_pg_opfamily) GETSTRUCT(opftuple);
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("operator %s is not a member of operator family \"%s\"",
- format_operator(opid),
- NameStr(opfform->opfname)),
- errdetail("The exclusion operator must be related to the index operator class for the constraint.")));
- }
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("operator %s is not a member of operator family \"%s\"",
+ format_operator(opid),
+ NameStr(opfform->opfname)),
+ errdetail("The exclusion operator must be related to the index operator class for the constraint.")));
+ }
- indexInfo->ii_ExclusionOps[attn] = opid;
- indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
- indexInfo->ii_ExclusionStrats[attn] = strat;
- nextExclOp = lnext(nextExclOp);
- }
+ indexInfo->ii_ExclusionOps[attn] = opid;
+ indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
+ indexInfo->ii_ExclusionStrats[attn] = strat;
+ nextExclOp = lnext(nextExclOp);
+ }
- /*
- * Set up the per-column options (indoption field). For now, this is
- * zero for any un-ordered index, while ordered indexes have DESC and
- * NULLS FIRST/LAST options.
- */
- colOptionP[attn] = 0;
- if (amcanorder)
- {
- /* default ordering is ASC */
- if (attribute->ordering == SORTBY_DESC)
- colOptionP[attn] |= INDOPTION_DESC;
- /* default null ordering is LAST for ASC, FIRST for DESC */
- if (attribute->nulls_ordering == SORTBY_NULLS_DEFAULT)
+ /*
+ * Set up the per-column options (indoption field). For now, this is
+ * zero for any un-ordered index, while ordered indexes have DESC and
+ * NULLS FIRST/LAST options.
+ */
+ colOptionP[attn] = 0;
+ if (amcanorder)
{
+ /* default ordering is ASC */
if (attribute->ordering == SORTBY_DESC)
+ colOptionP[attn] |= INDOPTION_DESC;
+ /* default null ordering is LAST for ASC, FIRST for DESC */
+ if (attribute->nulls_ordering == SORTBY_NULLS_DEFAULT)
+ {
+ if (attribute->ordering == SORTBY_DESC)
+ colOptionP[attn] |= INDOPTION_NULLS_FIRST;
+ }
+ else if (attribute->nulls_ordering == SORTBY_NULLS_FIRST)
colOptionP[attn] |= INDOPTION_NULLS_FIRST;
}
- else if (attribute->nulls_ordering == SORTBY_NULLS_FIRST)
- colOptionP[attn] |= INDOPTION_NULLS_FIRST;
+ else
+ {
+ /* index AM does not support ordering */
+ if (attribute->ordering != SORTBY_DEFAULT)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("access method \"%s\" does not support ASC/DESC options",
+ accessMethodName)));
+ if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("access method \"%s\" does not support NULLS FIRST/LAST options",
+ accessMethodName)));
+ }
}
else
{
- /* index AM does not support ordering */
- if (attribute->ordering != SORTBY_DEFAULT)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("access method \"%s\" does not support ASC/DESC options",
- accessMethodName)));
- if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("access method \"%s\" does not support NULLS FIRST/LAST options",
- accessMethodName)));
+ //elog(NOTICE, "ComputeIndexAttrs. Included attn %d. Don't look for opclass", attn);
}
-
attn++;
}
}
diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c
index 603d11d..ffb63f2 100644
--- a/src/backend/optimizer/util/plancat.c
+++ b/src/backend/optimizer/util/plancat.c
@@ -225,18 +225,22 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
* each indexed column must have an opclass.
*/
info->indexkeys = (int *) palloc(sizeof(int) * ncolumns);
- info->indexcollations = (Oid *) palloc(sizeof(Oid) * ncolumns);
- info->opfamily = (Oid *) palloc(sizeof(Oid) * ncolumns);
- info->opcintype = (Oid *) palloc(sizeof(Oid) * ncolumns);
+ info->indexcollations = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
+ info->opfamily = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
+ info->opcintype = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
info->canreturn = (bool *) palloc(sizeof(bool) * ncolumns);
for (i = 0; i < ncolumns; i++)
{
info->indexkeys[i] = index->indkey.values[i];
+ info->canreturn[i] = index_can_return(indexRelation, i + 1);
+ }
+
+ for (i = 0; i < nkeycolumns; i++)
+ {
info->indexcollations[i] = indexRelation->rd_indcollation[i];
info->opfamily[i] = indexRelation->rd_opfamily[i];
info->opcintype[i] = indexRelation->rd_opcintype[i];
- info->canreturn[i] = index_can_return(indexRelation, i + 1);
}
info->relam = indexRelation->rd_rel->relam;
@@ -263,7 +267,7 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
info->reverse_sort = (bool *) palloc(sizeof(bool) * ncolumns);
info->nulls_first = (bool *) palloc(sizeof(bool) * ncolumns);
- for (i = 0; i < ncolumns; i++)
+ for (i = 0; i < nkeycolumns; i++)
{
int16 opt = indexRelation->rd_indoption[i];
@@ -291,7 +295,7 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
info->reverse_sort = (bool *) palloc(sizeof(bool) * ncolumns);
info->nulls_first = (bool *) palloc(sizeof(bool) * ncolumns);
- for (i = 0; i < ncolumns; i++)
+ for (i = 0; i < nkeycolumns; i++)
{
int16 opt = indexRelation->rd_indoption[i];
Oid ltopr;
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 513fa57..ac828b5 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -1178,7 +1178,7 @@ RelationInitIndexAccessInfo(Relation relation)
int2vector *indoption;
MemoryContext indexcxt;
MemoryContext oldcontext;
- int natts;
+ int natts, nkeyatts;
uint16 amsupport;
/*
@@ -1214,6 +1214,7 @@ RelationInitIndexAccessInfo(Relation relation)
elog(ERROR, "relnatts disagrees with indnatts for index %u",
RelationGetRelid(relation));
amsupport = aform->amsupport;
+ nkeyatts = IndexRelationGetNumberOfKeyAttributes(relation);
/*
* Make the private context to hold index access info. The reason we need
@@ -1294,7 +1295,7 @@ RelationInitIndexAccessInfo(Relation relation)
*/
IndexSupportInitialize(indclass, relation->rd_support,
relation->rd_opfamily, relation->rd_opcintype,
- amsupport, natts);
+ amsupport, nkeyatts);
/*
* Similarly extract indoption and copy it to the cache entry
@@ -4366,7 +4367,7 @@ RelationGetExclusionInfo(Relation indexRelation,
Oid **procs,
uint16 **strategies)
{
- int ncols = indexRelation->rd_rel->relnatts;
+ int nkeycols = IndexRelationGetNumberOfKeyAttributes(indexRelation);
Oid *ops;
Oid *funcs;
uint16 *strats;
@@ -4379,16 +4380,16 @@ RelationGetExclusionInfo(Relation indexRelation,
int i;
/* Allocate result space in caller context */
- *operators = ops = (Oid *) palloc(sizeof(Oid) * ncols);
- *procs = funcs = (Oid *) palloc(sizeof(Oid) * ncols);
- *strategies = strats = (uint16 *) palloc(sizeof(uint16) * ncols);
+ *operators = ops = (Oid *) palloc(sizeof(Oid) * nkeycols);
+ *procs = funcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
+ *strategies = strats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
/* Quick exit if we have the data cached already */
if (indexRelation->rd_exclstrats != NULL)
{
- memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * ncols);
- memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * ncols);
- memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * ncols);
+ memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * nkeycols);
+ memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * nkeycols);
+ memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * nkeycols);
return;
}
@@ -4437,12 +4438,12 @@ RelationGetExclusionInfo(Relation indexRelation,
arr = DatumGetArrayTypeP(val); /* ensure not toasted */
nelem = ARR_DIMS(arr)[0];
if (ARR_NDIM(arr) != 1 ||
- nelem != ncols ||
+ nelem != nkeycols ||
ARR_HASNULL(arr) ||
ARR_ELEMTYPE(arr) != OIDOID)
elog(ERROR, "conexclop is not a 1-D Oid array");
- memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * ncols);
+ memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * nkeycols);
}
systable_endscan(conscan);
@@ -4453,7 +4454,7 @@ RelationGetExclusionInfo(Relation indexRelation,
RelationGetRelationName(indexRelation));
/* We need the func OIDs and strategy numbers too */
- for (i = 0; i < ncols; i++)
+ for (i = 0; i < nkeycols; i++)
{
funcs[i] = get_opcode(ops[i]);
strats[i] = get_op_opfamily_strategy(ops[i],
@@ -4466,12 +4467,12 @@ RelationGetExclusionInfo(Relation indexRelation,
/* Save a copy of the results in the relcache entry. */
oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
- indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * ncols);
- indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * ncols);
- indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * ncols);
- memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * ncols);
- memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * ncols);
- memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * ncols);
+ indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * nkeycols);
+ indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * nkeycols);
+ indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
+ memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * nkeycols);
+ memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * nkeycols);
+ memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * nkeycols);
MemoryContextSwitchTo(oldcxt);
}
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index d532e87..ec2a49c 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -810,9 +810,8 @@ tuplesort_begin_index_btree(Relation heapRel,
state->heapRel = heapRel;
state->indexRel = indexRel;
state->enforceUnique = enforceUnique;
-
indexScanKey = _bt_mkscankey_nodata(indexRel);
- state->nKeys = RelationGetNumberOfAttributes(indexRel);
+ state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
/* Prepare SortSupport data for each column */
state->sortKeys = (SortSupport) palloc0(state->nKeys *
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers