I realise that this has already been done, by Joe Conway I think. Indeed I was
looking at this just before beta1 when I happened to notice the post giving the
plpgsql function. However, as I had started work on it and I was interested in
seeing how things should be done I continued, only not in so much of a rush.

In the interests on finding out if I have approached this the right way, or the
way a more experienced backend programmer would, I'd appreciate any comments on
the attached .c file. In particular, I'm not sure what I'm doing with regard to
memory contexts, I think I may have one unnecessary switch in there, and in
general I seem to be doing a lot of work just to find out tidbits of
information.

I based this on, i.e. started by editing, Joe Conway's tablefunc.c but I think
there's very little of the original left in there.

I've also attached the .h, Makefile and .sql.in files to make this work if
anyone is interested in giving it a run. The .sql.in shows the usage. I did
this in a directory called pggrouping, for the sake of a better name, under the
contrib directory in my tree, so that's probably the best place to build it.

Thanks, and sorry for adding to people's email and work load.


-- 
Nigel J. Andrews
Director

---
Logictree Systems Limited
Computer Consultants
/*
 * Derived from tablefunc.c, a sample to demonstrate C functions which
 * return setof scalar and setof composite by Joe Conway <[EMAIL PROTECTED]>
 *
 * Copyright 2002 by PostgreSQL Global Development Group
 *
 * Permission to use, copy, modify, and distribute this software and its
 * documentation for any purpose, without fee, and without a written agreement
 * is hereby granted, provided that the above copyright notice and this
 * paragraph and the following two paragraphs appear in all copies.
 * 
 * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 * 
 * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
 * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
 *
 */
#include <stdlib.h>
#include <math.h>

#include "postgres.h"

#include "fmgr.h"
#include "funcapi.h"
#include "executor/spi.h" 
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"

#include "pggrouping.h"

typedef struct unpack_array_fctx
{
	SPITupleTable  *spi_tuptable;	/* sql results from user query */
	TupleDesc 		tupdesc;	    /* TupleDesc for results */
	int 		    unpack_attrnum;	/* attribute number to be unpacked */
	int 		    lastcall_cntr;	/* previous call_cntr, invlaid = -1 */
	int 		    lastindex;		/* index of the last array item sent, invalid < 1 */
}	unpack_array_fctx;


#define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp)))
#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))

static Datum expandArray_SRF(FunctionCallInfo info,
							 char *sql,
							 int unpackAttrNum);
static Datum expandArray_SRF_FirstCall(FunctionCallInfo fcinfo,
									   FuncCallContext *funcctx,
									   const char *sql,
									   const int unpackAttrNum);
static Datum expandArray_SRF_GetTuple(FunctionCallInfo fcinfo,
									  FuncCallContext *funcctx);

static TupleDesc makeUnpackedTupleDesc(TupleDesc src_tupdesc,
									   int unpack_attrnum);
static bool similarTupleDescs(TupleDesc ret_tupdesc,
							  TupleDesc sql_tupdesc);



/*
 * pg_group_expandusers
 *
 * Return pg_group where each tuple has grolist attribute of int4[] type
 * changed to be of type int4 and to hold only one user id.
 */
PG_FUNCTION_INFO_V1(pg_group_long);
Datum
pg_group_long(PG_FUNCTION_ARGS)
{
	return expandArray_SRF(fcinfo,
						   "select groname,grosysid,grolist from pg_group",
						   3);
}


/*
 * expand_array_srf
 *
 * Return tuples such that the elements of an array attribute are
 * extracted in turn and placed into the output instead of the array.
 * Declared to fmgr as:
 *   CREATE FUNCTION the_name(text,integer) RETURNS SETOF RECORD ...
 *
 * where the text argument is the query string to obtain the source
 * data and the integer argument gives the column number of the array
 * to expand.
 *
 * Note, despite checking number of arguments this is in no way safe
 * from some one creating a fmgr function which uses wrong argument types.
 */
PG_FUNCTION_INFO_V1(expand_array_srf);
Datum
expand_array_srf(PG_FUNCTION_ARGS)
{
	if (fcinfo->nargs != 2)
		elog(ERROR, "Wrong number of arguments specified for function");

	return expandArray_SRF(fcinfo,
						   GET_STR(PG_GETARG_TEXT_P(0)),
						   PG_GETARG_INT32(1));
}


/*
 * expandArray_SRF
 *
 * This is not the user invoked function. It is supposed to
 * reused by all such functions for doing the work.
 * Objects if it's asked to unpack multidimensional arrays.
 *
 * Nice idea but the use of fcinfo in the return macros requiring
 * that value to be passed around makes this split into nice
 * manageable code portions more messy and worse, dependent on
 * the calling convention of fmgr [and SRF] functions, not to
 * mention the SPI_ disconnection stuck in the middle of nowhere;
 * otherwise known as: at the end, completely out of touch with the
 * rest of the use of SPI.
 */
static
Datum
expandArray_SRF(FunctionCallInfo fcinfo, char *sql, int unpackAttrNum)
{
	FuncCallContext *funcctx;
	Datum retdatum;

 	if(SRF_IS_FIRSTCALL())
	{
		/* create a function context for cross-call persistence */
		funcctx = SRF_FIRSTCALL_INIT();

		retdatum = expandArray_SRF_FirstCall(fcinfo, funcctx, sql, unpackAttrNum);

		/* cheat to detect no usable tuples */
		if (((ReturnSetInfo *)(fcinfo->resultinfo))->isDone == ExprEndResult)
			return retdatum;
	}

	/* stuff done on every call of the function */
 	funcctx = SRF_PERCALL_SETUP();

	if (funcctx->call_cntr < funcctx->max_calls)
		return expandArray_SRF_GetTuple(fcinfo, funcctx);

	/* release SPI related resources
	   not too nice what with SPI only reference in FirstCall() */
	SPI_finish();

	SRF_RETURN_DONE(funcctx);
}


/*
 * expandArray_SRF_FirstCall
 *
 * Do the 'on first call' stuff.
 */
static
Datum
expandArray_SRF_FirstCall(FunctionCallInfo fcinfo,
						  FuncCallContext *funcctx,
						  const char *sql,
						  const int unpackcolnum)
{
	struct unpack_array_fctx *fctx;
	MemoryContext	 oldcontext;
	int				 ret;
	int				 proc;

	Oid 			 funcid = fcinfo->flinfo->fn_oid;
	Oid 			 functypeid;
	char			 functyptype;

	TupleDesc		 tupdesc = NULL;

	SPITupleTable   *spi_tuptable = NULL;

	/* switch to memory context appropriate for multiple function calls */
	oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

	/* Do the query */
	if ((ret = SPI_connect()) < 0)
		elog(ERROR, "expandArray: SPI_connect returned %d", ret);

	ret = SPI_exec((char *)sql, 0);
	proc = SPI_processed;

	if ((ret != SPI_OK_SELECT) || (proc == 0))
	{
		/* no tuples */
		SPI_finish();
		SRF_RETURN_DONE(funcctx);
	}

	spi_tuptable = SPI_tuptable;

	/* SPI switches context on us, so reset it */
	MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

	/* Determine and verify return types */

	/* desired return type */
	tupdesc = makeUnpackedTupleDesc(spi_tuptable->tupdesc, unpackcolnum);

	/* function return type */
	functypeid = get_func_rettype(funcid);
	functyptype = get_typtype(functypeid);
		
	if (functyptype == 'c')
	{
		/* Return type is fully known (predetermined) so check for
		   compatibility to query */

		TupleDesc fret_tupdesc;

		fret_tupdesc = TypeGetTupleDesc(functypeid, NIL);

		if (!similarTupleDescs(fret_tupdesc, tupdesc))
			elog(ERROR, "Query and return tuple descriptions are incompatible");

		pfree(fret_tupdesc);
	}
	else if (functyptype == 'p' && functypeid == RECORDOID)
	{
		/* We have the freedom to specify what we are returning */

		tupdesc = tupdesc;
	}
	else if (functyptype == 'b')
		elog(ERROR, "Invalid kind of return type specified for function");
	else
		elog(ERROR, "Unknown kind of return type specified for function");

	/* allocate a slot for a tuple with this tupdesc */
	funcctx->slot = TupleDescGetSlot(tupdesc);

	/* initialise our persistent storage */

	fctx = (struct unpack_array_fctx *) palloc(sizeof(struct unpack_array_fctx));

	fctx->spi_tuptable = spi_tuptable;
	fctx->lastcall_cntr = -1;
	fctx->lastindex = 0;
	fctx->unpack_attrnum = unpackcolnum;
	fctx->tupdesc = tupdesc;

	funcctx->user_fctx = fctx;

	/* total 'notional' number of tuples to be returned */
	funcctx->max_calls = proc;

	MemoryContextSwitchTo(oldcontext);

	/* dummy return */
	return Int32GetDatum(0);
}



/*
 * expandArray_SRF_GetTuple
 *
 * Get a tuple.
 */
static
Datum
expandArray_SRF_GetTuple(FunctionCallInfo fcinfo, FuncCallContext *funcctx)
{
	int                       call_cntr = funcctx->call_cntr;
	TupleTableSlot           *slot      = funcctx->slot;
	struct unpack_array_fctx *fctx = (struct unpack_array_fctx *) funcctx->user_fctx;
	int                       nextarrayind = fctx->lastindex + 1;
	int                       unpackcolnum = fctx->unpack_attrnum;
	HeapTuple                 spi_tuple    = fctx->spi_tuptable->vals[call_cntr];
	TupleDesc                 spi_tupdesc  = fctx->spi_tuptable->tupdesc;
	HeapTuple                 tuple;
	TupleDesc                 tupdesc      = fctx->tupdesc;

	MemoryContext oldcontext;

	Datum *values;
	char *nulls;
	int i;
	int cattrindex = unpackcolnum - 1;
	Datum result;

	/* paranoia test */
#ifdef ALWAYS_PARANOID
	if (spi_tuple->t_data->t_natts != tupdesc->natts
		|| spi_tupdesc->natts != tupdesc->natts)
		elog(ERROR, "Mismatch in tuple and tupledesc attribute counts");
#else
	Assert(spi_tuple->t_data->t_natts == tupdesc->natts);
	Assert(spi_tupdesc->natts == tupdesc->natts);
#endif

	oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

	values = (Datum *)palloc(sizeof(Datum) * tupdesc->natts);
	nulls = (char *)palloc(sizeof(char) * tupdesc->natts);

	/* heap_deformtuple() replacement */
	for (i = 0; i < tupdesc->natts; i++)
	{
		bool isNull;

		*(values + i) = heap_getattr(spi_tuple,
									 i + 1,
									 spi_tupdesc,
									 &isNull);
		*(nulls + i) = (isNull) ? 'n' : ' ';
	}

	if (*(nulls + cattrindex) != 'n')
	{
		/* Replace the offending array with an element from it or null it */

		ArrayType *earray = DatumGetArrayTypeP(*(values + cattrindex));
		Datum eelement = PointerGetDatum(NULL);
		bool eelementisnull = true;
		int earrayind = fctx->lastcall_cntr == call_cntr
			? nextarrayind
			: ARR_LBOUND(earray)[0];

		/* Quick check for 1-dimensionality */
		if (ARR_NDIM(earray) != 1)
			elog(ERROR, "Array is multidimensional, only 1-d arrays are supported");

		/* empty the entry */
		*(values + cattrindex) = eelement;
		*(nulls + cattrindex) = 'n';

		if (earrayind <= ARR_DIMS(earray)[0])
		{
			Form_pg_attribute elemattr = tupdesc->attrs[cattrindex];

			/* Get the element */
			eelement = array_ref(earray,
								 1,
								 &earrayind,
								 0,     /* > 0 means fixed length array */
								 elemattr->attlen,
								 elemattr->attbyval,
								 elemattr->attalign,
								 &eelementisnull);
			
			/*
			eelement = fetch_att(((Datum)(ARR_DATA_PTR(earray)) + earrayind),
								 tupdesc->attrs[cattrindex]->attbyval,
								 tupdesc->attrs[cattrindex]->attlen);
			*/
		}

		if (earrayind >= ARR_DIMS(earray)[0])
			/* Indicate end of array reached */
			nextarrayind = 0;

		if (!eelementisnull /*PointerIsValid(DatumGetPointer(eelement))*/)
		{
			*(values + cattrindex) = eelement;
			*(nulls + cattrindex) = ' ';
		}
	}
	else
		/* Indicate end of array since no array present */
		nextarrayind = 0;

	MemoryContextSwitchTo(oldcontext);

	/* Copies datums into a new tuple */
	tuple = heap_formtuple(tupdesc, values, nulls);

	/*
	 * Make the tuple into a datum
	 *  - comment on this macro implies tuple should be in our context
	 *    and possibly freed after here
	*/
	result = TupleGetDatum(slot, tuple);

	MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

	pfree(values);
	pfree(nulls);

	/* Set array index and previous call counter for next call */
	fctx->lastindex = nextarrayind;
	fctx->lastcall_cntr = call_cntr;

	if (nextarrayind > 0)
		/* -- call counter since API will ++ but we haven't finished */
		call_cntr = --funcctx->call_cntr;

	MemoryContextSwitchTo(oldcontext);

	SRF_RETURN_NEXT(funcctx, result);
}

/*
 * Check if two tupdescs match in type of attributes
 * The standard equalTupleDescs() compares names, something
 * which may be a problem, so we use a much loser check
 * of sameness.
 */
static bool
similarTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
{
	int			i;

	if (ret_tupdesc->natts != sql_tupdesc->natts)
		return false;

	for (i = 0; i < ret_tupdesc->natts; i++)
		if (ret_tupdesc->attrs[i]->atttypid != sql_tupdesc->attrs[i]->atttypid)
			return false;

	return true;
}

/*
 * make_unpacked_tupledesc
 * Copy the given tupledesc changing the given column type from
 * array to the type contained in the array
 */
