On 5 April 2017 at 08:00, Craig Ringer <cr...@2ndquadrant.com> wrote:

> Taking a look at this now.

Rebased to current master with conflicts and whitespace errors fixed.
Review pending.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 71b6163734934db3160de81fd064e7d113b9122f Mon Sep 17 00:00:00 2001
From: Jim Nasby <jim.na...@bluetreble.com>
Date: Wed, 1 Mar 2017 15:45:51 -0600
Subject: [PATCH 1/2] Add SPI_execute_callback() and callback-based
 DestReceiver.

Instead of placing results in a tuplestore, this method of execution
uses the supplied callback when creating the Portal for a query.
---
 src/backend/executor/spi.c | 79 ++++++++++++++++++++++++++++++++++++++++------
 src/backend/tcop/dest.c    | 11 +++++++
 src/include/executor/spi.h |  4 +++
 src/include/tcop/dest.h    |  1 +
 4 files changed, 85 insertions(+), 10 deletions(-)

diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 3a89ccd..a0af31a 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
 
 static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				  Snapshot snapshot, Snapshot crosscheck_snapshot,
-				  bool read_only, bool fire_triggers, uint64 tcount);
+				  bool read_only, bool fire_triggers, uint64 tcount,
+				  DestReceiver *callback);
 
 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
 					Datum *Values, const char *Nulls);
@@ -321,7 +322,34 @@ SPI_execute(const char *src, bool read_only, long tcount)
 
 	res = _SPI_execute_plan(&plan, NULL,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount);
+							read_only, true, tcount, NULL);
+
+	_SPI_end_call(true);
+	return res;
+}
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+		DestReceiver *callback)
+{
+	_SPI_plan	plan;
+	int			res;
+
+	if (src == NULL || tcount < 0)
+		return SPI_ERROR_ARGUMENT;
+
+	res = _SPI_begin_call(true);
+	if (res < 0)
+		return res;
+
+	memset(&plan, 0, sizeof(_SPI_plan));
+	plan.magic = _SPI_PLAN_MAGIC;
+	plan.cursor_options = 0;
+
+	_SPI_prepare_oneshot_plan(src, &plan);
+
+	res = _SPI_execute_plan(&plan, NULL,
+							InvalidSnapshot, InvalidSnapshot,
+							read_only, true, tcount, callback);
 
 	_SPI_end_call(true);
 	return res;
@@ -355,7 +383,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 							_SPI_convert_params(plan->nargs, plan->argtypes,
 												Values, Nulls),
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount);
+							read_only, true, tcount, NULL);
+
+	_SPI_end_call(true);
+	return res;
+}
+
+/* Execute a previously prepared plan with a callback Destination */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+				 bool read_only, long tcount, DestReceiver *callback)
+{
+	int			res;
+
+	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+		return SPI_ERROR_ARGUMENT;
+
+	if (plan->nargs > 0 && Values == NULL)
+		return SPI_ERROR_PARAM;
+
+	res = _SPI_begin_call(true);
+	if (res < 0)
+		return res;
+
+	res = _SPI_execute_plan(plan,
+							_SPI_convert_params(plan->nargs, plan->argtypes,
+												Values, Nulls),
+							InvalidSnapshot, InvalidSnapshot,
+							read_only, true, tcount, callback);
 
 	_SPI_end_call(true);
 	return res;
@@ -384,7 +439,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
 
 	res = _SPI_execute_plan(plan, params,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount);
+							read_only, true, tcount, NULL);
 
 	_SPI_end_call(true);
 	return res;
@@ -425,7 +480,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
 							_SPI_convert_params(plan->nargs, plan->argtypes,
 												Values, Nulls),
 							snapshot, crosscheck_snapshot,
-							read_only, fire_triggers, tcount);
+							read_only, fire_triggers, tcount, NULL);
 
 	_SPI_end_call(true);
 	return res;
@@ -472,7 +527,7 @@ SPI_execute_with_args(const char *src,
 
 	res = _SPI_execute_plan(&plan, paramLI,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount);
+							read_only, true, tcount, NULL);
 
 	_SPI_end_call(true);
 	return res;
@@ -1907,7 +1962,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
 static int
 _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				  Snapshot snapshot, Snapshot crosscheck_snapshot,
-				  bool read_only, bool fire_triggers, uint64 tcount)
+				  bool read_only, bool fire_triggers, uint64 tcount,
+				  DestReceiver *callback)
 {
 	int			my_res = 0;
 	uint64		my_processed = 0;
@@ -1918,6 +1974,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 	ErrorContextCallback spierrcontext;
 	CachedPlan *cplan = NULL;
 	ListCell   *lc1;
+	DestReceiver *dest = callback;
 
 	/*
 	 * Setup error traceback support for ereport()
@@ -2037,7 +2094,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 		{
 			PlannedStmt *stmt = castNode(PlannedStmt, lfirst(lc2));
 			bool		canSetTag = stmt->canSetTag;
-			DestReceiver *dest;
 
 			_SPI_current->processed = 0;
 			_SPI_current->lastoid = InvalidOid;
@@ -2082,7 +2138,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				UpdateActiveSnapshotCommandId();
 			}
 
-			dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
+			if (!callback)
+				dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
 
 			if (stmt->utilityStmt == NULL)
 			{
@@ -2108,6 +2165,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 			{
 				char		completionTag[COMPLETION_TAG_BUFSIZE];
 
+				// XXX throw error if callback is set
 				ProcessUtility(stmt,
 							   plansource->query_string,
 							   PROCESS_UTILITY_QUERY,
@@ -2281,7 +2339,8 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
 	switch (operation)
 	{
 		case CMD_SELECT:
-			if (queryDesc->dest->mydest != DestSPI)
+			if (queryDesc->dest->mydest != DestSPI &&
+					queryDesc->dest->mydest != DestSPICallback)
 			{
 				/* Don't return SPI_OK_SELECT if we're discarding result */
 				res = SPI_OK_UTILITY;
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 28081c3..bd671e0 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -87,6 +87,11 @@ static DestReceiver spi_printtupDR = {
 	DestSPI
 };
 
+static DestReceiver spi_callbackDR = {
+	donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+	DestSPICallback
+};
+
 /* Globally available receiver for DestNone */
 DestReceiver *None_Receiver = &donothingDR;
 
@@ -126,6 +131,9 @@ CreateDestReceiver(CommandDest dest)
 		case DestSPI:
 			return &spi_printtupDR;
 
+		case DestSPICallback:
+			return &spi_callbackDR;
+
 		case DestTuplestore:
 			return CreateTuplestoreDestReceiver();
 
@@ -172,6 +180,7 @@ EndCommand(const char *commandTag, CommandDest dest)
 		case DestNone:
 		case DestDebug:
 		case DestSPI:
+		case DestSPICallback:
 		case DestTuplestore:
 		case DestIntoRel:
 		case DestCopyOut:
@@ -216,6 +225,7 @@ NullCommand(CommandDest dest)
 		case DestNone:
 		case DestDebug:
 		case DestSPI:
+		case DestSPICallback:
 		case DestTuplestore:
 		case DestIntoRel:
 		case DestCopyOut:
@@ -262,6 +272,7 @@ ReadyForQuery(CommandDest dest)
 		case DestNone:
 		case DestDebug:
 		case DestSPI:
+		case DestSPICallback:
 		case DestTuplestore:
 		case DestIntoRel:
 		case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 94a805d..13719e1 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -80,11 +80,15 @@ extern PGDLLIMPORT int SPI_result;
 extern int	SPI_connect(void);
 extern int	SPI_finish(void);
 extern int	SPI_execute(const char *src, bool read_only, long tcount);
+extern int	SPI_execute_callback(const char *src, bool read_only, long tcount,
+									DestReceiver *callback);
 extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 				 bool read_only, long tcount);
 extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
 								ParamListInfo params,
 								bool read_only, long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+				 bool read_only, long tcount, DestReceiver *callback);
 extern int	SPI_exec(const char *src, long tcount);
 extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 		  long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index c459af2..1d1d641 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -91,6 +91,7 @@ typedef enum
 	DestRemoteExecute,			/* sent to frontend, in Execute command */
 	DestRemoteSimple,			/* sent to frontend, w/no catalog access */
 	DestSPI,					/* results sent to SPI manager */
+	DestSPICallback,			/* results sent to user-specified callback function */
 	DestTuplestore,				/* results sent to Tuplestore */
 	DestIntoRel,				/* results sent to relation (SELECT INTO) */
 	DestCopyOut,				/* results sent to COPY TO code */
-- 
2.5.5

From c846452b1b0ca30f2ae9c08488482ef1b9b9e9a9 Mon Sep 17 00:00:00 2001
From: Jim Nasby <jim.na...@bluetreble.com>
Date: Wed, 1 Mar 2017 15:46:53 -0600
Subject: [PATCH 2/2] Modify plpython to use SPI callbacks

