On 2/28/17 9:42 PM, Jim Nasby wrote:
I'll post a plpython patch that doesn't add the output format control.
I've attached the results of that. Unfortunately the speed improvement
is only 27% at this point (with 9999999 tuples). Presumably that's
because it's constructing a brand new dictionary from scratch for each
tuple.
I found a couple bugs. New patches attached.
--
Jim Nasby, Chief Data Architect, OpenSCG
From 116b6a45b0146e42f1faa130d78e9362950c18c3 Mon Sep 17 00:00:00 2001
From: Jim Nasby <jim.na...@bluetreble.com>
Date: Wed, 1 Mar 2017 15:45:51 -0600
Subject: [PATCH 1/2] Add SPI_execute_callback() and callback-based
DestReceiver.
Instead of placing results in a tuplestore, this method of execution
uses the supplied callback when creating the Portal for a query.
---
src/backend/executor/spi.c | 79 ++++++++++++++++++++++++++++++++++++++++------
src/backend/tcop/dest.c | 11 +++++++
src/include/executor/spi.h | 4 +++
src/include/tcop/dest.h | 1 +
4 files changed, 85 insertions(+), 10 deletions(-)
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 55f97b14e6..ffeba679da 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src,
SPIPlanPtr plan);
static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot
crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64
tcount);
+ bool read_only, bool fire_triggers, uint64
tcount,
+ DestReceiver *callback);
static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
Datum *Values, const char *Nulls);
@@ -320,7 +321,34 @@ SPI_execute(const char *src, bool read_only, long tcount)
res = _SPI_execute_plan(&plan, NULL,
InvalidSnapshot,
InvalidSnapshot,
- read_only, true,
tcount);
+ read_only, true,
tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+ DestReceiver *callback)
+{
+ _SPI_plan plan;
+ int res;
+
+ if (src == NULL || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ memset(&plan, 0, sizeof(_SPI_plan));
+ plan.magic = _SPI_PLAN_MAGIC;
+ plan.cursor_options = 0;
+
+ _SPI_prepare_oneshot_plan(src, &plan);
+
+ res = _SPI_execute_plan(&plan, NULL,
+ InvalidSnapshot,
InvalidSnapshot,
+ read_only, true,
tcount, callback);
_SPI_end_call(true);
return res;
@@ -354,7 +382,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const
char *Nulls,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
InvalidSnapshot,
InvalidSnapshot,
- read_only, true,
tcount);
+ read_only, true,
tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/* Execute a previously prepared plan with a callback Destination */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+ bool read_only, long tcount, DestReceiver
*callback)
+{
+ int res;
+
+ if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ if (plan->nargs > 0 && Values == NULL)
+ return SPI_ERROR_PARAM;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ res = _SPI_execute_plan(plan,
+
_SPI_convert_params(plan->nargs, plan->argtypes,
+
Values, Nulls),
+ InvalidSnapshot,
InvalidSnapshot,
+ read_only, true,
tcount, callback);
_SPI_end_call(true);
return res;
@@ -383,7 +438,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
ParamListInfo params,
res = _SPI_execute_plan(plan, params,
InvalidSnapshot,
InvalidSnapshot,
- read_only, true,
tcount);
+ read_only, true,
tcount, NULL);
_SPI_end_call(true);
return res;
@@ -424,7 +479,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
_SPI_convert_params(plan->nargs, plan->argtypes,
Values, Nulls),
snapshot,
crosscheck_snapshot,
- read_only,
fire_triggers, tcount);
+ read_only,
fire_triggers, tcount, NULL);
_SPI_end_call(true);
return res;
@@ -471,7 +526,7 @@ SPI_execute_with_args(const char *src,
res = _SPI_execute_plan(&plan, paramLI,
InvalidSnapshot,
InvalidSnapshot,
- read_only, true,
tcount);
+ read_only, true,
tcount, NULL);
_SPI_end_call(true);
return res;
@@ -1892,7 +1947,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr
plan)
static int
_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
Snapshot snapshot, Snapshot
crosscheck_snapshot,
- bool read_only, bool fire_triggers, uint64
tcount)
+ bool read_only, bool fire_triggers, uint64
tcount,
+ DestReceiver *callback)
{
int my_res = 0;
uint64 my_processed = 0;
@@ -1903,6 +1959,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL;
ListCell *lc1;
+ DestReceiver *dest = callback;
/*
* Setup error traceback support for ereport()
@@ -2020,7 +2077,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
PlannedStmt *stmt = castNode(PlannedStmt, lfirst(lc2));
bool canSetTag = stmt->canSetTag;
- DestReceiver *dest;
_SPI_current->processed = 0;
_SPI_current->lastoid = InvalidOid;
@@ -2065,7 +2121,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
UpdateActiveSnapshotCommandId();
}
- dest = CreateDestReceiver(canSetTag ? DestSPI :
DestNone);
+ if (!callback)
+ dest = CreateDestReceiver(canSetTag ? DestSPI :
DestNone);
if (stmt->utilityStmt == NULL)
{
@@ -2090,6 +2147,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
{
char
completionTag[COMPLETION_TAG_BUFSIZE];
+ // XXX throw error if callback is set
ProcessUtility(stmt,
plansource->query_string,
PROCESS_UTILITY_QUERY,
@@ -2262,7 +2320,8 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers,
uint64 tcount)
switch (operation)
{
case CMD_SELECT:
- if (queryDesc->dest->mydest != DestSPI)
+ if (queryDesc->dest->mydest != DestSPI &&
+ queryDesc->dest->mydest !=
DestSPICallback)
{
/* Don't return SPI_OK_SELECT if we're
discarding result */
res = SPI_OK_UTILITY;
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 28081c3765..bd671e0b26 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -87,6 +87,11 @@ static DestReceiver spi_printtupDR = {
DestSPI
};
+static DestReceiver spi_callbackDR = {
+ donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+ DestSPICallback
+};
+
/* Globally available receiver for DestNone */
DestReceiver *None_Receiver = &donothingDR;
@@ -126,6 +131,9 @@ CreateDestReceiver(CommandDest dest)
case DestSPI:
return &spi_printtupDR;
+ case DestSPICallback:
+ return &spi_callbackDR;
+
case DestTuplestore:
return CreateTuplestoreDestReceiver();
@@ -172,6 +180,7 @@ EndCommand(const char *commandTag, CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -216,6 +225,7 @@ NullCommand(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
@@ -262,6 +272,7 @@ ReadyForQuery(CommandDest dest)
case DestNone:
case DestDebug:
case DestSPI:
+ case DestSPICallback:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index a18ae63245..d779511130 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -74,11 +74,15 @@ extern PGDLLIMPORT int SPI_result;
extern int SPI_connect(void);
extern int SPI_finish(void);
extern int SPI_execute(const char *src, bool read_only, long tcount);
+extern int SPI_execute_callback(const char *src, bool read_only, long
tcount,
+
DestReceiver *callback);
extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
bool read_only, long tcount);
extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
ParamListInfo
params,
bool read_only,
long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const
char *Nulls,
+ bool read_only, long tcount, DestReceiver
*callback);
extern int SPI_exec(const char *src, long tcount);
extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index c459af2e13..1d1d641ae0 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -91,6 +91,7 @@ typedef enum
DestRemoteExecute, /* sent to frontend, in Execute
command */
DestRemoteSimple, /* sent to frontend, w/no
catalog access */
DestSPI, /* results sent to SPI
manager */
+ DestSPICallback, /* results sent to
user-specified callback function */
DestTuplestore, /* results sent to Tuplestore */
DestIntoRel, /* results sent to relation
(SELECT INTO) */
DestCopyOut, /* results sent to COPY TO code
*/
--
2.11.1
From f6a623637870f67036e375b2928ac1adc9c7184b Mon Sep 17 00:00:00 2001
From: Jim Nasby <jim.na...@bluetreble.com>
Date: Wed, 1 Mar 2017 15:46:53 -0600
Subject: [PATCH 2/2] Modify plpython to use SPI callbacks
This is a bare minimum patch to switch plpython to using SPI callbacks
in lieu of a tuplestore. Simple testing shows a ~27% speedup with a
simple generate_series(1,10000000).
---
src/pl/plpython/plpy_main.c | 13 ++
src/pl/plpython/plpy_main.h | 3 +
src/pl/plpython/plpy_spi.c | 299 ++++++++++++++++++++++++++++++++------------
3 files changed, 234 insertions(+), 81 deletions(-)
diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 860b804e54..07501f18f2 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -403,6 +403,19 @@ PLy_current_execution_context(void)
return PLy_execution_contexts;
}
+PLyExecutionContext *
+PLy_switch_execution_context(PLyExecutionContext *new)
+{
+ PLyExecutionContext *last = PLy_execution_contexts;
+
+ if (PLy_execution_contexts == NULL)
+ elog(ERROR, "no Python function is currently executing");
+
+ PLy_execution_contexts = new;
+
+ return last;
+}
+
MemoryContext
PLy_get_scratch_context(PLyExecutionContext *context)
{
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4323..7cbe0a8d35 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -25,6 +25,9 @@ typedef struct PLyExecutionContext
/* Get the current execution context */
extern PLyExecutionContext *PLy_current_execution_context(void);
+/* Get switch execution contexts */
+extern PLyExecutionContext *PLy_switch_execution_context(PLyExecutionContext
*new);
+
/* Get the scratch memory context for specified execution context */
extern MemoryContext PLy_get_scratch_context(PLyExecutionContext *context);
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index 07ab6a087e..031b2ed8a9 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -28,10 +28,30 @@
#include "plpy_procedure.h"
#include "plpy_resultobject.h"
+typedef struct
+{
+ DestReceiver pub;
+ PLyExecutionContext *exec_ctx;
+ MemoryContext parent_ctx;
+ MemoryContext cb_ctx;
+ TupleDesc desc;
+ PLyTypeInfo *args;
+
+ PyObject *result;
+} CallbackState;
+
+
+void PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo);
+void PLy_CSDestroy(DestReceiver *self);
+static bool PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self);
+static CallbackState *PLy_Callback_New(PLyExecutionContext *exec_ctx);
+static CallbackState *PLy_Callback_Free(CallbackState *callback);
+static PLyResultObject *PLyCSNewResult(CallbackState *myState);
+
static PyObject *PLy_spi_execute_query(char *query, long limit);
static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long
limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
+static PyObject *PLy_spi_execute_fetch_result(CallbackState *callback,
uint64 rows, int
status);
static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
@@ -196,6 +216,8 @@ PLy_spi_execute(PyObject *self, PyObject *args)
static PyObject *
PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
{
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+ CallbackState *callback;
volatile int nargs;
int i,
rv;
@@ -238,12 +260,12 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long
limit)
oldcontext = CurrentMemoryContext;
oldowner = CurrentResourceOwner;
+ callback = PLy_Callback_New(exec_ctx);
PLy_spi_subtransaction_begin(oldcontext, oldowner);
PG_TRY();
{
- PLyExecutionContext *exec_ctx = PLy_current_execution_context();
char *volatile nulls;
volatile int j;
@@ -289,9 +311,10 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long
limit)
}
}
- rv = SPI_execute_plan(plan->plan, plan->values, nulls,
-
exec_ctx->curr_proc->fn_readonly, limit);
- ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed,
rv);
+ rv = SPI_execute_plan_callback(plan->plan, plan->values, nulls,
+
exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *)
callback);
+ ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
if (nargs > 0)
pfree(nulls);
@@ -316,9 +339,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long
limit)
}
PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ PLy_Callback_Free(callback);
return NULL;
}
PG_END_TRY();
+ callback = PLy_Callback_Free(callback);
for (i = 0; i < nargs; i++)
{
@@ -344,9 +369,11 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long
limit)
static PyObject *
PLy_spi_execute_query(char *query, long limit)
{
- int rv;
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+ CallbackState *callback = PLy_Callback_New(exec_ctx);
volatile MemoryContext oldcontext;
volatile ResourceOwner oldowner;
+ int rv;
PyObject *ret = NULL;
oldcontext = CurrentMemoryContext;
@@ -356,20 +383,22 @@ PLy_spi_execute_query(char *query, long limit)
PG_TRY();
{
- PLyExecutionContext *exec_ctx = PLy_current_execution_context();
-
pg_verifymbstr(query, strlen(query), false);
- rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly,
limit);
- ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed,
rv);
+ rv = SPI_execute_callback(query,
exec_ctx->curr_proc->fn_readonly, limit,
+ (DestReceiver *) callback);
+
+ ret = PLy_spi_execute_fetch_result(callback, SPI_processed, rv);
PLy_spi_subtransaction_commit(oldcontext, oldowner);
}
PG_CATCH();
{
PLy_spi_subtransaction_abort(oldcontext, oldowner);
+ PLy_Callback_Free(callback);
return NULL;
}
PG_END_TRY();
+ callback = PLy_Callback_Free(callback);
if (rv < 0)
{
@@ -383,94 +412,202 @@ PLy_spi_execute_query(char *query, long limit)
return ret;
}
-static PyObject *
-PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
+static CallbackState *
+PLy_Callback_New(PLyExecutionContext *exec_ctx)
+{
+ MemoryContext oldcontext, cb_ctx;
+ CallbackState *callback;
+
+ callback = palloc0(sizeof(CallbackState));
+
+ /*
+ * Use a new context to make cleanup easier. Allocate it in the current
+ * context so we don't have to worry about cleaning it up if there's an
+ * error.
+ */
+ cb_ctx = AllocSetContextCreate(CurrentMemoryContext,
+ "PL/Python
callback context",
+
ALLOCSET_DEFAULT_SIZES);
+
+ oldcontext = MemoryContextSwitchTo(cb_ctx);
+ callback->parent_ctx = oldcontext;
+ callback->cb_ctx = cb_ctx;
+ memcpy(&(callback->pub), CreateDestReceiver(DestSPICallback),
sizeof(DestReceiver));
+ callback->pub.receiveSlot = PLy_CSreceive;
+ callback->pub.rStartup = PLy_CSStartup;
+ callback->pub.rDestroy = PLy_CSDestroy;
+ callback->exec_ctx = exec_ctx;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return callback;
+}
+
+static CallbackState *
+PLy_Callback_Free(CallbackState *callback)
+{
+ if (callback)
+ {
+ if (callback->cb_ctx)
+ (callback->pub.rDestroy) ((DestReceiver *) callback);
+
+ pfree(callback);
+ }
+
+ return (CallbackState *) NULL;
+}
+
+static PLyResultObject *
+PLyCSNewResult(CallbackState *myState)
{
+ MemoryContext oldctx;
+
+ /* The result info needs to be in the parent context */
+ oldctx = MemoryContextSwitchTo(myState->parent_ctx);
+ myState->result = PLy_result_new();
+ if (myState->result == NULL)
+ PLy_elog(ERROR, "could not create new result object");
+
+ MemoryContextSwitchTo(oldctx);
+ return (PLyResultObject *) myState->result;
+}
+
+void
+PLy_CSStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ PLyExecutionContext *old_exec_ctx;
+ CallbackState *myState = (CallbackState *) self;
PLyResultObject *result;
- volatile MemoryContext oldcontext;
+ PLyTypeInfo *args;
+ MemoryContext mctx, old_mctx;
+
+ /*
+ * We may be in a different execution context when we're called, so
switch
+ * back to our original one.
+ */
+ mctx = myState->cb_ctx;
+ old_exec_ctx = PLy_switch_execution_context(myState->exec_ctx);
+ old_mctx = MemoryContextSwitchTo(mctx);
+
+ /* We need to store this because the TupleDesc the receive function
gets has no names. */
+ myState->desc = typeinfo;
+
+ /* Setup type conversion info */
+ myState->args = args = palloc0(sizeof(PLyTypeInfo));
+ PLy_typeinfo_init(args, mctx);
+ PLy_input_tuple_funcs(args, typeinfo);
+
+ result = PLyCSNewResult(myState);
+
+ /*
+ * Save tuple descriptor for later use by result set metadata
+ * functions. Save it in TopMemoryContext so that it survives
+ * outside of an SPI context. We trust that PLy_result_dealloc()
+ * will clean it up when the time is right. XXX This might result in a
leak
+ * if an error happens and the result doesn't get dereferenced.
+ */
+ MemoryContextSwitchTo(TopMemoryContext);
+ result->tupdesc = CreateTupleDescCopy(typeinfo);
- result = (PLyResultObject *) PLy_result_new();
- Py_DECREF(result->status);
- result->status = PyInt_FromLong(status);
+ MemoryContextSwitchTo(old_mctx);
+ PLy_switch_execution_context(old_exec_ctx);
+}
- if (status > 0 && tuptable == NULL)
+void
+PLy_CSDestroy(DestReceiver *self)
+{
+ CallbackState *myState = (CallbackState *) self;
+ MemoryContext cb_ctx = myState->cb_ctx;
+
+ MemoryContextDelete(cb_ctx);
+ myState->cb_ctx = 0;
+}
+
+static bool
+PLy_CSreceive(TupleTableSlot *slot, DestReceiver *self)
+{
+ CallbackState *myState = (CallbackState *) self;
+ TupleDesc desc = myState->desc;
+ PLyTypeInfo *args = myState->args;
+ PLyResultObject *result = (PLyResultObject *) myState->result;
+ PLyExecutionContext *old_exec_ctx =
PLy_switch_execution_context(myState->exec_ctx);
+ MemoryContext scratch_context =
PLy_get_scratch_context(myState->exec_ctx);
+ MemoryContext oldcontext = CurrentMemoryContext;
+ int rv = 1;
+ PyObject *row;
+
+ /* Verify saved state matches incoming slot */
+ Assert(desc->tdtypeid == slot->tts_tupleDescriptor->tdtypeid);
+ Assert(args->in.r.natts == slot->tts_tupleDescriptor->natts);
+
+ /* Make sure the tuple is fully deconstructed */
+ slot_getallattrs(slot);
+
+ /*
+ * Do the work in the scratch context to avoid leaking memory from the
+ * datatype output function calls.
+ */
+ MemoryContextSwitchTo(scratch_context);
+
+ PG_TRY();
{
- Py_DECREF(result->nrows);
- result->nrows = (rows > (uint64) LONG_MAX) ?
- PyFloat_FromDouble((double) rows) :
- PyInt_FromLong((long) rows);
+ row = PLyDict_FromTuple(args, ExecFetchSlotTuple(slot), desc);
}
- else if (status > 0 && tuptable != NULL)
+ PG_CATCH();
{
- PLyTypeInfo args;
- MemoryContext cxt;
+ Py_XDECREF(row);
+ MemoryContextSwitchTo(oldcontext);
+ PLy_switch_execution_context(old_exec_ctx);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
- Py_DECREF(result->nrows);
- result->nrows = (rows > (uint64) LONG_MAX) ?
- PyFloat_FromDouble((double) rows) :
- PyInt_FromLong((long) rows);
+ /*
+ * If we tried to do this in the PG_CATCH we'd have to mark row
+ * as volatile, but that won't work with PyList_Append, so just
+ * test the error code after doing Py_DECREF().
+ */
+ if (row)
+ {
+ rv = PyList_Append(result->rows, row);
+ Py_DECREF(row);
+ }
+
+ if (rv != 0)
+ ereport(ERROR,
+ (errmsg("unable to append value to list")));
- cxt = AllocSetContextCreate(CurrentMemoryContext,
-
"PL/Python temp context",
-
ALLOCSET_DEFAULT_SIZES);
- PLy_typeinfo_init(&args, cxt);
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextReset(scratch_context);
+ PLy_switch_execution_context(old_exec_ctx);
- oldcontext = CurrentMemoryContext;
- PG_TRY();
- {
- MemoryContext oldcontext2;
+ return true;
+}
- if (rows)
- {
- uint64 i;
-
- /*
- * PyList_New() and PyList_SetItem() use
Py_ssize_t for list
- * size and list indices; so we cannot support
a result larger
- * than PY_SSIZE_T_MAX.
- */
- if (rows > (uint64) PY_SSIZE_T_MAX)
- ereport(ERROR,
-
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("query result
has too many rows to fit in a Python list")));
-
- Py_DECREF(result->rows);
- result->rows = PyList_New(rows);
-
- PLy_input_tuple_funcs(&args, tuptable->tupdesc);
- for (i = 0; i < rows; i++)
- {
- PyObject *row =
PLyDict_FromTuple(&args,
-
tuptable->vals[i],
-
tuptable->tupdesc);
- PyList_SetItem(result->rows, i, row);
- }
- }
+static PyObject *
+PLy_spi_execute_fetch_result(CallbackState *callback, uint64 rows, int status)
+{
+ PLyResultObject *result = (PLyResultObject *) callback->result;
+ /* If status < 0 this stuff would just get thrown away anyway. */
+ if (status > 0)
+ {
+ if (!result)
+ {
/*
- * Save tuple descriptor for later use by result set
metadata
- * functions. Save it in TopMemoryContext so that it
survives
- * outside of an SPI context. We trust that
PLy_result_dealloc()
- * will clean it up when the time is right. (Do this
as late as
- * possible, to minimize the number of ways the tupdesc
could get
- * leaked due to errors.)
+ * This happens if the command returned no results.
Create a dummy result set.
*/
- oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
- result->tupdesc =
CreateTupleDescCopy(tuptable->tupdesc);
- MemoryContextSwitchTo(oldcontext2);
- }
- PG_CATCH();
- {
- MemoryContextSwitchTo(oldcontext);
- MemoryContextDelete(cxt);
- Py_DECREF(result);
- PG_RE_THROW();
+ result = PLyCSNewResult(callback);
+ callback->result = (PyObject *) result;
}
- PG_END_TRY();
- MemoryContextDelete(cxt);
- SPI_freetuptable(tuptable);
+ Py_DECREF(result->status);
+ result->status = PyInt_FromLong(status);
+ Py_DECREF(result->nrows);
+ result->nrows = (rows > (uint64) LONG_MAX) ?
+ PyFloat_FromDouble((double) rows) :
+ PyInt_FromLong((long) rows);
}
return (PyObject *) result;
--
2.11.1
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers