On 2/28/17 9:42 PM, Jim Nasby wrote:

I'll post a plpython patch that doesn't add the output format control.

I've attached the results of that. Unfortunately the speed improvement
is only 27% at this point (with 9999999 tuples). Presumably that's
because it's constructing a brand new dictionary from scratch for each
tuple.

I found a couple bugs. New patches attached.
--
Jim Nasby, Chief Data Architect, OpenSCG
From 116b6a45b0146e42f1faa130d78e9362950c18c3 Mon Sep 17 00:00:00 2001
From: Jim Nasby <jim.na...@bluetreble.com>
Date: Wed, 1 Mar 2017 15:45:51 -0600
Subject: [PATCH 1/2] Add SPI_execute_callback() and callback-based
 DestReceiver.

Instead of placing results in a tuplestore, this method of execution
uses the supplied callback when creating the Portal for a query.
---
 src/backend/executor/spi.c | 79 ++++++++++++++++++++++++++++++++++++++++------
 src/backend/tcop/dest.c    | 11 +++++++
 src/include/executor/spi.h |  4 +++
 src/include/tcop/dest.h    |  1 +
 4 files changed, 85 insertions(+), 10 deletions(-)

diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 55f97b14e6..ffeba679da 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, 
SPIPlanPtr plan);
 
 static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                  Snapshot snapshot, Snapshot 
crosscheck_snapshot,
-                                 bool read_only, bool fire_triggers, uint64 
tcount);
+                                 bool read_only, bool fire_triggers, uint64 
tcount,
+                                 DestReceiver *callback);
 
 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
                                        Datum *Values, const char *Nulls);
@@ -320,7 +321,34 @@ SPI_execute(const char *src, bool read_only, long tcount)
 
        res = _SPI_execute_plan(&plan, NULL,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
+
+       _SPI_end_call(true);
+       return res;
+}
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+               DestReceiver *callback)
+{
+       _SPI_plan       plan;
+       int                     res;
+
+       if (src == NULL || tcount < 0)
+               return SPI_ERROR_ARGUMENT;
+
+       res = _SPI_begin_call(true);
+       if (res < 0)
+               return res;
+
+       memset(&plan, 0, sizeof(_SPI_plan));
+       plan.magic = _SPI_PLAN_MAGIC;
+       plan.cursor_options = 0;
+
+       _SPI_prepare_oneshot_plan(src, &plan);
+
+       res = _SPI_execute_plan(&plan, NULL,
+                                                       InvalidSnapshot, 
InvalidSnapshot,
+                                                       read_only, true, 
tcount, callback);
 
        _SPI_end_call(true);
        return res;
@@ -354,7 +382,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const 
char *Nulls,
                                                        
_SPI_convert_params(plan->nargs, plan->argtypes,
                                                                                
                Values, Nulls),
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
+
+       _SPI_end_call(true);
+       return res;
+}
+
+/* Execute a previously prepared plan with a callback Destination */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+                                bool read_only, long tcount, DestReceiver 
*callback)
+{
+       int                     res;
+
+       if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+               return SPI_ERROR_ARGUMENT;
+
+       if (plan->nargs > 0 && Values == NULL)
+               return SPI_ERROR_PARAM;
+
+       res = _SPI_begin_call(true);
+       if (res < 0)
+               return res;
+
+       res = _SPI_execute_plan(plan,
+                                                       
_SPI_convert_params(plan->nargs, plan->argtypes,
+                                                                               
                Values, Nulls),
+                                                       InvalidSnapshot, 
InvalidSnapshot,
+                                                       read_only, true, 
tcount, callback);
 
        _SPI_end_call(true);
        return res;
@@ -383,7 +438,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, 
ParamListInfo params,
 
        res = _SPI_execute_plan(plan, params,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -424,7 +479,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
                                                        
_SPI_convert_params(plan->nargs, plan->argtypes,
                                                                                
                Values, Nulls),
                                                        snapshot, 
crosscheck_snapshot,
-                                                       read_only, 
fire_triggers, tcount);
+                                                       read_only, 
fire_triggers, tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -471,7 +526,7 @@ SPI_execute_with_args(const char *src,
 
        res = _SPI_execute_plan(&plan, paramLI,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -1892,7 +1947,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr 
plan)
 static int
 _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                  Snapshot snapshot, Snapshot 
crosscheck_snapshot,
-                                 bool read_only, bool fire_triggers, uint64 
tcount)
+                                 bool read_only, bool fire_triggers, uint64 
tcount,
+                                 DestReceiver *callback)
 {
        int                     my_res = 0;
        uint64          my_processed = 0;
@@ -1903,6 +1959,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
        ErrorContextCallback spierrcontext;
        CachedPlan *cplan = NULL;
        ListCell   *lc1;
+       DestReceiver *dest = callback;
 
        /*
         * Setup error traceback support for ereport()
@@ -2020,7 +2077,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                {
                        PlannedStmt *stmt = castNode(PlannedStmt, lfirst(lc2));
                        bool            canSetTag = stmt->canSetTag;
-                       DestReceiver *dest;
 
                        _SPI_current->processed = 0;
                        _SPI_current->lastoid = InvalidOid;
@@ -2065,7 +2121,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                UpdateActiveSnapshotCommandId();
                        }
 
-                       dest = CreateDestReceiver(canSetTag ? DestSPI : 
DestNone);
+                       if (!callback)
+                               dest = CreateDestReceiver(canSetTag ? DestSPI : 
DestNone);
 
                        if (stmt->utilityStmt == NULL)
                        {
@@ -2090,6 +2147,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                        {
                                char            
completionTag[COMPLETION_TAG_BUFSIZE];
 
+                               // XXX throw error if callback is set
                                ProcessUtility(stmt,
                                                           
plansource->query_string,
                                                           
PROCESS_UTILITY_QUERY,
@@ -2262,7 +2320,8 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, 
uint64 tcount)
        switch (operation)
        {
                case CMD_SELECT:
-                       if (queryDesc->dest->mydest != DestSPI)
+                       if (queryDesc->dest->mydest != DestSPI &&
+                                       queryDesc->dest->mydest != 
DestSPICallback)
                        {
                                /* Don't return SPI_OK_SELECT if we're 
discarding result */
                                res = SPI_OK_UTILITY;
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 28081c3765..bd671e0b26 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -87,6 +87,11 @@ static DestReceiver spi_printtupDR = {
        DestSPI
 };
 
+static DestReceiver spi_callbackDR = {
+       donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+       DestSPICallback
+};
+
 /* Globally available receiver for DestNone */
 DestReceiver *None_Receiver = &donothingDR;
 
@@ -126,6 +131,9 @@ CreateDestReceiver(CommandDest dest)
                case DestSPI:
                        return &spi_printtupDR;
 
+               case DestSPICallback:
+                       return &spi_callbackDR;
+
                case DestTuplestore:
                        return CreateTuplestoreDestReceiver();
 
@@ -172,6 +180,7 @@ EndCommand(const char *commandTag, CommandDest dest)
                case DestNone:
                case DestDebug:
                case DestSPI:
+               case DestSPICallback:
                case DestTuplestore:
                case DestIntoRel:
                case DestCopyOut:
@@ -216,6 +225,7 @@ NullCommand(CommandDest dest)
                case DestNone:
                case DestDebug:
                case DestSPI:
+               case DestSPICallback:
                case DestTuplestore:
                case DestIntoRel:
                case DestCopyOut:
@@ -262,6 +272,7 @@ ReadyForQuery(CommandDest dest)
                case DestNone:
                case DestDebug:
                case DestSPI:
+               case DestSPICallback:
                case DestTuplestore:
                case DestIntoRel:
                case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index a18ae63245..d779511130 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -74,11 +74,15 @@ extern PGDLLIMPORT int SPI_result;
 extern int     SPI_connect(void);
 extern int     SPI_finish(void);
 extern int     SPI_execute(const char *src, bool read_only, long tcount);
+extern int     SPI_execute_callback(const char *src, bool read_only, long 
tcount,
+                                                                       
DestReceiver *callback);
 extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                                 bool read_only, long tcount);
 extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
                                                                ParamListInfo 
params,
                                                                bool read_only, 
long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const 
char *Nulls,
+                                bool read_only, long tcount, DestReceiver 
*callback);
 extern int     SPI_exec(const char *src, long tcount);
 extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                  long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index c459af2e13..1d1d641ae0 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -91,6 +91,7 @@ typedef enum
        DestRemoteExecute,                      /* sent to frontend, in Execute 
command */
        DestRemoteSimple,                       /* sent to frontend, w/no 
catalog access */
        DestSPI,                                        /* results sent to SPI 
manager */
+       DestSPICallback,                        /* results sent to 
user-specified callback function */
        DestTuplestore,                         /* results sent to Tuplestore */
        DestIntoRel,                            /* results sent to relation 
(SELECT INTO) */
        DestCopyOut,                            /* results sent to COPY TO code 
*/
-- 
2.11.1

From f6a623637870f67036e375b2928ac1adc9c7184b Mon Sep 17 00:00:00 2001
From: Jim Nasby <jim.na...@bluetreble.com>
Date: Wed, 1 Mar 2017 15:46:53 -0600
Subject: [PATCH 2/2] Modify plpython to use SPI callbacks

This is a bare minimum patch to switch plpython to using SPI callbacks
in lieu of a tuplestore. Simple testing shows a ~27% speedup with a
simple generate_series(1,10000000).
---
 src/pl/plpython/plpy_main.c |  13 ++
 src/pl/plpython/plpy_main.h |   3 +
 src/pl/plpython/plpy_spi.c  | 299 ++++++++++++++++++++++++++++++++------------
 3 files changed, 234 insertions(+), 81 deletions(-)

diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804e54..07501f18f2 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
        return PLy_execution_contexts;
 }
 
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+       PLyExecutionContext *last = PLy_execution_contexts;
+
+       if (PLy_execution_contexts == NULL)
+               elog(ERROR, "no Python function is currently executing");
+
+       PLy_execution_contexts = new;
+
+       return last;
+}
+
 MemoryContext
 PLy_get_scratch_context(PLyExecutionContext *context)
 {
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4323..7cbe0a8d35 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
 /* Get the current execution context */
 extern PLyExecutionContext *PLy_current_execution_context(void);
 
+/* Get switch execution contexts */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext 
*new);
+
 /* Get the scratch memory context for specified execution context */
 extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
 
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index 07ab6a087e..031b2ed8a9 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,10 +28,30 @@
 #include "plpy_procedure.h"
 #include "plpy_resultobject.h"
 
+typedef struct
+{
+       DestReceiver pub;
+       PLyExecutionContext *exec_ctx;
+       MemoryContext parent_ctx;
+       MemoryContext cb_ctx;
+       TupleDesc       desc;
+       PLyTypeInfo *args;
+
+       PyObject        *result;
+} CallbackState;
+
 
+
+void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
+static CallbackState *PLy_Callback_New(PLyExecutionContext *exec_ctx);
+static CallbackState *PLy_Callback_Free(CallbackState *callback);
+static PLyResultObject *PLyCSNewResult(CallbackState *myState);
+ 
 static PyObject *PLy_spi_execute_query(char *query, long limit);
 static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
+static PyObject *PLy_spi_execute_fetch_result(CallbackState    *callback,
                                                         uint64 rows, int 
status);
 static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
 
@@ -196,6 +216,8 @@ PLy_spi_execute(PyObject *self, PyObject *args)
 static PyObject *
 PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 {
+       PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+       CallbackState   *callback;
        volatile int nargs;
        int                     i,
                                rv;
@@ -238,12 +260,12 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
 
        oldcontext = CurrentMemoryContext;
        oldowner = CurrentResourceOwner;
+       callback = PLy_Callback_New(exec_ctx);
 
        PLy_spi_subtransaction_begin(oldcontext, oldowner);
 
        PG_TRY();
        {
-               PLyExecutionContext *exec_ctx = PLy_current_execution_context();
                char       *volatile nulls;
                volatile int j;
 
@@ -289,9 +311,10 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
                        }
                }
 