This is a bare minimum patch to switch plpython to using SPI callbacks
in lieu of a tuplestore. Simple testing shows a ~27% speedup with a
simple generate_series(1,10000000).
---
 src/pl/plpython/plpy_main.c |  13 ++
 src/pl/plpython/plpy_main.h |   3 +
 src/pl/plpython/plpy_spi.c  | 303 ++++++++++++++++++++++++++++++++------------
 3 files changed, 235 insertions(+), 84 deletions(-)

diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804..07501f1 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
 	return PLy_execution_contexts;
 }
 
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+	PLyExecutionContext *last = PLy_execution_contexts;
+
+	if (PLy_execution_contexts == NULL)
+		elog(ERROR, "no Python function is currently executing");
+
+	PLy_execution_contexts = new;
+
+	return last;
+}
+
 MemoryContext
 PLy_get_scratch_context(PLyExecutionContext *context)
 {
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4..7cbe0a8 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
 /* Get the current execution context */
 extern PLyExecutionContext *PLy_current_execution_context(void);
 
+/* Get switch execution contexts */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext *new);
+
 /* Get the scratch memory context for specified execution context */
 extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
 
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index c6856cc..dcec0c7 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,9 +28,27 @@
 #include "plpy_procedure.h"
 #include "plpy_resultobject.h"
 
+typedef struct
+{
+	DestReceiver pub;
+	PLyExecutionContext *exec_ctx;
+	MemoryContext parent_ctx;
+	MemoryContext cb_ctx;
+	TupleDesc	desc;
+	PLyTypeInfo *args;
+
+	PyObject	*result;
+} CallbackState;
+
+void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
+static CallbackState *PLy_Callback_New(PLyExecutionContext *exec_ctx);
+static CallbackState *PLy_Callback_Free(CallbackState *callback);
+static PLyResultObject *PLyCSNewResult(CallbackState *myState);
 
 static PyObject *PLy_spi_execute_query(char *query, long limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
+static PyObject *PLy_spi_execute_fetch_result(CallbackState *callback,
 							 uint64 rows, int status);
 static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
 
@@ -195,6 +213,8 @@ PLy_spi_execute(PyObject *self, PyObject *args)
 PyObject *
 PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 {
+	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+	CallbackState	*callback;
 	volatile int nargs;
 	int			i,
 				rv;
@@ -237,12 +257,12 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 
 	oldcontext = CurrentMemoryContext;
 	oldowner = CurrentResourceOwner;
+	callback = PLy_Callback_New(exec_ctx);
 
 	PLy_spi_subtransaction_begin(oldcontext, oldowner);
 
 	PG_TRY();
 	{
-		PLyExecutionContext *exec_ctx = PLy_current_execution_context();
 		char	   *volatile nulls;
 		volatile int j;
 
@@ -288,9 +308,10 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 			}
 		}
 
-		rv = SPI_execute_plan(plan->plan, plan->values, nulls,
-							  exec_ctx->curr_proc->fn_readonly, limit);
-		ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+		rv = SPI_execute_plan_callback(plan->plan, plan->values, nulls,
+							exec_ctx->curr_proc->fn_readonly, limit,
+							(DestReceiver *) callback);
+		ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
 
 		if (nargs > 0)
 			pfree(nulls);
@@ -315,9 +336,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 		}
 
 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
+		PLy_Callback_Free(callback);
 		return NULL;
 	}
 	PG_END_TRY();
