On 4/5/17 7:44 PM, Jim Nasby wrote:
Updated patches attached, but I still need to update the docs.

Attached is a complete series of patches that includes the docs patch.

Right now, the docs don't include a concrete example, because adding one would be a pretty large if it demonstrated real usage, which presumably means Yet Another Contrib Module strictly for the purpose of demonstrating something. Rather than doing that, ISTM it'd be better to point the user at what plpythonu is doing.

Another option would be to have a very simple example that only uses *receiveSlot, but that seems rather pointless to me.
--
Jim Nasby, Chief Data Architect, Austin TX
OpenSCG                 http://OpenSCG.com
From 0a2ef661f55a047763a43b0eebd7483760e4a427 Mon Sep 17 00:00:00 2001
From: Jim Nasby <jim.na...@bluetreble.com>
Date: Wed, 5 Apr 2017 20:52:39 -0500
Subject: [PATCH 1/3] Add SPI_execute_callback

Instead of placing results in a tuplestore, this method of execution
uses the supplied callback when creating the Portal for a query.
---
 src/backend/executor/spi.c | 80 ++++++++++++++++++++++++++++++++++++++++------
 src/backend/tcop/dest.c    | 15 +++++++++
 src/include/executor/spi.h |  4 +++
 src/include/tcop/dest.h    |  1 +
 4 files changed, 90 insertions(+), 10 deletions(-)

diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index ca547dc6d9..4f6c3011f9 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, 
SPIPlanPtr plan);
 
 static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                  Snapshot snapshot, Snapshot 
crosscheck_snapshot,
-                                 bool read_only, bool fire_triggers, uint64 
tcount);
+                                 bool read_only, bool fire_triggers, uint64 
tcount,
+                                 DestReceiver *callback);
 
 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
                                        Datum *Values, const char *Nulls);
@@ -321,7 +322,35 @@ SPI_execute(const char *src, bool read_only, long tcount)
 
        res = _SPI_execute_plan(&plan, NULL,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
+
+       _SPI_end_call(true);
+       return res;
+}
+
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+               DestReceiver *callback)
+{
+       _SPI_plan       plan;
+       int                     res;
+
+       if (src == NULL || tcount < 0)
+               return SPI_ERROR_ARGUMENT;
+
+       res = _SPI_begin_call(true);
+       if (res < 0)
+               return res;
+
+       memset(&plan, 0, sizeof(_SPI_plan));
+       plan.magic = _SPI_PLAN_MAGIC;
+       plan.cursor_options = 0;
+
+       _SPI_prepare_oneshot_plan(src, &plan);
+
+       res = _SPI_execute_plan(&plan, NULL,
+                                                       InvalidSnapshot, 
InvalidSnapshot,
+                                                       read_only, true, 
tcount, callback);
 
        _SPI_end_call(true);
        return res;
@@ -355,7 +384,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const 
char *Nulls,
                                                        
_SPI_convert_params(plan->nargs, plan->argtypes,
                                                                                
                Values, Nulls),
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
+
+       _SPI_end_call(true);
+       return res;
+}
+
+/* Execute a previously prepared plan with a callback */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+                                bool read_only, long tcount, DestReceiver 
*callback)
+{
+       int                     res;
+
+       if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+               return SPI_ERROR_ARGUMENT;
+
+       if (plan->nargs > 0 && Values == NULL)
+               return SPI_ERROR_PARAM;
+
+       res = _SPI_begin_call(true);
+       if (res < 0)
+               return res;
+
+       res = _SPI_execute_plan(plan,
+                                                       
_SPI_convert_params(plan->nargs, plan->argtypes,
+                                                                               
                Values, Nulls),
+                                                       InvalidSnapshot, 
InvalidSnapshot,
+                                                       read_only, true, 
tcount, callback);
 
        _SPI_end_call(true);
        return res;
@@ -384,7 +440,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, 
ParamListInfo params,
 
        res = _SPI_execute_plan(plan, params,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -425,7 +481,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
                                                        
_SPI_convert_params(plan->nargs, plan->argtypes,
                                                                                
                Values, Nulls),
                                                        snapshot, 
crosscheck_snapshot,
-                                                       read_only, 
fire_triggers, tcount);
+                                                       read_only, 
fire_triggers, tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -472,7 +528,7 @@ SPI_execute_with_args(const char *src,
 
        res = _SPI_execute_plan(&plan, paramLI,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -1907,7 +1963,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr 
plan)
 static int
 _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                  Snapshot snapshot, Snapshot 
crosscheck_snapshot,
-                                 bool read_only, bool fire_triggers, uint64 
tcount)
+                                 bool read_only, bool fire_triggers, uint64 
tcount,
+                                 DestReceiver *callback)
 {
        int                     my_res = 0;
        uint64          my_processed = 0;
@@ -1918,6 +1975,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
        ErrorContextCallback spierrcontext;
        CachedPlan *cplan = NULL;
        ListCell   *lc1;
+       DestReceiver *dest = callback;
 
        /*
         * Setup error traceback support for ereport()
@@ -2037,7 +2095,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                {
                        PlannedStmt *stmt = castNode(PlannedStmt, lfirst(lc2));
                        bool            canSetTag = stmt->canSetTag;
-                       DestReceiver *dest;
 
                        _SPI_current->processed = 0;
                        _SPI_current->lastoid = InvalidOid;
@@ -2082,7 +2139,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                UpdateActiveSnapshotCommandId();
                        }
 
-                       dest = CreateDestReceiver(canSetTag ? DestSPI : 
DestNone);
+                       if (!callback)
+                               dest = CreateDestReceiver(canSetTag ? DestSPI : 
DestNone);
 
                        if (stmt->utilityStmt == NULL)
                        {
@@ -2108,6 +2166,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                        {
                                char            
completionTag[COMPLETION_TAG_BUFSIZE];
 
+                               Assert(!callback);
                                ProcessUtility(stmt,
                                                           
plansource->query_string,
                                                           
PROCESS_UTILITY_QUERY,
@@ -2281,7 +2340,8 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, 
uint64 tcount)
        switch (operation)
        {
                case CMD_SELECT:
-                       if (queryDesc->dest->mydest != DestSPI)
+                       if (queryDesc->dest->mydest != DestSPI &&
+                                       queryDesc->dest->mydest != 
DestSPICallback)
                        {
                                /* Don't return SPI_OK_SELECT if we're 
discarding result */
                                res = SPI_OK_UTILITY;
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 28081c3765..f68b6e1b51 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -87,6 +87,15 @@ static DestReceiver spi_printtupDR = {
        DestSPI
 };
 
+/*
+ * This is strictly a starting point for creating a callback. It should not
+ * actually be used.
+ */
+static DestReceiver spi_callbackDR = {
+       donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+       DestSPICallback
+};
+
 /* Globally available receiver for DestNone */
 DestReceiver *None_Receiver = &donothingDR;
 
@@ -126,6 +135,9 @@ CreateDestReceiver(CommandDest dest)
                case DestSPI:
                        return &spi_printtupDR;
 
+               case DestSPICallback:
+                       return &spi_callbackDR;
+
                case DestTuplestore:
                        return CreateTuplestoreDestReceiver();
 
@@ -172,6 +184,7 @@ EndCommand(const char *commandTag, CommandDest dest)
                case DestNone:
                case DestDebug:
                case DestSPI:
+               case DestSPICallback:
                case DestTuplestore:
                case DestIntoRel:
                case DestCopyOut:
@@ -216,6 +229,7 @@ NullCommand(CommandDest dest)
                case DestNone:
                case DestDebug:
                case DestSPI:
+               case DestSPICallback:
                case DestTuplestore:
                case DestIntoRel:
                case DestCopyOut:
@@ -262,6 +276,7 @@ ReadyForQuery(CommandDest dest)
                case DestNone:
                case DestDebug:
                case DestSPI:
+               case DestSPICallback:
                case DestTuplestore:
                case DestIntoRel:
                case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 94a805d477..13719e1df5 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -80,11 +80,15 @@ extern PGDLLIMPORT int SPI_result;
 extern int     SPI_connect(void);
 extern int     SPI_finish(void);
 extern int     SPI_execute(const char *src, bool read_only, long tcount);
+extern int     SPI_execute_callback(const char *src, bool read_only, long 
tcount,
+                                                                       
DestReceiver *callback);
 extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                                 bool read_only, long tcount);
 extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
                                                                ParamListInfo 