-               rv = SPI_execute_plan(plan->plan, plan->values, nulls,
-                                                         
exec_ctx->curr_proc->fn_readonly, limit);
-               ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, 
rv);
+               rv = SPI_execute_plan_callback(plan->plan, plan->values, nulls,
+                                                       
exec_ctx->curr_proc->fn_readonly, limit,
+                                                       (DestReceiver *) 
callback);
+               ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
 
                if (nargs > 0)
                        pfree(nulls);
@@ -316,9 +339,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
                }
 
                PLy_spi_subtransaction_abort(oldcontext, oldowner);
+               PLy_Callback_Free(callback);
                return NULL;
        }
        PG_END_TRY();
+       callback = PLy_Callback_Free(callback);
 
        for (i = 0; i < nargs; i++)
        {
@@ -344,9 +369,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
 static PyObject *
 PLy_spi_execute_query(char *query, long limit)
 {
-       int                     rv;
+       PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+       CallbackState   *callback = PLy_Callback_New(exec_ctx);
        volatile MemoryContext oldcontext;
        volatile ResourceOwner oldowner;
+       int                     rv;
        PyObject   *ret = NULL;
 
        oldcontext = CurrentMemoryContext;
@@ -356,20 +383,22 @@ PLy_spi_execute_query(char *query, long limit)
 
        PG_TRY();
        {
-               PLyExecutionContext *exec_ctx = PLy_current_execution_context();
-
                pg_verifymbstr(query, strlen(query), false);
-               rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, 
limit);
-               ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, 
rv);
+               rv = SPI_execute_callback(query, 
exec_ctx->curr_proc->fn_readonly, limit,
+                               (DestReceiver *) callback);
+
+               ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
 
                PLy_spi_subtransaction_commit(oldcontext, oldowner);
        }
        PG_CATCH();
        {
                PLy_spi_subtransaction_abort(oldcontext, oldowner);
+               PLy_Callback_Free(callback);
                return NULL;
        }
        PG_END_TRY();
+       callback = PLy_Callback_Free(callback);
 
        if (rv < 0)
        {
@@ -383,94 +412,202 @@ PLy_spi_execute_query(char *query, long limit)
        return ret;
 }
 
-static PyObject *
-PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
+static CallbackState *
+PLy_Callback_New(PLyExecutionContext *exec_ctx)
+{
+       MemoryContext oldcontext, cb_ctx;
+       CallbackState *callback;
+
+       callback = palloc0(sizeof(CallbackState));
+
+       /*
+        * Use a new context to make cleanup easier. Allocate it in the current
+        * context so we don't have to worry about cleaning it up if there's an
+        * error.
+        */
+       cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+                                                               "PL/Python 
callback context",
+                                                               
ALLOCSET_DEFAULT_SIZES);
+
+       oldcontext = MemoryContextSwitchTo(cb_ctx);
+       callback->parent_ctx = oldcontext;
+       callback->cb_ctx = cb_ctx;
+       memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback), 
sizeof(DestReceiver));
+       callback->pub.receiveSlot = PLy_CSreceive;
+       callback->pub.rStartup = PLy_CSStartup;
+       callback->pub.rDestroy = PLy_CSDestroy;
+       callback->exec_ctx = exec_ctx;
+       
+       MemoryContextSwitchTo(oldcontext);
+
+       return callback;
+}
+
+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)
+{
+       if (callback)
+       {
+               if (callback->cb_ctx)
+                       (callback->pub.rDestroy) ((DestReceiver *) callback);
+
+               pfree(callback);
+       }
+
+       return (CallbackState *) NULL;
+}
+
+static PLyResultObject *
+PLyCSNewResult(CallbackState *myState)
 {
+       MemoryContext oldctx;
+
+       /* The result info needs to be in the parent context */
+       oldctx = MemoryContextSwitchTo(myState->parent_ctx);
+       myState->result = PLy_result_new();
+       if (myState->result == NULL)
+               PLy_elog(ERROR, "could not create new result object");
+
+       MemoryContextSwitchTo(oldctx);
+       return (PLyResultObject *) myState->result;
+}
+
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+       PLyExecutionContext *old_exec_ctx;
+       CallbackState *myState = (CallbackState *) self;
        PLyResultObject *result;