+	callback = PLy_Callback_Free(callback);
 
 	for (i = 0; i < nargs; i++)
 	{
@@ -343,9 +366,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 static PyObject *
 PLy_spi_execute_query(char *query, long limit)
 {
+	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+	CallbackState	*callback = PLy_Callback_New(exec_ctx);
+	volatile MemoryContext oldcontext;
+	volatile ResourceOwner oldowner;
 	int			rv;
-	volatile MemoryContext oldcontext;
-	volatile ResourceOwner oldowner;
 	PyObject   *ret = NULL;
 
 	oldcontext = CurrentMemoryContext;
@@ -355,20 +380,22 @@ PLy_spi_execute_query(char *query, long limit)
 
 	PG_TRY();
 	{
-		PLyExecutionContext *exec_ctx = PLy_current_execution_context();
-
 		pg_verifymbstr(query, strlen(query), false);
-		rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit);
-		ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+		rv = SPI_execute_callback(query, exec_ctx->curr_proc->fn_readonly, limit,
+				(DestReceiver *) callback);
+
+		ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
 
 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
 	}
 	PG_CATCH();
 	{
 		PLy_spi_subtransaction_abort(oldcontext, oldowner);
+		PLy_Callback_Free(callback);
 		return NULL;
 	}
 	PG_END_TRY();
+	callback = PLy_Callback_Free(callback);
 
 	if (rv < 0)
 	{
@@ -382,94 +409,202 @@ PLy_spi_execute_query(char *query, long limit)
 	return ret;
 }
 
-static PyObject *
-PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
+static CallbackState *
+PLy_Callback_New(PLyExecutionContext *exec_ctx)
 {
+	MemoryContext oldcontext, cb_ctx;
+	CallbackState *callback;
+
+	callback = palloc0(sizeof(CallbackState));
+
+	/*
+	 * Use a new context to make cleanup easier. Allocate it in the current
+	 * context so we don't have to worry about cleaning it up if there's an
+	 * error.
+	 */
+	cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+								"PL/Python callback context",
+								ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(cb_ctx);
+	callback->parent_ctx = oldcontext;
+	callback->cb_ctx = cb_ctx;
+	memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback), sizeof(DestReceiver));
+	callback->pub.receiveSlot = PLy_CSreceive;
+	callback->pub.rStartup = PLy_CSStartup;
+	callback->pub.rDestroy = PLy_CSDestroy;
+	callback->exec_ctx = exec_ctx;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	return callback;
+}
+
+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)
+{
+	if (callback)
+	{
+		if (callback->cb_ctx)
+			(callback->pub.rDestroy) ((DestReceiver *) callback);
+
+		pfree(callback);
+	}
+
+	return (CallbackState *) NULL;
+}
+
+static PLyResultObject *
+PLyCSNewResult(CallbackState *myState)
+{
+	MemoryContext oldctx;
+
+	/* The result info needs to be in the parent context */
+	oldctx = MemoryContextSwitchTo(myState->parent_ctx);
+	myState->result = PLy_result_new();
+	if (myState->result == NULL)
+		PLy_elog(ERROR, "could not create new result object");
+
+	MemoryContextSwitchTo(oldctx);
+	return (PLyResultObject *) myState->result;
+}
+
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+	PLyExecutionContext *old_exec_ctx;
+	CallbackState *myState = (CallbackState *) self;
 	PLyResultObject *result;
-	volatile MemoryContext oldcontext;
+	PLyTypeInfo *args;
+	MemoryContext mctx, old_mctx;
 
-	result = (PLyResultObject *) PLy_result_new();
-	Py_DECREF(result->status);
-	result->status = PyInt_FromLong(status);
+	/*
+	 * We may be in a different execution context when we're called, so switch
+	 * back to our original one.
+	 */
+	mctx = myState->cb_ctx;
+	old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+	old_mctx = MemoryContextSwitchTo(mctx);
 
-	if (status > 0 && tuptable == NULL)
+	/* We need to store this because the TupleDesc the receive function gets has no names. */
+	myState->desc = typeinfo;
+
+	/* Setup type conversion info */
+	myState->args = args = palloc0(sizeof(PLyTypeInfo));
+	PLy_typeinfo_init(args, mctx);
+	PLy_input_tuple_funcs(args, typeinfo);
+
+	result = PLyCSNewResult(myState);
+
+	/*
+	 * Save tuple descriptor for later use by result set metadata
+	 * functions.  Save it in TopMemoryContext so that it survives
+	 * outside of an SPI context.  We trust that PLy_result_dealloc()
+	 * will clean it up when the time is right. XXX This might result in a leak
+	 * if an error happens and the result doesn't get dereferenced.
+	 */
+	MemoryContextSwitchTo(TopMemoryContext);
+	result->tupdesc = CreateTupleDescCopy(typeinfo);
+
+	MemoryContextSwitchTo(old_mctx);
+	PLy_switch_execution_context(old_exec_ctx);
+}
+
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+	CallbackState *myState = (CallbackState *) self;
+	MemoryContext cb_ctx = myState->cb_ctx;
+
+	MemoryContextDelete(cb_ctx);
+	myState->cb_ctx = 0;
+}
+
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+	CallbackState 	*myState = (CallbackState *) self;
+	TupleDesc		desc = myState->desc;
+	PLyTypeInfo 	*args = myState->args;
+	PLyResultObject *result = (PLyResultObject *) myState->result;
+	PLyExecutionContext *old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+	MemoryContext 	scratch_context = PLy_get_scratch_context(myState->exec_ctx);
+	MemoryContext 	oldcontext = CurrentMemoryContext;
+	int			rv = 1;
+	PyObject   *row;
+
+	/* Verify saved state matches incoming slot */
+	Assert(desc->tdtypeid == slot->tts_tupleDescriptor->tdtypeid);
+	Assert(args->in.r.natts == slot->tts_tupleDescriptor->natts);
+
+	/* Make sure the tuple is fully deconstructed */
+	slot_getallattrs(slot);
+
+	/*
+	 * Do the work in the scratch context to avoid leaking memory from the
+	 * datatype output function calls.
+	 */
+	MemoryContextSwitchTo(scratch_context);
+
+	PG_TRY();
 	{
-		Py_DECREF(result->nrows);
-		result->nrows = (rows > (uint64) LONG_MAX) ?
-			PyFloat_FromDouble((double) rows) :
-			PyInt_FromLong((long) rows);
+		row = PLyDict_FromTuple(args, ExecFetchSlotTuple(slot), desc);
 	}