params,
                                                                bool read_only, 
long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const 
char *Nulls,
+                                bool read_only, long tcount, DestReceiver 
*callback);
 extern int     SPI_exec(const char *src, long tcount);
 extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                  long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index c459af2e13..1d1d641ae0 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -91,6 +91,7 @@ typedef enum
        DestRemoteExecute,                      /* sent to frontend, in Execute 
command */
        DestRemoteSimple,                       /* sent to frontend, w/no 
catalog access */
        DestSPI,                                        /* results sent to SPI 
manager */
+       DestSPICallback,                        /* results sent to 
user-specified callback function */
        DestTuplestore,                         /* results sent to Tuplestore */
        DestIntoRel,                            /* results sent to relation 
(SELECT INTO) */
        DestCopyOut,                            /* results sent to COPY TO code 
*/
-- 
2.11.1

From 093cbf519296ff262a534bbe912cafac477f5692 Mon Sep 17 00:00:00 2001
From: Jim Nasby <jim.na...@bluetreble.com>
Date: Wed, 5 Apr 2017 22:42:13 -0500
Subject: [PATCH 2/3] Add documentation for SPI_execute_callback

---
 doc/src/sgml/spi.sgml | 118 ++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 118 insertions(+)

diff --git a/doc/src/sgml/spi.sgml b/doc/src/sgml/spi.sgml
index 86be87c0fd..901bb9cbb1 100644
--- a/doc/src/sgml/spi.sgml
+++ b/doc/src/sgml/spi.sgml
@@ -600,6 +600,124 @@ int SPI_exec(const char * <parameter>command</parameter>, 
long <parameter>count<
 
 <!-- *********************************************** -->
 
+<refentry id="spi-spi-execute-callback">
+ <indexterm><primary>SPI_exececute_callback</primary></indexterm>
+
+ <refmeta>
+  <refentrytitle>SPI_execute_callback</refentrytitle>
+  <manvolnum>3</manvolnum>
+ </refmeta>
+
+ <refnamediv>
+  <refname>SPI_execute_callback</refname>
+  <refpurpose>execute a read/write command </refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+int SPI_execute_callback(const char * <parameter>command</parameter>, bool 
<parameter>read_only</parameter>,
+                                                long 
<parameter>count</parameter>, DestReceiver * <parameter>callback</parameter>)
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+
+  <para>
+   <function>SPI_execute_callback</function> is the same as
+   <function>SPI_execute</function>, except that instead of returning results
+   via <structname>SPITupleTable</structname>, the user-supplied 
<parameter>callback</parameter>
+   is used. Unlike 
+   <function>SPI_execute</function>,
+   <function>SPI_execute_callback</function>
+   will run the callback for every SQL command passed in to 
<parameter>command</parameter>.
+  </para>
+
+  <para>
+   The structure <structname>DestReceiver</structname> is defined
+   as:
+<programlisting>
+typedef struct _DestReceiver DestReceiver;
+
+struct _DestReceiver
+{
+       /* Called for each tuple to be output: */
+       bool            (*receiveSlot) (TupleTableSlot *slot,
+                                                                               
        DestReceiver *self);
+       /* Per-executor-run initialization and shutdown: */
+       void            (*rStartup) (DestReceiver *self,
+                                                                               
 int operation,
+                                                                               
 TupleDesc typeinfo);
+       void            (*rShutdown) (DestReceiver *self);
+       /* Destroy the receiver object itself (if dynamically allocated) */
+       void            (*rDestroy) (DestReceiver *self);
+       /* CommandDest code for this receiver */
+       CommandDest mydest;
+       /* Private fields might appear beyond this point... */
+};
+</programlisting>
+
+   <structfield>(*receiveSlot)</structfield> is a function that is called for
+   every tuple generated.
+
+   <structfield>(*rStartup)</structfield> and
+   <structfield>(*rShutdown)</structfield> are called when query execution
+   starts and finishes. Their use is optional.
+
+   <structfield>(*rDestroy)</structfield> is called when the query execution
+   context is freed. If your <structname>DestReceiver</structname> dynamically
+   allocates memory, you can use <structfield>(*rDestroy)</structfield> to free
+   that memory. You should do this if <structname>DestReceiver</structname> is
+   created in a long-lived memory context.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Arguments</title>
+
+  <variablelist>
+   <varlistentry>
+    <term><literal>const char * <parameter>command</parameter></literal></term>
+    <listitem>
+     <para>
+      string containing command to execute
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>long <parameter>count</parameter></literal></term>
+    <listitem>
+     <para>
+      maximum number of rows to return,
+      or <literal>0</> for no limit
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>DestReceiver 
<parameter>callback</parameter></literal></term>
+    <listitem>
+     <para>
+         callback to be executed for each tuple generated by
+         <parameter>command</parameter>
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </refsect1>
+
+ <refsect1>
+  <title>Return Value</title>
+
+  <para>
+   See <function>SPI_execute</function>.
+  </para>
+ </refsect1>
+</refentry>
+
+<!-- *********************************************** -->
+
 <refentry id="spi-spi-execute-with-args">
  <indexterm><primary>SPI_execute_with_args</primary></indexterm>
 
-- 
2.11.1

From 0ce5131c8bb03117d356b555431877baeb149175 Mon Sep 17 00:00:00 2001
From: Jim Nasby <jim.na...@bluetreble.com>
Date: Wed, 5 Apr 2017 21:30:39 -0500
Subject: [PATCH 3/3] Switch plpython to using SPI_execute_callback

This is a bare minimum patch to switch plpython to using SPI callbacks
in lieu of a tuplestore. Simple testing shows a ~27% speedup with a
simple generate_series(1,10000000).
---
 src/pl/plpython/plpy_main.c |  13 ++
 src/pl/plpython/plpy_main.h |   3 +
 src/pl/plpython/plpy_spi.c  | 313 ++++++++++++++++++++++++++++++++------------
 3 files changed, 248 insertions(+), 81 deletions(-)

diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804e54..07501f18f2 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
        return PLy_execution_contexts;
 }
 
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+       PLyExecutionContext *last = PLy_execution_contexts;
+
+       if (PLy_execution_contexts == NULL)
+               elog(ERROR, "no Python function is currently executing");
+
+       PLy_execution_contexts = new;
+
+       return last;
+}
+
 MemoryContext
 PLy_get_scratch_context(PLyExecutionContext *context)
 {
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4323..fe30dbc14b 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
 /* Get the current execution context */
 extern PLyExecutionContext *PLy_current_execution_context(void);
 
+/* Switch execution context (similar to MemoryContextSwitchTo */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext 
*new);
+
 /* Get the scratch memory context for specified execution context */
 extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
 
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index c6856ccbac..236cc6d998 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,9 +28,27 @@
 #include "plpy_procedure.h"
 #include "plpy_resultobject.h"
 