static TupleDesc
makeUnpackedTupleDesc(TupleDesc src_tupdesc, int unpack_attrnum)
{
	HeapTuple           typtuple;
	Form_pg_attribute	attr;
	Form_pg_type	    type;
	Oid					atttypid;
	TupleDesc			tupdesc;
	int					natts;
	/* char				attname[NAMEDATALEN+1]; */

	natts = src_tupdesc->natts;
	tupdesc = CreateTupleDescCopy(src_tupdesc);

	/* safety check */
	if (unpack_attrnum < 1 || unpack_attrnum > natts)
		elog(ERROR, "No such attribute number %d", unpack_attrnum);

	attr = tupdesc->attrs[unpack_attrnum-1];

#if 0
	/*
	 * This doesn't work because attndims = 0 in the test case
	 * of pg_group.grolist
	 * Since the easiest way to find the dimensions seems to be
	 * to query an array instance this task is left for the
	 * tuple fetch section.
	 */
	if (attr->attndims != 1)
		elog(ERROR,
			 "Attribute number %d not an array or multidimensional",
			 unpack_attrnum);
#endif

	atttypid = attr->atttypid;

	/* Find information about type of array element */

	/* First about the array */
	typtuple = SearchSysCache(TYPEOID,
							  ObjectIdGetDatum(atttypid),
							  0, 0, 0);
	if (!HeapTupleIsValid(typtuple))
		elog(ERROR, "Unable to look up type id %u", atttypid);

	type = (Form_pg_type) GETSTRUCT(typtuple);

	/*
	 * I'm confused we can check this is an array type above
	 * using attr->attndims (like what we are doing) can't we?
	 *  - test case says 'obviously not'.
	 */
	if (type->typelem == 0 || type->typlen != -1)
		elog(ERROR, "Column %d is not an array type", unpack_attrnum);

	atttypid = type->typelem;

	ReleaseSysCache(typtuple);

	/* Now about the element */
	typtuple = SearchSysCache(TYPEOID,
							  ObjectIdGetDatum(atttypid),
							  0, 0, 0);
	if (!HeapTupleIsValid(typtuple))
		elog(ERROR, "Unable to look up type id %u", atttypid);

	/* type = (Form_pg_type) GETSTRUCT(typtuple); */

	atttypid = HeapTupleGetOid(typtuple); /* Why this? */

	/* Override the array attribute in tupledesc */
	tupdesc->attrs[unpack_attrnum-1] = NULL;
	TupleDescInitEntry(
		tupdesc,
		(AttrNumber)unpack_attrnum,
		NameStr(attr->attname),
		atttypid,
		-1,   /* atttypmod */
		0,    /* attndims - only 1-d arrays supported at moment */
		false /* attisset */
		);

	ReleaseSysCache(typtuple);

	/* TupleDescInitEntry() allocates new storage for this */
	pfree(attr);

	return tupdesc;
}

/*
 * Copyright 2002 by PostgreSQL Global Development Group
 *
 * Permission to use, copy, modify, and distribute this software and its
 * documentation for any purpose, without fee, and without a written agreement
 * is hereby granted, provided that the above copyright notice and this
 * paragraph and the following two paragraphs appear in all copies.
 * 
 * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 * 
 * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
 * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
 *
 */

#ifndef PGGROUPING_H
#define PGGROUPING_H

/*
 * External declarations
 */
extern Datum pg_group_long(PG_FUNCTION_ARGS);
extern Datum expand_array_srf(PG_FUNCTION_ARGS);

#endif   /* PGGROUPING_H */
subdir = contrib/pggrouping
top_builddir = ../..
include $(top_builddir)/src/Makefile.global

MODULES = pggrouping
DATA_built = pggrouping.sql
DOCS = README.pggrouping

include $(top_srcdir)/contrib/contrib-global.mk
CREATE OR REPLACE VIEW pg_group_long AS
        SELECT ''::name AS groname
                        , 1::integer AS grosysid
                        , 1::integer AS usesysid;

CREATE OR REPLACE FUNCTION pg_group_long()
  RETURNS setof pg_group_long
  AS 'MODULE_PATHNAME','pg_group_long' LANGUAGE 'c' VOLATILE;

CREATE OR REPLACE FUNCTION expand_array(text,int)
  RETURNS setof record
  AS 'MODULE_PATHNAME','expand_array_srf' LANGUAGE 'c' VOLATILE;

--
-- examples of use on pg_group
--

select * from pg_group;

select * from pg_group_long();

select * from expand_array('select groname, grosysid, grolist from pg_group', 3) AS 
grps(grp name, grpid int, useid int);


---------------------------(end of broadcast)---------------------------
TIP 6: Have you searched our list archives?

http://archives.postgresql.org

Reply via email to