-       volatile MemoryContext oldcontext;
+       PLyTypeInfo *args;
+       MemoryContext mctx, old_mctx;
+
+       /*
+        * We may be in a different execution context when we're called, so 
switch
+        * back to our original one.
+        */
+       mctx = myState->cb_ctx;
+       old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+       old_mctx = MemoryContextSwitchTo(mctx);
+       
+       /* We need to store this because the TupleDesc the receive function 
gets has no names. */
+       myState->desc = typeinfo;
+
+       /* Setup type conversion info */
+       myState->args = args = palloc0(sizeof(PLyTypeInfo));
+       PLy_typeinfo_init(args, mctx);
+       PLy_input_tuple_funcs(args, typeinfo);
+
+       result = PLyCSNewResult(myState);
+       
+       /*
+        * Save tuple descriptor for later use by result set metadata
+        * functions.  Save it in TopMemoryContext so that it survives
+        * outside of an SPI context.  We trust that PLy_result_dealloc()
+        * will clean it up when the time is right. XXX This might result in a 
leak
+        * if an error happens and the result doesn't get dereferenced. 
+        */
+       MemoryContextSwitchTo(TopMemoryContext);
+       result->tupdesc = CreateTupleDescCopy(typeinfo);
 
-       result = (PLyResultObject *) PLy_result_new();
-       Py_DECREF(result->status);
-       result->status = PyInt_FromLong(status);
+       MemoryContextSwitchTo(old_mctx);
+       PLy_switch_execution_context(old_exec_ctx);
+}
 
-       if (status > 0 && tuptable == NULL)
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+       CallbackState *myState = (CallbackState *) self;
+       MemoryContext cb_ctx = myState->cb_ctx;
+
+       MemoryContextDelete(cb_ctx);
+       myState->cb_ctx = 0;
+}
+
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+       CallbackState   *myState = (CallbackState *) self;
+       TupleDesc               desc = myState->desc;
+       PLyTypeInfo     *args = myState->args;
+       PLyResultObject *result = (PLyResultObject *) myState->result;
+       PLyExecutionContext *old_exec_ctx = 
PLy_switch_execution_context(myState->exec_ctx);
+       MemoryContext   scratch_context = 
PLy_get_scratch_context(myState->exec_ctx);
+       MemoryContext   oldcontext = CurrentMemoryContext;
+       int                     rv = 1;
+       PyObject   *row;
+
+       /* Verify saved state matches incoming slot */
+       Assert(desc->tdtypeid == slot->tts_tupleDescriptor->tdtypeid);
+       Assert(args->in.r.natts == slot->tts_tupleDescriptor->natts);
+
+       /* Make sure the tuple is fully deconstructed */
+       slot_getallattrs(slot);
+
+       /*
+        * Do the work in the scratch context to avoid leaking memory from the
+        * datatype output function calls.
+        */
+       MemoryContextSwitchTo(scratch_context);
+
+       PG_TRY();
        {
-               Py_DECREF(result->nrows);
-               result->nrows = (rows > (uint64) LONG_MAX) ?
-                       PyFloat_FromDouble((double) rows) :
-                       PyInt_FromLong((long) rows);
+               row = PLyDict_FromTuple(args, ExecFetchSlotTuple(slot), desc);
        }