+typedef struct
+{
+       DestReceiver pub;
+       PLyExecutionContext *exec_ctx;
+       MemoryContext parent_ctx;
+       MemoryContext cb_ctx;
+       TupleDesc       desc;
+       PLyTypeInfo *args;
+
+       PyObject        *result;
+} CallbackState;
+
+static void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc 
typeinfo);
+static void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
+static CallbackState *PLy_Callback_New(PLyExecutionContext *exec_ctx);
+static CallbackState *PLy_Callback_Free(CallbackState *callback);
+static PLyResultObject *PLyCSNewResult(CallbackState *myState);
 
 static PyObject *PLy_spi_execute_query(char *query, long limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
+static PyObject *PLy_spi_execute_fetch_result(CallbackState *callback,
                                                         uint64 rows, int 
status);
 static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
 
@@ -195,6 +213,8 @@ PLy_spi_execute(PyObject *self, PyObject *args)
 PyObject *
 PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 {
+       PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+       CallbackState   *callback;
        volatile int nargs;
        int                     i,
                                rv;
@@ -237,12 +257,12 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
 
        oldcontext = CurrentMemoryContext;
        oldowner = CurrentResourceOwner;
+       callback = PLy_Callback_New(exec_ctx);
 
        PLy_spi_subtransaction_begin(oldcontext, oldowner);
 
        PG_TRY();
        {
-               PLyExecutionContext *exec_ctx = PLy_current_execution_context();
                char       *volatile nulls;
                volatile int j;
 
@@ -288,9 +308,10 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
                        }
                }
 
-               rv = SPI_execute_plan(plan->plan, plan->values, nulls,
-                                                         
exec_ctx->curr_proc->fn_readonly, limit);
-               ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, 
rv);
+               rv = SPI_execute_plan_callback(plan->plan, plan->values, nulls,
+                                                       
exec_ctx->curr_proc->fn_readonly, limit,
+                                                       (DestReceiver *) 
callback);
+               ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
 
                if (nargs > 0)
                        pfree(nulls);
