I realise that this has already been done, by Joe Conway I think. Indeed I was looking at this just before beta1 when I happened to notice the post giving the plpgsql function. However, as I had started work on it and I was interested in seeing how things should be done I continued, only not in so much of a rush. In the interests on finding out if I have approached this the right way, or the way a more experienced backend programmer would, I'd appreciate any comments on the attached .c file. In particular, I'm not sure what I'm doing with regard to memory contexts, I think I may have one unnecessary switch in there, and in general I seem to be doing a lot of work just to find out tidbits of information. I based this on, i.e. started by editing, Joe Conway's tablefunc.c but I think there's very little of the original left in there. I've also attached the .h, Makefile and .sql.in files to make this work if anyone is interested in giving it a run. The .sql.in shows the usage. I did this in a directory called pggrouping, for the sake of a better name, under the contrib directory in my tree, so that's probably the best place to build it. Thanks, and sorry for adding to people's email and work load. -- Nigel J. Andrews Director --- Logictree Systems Limited Computer Consultants
/* * Derived from tablefunc.c, a sample to demonstrate C functions which * return setof scalar and setof composite by Joe Conway <[EMAIL PROTECTED]> * * Copyright 2002 by PostgreSQL Global Development Group * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without a written agreement * is hereby granted, provided that the above copyright notice and this * paragraph and the following two paragraphs appear in all copies. * * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. * */ #include <stdlib.h> #include <math.h> #include "postgres.h" #include "fmgr.h" #include "funcapi.h" #include "executor/spi.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/lsyscache.h" #include "pggrouping.h" typedef struct unpack_array_fctx { SPITupleTable *spi_tuptable; /* sql results from user query */ TupleDesc tupdesc; /* TupleDesc for results */ int unpack_attrnum; /* attribute number to be unpacked */ int lastcall_cntr; /* previous call_cntr, invlaid = -1 */ int lastindex; /* index of the last array item sent, invalid < 1 */ } unpack_array_fctx; #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp))) #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp))) static Datum expandArray_SRF(FunctionCallInfo info, char *sql, int unpackAttrNum); static Datum expandArray_SRF_FirstCall(FunctionCallInfo fcinfo, FuncCallContext *funcctx, const char *sql, const int unpackAttrNum); static Datum expandArray_SRF_GetTuple(FunctionCallInfo fcinfo, FuncCallContext *funcctx); static TupleDesc makeUnpackedTupleDesc(TupleDesc src_tupdesc, int unpack_attrnum); static bool similarTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc); /* * pg_group_expandusers * * Return pg_group where each tuple has grolist attribute of int4[] type * changed to be of type int4 and to hold only one user id. */ PG_FUNCTION_INFO_V1(pg_group_long); Datum pg_group_long(PG_FUNCTION_ARGS) { return expandArray_SRF(fcinfo, "select groname,grosysid,grolist from pg_group", 3); } /* * expand_array_srf * * Return tuples such that the elements of an array attribute are * extracted in turn and placed into the output instead of the array. * Declared to fmgr as: * CREATE FUNCTION the_name(text,integer) RETURNS SETOF RECORD ... * * where the text argument is the query string to obtain the source * data and the integer argument gives the column number of the array * to expand. * * Note, despite checking number of arguments this is in no way safe * from some one creating a fmgr function which uses wrong argument types. */ PG_FUNCTION_INFO_V1(expand_array_srf); Datum expand_array_srf(PG_FUNCTION_ARGS) { if (fcinfo->nargs != 2) elog(ERROR, "Wrong number of arguments specified for function"); return expandArray_SRF(fcinfo, GET_STR(PG_GETARG_TEXT_P(0)), PG_GETARG_INT32(1)); } /* * expandArray_SRF * * This is not the user invoked function. It is supposed to * reused by all such functions for doing the work. * Objects if it's asked to unpack multidimensional arrays. * * Nice idea but the use of fcinfo in the return macros requiring * that value to be passed around makes this split into nice * manageable code portions more messy and worse, dependent on * the calling convention of fmgr [and SRF] functions, not to * mention the SPI_ disconnection stuck in the middle of nowhere; * otherwise known as: at the end, completely out of touch with the * rest of the use of SPI. */ static Datum expandArray_SRF(FunctionCallInfo fcinfo, char *sql, int unpackAttrNum) { FuncCallContext *funcctx; Datum retdatum; if(SRF_IS_FIRSTCALL()) { /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); retdatum = expandArray_SRF_FirstCall(fcinfo, funcctx, sql, unpackAttrNum); /* cheat to detect no usable tuples */ if (((ReturnSetInfo *)(fcinfo->resultinfo))->isDone == ExprEndResult) return retdatum; } /* stuff done on every call of the function */ funcctx = SRF_PERCALL_SETUP(); if (funcctx->call_cntr < funcctx->max_calls) return expandArray_SRF_GetTuple(fcinfo, funcctx); /* release SPI related resources not too nice what with SPI only reference in FirstCall() */ SPI_finish(); SRF_RETURN_DONE(funcctx); } /* * expandArray_SRF_FirstCall * * Do the 'on first call' stuff. */ static Datum expandArray_SRF_FirstCall(FunctionCallInfo fcinfo, FuncCallContext *funcctx, const char *sql, const int unpackcolnum) { struct unpack_array_fctx *fctx; MemoryContext oldcontext; int ret; int proc; Oid funcid = fcinfo->flinfo->fn_oid; Oid functypeid; char functyptype; TupleDesc tupdesc = NULL; SPITupleTable *spi_tuptable = NULL; /* switch to memory context appropriate for multiple function calls */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); /* Do the query */ if ((ret = SPI_connect()) < 0) elog(ERROR, "expandArray: SPI_connect returned %d", ret); ret = SPI_exec((char *)sql, 0); proc = SPI_processed; if ((ret != SPI_OK_SELECT) || (proc == 0)) { /* no tuples */ SPI_finish(); SRF_RETURN_DONE(funcctx); } spi_tuptable = SPI_tuptable; /* SPI switches context on us, so reset it */ MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); /* Determine and verify return types */ /* desired return type */ tupdesc = makeUnpackedTupleDesc(spi_tuptable->tupdesc, unpackcolnum); /* function return type */ functypeid = get_func_rettype(funcid); functyptype = get_typtype(functypeid); if (functyptype == 'c') { /* Return type is fully known (predetermined) so check for compatibility to query */ TupleDesc fret_tupdesc; fret_tupdesc = TypeGetTupleDesc(functypeid, NIL); if (!similarTupleDescs(fret_tupdesc, tupdesc)) elog(ERROR, "Query and return tuple descriptions are incompatible"); pfree(fret_tupdesc); } else if (functyptype == 'p' && functypeid == RECORDOID) { /* We have the freedom to specify what we are returning */ tupdesc = tupdesc; } else if (functyptype == 'b') elog(ERROR, "Invalid kind of return type specified for function"); else elog(ERROR, "Unknown kind of return type specified for function"); /* allocate a slot for a tuple with this tupdesc */ funcctx->slot = TupleDescGetSlot(tupdesc); /* initialise our persistent storage */ fctx = (struct unpack_array_fctx *) palloc(sizeof(struct unpack_array_fctx)); fctx->spi_tuptable = spi_tuptable; fctx->lastcall_cntr = -1; fctx->lastindex = 0; fctx->unpack_attrnum = unpackcolnum; fctx->tupdesc = tupdesc; funcctx->user_fctx = fctx; /* total 'notional' number of tuples to be returned */ funcctx->max_calls = proc; MemoryContextSwitchTo(oldcontext); /* dummy return */ return Int32GetDatum(0); } /* * expandArray_SRF_GetTuple * * Get a tuple. */ static Datum expandArray_SRF_GetTuple(FunctionCallInfo fcinfo, FuncCallContext *funcctx) { int call_cntr = funcctx->call_cntr; TupleTableSlot *slot = funcctx->slot; struct unpack_array_fctx *fctx = (struct unpack_array_fctx *) funcctx->user_fctx; int nextarrayind = fctx->lastindex + 1; int unpackcolnum = fctx->unpack_attrnum; HeapTuple spi_tuple = fctx->spi_tuptable->vals[call_cntr]; TupleDesc spi_tupdesc = fctx->spi_tuptable->tupdesc; HeapTuple tuple; TupleDesc tupdesc = fctx->tupdesc; MemoryContext oldcontext; Datum *values; char *nulls; int i; int cattrindex = unpackcolnum - 1; Datum result; /* paranoia test */ #ifdef ALWAYS_PARANOID if (spi_tuple->t_data->t_natts != tupdesc->natts || spi_tupdesc->natts != tupdesc->natts) elog(ERROR, "Mismatch in tuple and tupledesc attribute counts"); #else Assert(spi_tuple->t_data->t_natts == tupdesc->natts); Assert(spi_tupdesc->natts == tupdesc->natts); #endif oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); values = (Datum *)palloc(sizeof(Datum) * tupdesc->natts); nulls = (char *)palloc(sizeof(char) * tupdesc->natts); /* heap_deformtuple() replacement */ for (i = 0; i < tupdesc->natts; i++) { bool isNull; *(values + i) = heap_getattr(spi_tuple, i + 1, spi_tupdesc, &isNull); *(nulls + i) = (isNull) ? 'n' : ' '; } if (*(nulls + cattrindex) != 'n') { /* Replace the offending array with an element from it or null it */ ArrayType *earray = DatumGetArrayTypeP(*(values + cattrindex)); Datum eelement = PointerGetDatum(NULL); bool eelementisnull = true; int earrayind = fctx->lastcall_cntr == call_cntr ? nextarrayind : ARR_LBOUND(earray)[0]; /* Quick check for 1-dimensionality */ if (ARR_NDIM(earray) != 1) elog(ERROR, "Array is multidimensional, only 1-d arrays are supported"); /* empty the entry */ *(values + cattrindex) = eelement; *(nulls + cattrindex) = 'n'; if (earrayind <= ARR_DIMS(earray)[0]) { Form_pg_attribute elemattr = tupdesc->attrs[cattrindex]; /* Get the element */ eelement = array_ref(earray, 1, &earrayind, 0, /* > 0 means fixed length array */ elemattr->attlen, elemattr->attbyval, elemattr->attalign, &eelementisnull); /* eelement = fetch_att(((Datum)(ARR_DATA_PTR(earray)) + earrayind), tupdesc->attrs[cattrindex]->attbyval, tupdesc->attrs[cattrindex]->attlen); */ } if (earrayind >= ARR_DIMS(earray)[0]) /* Indicate end of array reached */ nextarrayind = 0; if (!eelementisnull /*PointerIsValid(DatumGetPointer(eelement))*/) { *(values + cattrindex) = eelement; *(nulls + cattrindex) = ' '; } } else /* Indicate end of array since no array present */ nextarrayind = 0; MemoryContextSwitchTo(oldcontext); /* Copies datums into a new tuple */ tuple = heap_formtuple(tupdesc, values, nulls); /* * Make the tuple into a datum * - comment on this macro implies tuple should be in our context * and possibly freed after here */ result = TupleGetDatum(slot, tuple); MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); pfree(values); pfree(nulls); /* Set array index and previous call counter for next call */ fctx->lastindex = nextarrayind; fctx->lastcall_cntr = call_cntr; if (nextarrayind > 0) /* -- call counter since API will ++ but we haven't finished */ call_cntr = --funcctx->call_cntr; MemoryContextSwitchTo(oldcontext); SRF_RETURN_NEXT(funcctx, result); } /* * Check if two tupdescs match in type of attributes * The standard equalTupleDescs() compares names, something * which may be a problem, so we use a much loser check * of sameness. */ static bool similarTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc) { int i; if (ret_tupdesc->natts != sql_tupdesc->natts) return false; for (i = 0; i < ret_tupdesc->natts; i++) if (ret_tupdesc->attrs[i]->atttypid != sql_tupdesc->attrs[i]->atttypid) return false; return true; } /* * make_unpacked_tupledesc * Copy the given tupledesc changing the given column type from * array to the type contained in the array */ static TupleDesc makeUnpackedTupleDesc(TupleDesc src_tupdesc, int unpack_attrnum) { HeapTuple typtuple; Form_pg_attribute attr; Form_pg_type type; Oid atttypid; TupleDesc tupdesc; int natts; /* char attname[NAMEDATALEN+1]; */ natts = src_tupdesc->natts; tupdesc = CreateTupleDescCopy(src_tupdesc); /* safety check */ if (unpack_attrnum < 1 || unpack_attrnum > natts) elog(ERROR, "No such attribute number %d", unpack_attrnum); attr = tupdesc->attrs[unpack_attrnum-1]; #if 0 /* * This doesn't work because attndims = 0 in the test case * of pg_group.grolist * Since the easiest way to find the dimensions seems to be * to query an array instance this task is left for the * tuple fetch section. */ if (attr->attndims != 1) elog(ERROR, "Attribute number %d not an array or multidimensional", unpack_attrnum); #endif atttypid = attr->atttypid; /* Find information about type of array element */ /* First about the array */ typtuple = SearchSysCache(TYPEOID, ObjectIdGetDatum(atttypid), 0, 0, 0); if (!HeapTupleIsValid(typtuple)) elog(ERROR, "Unable to look up type id %u", atttypid); type = (Form_pg_type) GETSTRUCT(typtuple); /* * I'm confused we can check this is an array type above * using attr->attndims (like what we are doing) can't we? * - test case says 'obviously not'. */ if (type->typelem == 0 || type->typlen != -1) elog(ERROR, "Column %d is not an array type", unpack_attrnum); atttypid = type->typelem; ReleaseSysCache(typtuple); /* Now about the element */ typtuple = SearchSysCache(TYPEOID, ObjectIdGetDatum(atttypid), 0, 0, 0); if (!HeapTupleIsValid(typtuple)) elog(ERROR, "Unable to look up type id %u", atttypid); /* type = (Form_pg_type) GETSTRUCT(typtuple); */ atttypid = HeapTupleGetOid(typtuple); /* Why this? */ /* Override the array attribute in tupledesc */ tupdesc->attrs[unpack_attrnum-1] = NULL; TupleDescInitEntry( tupdesc, (AttrNumber)unpack_attrnum, NameStr(attr->attname), atttypid, -1, /* atttypmod */ 0, /* attndims - only 1-d arrays supported at moment */ false /* attisset */ ); ReleaseSysCache(typtuple); /* TupleDescInitEntry() allocates new storage for this */ pfree(attr); return tupdesc; }
/* * Copyright 2002 by PostgreSQL Global Development Group * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without a written agreement * is hereby granted, provided that the above copyright notice and this * paragraph and the following two paragraphs appear in all copies. * * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. * */ #ifndef PGGROUPING_H #define PGGROUPING_H /* * External declarations */ extern Datum pg_group_long(PG_FUNCTION_ARGS); extern Datum expand_array_srf(PG_FUNCTION_ARGS); #endif /* PGGROUPING_H */
subdir = contrib/pggrouping top_builddir = ../.. include $(top_builddir)/src/Makefile.global MODULES = pggrouping DATA_built = pggrouping.sql DOCS = README.pggrouping include $(top_srcdir)/contrib/contrib-global.mk
CREATE OR REPLACE VIEW pg_group_long AS SELECT ''::name AS groname , 1::integer AS grosysid , 1::integer AS usesysid; CREATE OR REPLACE FUNCTION pg_group_long() RETURNS setof pg_group_long AS 'MODULE_PATHNAME','pg_group_long' LANGUAGE 'c' VOLATILE; CREATE OR REPLACE FUNCTION expand_array(text,int) RETURNS setof record AS 'MODULE_PATHNAME','expand_array_srf' LANGUAGE 'c' VOLATILE; -- -- examples of use on pg_group -- select * from pg_group; select * from pg_group_long(); select * from expand_array('select groname, grosysid, grolist from pg_group', 3) AS grps(grp name, grpid int, useid int);
---------------------------(end of broadcast)--------------------------- TIP 6: Have you searched our list archives? http://archives.postgresql.org