-       else if (status > 0 && tuptable != NULL)
+       PG_CATCH();
        {
-               PLyTypeInfo args;
-               MemoryContext cxt;
+               Py_XDECREF(row);
+               MemoryContextSwitchTo(oldcontext);
+               PLy_switch_execution_context(old_exec_ctx);
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
 
-               Py_DECREF(result->nrows);
-               result->nrows = (rows > (uint64) LONG_MAX) ?
-                       PyFloat_FromDouble((double) rows) :
-                       PyInt_FromLong((long) rows);
+       /*
+        * If we tried to do this in the PG_CATCH we'd have to mark row
+        * as volatile, but that won't work with PyList_Append, so just
+        * test the error code after doing Py_DECREF().
+        */
+       if (row)
+       {
+               rv = PyList_Append(result->rows, row);
+               Py_DECREF(row);
+       }
+       
+       if (rv != 0)
+               ereport(ERROR,
+                               (errmsg("unable to append value to list")));
 
-               cxt = AllocSetContextCreate(CurrentMemoryContext,
-                                                                       
"PL/Python temp context",
-                                                                       
ALLOCSET_DEFAULT_SIZES);
-               PLy_typeinfo_init(&args, cxt);
+       MemoryContextSwitchTo(oldcontext);
+       MemoryContextReset(scratch_context);
+       PLy_switch_execution_context(old_exec_ctx);
 
-               oldcontext = CurrentMemoryContext;
-               PG_TRY();
-               {
-                       MemoryContext oldcontext2;
+       return true;
+}
 
-                       if (rows)
-                       {
-                               uint64          i;
-
-                               /*
-                                * PyList_New() and PyList_SetItem() use 
Py_ssize_t for list
-                                * size and list indices; so we cannot support 
a result larger
-                                * than PY_SSIZE_T_MAX.
-                                */
-                               if (rows > (uint64) PY_SSIZE_T_MAX)
-                                       ereport(ERROR,
-                                                       
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-                                                        errmsg("query result 
has too many rows to fit in a Python list")));
-
-                               Py_DECREF(result->rows);
-                               result->rows = PyList_New(rows);
-
-                               PLy_input_tuple_funcs(&args, tuptable->tupdesc);
-                               for (i = 0; i < rows; i++)
-                               {
-                                       PyObject   *row = 
PLyDict_FromTuple(&args,
-                                                                               
                                tuptable->vals[i],
-                                                                               
                                tuptable->tupdesc);
 
-                                       PyList_SetItem(result->rows, i, row);
-                               }
-                       }
+static PyObject *
+PLy_spi_execute_fetch_result(CallbackState *callback, uint64 rows, int status)
+{
+       PLyResultObject *result = (PLyResultObject *) callback->result;
 
+       /* If status < 0 this stuff would just get thrown away anyway. */
+       if (status > 0)
+       {
+               if (!result)
+               {
                        /*
-                        * Save tuple descriptor for later use by result set 
metadata
-                        * functions.  Save it in TopMemoryContext so that it 
survives
-                        * outside of an SPI context.  We trust that 
PLy_result_dealloc()
-                        * will clean it up when the time is right.  (Do this 
as late as
-                        * possible, to minimize the number of ways the tupdesc 
could get
-                        * leaked due to errors.)
+                        * This happens if the command returned no results. 
Create a dummy result set.
                         */
-                       oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
-                       result->tupdesc = 
CreateTupleDescCopy(tuptable->tupdesc);
-                       MemoryContextSwitchTo(oldcontext2);
-               }
-               PG_CATCH();
-               {
-                       MemoryContextSwitchTo(oldcontext);
-                       MemoryContextDelete(cxt);
-                       Py_DECREF(result);
-                       PG_RE_THROW();
+                       result = PLyCSNewResult(callback);
+                       callback->result = (PyObject *) result;
                }
-               PG_END_TRY();
 
-               MemoryContextDelete(cxt);
-               SPI_freetuptable(tuptable);
+               Py_DECREF(result->status);
+               result->status = PyInt_FromLong(status);
+               Py_DECREF(result->nrows);
+               result->nrows = (rows > (uint64) LONG_MAX) ?
+                       PyFloat_FromDouble((double) rows) :
+                       PyInt_FromLong((long) rows);
        }
 
        return (PyObject *) result;
-- 
2.11.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to