@@ -315,9 +336,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
                }
 
                PLy_spi_subtransaction_abort(oldcontext, oldowner);
+               PLy_Callback_Free(callback);
                return NULL;
        }
        PG_END_TRY();
+       callback = PLy_Callback_Free(callback);
 
        for (i = 0; i < nargs; i++)
        {
@@ -343,9 +366,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long 
limit)
 static PyObject *
 PLy_spi_execute_query(char *query, long limit)
 {
-       int                     rv;
+       PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+       CallbackState   *callback = PLy_Callback_New(exec_ctx);
        volatile MemoryContext oldcontext;
        volatile ResourceOwner oldowner;
+       int                     rv;
        PyObject   *ret = NULL;
 
        oldcontext = CurrentMemoryContext;
@@ -355,20 +380,22 @@ PLy_spi_execute_query(char *query, long limit)
 
        PG_TRY();
        {
-               PLyExecutionContext *exec_ctx = PLy_current_execution_context();
-
                pg_verifymbstr(query, strlen(query), false);
-               rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, 
limit);
-               ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, 
rv);
+               rv = SPI_execute_callback(query, 
exec_ctx->curr_proc->fn_readonly, limit,
+                               (DestReceiver *) callback);
+
+               ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
 
                PLy_spi_subtransaction_commit(oldcontext, oldowner);
        }
        PG_CATCH();
        {
                PLy_spi_subtransaction_abort(oldcontext, oldowner);
+               PLy_Callback_Free(callback);
                return NULL;
        }
        PG_END_TRY();
+       callback = PLy_Callback_Free(callback);
 
        if (rv < 0)
        {
@@ -382,94 +409,218 @@ PLy_spi_execute_query(char *query, long limit)
        return ret;
 }
 
-static PyObject *
-PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
+static CallbackState *
+PLy_Callback_New(PLyExecutionContext *exec_ctx)
+{
+       MemoryContext oldcontext, cb_ctx;
+       CallbackState *callback;
+
+       /* XXX does this really need palloc0? */
+       callback = palloc0(sizeof(CallbackState));
+
+       /*
+        * Use a new context to make cleanup easier. Allocate it in the current
+        * context so we don't have to worry about cleaning it up if there's an
+        * error.
+        */
+       cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+                                                               "PL/Python 
callback context",
+                                                               
ALLOCSET_DEFAULT_SIZES);
+
+       oldcontext = MemoryContextSwitchTo(cb_ctx);
+       callback->parent_ctx = oldcontext;
+       callback->cb_ctx = cb_ctx;
+       memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback), 
sizeof(DestReceiver));
+       callback->pub.receiveSlot = PLy_CSreceive;
+       callback->pub.rStartup = PLy_CSStartup;
+       callback->pub.rDestroy = PLy_CSDestroy;
+       callback->exec_ctx = exec_ctx;
+
+       MemoryContextSwitchTo(oldcontext);
+
+       return callback;
+}
+
+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)
+{
+       if (callback)
+       {
+               if (callback->cb_ctx)
+                       (callback->pub.rDestroy) ((DestReceiver *) callback);
+
+               pfree(callback);
+       }
+
+       return (CallbackState *) NULL;
+}
+
+static PLyResultObject *
+PLyCSNewResult(CallbackState *myState)
 {
+       MemoryContext oldctx;
+
+       /* The result info needs to be in the parent context */
+       oldctx = MemoryContextSwitchTo(myState->parent_ctx);
+       myState->result = PLy_result_new();
+       if (myState->result == NULL)
+               PLy_elog(ERROR, "could not create new result object");
+
+       MemoryContextSwitchTo(oldctx);
+       return (PLyResultObject *) myState->result;
+}
+
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+       PLyExecutionContext *old_exec_ctx;
+       CallbackState *myState = (CallbackState *) self;
        PLyResultObject *result;