-	else if (status > 0 && tuptable != NULL)
+	PG_CATCH();
 	{
-		PLyTypeInfo args;
-		MemoryContext cxt;
+		Py_XDECREF(row);
+		MemoryContextSwitchTo(oldcontext);
+		PLy_switch_execution_context(old_exec_ctx);
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	/*
+	 * If we tried to do this in the PG_CATCH we'd have to mark row
+	 * as volatile, but that won't work with PyList_Append, so just
+	 * test the error code after doing Py_DECREF().
+	 */
+	if (row)
+	{
+		rv = PyList_Append(result->rows, row);
+		Py_DECREF(row);
+	}
 
-		Py_DECREF(result->nrows);
-		result->nrows = (rows > (uint64) LONG_MAX) ?
-			PyFloat_FromDouble((double) rows) :
-			PyInt_FromLong((long) rows);
+	if (rv != 0)
+		ereport(ERROR,
+				(errmsg("unable to append value to list")));
 
-		cxt = AllocSetContextCreate(CurrentMemoryContext,
-									"PL/Python temp context",
-									ALLOCSET_DEFAULT_SIZES);
-		PLy_typeinfo_init(&args, cxt);
+	MemoryContextSwitchTo(oldcontext);
+	MemoryContextReset(scratch_context);
+	PLy_switch_execution_context(old_exec_ctx);
 
-		oldcontext = CurrentMemoryContext;
-		PG_TRY();
+	return true;
+}
+
+
+static PyObject *
+PLy_spi_execute_fetch_result(CallbackState *callback, uint64 rows, int status)
+{
+	PLyResultObject *result = (PLyResultObject *) callback->result;
+
+	/* If status < 0 this stuff would just get thrown away anyway. */
+	if (status > 0)
+	{
+		if (!result)
 		{
-			MemoryContext oldcontext2;
-
-			if (rows)
-			{
-				uint64		i;
-
-				/*
-				 * PyList_New() and PyList_SetItem() use Py_ssize_t for list
-				 * size and list indices; so we cannot support a result larger
-				 * than PY_SSIZE_T_MAX.
-				 */
-				if (rows > (uint64) PY_SSIZE_T_MAX)
-					ereport(ERROR,
-							(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-							 errmsg("query result has too many rows to fit in a Python list")));
-
-				Py_DECREF(result->rows);
-				result->rows = PyList_New(rows);
-
-				PLy_input_tuple_funcs(&args, tuptable->tupdesc);
-				for (i = 0; i < rows; i++)
-				{
-					PyObject   *row = PLyDict_FromTuple(&args,
-														tuptable->vals[i],
-														tuptable->tupdesc);
-
-					PyList_SetItem(result->rows, i, row);
-				}
-			}
-
 			/*
-			 * Save tuple descriptor for later use by result set metadata
-			 * functions.  Save it in TopMemoryContext so that it survives
-			 * outside of an SPI context.  We trust that PLy_result_dealloc()
-			 * will clean it up when the time is right.  (Do this as late as
-			 * possible, to minimize the number of ways the tupdesc could get
-			 * leaked due to errors.)
+			 * This happens if the command returned no results. Create a dummy result set.
 			 */
-			oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
-			result->tupdesc = CreateTupleDescCopy(tuptable->tupdesc);
-			MemoryContextSwitchTo(oldcontext2);
+			result = PLyCSNewResult(callback);
+			callback->result = (PyObject *) result;
 		}
-		PG_CATCH();
-		{
-			MemoryContextSwitchTo(oldcontext);
-			MemoryContextDelete(cxt);
-			Py_DECREF(result);
-			PG_RE_THROW();
-		}
-		PG_END_TRY();
 
-		MemoryContextDelete(cxt);
-		SPI_freetuptable(tuptable);
+		Py_DECREF(result->status);
+		result->status = PyInt_FromLong(status);
+		Py_DECREF(result->nrows);
+		result->nrows = (rows > (uint64) LONG_MAX) ?
+			PyFloat_FromDouble((double) rows) :
+			PyInt_FromLong((long) rows);
 	}
 
 	return (PyObject *) result;
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to