-       volatile MemoryContext oldcontext;
+       PLyTypeInfo *args;
+       MemoryContext mctx, old_mctx;
+
+       /*
+        * We may be in a different execution context when we're called, so 
switch
+        * back to our original one.
+        */
+       mctx = myState->cb_ctx;
+       old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+       old_mctx = MemoryContextSwitchTo(mctx);
+
+       /*
+        * We need to store this because the TupleDesc that the receive function
+        * gets has no names.
+        */
+       myState->desc = typeinfo;
+
+       /* Setup type conversion info */
+       myState->args = args = palloc0(sizeof(PLyTypeInfo));
+       PLy_typeinfo_init(args, mctx);
+       PLy_input_tuple_funcs(args, typeinfo);
+
+       /* result is actually myState.result */
+       result = PLyCSNewResult(myState);
+
+       /*
+        * Save tuple descriptor for later use by result set metadata
+        * functions.  Save it in TopMemoryContext so that it survives outside 
of
+        * an SPI context.  We trust that PLy_result_dealloc() will clean it up
+        * when the time is right. The difference between result and everything
+        * else is that result needs to survive after the portal is destroyed,
+        * because result is what's handed back to the plpython function. While
+        * it's tempting to use something other than TopMemoryContext, that 
won't
+        * work: the user could potentially put result into the global 
dictionary,
+        * which means it could survive as long as the session does.  This might
+        * result in a leak if an error happens and the result doesn't get
+        * dereferenced, but if that happens it means the python GC has failed 
us,
+        * at which point we probably have bigger problems.
+        *
+        * This still isn't perfect though; if something the result tupledesc
+        * references has it's OID changed then the tupledesc will be invalid. 
I'm
+        * not sure it's worth worrying about that though.
+        */
+       MemoryContextSwitchTo(TopMemoryContext);
+       result->tupdesc = CreateTupleDescCopy(typeinfo);
+
+       MemoryContextSwitchTo(old_mctx);
+       PLy_switch_execution_context(old_exec_ctx);
+}
+
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+       CallbackState *myState = (CallbackState *) self;
+       MemoryContext cb_ctx = myState->cb_ctx;
 
-       result = (PLyResultObject *) PLy_result_new();
-       Py_DECREF(result->status);
-       result->status = PyInt_FromLong(status);
+       MemoryContextDelete(cb_ctx);
+       myState->cb_ctx = 0;
+}
 
-       if (status > 0 && tuptable == NULL)
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+       CallbackState   *myState = (CallbackState *) self;
+       TupleDesc               desc = myState->desc;
+       PLyTypeInfo     *args = myState->args;
+       PLyResultObject *result = (PLyResultObject *) myState->result;
+       PLyExecutionContext *old_exec_ctx = 
PLy_switch_execution_context(myState->exec_ctx);
+       MemoryContext   scratch_context = 
PLy_get_scratch_context(myState->exec_ctx);
+       MemoryContext   oldcontext = CurrentMemoryContext;
+       int                     rv = 1;
+       PyObject   *row;
+
+       /* Verify saved state matches incoming slot */
+       Assert(desc->tdtypeid == slot->tts_tupleDescriptor->tdtypeid);
+       Assert(args->in.r.natts == slot->tts_tupleDescriptor->natts);
+
+       /* Make sure the tuple is fully deconstructed */
+       slot_getallattrs(slot);
+
+       /*
+        * Do the work in the scratch context to avoid leaking memory from the
+        * datatype output function calls.
+        */
+       MemoryContextSwitchTo(scratch_context);
+
+       PG_TRY();
        {
-               Py_DECREF(result->nrows);
-               result->nrows = (rows > (uint64) LONG_MAX) ?
-                       PyFloat_FromDouble((double) rows) :
-                       PyInt_FromLong((long) rows);
+               row = PLyDict_FromTuple(args, ExecFetchSlotTuple(slot), desc);
        }
-       else if (status > 0 && tuptable != NULL)
+       PG_CATCH();
        {
-               PLyTypeInfo args;
-               MemoryContext cxt;
+               Py_XDECREF(row);
+               MemoryContextSwitchTo(oldcontext);
+               PLy_switch_execution_context(old_exec_ctx);
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
 
-               Py_DECREF(result->nrows);
-               result->nrows = (rows > (uint64) LONG_MAX) ?
-                       PyFloat_FromDouble((double) rows) :
-                       PyInt_FromLong((long) rows);
+       /*
+        * If we tried to do this in the PG_CATCH we'd have to mark row
+        * as volatile, but that won't work with PyList_Append, so just
+        * test the error code after doing Py_DECREF().
+        */
+       if (row)
+       {
+               rv = PyList_Append(result->rows, row);
+               Py_DECREF(row);
+       }
 
-               cxt = AllocSetContextCreate(CurrentMemoryContext,
-                                                                       
"PL/Python temp context",
-                                                                       
ALLOCSET_DEFAULT_SIZES);
-               PLy_typeinfo_init(&args, cxt);
+       if (rv != 0)
+               ereport(ERROR,
+                               (errmsg("unable to append value to list")));
 
-               oldcontext = CurrentMemoryContext;
-               PG_TRY();
-               {
-                       MemoryContext oldcontext2;
+       MemoryContextSwitchTo(oldcontext);
+       MemoryContextReset(scratch_context);
+       PLy_switch_execution_context(old_exec_ctx);
 
-                       if (rows)
-                       {
-                               uint64          i;
-
-                               /*
-                                * PyList_New() and PyList_SetItem() use 
Py_ssize_t for list
-                                * size and list indices; so we cannot support 
a result larger
-                                * than PY_SSIZE_T_MAX.
-                                */
-                               if (rows > (uint64) PY_SSIZE_T_MAX)
-                                       ereport(ERROR,
-                                                       
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-                                                        errmsg("query result 
has too many rows to fit in a Python list")));
-
-                               Py_DECREF(result->rows);
-                               result->rows = PyList_New(rows);
-
-                               PLy_input_tuple_funcs(&args, tuptable->tupdesc);
-                               for (i = 0; i < rows; i++)
-                               {
-                                       PyObject   *row = 
PLyDict_FromTuple(&args,
-                                                                               
                                tuptable->vals[i],
-                                                                               
                                tuptable->tupdesc);
+       return true;
+}
 
-                                       PyList_SetItem(result->rows, i, row);
-                               }
-                       }
 
+static PyObject *
+PLy_spi_execute_fetch_result(CallbackState *callback, uint64 rows, int status)
+{
+       PLyResultObject *result = (PLyResultObject *) callback->result;
+
+       /* If status < 0 this stuff would just get thrown away anyway. */
+       if (status > 0)
+       {
+               if (!result)
+               {
                        /*
-                        * Save tuple descriptor for later use by result set 
metadata
-                        * functions.  Save it in TopMemoryContext so that it 
survives
-                        * outside of an SPI context.  We trust that 
PLy_result_dealloc()
-                        * will clean it up when the time is right.  (Do this 
as late as
-                        * possible, to minimize the number of ways the tupdesc 
could get
-                        * leaked due to errors.)
+                        * This happens if the command returned no results. 
Create a dummy result set.
                         */
-                       oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
-                       result->tupdesc = 
CreateTupleDescCopy(tuptable->tupdesc);
-                       MemoryContextSwitchTo(oldcontext2);
+                       result = PLyCSNewResult(callback);
+                       callback->result = (PyObject *) result;
                }
-               PG_CATCH();
-               {
-                       MemoryContextSwitchTo(oldcontext);
-                       MemoryContextDelete(cxt);
-                       Py_DECREF(result);
-                       PG_RE_THROW();
-               }
-               PG_END_TRY();
 
-               MemoryContextDelete(cxt);
-               SPI_freetuptable(tuptable);
+               Py_DECREF(result->status);
+               result->status = PyInt_FromLong(status);
+               Py_DECREF(result->nrows);
+               result->nrows = (rows > (uint64) LONG_MAX) ?
+                       PyFloat_FromDouble((double) rows) :
+                       PyInt_FromLong((long) rows);
        }
 
        return (PyObject *) result;
-- 
2.11.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to