I would like to propose the attached patch implementing autonomous
transactions for discussion and review.

This work was mostly inspired by the discussion about pg_background a
while back [0].  It seemed that most people liked the idea of having
something like that, but couldn't perhaps agree on the final interface.
Most if not all of the preliminary patches in that thread were
committed, but the user interface portions were then abandoned in favor
of other work.  (I'm aware that rebased versions of pg_background
existing.  I have one, too.)

The main use case, in a nutshell, is to be able to commit certain things
independently without having it affected by what happens later to the
current transaction, for example for audit logging.

My patch consists of three major pieces.  (I didn't make them three
separate patches because it will be clear where the boundaries are.)

- A API interface to open a "connection" to a background worker, run
queries, get results: AutonomousSessionStart(), AutonomousSessionEnd(),
AutonomousSessionExecute(), etc.  The communication happens using the
client/server protocol.

- Patches to PL/pgSQL to implement Oracle-style autonomous transaction
blocks:

AS $$
DECLARE
  PRAGMA AUTONOMOUS_TRANSACTION;
BEGIN
  FOR i IN 0..9 LOOP
    START TRANSACTION;
    INSERT INTO test1 VALUES (i);
    IF i % 2 = 0 THEN
        COMMIT;
    ELSE
        ROLLBACK;
    END IF;
  END LOOP;

  RETURN 42;
END;
$$;

This is very incomplete and has some open technical issues that I will
discuss below.  But those are all issues of PL/pgSQL, not really issues
of how autonomous sessions work.

Basically, a block that is declared with that pragma uses the autonomous
C API instead of SPI to do its things.

- Patches to PL/Python to implement a context manager for autonomous
sessions (similar to how subtransactions work there):

with plpy.autonomous() as a:
    for i in range(0, 10):
        a.execute("BEGIN")
        a.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
        if i % 2 == 0:
            a.execute("COMMIT")
        else:
            a.execute("ROLLBACK")

This works quite well, except perhaps some tuning with memory management
and some caching and some refactoring.

While the PL/pgSQL work is more of a top-level goal, I added the
PL/Python implementation because it is easier to map the C API straight
out to something more accessible, so testing it out is much easier.


The main technical problem I had with PL/pgSQL is how to parse named
parameters.  If you're in PL/Python, say, you do

    plan = a.prepare("INSERT INTO test1 (a, b) VALUES ($1, $2)",
                     ["int4", "text"])

and that works fine, because it maps straight to the client/server
protocol.  But in PL/pgSQL, you will want something like

    DECLARE
      x, y ...
    BEGIN
      INSERT INTO test1 (a, b) VALUES (x, y)

When running in-process (SPI), we install parser hooks that allow the
parser to check back into PL/pgSQL about whether x, y are variables and
what they mean.  When we run in an autonomous session, we don't have
that available.  So my idea was to extend the protocol Parse message to
allow sending a symbol table instead of parameter types.  So instead of
saying, there are two parameters and here are their types, I would send
a list of symbols and types, and the server would respond to the Parse
message with some kind of information about which symbols it found.  I
think that would work, but I got lost in the weeds and didn't get very
far.  But you can see some of that in the code.  If anyone has other
ideas, I'd be very interested.


Other than that, I think there are also other bits and pieces that are
worth looking at, and perhaps have some overlap with other efforts, such as:

- Refining the internal APIs for running queries, with more flexibility
than SPI.  There have recently been discussions about that.  I just used
whatever was in tcop/postgres.c directly, like pg_background does, and
that seems mostly fine, but if there are other ideas, they would be
useful for this, too.

- An exception to the "mostly fine" is that the desirable handling of
log_statement, log_duration, log_min_duration_statement for
non-top-level execution is unclear.

- The autonomous session API could also be useful for other things, such
as perhaps implementing a variant of pg_background on top of them, or
doing other asynchronous or background execution schemes.  So input on
that is welcome.

- There is some overlap with the protocol handling for parallel query,
including things like error propagation, notify handling, encoding
handling.  I suspect that other background workers will need similar
facilities, so we could simplify some of that.

- Client encoding in particular was recently discussed for parallel
query.  The problem with the existing solution is that it makes
assign_client_encoding() require hardcoded knowledge of all relevant
background worker types.  So I tried a more general solution, with a hook.

- I added new test files in the plpgsql directory.  The main test for
plpgsql runs as part of the main test suite.  Maybe we want to move that
to the plpgsql directory as well.

- More guidance for using some of the background worker and shared
memory queue facilities.  For example, I don't know what a good queue
size would be.

- Both PL/pgSQL and PL/Python expose some details of SPI in ways that
make it difficult to run some things not through SPI.  For example,
return codes are exposed directly by PL/Python.  PL/pgSQL is heavily
tied to the API flow of SPI.  It's fixable, but it will be some work.  I
had originally wanted to hide the autonomous session API inside SPI or
make it fully compatible with SPI, but that was quickly thrown out.
PL/Python now contains some ugly code to make certain things match up so
that existing code can be used.  It's not always pretty.

- The patch "Set log_line_prefix and application name in test drivers"
(https://commitfest.postgresql.org/10/717/) is helpful in testing and
debugging this.


[0]:
https://www.postgresql.org/message-id/flat/CA+Tgmoam66dTzCP8N2cRcS6S6dBMFX+JMba+mDf68H=kakn...@mail.gmail.com

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index cec37ce..892d807 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -114,7 +114,7 @@ PrepareQuery(PrepareStmt *stmt, const char *queryString)
 	 */
 	query = parse_analyze_varparams((Node *) copyObject(stmt->query),
 									queryString,
-									&argtypes, &nargs);
+									&argtypes, &nargs, NULL);
 
 	/*
 	 * Check that all parameter types were determined.
diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c
index defafa5..a522c69 100644
--- a/src/backend/commands/variable.c
+++ b/src/backend/commands/variable.c
@@ -674,12 +674,17 @@ show_random_seed(void)
  * SET CLIENT_ENCODING
  */
 
+void (*check_client_encoding_hook)(void);
+
 bool
 check_client_encoding(char **newval, void **extra, GucSource source)
 {
 	int			encoding;
 	const char *canonical_name;
 
+	if (check_client_encoding_hook)
+		check_client_encoding_hook();
+
 	/* Look up the encoding by name */
 	encoding = pg_valid_client_encoding(*newval);
 	if (encoding < 0)
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index bfe66c6..7c7dc92 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -47,6 +47,11 @@ static PQcommMethods PqCommMqMethods = {
 	mq_endcopyout
 };
 
+static PQcommMethods *save_PqCommMethods;
+static CommandDest save_whereToSendOutput;
+static ProtocolVersion save_FrontendProtocol;
+static dsm_segment *save_seg;
+
 /*
  * Arrange to redirect frontend/backend protocol messages to a shared-memory
  * message queue.
@@ -54,12 +59,30 @@ static PQcommMethods PqCommMqMethods = {
 void
 pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
 {
+	save_PqCommMethods = PqCommMethods;
+	save_whereToSendOutput = whereToSendOutput;
+	save_FrontendProtocol = FrontendProtocol;
+
 	PqCommMethods = &PqCommMqMethods;
 	pq_mq = shm_mq_get_queue(mqh);
 	pq_mq_handle = mqh;
 	whereToSendOutput = DestRemote;
 	FrontendProtocol = PG_PROTOCOL_LATEST;
 	on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
+
+	save_seg = seg;
+}
+
+void
+pq_stop_redirect_to_shm_mq(void)
+{
+	cancel_on_dsm_detach(save_seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
+	PqCommMethods = save_PqCommMethods;
+	whereToSendOutput = save_whereToSendOutput;
+	FrontendProtocol = save_FrontendProtocol;
+	pq_mq = NULL;
+	pq_mq_handle = NULL;
+	save_seg = NULL;
 }
 
 /*
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index eac86cc..5b94d85 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -124,7 +124,7 @@ parse_analyze(Node *parseTree, const char *sourceText,
  */
 Query *
 parse_analyze_varparams(Node *parseTree, const char *sourceText,
-						Oid **paramTypes, int *numParams)
+						Oid **paramTypes, int *numParams, const char *paramNames[])
 {
 	ParseState *pstate = make_parsestate(NULL);
 	Query	   *query;
@@ -133,7 +133,7 @@ parse_analyze_varparams(Node *parseTree, const char *sourceText,
 
 	pstate->p_sourcetext = sourceText;
 
-	parse_variable_parameters(pstate, paramTypes, numParams);
+	parse_variable_parameters(pstate, paramTypes, numParams, paramNames);
 
 	query = transformTopLevelStmt(pstate, parseTree);
 
diff --git a/src/backend/parser/parse_param.c b/src/backend/parser/parse_param.c
index b402843..c459c00 100644
--- a/src/backend/parser/parse_param.c
+++ b/src/backend/parser/parse_param.c
@@ -49,8 +49,10 @@ typedef struct VarParamState
 {
 	Oid		  **paramTypes;		/* array of parameter type OIDs */
 	int		   *numParams;		/* number of array entries */
+	const char **paramNames;
 } VarParamState;
 
+static Node *variable_post_column_ref_hook(ParseState *pstate, ColumnRef *cref, Node *var);
 static Node *fixed_paramref_hook(ParseState *pstate, ParamRef *pref);
 static Node *variable_paramref_hook(ParseState *pstate, ParamRef *pref);
 static Node *variable_coerce_param_hook(ParseState *pstate, Param *param,
@@ -81,17 +83,58 @@ parse_fixed_parameters(ParseState *pstate,
  */
 void
 parse_variable_parameters(ParseState *pstate,
-						  Oid **paramTypes, int *numParams)
+						  Oid **paramTypes, int *numParams, const char *paramNames[])
 {
 	VarParamState *parstate = palloc(sizeof(VarParamState));
 
 	parstate->paramTypes = paramTypes;
 	parstate->numParams = numParams;
+	parstate->paramNames = paramNames;
+	pstate->p_post_columnref_hook = variable_post_column_ref_hook;
 	pstate->p_ref_hook_state = (void *) parstate;
 	pstate->p_paramref_hook = variable_paramref_hook;
 	pstate->p_coerce_param_hook = variable_coerce_param_hook;
 }
 
+static Node *
+variable_post_column_ref_hook(ParseState *pstate, ColumnRef *cref, Node *var)
+{
+	VarParamState *parstate = (VarParamState *) pstate->p_ref_hook_state;
+
+	/* already resolved */
+	if (var != NULL)
+		return NULL;
+
+	/* did not supply parameter names */
+	if (!parstate->paramNames)
+		return NULL;
+
+	if (list_length(cref->fields) == 1)
+	{
+		Node	   *field1 = (Node *) linitial(cref->fields);
+		char	   *name1;
+		int			i;
+		Param	   *param;
+
+		Assert(IsA(field1, String));
+		name1 = strVal(field1);
+		for (i = 0; i < *parstate->numParams; i++)
+			if (strcmp(name1, parstate->paramNames[i]) == 0)
+			{
+				param = makeNode(Param);
+				param->paramkind = PARAM_EXTERN;
+				param->paramid = i + 1;
+				param->paramtype = *parstate->paramTypes[i];
+				param->paramtypmod = -1;
+				param->paramcollid = InvalidOid;
+				param->location = -1;
+				return (Node *) param;
+			}
+	}
+
+	return NULL;
+}
+
 /*
  * Transform a ParamRef using fixed parameter types.
  */
diff --git a/src/backend/tcop/Makefile b/src/backend/tcop/Makefile
index 674302f..c3b337e 100644
--- a/src/backend/tcop/Makefile
+++ b/src/backend/tcop/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/tcop
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS= dest.o fastpath.o postgres.o pquery.o utility.o
+OBJS= autonomous.o dest.o fastpath.o postgres.o pquery.o utility.o
 
 ifneq (,$(filter $(PORTNAME),cygwin win32))
 override CPPFLAGS += -DWIN32_STACK_RLIMIT=$(WIN32_STACK_RLIMIT)
diff --git a/src/backend/tcop/autonomous.c b/src/backend/tcop/autonomous.c
new file mode 100644
index 0000000..bf78382
--- /dev/null
+++ b/src/backend/tcop/autonomous.c
@@ -0,0 +1,882 @@
+/*--------------------------------------------------------------------------
+ *
+ * autonomous.c
+ *		Run SQL commands using a background worker.
+ *
+ * Copyright (C) 2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/backend/tcop/autonomous.c
+ *
+ *
+ * This implements a C API to open an autonomous session and run SQL queries
+ * in it.  The session looks much like a normal database connection, but it is
+ * always to the same database, and there is no authentication needed.  The
+ * "backend" for that connection is a background worker.  The normal backend
+ * and the autonomous session worker communicate over the normal FE/BE
+ * protocol.
+ *
+ * Types:
+ *
+ * AutonomousSession -- opaque connection handle
+ * AutonomousPreparedStatement -- opaque prepared statement handle
+ * AutonomousResult -- query result
+ *
+ * Functions:
+ *
+ * AutonomousSessionStart() -- start a session (launches background worker)
+ * and return a handle
+ *
+ * AutonomousSessionEnd() -- close session and free resources
+ *
+ * AutonomousSessionExecute() -- run SQL string and return result (rows or
+ * status)
+ *
+ * AutonomousSessionPrepare() -- prepare an SQL string for subsequent
+ * execution
+ *
+ * AutonomousSessionExecutePrepared() -- run prepared statement
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "access/tupdesc.h"
+#include "access/xact.h"
+#include "commands/async.h"
+#include "commands/variable.h"
+#include "lib/stringinfo.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "nodes/pg_list.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "storage/dsm.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "tcop/autonomous.h"
+#include "tcop/tcopprot.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+/* Table-of-contents constants for our dynamic shared memory segment. */
+#define AUTONOMOUS_MAGIC				0x50674267
+
+#define AUTONOMOUS_KEY_FIXED_DATA		0
+#define AUTONOMOUS_KEY_GUC				1
+#define AUTONOMOUS_KEY_COMMAND_QUEUE	2
+#define AUTONOMOUS_KEY_RESPONSE_QUEUE	3
+#define AUTONOMOUS_NKEYS				4
+
+#define AUTONOMOUS_QUEUE_SIZE			16384
+
+/* Fixed-size data passed via our dynamic shared memory segment. */
+typedef struct autonomous_session_fixed_data
+{
+	Oid database_id;
+	Oid authenticated_user_id;
+	Oid current_user_id;
+	int sec_context;
+} autonomous_session_fixed_data;
+
+struct AutonomousSession
+{
+	dsm_segment *seg;
+	BackgroundWorkerHandle *worker_handle;
+	shm_mq_handle *command_qh;
+	shm_mq_handle *response_qh;
+	int		transaction_status;
+};
+
+struct AutonomousPreparedStatement
+{
+	AutonomousSession *session;
+	Oid		   *argtypes;
+	TupleDesc	tupdesc;
+};
+
+static void autonomous_worker_main(Datum main_arg);
+static void shm_mq_receive_stringinfo(shm_mq_handle *qh, StringInfoData *msg);
+static void autonomous_check_client_encoding_hook(void);
+static TupleDesc TupleDesc_from_RowDescription(StringInfo msg);
+static HeapTuple HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg);
+static void forward_NotifyResponse(StringInfo msg);
+static void rethrow_errornotice(StringInfo msg);
+static void invalid_protocol_message(char msgtype) pg_attribute_noreturn();
+
+
+AutonomousSession *
+AutonomousSessionStart(void)
+{
+	BackgroundWorker worker;
+	pid_t		pid;
+	AutonomousSession *session;
+	shm_toc_estimator e;
+	Size		segsize;
+	Size		guc_len;
+	char	   *gucstate;
+	dsm_segment *seg;
+	shm_toc	   *toc;
+	autonomous_session_fixed_data *fdata;
+	shm_mq	   *command_mq;
+	shm_mq	   *response_mq;
+	BgwHandleStatus bgwstatus;
+	StringInfoData msg;
+	char		msgtype;
+
+	session = palloc(sizeof(*session));
+
+	shm_toc_initialize_estimator(&e);
+	shm_toc_estimate_chunk(&e, sizeof(autonomous_session_fixed_data));
+	shm_toc_estimate_chunk(&e, AUTONOMOUS_QUEUE_SIZE);
+	shm_toc_estimate_chunk(&e, AUTONOMOUS_QUEUE_SIZE);
+	guc_len = EstimateGUCStateSpace();
+	shm_toc_estimate_chunk(&e, guc_len);
+	shm_toc_estimate_keys(&e, AUTONOMOUS_NKEYS);
+	segsize = shm_toc_estimate(&e);
+	seg = dsm_create(segsize, 0);
+
+	session->seg = seg;
+
+	toc = shm_toc_create(AUTONOMOUS_MAGIC, dsm_segment_address(seg), segsize);
+
+	/* Store fixed-size data in dynamic shared memory. */
+	fdata = shm_toc_allocate(toc, sizeof(*fdata));
+	fdata->database_id = MyDatabaseId;
+	fdata->authenticated_user_id = GetAuthenticatedUserId();
+	GetUserIdAndSecContext(&fdata->current_user_id, &fdata->sec_context);
+	shm_toc_insert(toc, AUTONOMOUS_KEY_FIXED_DATA, fdata);
+
+	/* Store GUC state in dynamic shared memory. */
+	gucstate = shm_toc_allocate(toc, guc_len);
+	SerializeGUCState(guc_len, gucstate);
+	shm_toc_insert(toc, AUTONOMOUS_KEY_GUC, gucstate);
+
+	command_mq = shm_mq_create(shm_toc_allocate(toc, AUTONOMOUS_QUEUE_SIZE),
+							   AUTONOMOUS_QUEUE_SIZE);
+	shm_toc_insert(toc, AUTONOMOUS_KEY_COMMAND_QUEUE, command_mq);
+	shm_mq_set_sender(command_mq, MyProc);
+
+	response_mq = shm_mq_create(shm_toc_allocate(toc, AUTONOMOUS_QUEUE_SIZE),
+								AUTONOMOUS_QUEUE_SIZE);
+	shm_toc_insert(toc, AUTONOMOUS_KEY_RESPONSE_QUEUE, response_mq);
+	shm_mq_set_receiver(response_mq, MyProc);
+
+	session->command_qh = shm_mq_attach(command_mq, seg, NULL);
+	session->response_qh = shm_mq_attach(response_mq, seg, NULL);
+
+	worker.bgw_flags =
+		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	worker.bgw_start_time = BgWorkerStart_ConsistentState;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	worker.bgw_main = autonomous_worker_main;
+	snprintf(worker.bgw_name, BGW_MAXLEN, "autonomous session by PID %d", MyProcPid);
+	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+	worker.bgw_notify_pid = MyProcPid;
+
+	if (!RegisterDynamicBackgroundWorker(&worker, &session->worker_handle))
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("could not register background process"),
+				 errhint("You might need to increase max_worker_processes.")));
+
+	shm_mq_set_handle(session->command_qh, session->worker_handle);
+	shm_mq_set_handle(session->response_qh, session->worker_handle);
+
+	bgwstatus = WaitForBackgroundWorkerStartup(session->worker_handle, &pid);
+	if (bgwstatus != BGWH_STARTED)
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("could not start background worker")));
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'E':
+			case 'N':
+				rethrow_errornotice(&msg);
+				break;
+			case 'Z':
+				session->transaction_status = pq_getmsgbyte(&msg);
+				pq_getmsgend(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'Z');
+
+	return session;
+}
+
+
+void
+AutonomousSessionEnd(AutonomousSession *session)
+{
+	StringInfoData msg;
+
+	if (session->transaction_status == 'T')
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("autonomous session ended with transaction block open")));
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'X');
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	pfree(session->worker_handle);
+	dsm_detach(session->seg);
+	pfree(session);
+}
+
+
+AutonomousResult *
+AutonomousSessionExecute(AutonomousSession *session, const char *sql)
+{
+	StringInfoData msg;
+	char		msgtype;
+	AutonomousResult *result;
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'Q');
+	pq_sendstring(&msg, sql);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	result = palloc0(sizeof(*result));
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'A':
+				forward_NotifyResponse(&msg);
+				break;
+			case 'C':
+				{
+					const char *tag = pq_getmsgstring(&msg);
+					result->command = pstrdup(tag);
+					pq_getmsgend(&msg);
+					break;
+				}
+			case 'D':
+				if (!result->tupdesc)
+					elog(ERROR, "no T before D");
+				result->tuples = lappend(result->tuples, HeapTuple_from_DataRow(result->tupdesc, &msg));
+				pq_getmsgend(&msg);
+				break;
+			case 'E':
+			case 'N':
+				rethrow_errornotice(&msg);
+				break;
+			case 'T':
+				if (result->tupdesc)
+					elog(ERROR, "already received a T message");
+				result->tupdesc = TupleDesc_from_RowDescription(&msg);
+				pq_getmsgend(&msg);
+				break;
+			case 'Z':
+				session->transaction_status = pq_getmsgbyte(&msg);
+				pq_getmsgend(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'Z');
+
+	return result;
+}
+
+
+AutonomousPreparedStatement *
+AutonomousSessionPrepare(AutonomousSession *session, const char *sql, int16 nargs,
+						 Oid argtypes[], const char *argnames[])
+{
+	AutonomousPreparedStatement *result;
+	StringInfoData msg;
+	int16		i;
+	char		msgtype;
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'P');
+	pq_sendstring(&msg, "");
+	pq_sendstring(&msg, sql);
+	pq_sendint(&msg, nargs, 2);
+	for (i = 0; i < nargs; i++)
+		pq_sendint(&msg, argtypes[i], 4);
+	if (argnames)
+		for (i = 0; i < nargs; i++)
+			pq_sendstring(&msg, argnames[i]);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	result = palloc0(sizeof(*result));
+	result->session = session;
+	result->argtypes = palloc(nargs * sizeof(*result->argtypes));
+	memcpy(result->argtypes, argtypes, nargs * sizeof(*result->argtypes));
+
+	shm_mq_receive_stringinfo(session->response_qh, &msg);
+	msgtype = pq_getmsgbyte(&msg);
+
+	switch (msgtype)
+	{
+		case '1':
+			break;
+		case 'E':
+			rethrow_errornotice(&msg);
+			break;
+		default:
+			invalid_protocol_message(msgtype);
+			break;
+	}
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'D');
+	pq_sendbyte(&msg, 'S');
+	pq_sendstring(&msg, "");
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'A':
+				forward_NotifyResponse(&msg);
+				break;
+			case 'E':
+				rethrow_errornotice(&msg);
+				break;
+			case 'n':
+				break;
+			case 't':
+				/* ignore for now */
+				break;
+			case 'T':
+				if (result->tupdesc)
+					elog(ERROR, "already received a T message");
+				result->tupdesc = TupleDesc_from_RowDescription(&msg);
+				pq_getmsgend(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'n' && msgtype != 'T');
+
+	return result;
+}
+
+
+AutonomousResult *
+AutonomousSessionExecutePrepared(AutonomousPreparedStatement *stmt, int16 nargs, Datum *values, bool *nulls)
+{
+	AutonomousSession *session;
+	StringInfoData msg;
+	AutonomousResult *result;
+	char		msgtype;
+	int16		i;
+
+	session = stmt->session;
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'B');
+	pq_sendstring(&msg, "");
+	pq_sendstring(&msg, "");
+	pq_sendint(&msg, 1, 2);  /* number of parameter format codes */
+	pq_sendint(&msg, 1, 2);
+	pq_sendint(&msg, nargs, 2);  /* number of parameter values */
+	for (i = 0; i < nargs; i++)
+	{
+		if (nulls[i])
+			pq_sendint(&msg, -1, 4);
+		else
+		{
+			Oid			typsend;
+			bool		typisvarlena;
+			bytea	   *outputbytes;
+
+			getTypeBinaryOutputInfo(stmt->argtypes[i], &typsend, &typisvarlena);
+			outputbytes = OidSendFunctionCall(typsend, values[i]);
+			pq_sendint(&msg, VARSIZE(outputbytes) - VARHDRSZ, 4);
+			pq_sendbytes(&msg, VARDATA(outputbytes), VARSIZE(outputbytes) - VARHDRSZ);
+			pfree(outputbytes);
+		}
+	}
+	pq_sendint(&msg, 1, 2);  /* number of result column format codes */
+	pq_sendint(&msg, 1, 2);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	shm_mq_receive_stringinfo(session->response_qh, &msg);
+	msgtype = pq_getmsgbyte(&msg);
+
+	switch (msgtype)
+	{
+		case '2':
+			break;
+		case 'E':
+			rethrow_errornotice(&msg);
+			break;
+		default:
+			invalid_protocol_message(msgtype);
+			break;
+	}
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'E');
+	pq_sendstring(&msg, "");
+	pq_sendint(&msg, 0, 4);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	result = palloc0(sizeof(*result));
+	result->tupdesc = stmt->tupdesc;
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'A':
+				forward_NotifyResponse(&msg);
+				break;
+			case 'C':
+				{
+					const char *tag = pq_getmsgstring(&msg);
+					result->command = pstrdup(tag);
+					pq_getmsgend(&msg);
+					break;
+				}
+			case 'D':
+				if (!stmt->tupdesc)
+					elog(ERROR, "did not expect any rows");
+				result->tuples = lappend(result->tuples, HeapTuple_from_DataRow(stmt->tupdesc, &msg));
+				pq_getmsgend(&msg);
+				break;
+			case 'E':
+			case 'N':
+				rethrow_errornotice(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'C');
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_putemptymessage('S');
+	pq_stop_redirect_to_shm_mq();
+
+	shm_mq_receive_stringinfo(session->response_qh, &msg);
+	msgtype = pq_getmsgbyte(&msg);
+
+	switch (msgtype)
+	{
+		case 'A':
+			forward_NotifyResponse(&msg);
+			break;
+		case 'Z':
+			session->transaction_status = pq_getmsgbyte(&msg);
+			pq_getmsgend(&msg);
+			break;
+		default:
+			invalid_protocol_message(msgtype);
+			break;
+	}
+
+	return result;
+}
+
+
+static void
+autonomous_worker_main(Datum main_arg)
+{
+	dsm_segment *seg;
+	shm_toc	   *toc;
+	autonomous_session_fixed_data *fdata;
+	char	   *gucstate;
+	shm_mq	   *command_mq;
+	shm_mq	   *response_mq;
+	shm_mq_handle *command_qh;
+	shm_mq_handle *response_qh;
+	StringInfoData msg;
+	char		msgtype;
+
+	pqsignal(SIGTERM, die);
+	BackgroundWorkerUnblockSignals();
+
+	/* Set up a memory context and resource owner. */
+	Assert(CurrentResourceOwner == NULL);
+	CurrentResourceOwner = ResourceOwnerCreate(NULL, "autonomous");
+	CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
+												 "autonomous session",
+												 ALLOCSET_DEFAULT_MINSIZE,
+												 ALLOCSET_DEFAULT_INITSIZE,
+												 ALLOCSET_DEFAULT_MAXSIZE);
+
+	seg = dsm_attach(DatumGetInt32(main_arg));
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("could not map dynamic shared memory segment")));
+
+	toc = shm_toc_attach(AUTONOMOUS_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("bad magic number in dynamic shared memory segment")));
+
+	/* Find data structures in dynamic shared memory. */
+	fdata = shm_toc_lookup(toc, AUTONOMOUS_KEY_FIXED_DATA);
+
+	gucstate = shm_toc_lookup(toc, AUTONOMOUS_KEY_GUC);
+
+	command_mq = shm_toc_lookup(toc, AUTONOMOUS_KEY_COMMAND_QUEUE);
+	shm_mq_set_receiver(command_mq, MyProc);
+	command_qh = shm_mq_attach(command_mq, seg, NULL);
+
+	response_mq = shm_toc_lookup(toc, AUTONOMOUS_KEY_RESPONSE_QUEUE);
+	shm_mq_set_sender(response_mq, MyProc);
+	response_qh = shm_mq_attach(response_mq, seg, NULL);
+
+	pq_redirect_to_shm_mq(seg, response_qh);
+	BackgroundWorkerInitializeConnectionByOid(fdata->database_id,
+											  fdata->authenticated_user_id);
+
+	SetClientEncoding(GetDatabaseEncoding());
+
+	StartTransactionCommand();
+	RestoreGUCState(gucstate);
+	CommitTransactionCommand();
+
+	process_session_preload_libraries();
+
+	SetUserIdAndSecContext(fdata->current_user_id, fdata->sec_context);
+
+	whereToSendOutput = DestRemote;
+	ReadyForQuery(whereToSendOutput);
+
+	MessageContext = AllocSetContextCreate(TopMemoryContext,
+										   "MessageContext",
+										   ALLOCSET_DEFAULT_MINSIZE,
+										   ALLOCSET_DEFAULT_INITSIZE,
+										   ALLOCSET_DEFAULT_MAXSIZE);
+
+	do
+	{
+		MemoryContextSwitchTo(MessageContext);
+		MemoryContextResetAndDeleteChildren(MessageContext);
+
+		ProcessCompletedNotifies();
+		pgstat_report_stat(false);
+		pgstat_report_activity(STATE_IDLE, NULL);
+
+		shm_mq_receive_stringinfo(command_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'B':
+				{
+					SetCurrentStatementStartTimestamp();
+					exec_bind_message(&msg);
+					break;
+				}
+			case 'D':
+				{
+					int         describe_type;
+					const char *describe_target;
+
+					SetCurrentStatementStartTimestamp();
+
+					describe_type = pq_getmsgbyte(&msg);
+					describe_target = pq_getmsgstring(&msg);
+					pq_getmsgend(&msg);
+
+					switch (describe_type)
+					{
+						case 'S':
+							exec_describe_statement_message(describe_target);
+							break;
+#ifdef TODO
+						case 'P':
+							exec_describe_portal_message(describe_target);
+							break;
+#endif
+						default:
+							ereport(ERROR,
+									(errcode(ERRCODE_PROTOCOL_VIOLATION),
+									 errmsg("invalid DESCRIBE message subtype %d",
+											describe_type)));
+							break;
+					}
+				}
+				break;
+			case 'E':
+				{
+					const char *portal_name;
+					int			max_rows;
+
+					SetCurrentStatementStartTimestamp();
+
+					portal_name = pq_getmsgstring(&msg);
+					max_rows = pq_getmsgint(&msg, 4);
+					pq_getmsgend(&msg);
+
+					exec_execute_message(portal_name, max_rows);
+				}
+				break;
+
+			case 'P':
+				{
+					const char *stmt_name;
+					const char *query_string;
+					int			numParams;
+					Oid		   *paramTypes = NULL;
+					const char **paramNames = NULL;
+
+					SetCurrentStatementStartTimestamp();
+
+					stmt_name = pq_getmsgstring(&msg);
+					query_string = pq_getmsgstring(&msg);
+					numParams = pq_getmsgint(&msg, 2);
+					if (numParams > 0)
+					{
+						int			i;
+
+						paramTypes = palloc(numParams * sizeof(Oid));
+						for (i = 0; i < numParams; i++)
+							paramTypes[i] = pq_getmsgint(&msg, 4);
+					}
+					/* If data left in message, read parameter names. */
+					if (msg.cursor != msg.len)
+					{
+						int			i;
+
+						paramNames = palloc(numParams * sizeof(char *));
+						for (i = 0; i < numParams; i++)
+							paramNames[i] = pq_getmsgstring(&msg);
+					}
+					pq_getmsgend(&msg);
+
+					exec_parse_message(query_string, stmt_name,
+									   paramTypes, numParams, paramNames);
+					break;
+				}
+			case 'Q':
+				{
+					const char *sql;
+					int save_log_statement;
+					bool save_log_duration;
+					int save_log_min_duration_statement;
+
+					sql = pq_getmsgstring(&msg);
+					pq_getmsgend(&msg);
+
+					/* XXX room for improvement */
+					save_log_statement = log_statement;
+					save_log_duration = log_duration;
+					save_log_min_duration_statement = log_min_duration_statement;
+
+					check_client_encoding_hook = autonomous_check_client_encoding_hook;
+					log_statement = LOGSTMT_NONE;
+					log_duration = false;
+					log_min_duration_statement = -1;
+
+					SetCurrentStatementStartTimestamp();
+					exec_simple_query(sql, 1);
+
+					log_statement = save_log_statement;
+					log_duration = save_log_duration;
+					log_min_duration_statement = save_log_min_duration_statement;
+					check_client_encoding_hook = NULL;
+
+					ReadyForQuery(whereToSendOutput);
+					break;
+				}
+			case 'S':
+				{
+					pq_getmsgend(&msg);
+					finish_xact_command();
+					ReadyForQuery(whereToSendOutput);
+					break;
+				}
+			case 'X':
+				break;
+			default:
+				ereport(ERROR,
+						(errcode(ERRCODE_PROTOCOL_VIOLATION),
+						 errmsg("invalid protocol message type from autonomous session leader: %c",
+								msgtype)));
+				break;
+		}
+	}
+	while (msgtype != 'X');
+}
+
+
+static void
+shm_mq_receive_stringinfo(shm_mq_handle *qh, StringInfoData *msg)
+{
+	shm_mq_result res;
+	Size		nbytes;
+	void	   *data;
+
+	res = shm_mq_receive(qh, &nbytes, &data, false);
+	if (res != SHM_MQ_SUCCESS)
+		elog(ERROR, "shm_mq_receive failed: %d", res);
+
+	initStringInfo(msg);
+	appendBinaryStringInfo(msg, data, nbytes);
+}
+
+
+static void
+autonomous_check_client_encoding_hook(void)
+{
+	elog(ERROR, "cannot set client encoding in autonomous session");
+}
+
+
+static TupleDesc
+TupleDesc_from_RowDescription(StringInfo msg)
+{
+	TupleDesc	tupdesc;
+	int16		natts = pq_getmsgint(msg, 2);
+	int16		i;
+
+	tupdesc = CreateTemplateTupleDesc(natts, false);
+	for (i = 0; i < natts; i++)
+	{
+		const char *colname;
+		Oid     type_oid;
+		int32	typmod;
+		int16	format;
+
+		colname = pq_getmsgstring(msg);
+		(void) pq_getmsgint(msg, 4);   /* table OID */
+		(void) pq_getmsgint(msg, 2);   /* table attnum */
+		type_oid = pq_getmsgint(msg, 4);
+		(void) pq_getmsgint(msg, 2);   /* type length */
+		typmod = pq_getmsgint(msg, 4);
+		format = pq_getmsgint(msg, 2);
+		(void) format;
+#ifdef TODO
+		/* XXX The protocol sometimes sends 0 (text) if the format is not
+		 * determined yet.  We always use binary, so this check is probably
+		 * not useful. */
+		if (format != 1)
+			elog(ERROR, "format must be binary");
+#endif
+
+		TupleDescInitEntry(tupdesc, i + 1, colname, type_oid, typmod, 0);
+	}
+	return tupdesc;
+}
+
+
+static HeapTuple
+HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg)
+{
+	int16		natts = pq_getmsgint(msg, 2);
+	int16		i;
+	Datum	   *values;
+	bool	   *nulls;
+	StringInfoData buf;
+
+	Assert(tupdesc);
+
+	if (natts != tupdesc->natts)
+		elog(ERROR, "malformed DataRow");
+
+	values = palloc(natts * sizeof(*values));
+	nulls = palloc(natts * sizeof(*nulls));
+	initStringInfo(&buf);
+
+	for (i = 0; i < natts; i++)
+	{
+		int32 len = pq_getmsgint(msg, 4);
+
+		if (len < 0)
+			nulls[i] = true;
+		else
+		{
+			Oid recvid;
+			Oid typioparams;
+
+			nulls[i] = false;
+
+			getTypeBinaryInputInfo(tupdesc->attrs[i]->atttypid,
+								   &recvid,
+								   &typioparams);
+			resetStringInfo(&buf);
+			appendBinaryStringInfo(&buf, pq_getmsgbytes(msg, len), len);
+			values[i] = OidReceiveFunctionCall(recvid, &buf, typioparams,
+											   tupdesc->attrs[i]->atttypmod);
+		}
+	}
+
+	return heap_form_tuple(tupdesc, values, nulls);
+}
+
+
+static void
+forward_NotifyResponse(StringInfo msg)
+{
+	int32	pid;
+	const char *channel;
+	const char *payload;
+
+	pid = pq_getmsgint(msg, 4);
+	channel = pq_getmsgrawstring(msg);
+	payload = pq_getmsgrawstring(msg);
+	pq_endmessage(msg);
+
+	NotifyMyFrontEnd(channel, payload, pid);
+}
+
+
+static void
+rethrow_errornotice(StringInfo msg)
+{
+	ErrorData   edata;
+
+	pq_parse_errornotice(msg, &edata);
+	edata.elevel = Min(edata.elevel, ERROR);
+	ThrowErrorData(&edata);
+}
+
+
+static void
+invalid_protocol_message(char msgtype)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_PROTOCOL_VIOLATION),
+			 errmsg("invalid protocol message type from autonomous session: %c",
+					msgtype)));
+}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 98ccbbb..2a7d4d7 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -180,8 +180,6 @@ static int	errdetail_execute(List *raw_parsetree_list);
 static int	errdetail_params(ParamListInfo params);
 static int	errdetail_abort(void);
 static int	errdetail_recovery_conflict(void);
-static void start_xact_command(void);
-static void finish_xact_command(void);
 static bool IsTransactionExitStmt(Node *parsetree);
 static bool IsTransactionExitStmtList(List *parseTrees);
 static bool IsTransactionStmtList(List *parseTrees);
@@ -869,8 +867,8 @@ pg_plan_queries(List *querytrees, int cursorOptions, ParamListInfo boundParams)
  *
  * Execute a "simple Query" protocol message.
  */
-static void
-exec_simple_query(const char *query_string)
+void
+exec_simple_query(const char *query_string, int16 format)
 {
 	CommandDest dest = whereToSendOutput;
 	MemoryContext oldcontext;
@@ -963,7 +961,6 @@ exec_simple_query(const char *query_string)
 				   *plantree_list;
 		Portal		portal;
 		DestReceiver *receiver;
-		int16		format;
 
 		/*
 		 * Get the command name for use in status display (it also becomes the
@@ -1054,6 +1051,8 @@ exec_simple_query(const char *query_string)
 		 */
 		PortalStart(portal, NULL, 0, InvalidSnapshot);
 
+		if (format < 0)
+		{
 		/*
 		 * Select the appropriate output format: text unless we are doing a
 		 * FETCH from a binary cursor.  (Pretty grotty to have to do this here
@@ -1074,6 +1073,7 @@ exec_simple_query(const char *query_string)
 					format = 1; /* BINARY */
 			}
 		}
+		}
 		PortalSetResultFormat(portal, 1, &format);
 
 		/*
@@ -1185,11 +1185,12 @@ exec_simple_query(const char *query_string)
  *
  * Execute a "Parse" protocol message.
  */
-static void
+void
 exec_parse_message(const char *query_string,	/* string to execute */
 				   const char *stmt_name,		/* name for prepared stmt */
-				   Oid *paramTypes,		/* parameter types */
-				   int numParams)		/* number of parameters */
+				   Oid paramTypes[],		/* parameter types */
+				   int numParams,		/* number of parameters */
+				   const char *paramNames[])
 {
 	MemoryContext unnamed_stmt_context = NULL;
 	MemoryContext oldcontext;
@@ -1328,7 +1329,8 @@ exec_parse_message(const char *query_string,	/* string to execute */
 		query = parse_analyze_varparams(raw_parse_tree,
 										query_string,
 										&paramTypes,
-										&numParams);
+										&numParams,
+										paramNames);
 
 		/*
 		 * Check all parameter types got determined.
@@ -1447,7 +1449,7 @@ exec_parse_message(const char *query_string,	/* string to execute */
  *
  * Process a "Bind" message to create a portal from a prepared statement
  */
-static void
+void
 exec_bind_message(StringInfo input_message)
 {
 	const char *portal_name;
@@ -1829,7 +1831,7 @@ exec_bind_message(StringInfo input_message)
  *
  * Process an "Execute" message for a portal
  */
-static void
+void
 exec_execute_message(const char *portal_name, long max_rows)
 {
 	CommandDest dest;
@@ -2278,7 +2280,7 @@ errdetail_recovery_conflict(void)
  *
  * Process a "Describe" message for a prepared statement
  */
-static void
+void
 exec_describe_statement_message(const char *stmt_name)
 {
 	CachedPlanSource *psrc;
@@ -2422,7 +2424,7 @@ exec_describe_portal_message(const char *portal_name)
 /*
  * Convenience routines for starting/committing a single command.
  */
-static void
+void
 start_xact_command(void)
 {
 	if (!xact_started)
@@ -2442,7 +2444,7 @@ start_xact_command(void)
 	}
 }
 
-static void
+void
 finish_xact_command(void)
 {
 	if (xact_started)
@@ -4067,7 +4069,7 @@ PostgresMain(int argc, char *argv[],
 					if (am_walsender)
 						exec_replication_command(query_string);
 					else
-						exec_simple_query(query_string);
+						exec_simple_query(query_string, -1);
 
 					send_ready_for_query = true;
 				}
@@ -4099,7 +4101,7 @@ PostgresMain(int argc, char *argv[],
 					pq_getmsgend(&input_message);
 
 					exec_parse_message(query_string, stmt_name,
-									   paramTypes, numParams);
+									   paramTypes, numParams, NULL);
 				}
 				break;
 
diff --git a/src/include/commands/variable.h b/src/include/commands/variable.h
index 8105951..73faff7 100644
--- a/src/include/commands/variable.h
+++ b/src/include/commands/variable.h
@@ -29,6 +29,7 @@ extern bool check_transaction_deferrable(bool *newval, void **extra, GucSource s
 extern bool check_random_seed(double *newval, void **extra, GucSource source);
 extern void assign_random_seed(double newval, void *extra);
 extern const char *show_random_seed(void);
+extern void (*check_client_encoding_hook)(void);
 extern bool check_client_encoding(char **newval, void **extra, GucSource source);
 extern void assign_client_encoding(const char *newval, void *extra);
 extern bool check_session_authorization(char **newval, void **extra, GucSource source);
diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h
index 8c03acb..6cc0090 100644
--- a/src/include/libpq/pqmq.h
+++ b/src/include/libpq/pqmq.h
@@ -17,6 +17,7 @@
 #include "storage/shm_mq.h"
 
 extern void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh);
+extern void pq_stop_redirect_to_shm_mq(void);
 extern void pq_set_parallel_master(pid_t pid, BackendId backend_id);
 
 extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);
diff --git a/src/include/parser/analyze.h b/src/include/parser/analyze.h
index 5ba322a..2168c00 100644
--- a/src/include/parser/analyze.h
+++ b/src/include/parser/analyze.h
@@ -25,7 +25,7 @@ extern PGDLLIMPORT post_parse_analyze_hook_type post_parse_analyze_hook;
 extern Query *parse_analyze(Node *parseTree, const char *sourceText,
 			  Oid *paramTypes, int numParams);
 extern Query *parse_analyze_varparams(Node *parseTree, const char *sourceText,
-						Oid **paramTypes, int *numParams);
+									  Oid **paramTypes, int *numParams, const char *paramNames[]);
 
 extern Query *parse_sub_analyze(Node *parseTree, ParseState *parentParseState,
 				  CommonTableExpr *parentCTE,
diff --git a/src/include/parser/parse_param.h b/src/include/parser/parse_param.h
index bbf608a..5f7e9fa 100644
--- a/src/include/parser/parse_param.h
+++ b/src/include/parser/parse_param.h
@@ -18,7 +18,7 @@
 extern void parse_fixed_parameters(ParseState *pstate,
 					   Oid *paramTypes, int numParams);
 extern void parse_variable_parameters(ParseState *pstate,
-						  Oid **paramTypes, int *numParams);
+									  Oid **paramTypes, int *numParams, const char *paramNames[]);
 extern void check_variable_parameters(ParseState *pstate, Query *query);
 extern bool query_contains_extern_params(Query *query);
 
diff --git a/src/include/tcop/autonomous.h b/src/include/tcop/autonomous.h
new file mode 100644
index 0000000..9f833d7
--- /dev/null
+++ b/src/include/tcop/autonomous.h
@@ -0,0 +1,27 @@
+#ifndef AUTONOMOUS_H
+#define AUTONOMOUS_H
+
+#include "access/tupdesc.h"
+#include "nodes/pg_list.h"
+
+struct AutonomousSession;
+typedef struct AutonomousSession AutonomousSession;
+
+struct AutonomousPreparedStatement;
+typedef struct AutonomousPreparedStatement AutonomousPreparedStatement;
+
+typedef struct AutonomousResult
+{
+	TupleDesc	tupdesc;
+	List	   *tuples;
+	char	   *command;
+} AutonomousResult;
+
+AutonomousSession *AutonomousSessionStart(void);
+void AutonomousSessionEnd(AutonomousSession *session);
+AutonomousResult *AutonomousSessionExecute(AutonomousSession *session, const char *sql);
+AutonomousPreparedStatement *AutonomousSessionPrepare(AutonomousSession *session, const char *sql, int16 nargs,
+							  Oid argtypes[], const char *argnames[]);
+AutonomousResult *AutonomousSessionExecutePrepared(AutonomousPreparedStatement *stmt, int16 nargs, Datum *values, bool *nulls);
+
+#endif /* AUTONOMOUS_H */
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 7254355..150e972 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -57,6 +57,12 @@ extern PlannedStmt *pg_plan_query(Query *querytree, int cursorOptions,
 			  ParamListInfo boundParams);
 extern List *pg_plan_queries(List *querytrees, int cursorOptions,
 				ParamListInfo boundParams);
+extern void exec_simple_query(const char *query_string, int16 format);
+extern void exec_parse_message(const char *query_string, const char *stmt_name,
+							   Oid paramTypes[], int numParams, const char *paramNames[]);
+extern void exec_bind_message(StringInfo input_message);
+extern void exec_execute_message(const char *portal_name, long max_rows);
+extern void exec_describe_statement_message(const char *stmt_name);
 
 extern bool check_max_stack_depth(int *newval, void **extra, GucSource source);
 extern void assign_max_stack_depth(int newval, void *extra);
@@ -70,6 +76,9 @@ extern void RecoveryConflictInterrupt(ProcSignalReason reason); /* called from S
 extern void ProcessClientReadInterrupt(bool blocked);
 extern void ProcessClientWriteInterrupt(bool blocked);
 
+extern void start_xact_command(void);
+extern void finish_xact_command(void);
+
 extern void process_postgres_switches(int argc, char *argv[],
 						  GucContext ctx, const char **dbname);
 extern void PostgresMain(int argc, char *argv[],
diff --git a/src/pl/plpgsql/src/.gitignore b/src/pl/plpgsql/src/.gitignore
index 92387fa..ff6ac96 100644
--- a/src/pl/plpgsql/src/.gitignore
+++ b/src/pl/plpgsql/src/.gitignore
@@ -1,3 +1,6 @@
 /pl_gram.c
 /pl_gram.h
 /plerrcodes.h
+/log/
+/results/
+/tmp_check/
diff --git a/src/pl/plpgsql/src/Makefile b/src/pl/plpgsql/src/Makefile
index e073b2a..97998ba 100644
--- a/src/pl/plpgsql/src/Makefile
+++ b/src/pl/plpgsql/src/Makefile
@@ -24,6 +24,8 @@ OBJS = pl_gram.o pl_handler.o pl_comp.o pl_exec.o \
 
 DATA = plpgsql.control plpgsql--1.0.sql plpgsql--unpackaged--1.0.sql
 
+REGRESS = plpgsql_autonomous
+
 all: all-lib
 
 # Shared library stuff
@@ -65,6 +67,19 @@ pl_gram.c: BISONFLAGS += -d
 plerrcodes.h: $(top_srcdir)/src/backend/utils/errcodes.txt generate-plerrcodes.pl
 	$(PERL) $(srcdir)/generate-plerrcodes.pl $< > $@
 
+
+check: submake
+	$(pg_regress_check) $(REGRESS_OPTS) $(REGRESS)
+
+installcheck: submake
+	$(pg_regress_installcheck) $(REGRESS_OPTS) $(REGRESS)
+
+
+.PHONY: submake
+submake:
+	$(MAKE) -C $(top_builddir)/src/test/regress pg_regress$(X)
+
+
 distprep: pl_gram.h pl_gram.c plerrcodes.h
 
 # pl_gram.c, pl_gram.h and plerrcodes.h are in the distribution tarball,
diff --git a/src/pl/plpgsql/src/expected/plpgsql_autonomous.out b/src/pl/plpgsql/src/expected/plpgsql_autonomous.out
new file mode 100644
index 0000000..3822c7a
--- /dev/null
+++ b/src/pl/plpgsql/src/expected/plpgsql_autonomous.out
@@ -0,0 +1,72 @@
+CREATE TABLE test1 (a int);
+CREATE FUNCTION autonomous_test() RETURNS integer
+LANGUAGE plpgsql
+AS $$
+DECLARE
+  PRAGMA AUTONOMOUS_TRANSACTION;
+BEGIN
+  FOR i IN 0..9 LOOP
+    START TRANSACTION;
+    EXECUTE 'INSERT INTO test1 VALUES (' || i::text || ')';
+    IF i % 2 = 0 THEN
+        COMMIT;
+    ELSE
+        ROLLBACK;
+    END IF;
+  END LOOP;
+
+  RETURN 42;
+END;
+$$;
+SELECT autonomous_test();
+ autonomous_test 
+-----------------
+              42
+(1 row)
+
+SELECT * FROM test1;
+ a 
+---
+ 0
+ 2
+ 4
+ 6
+ 8
+(5 rows)
+
+TRUNCATE test1;
+CREATE FUNCTION autonomous_test2() RETURNS integer
+LANGUAGE plpgsql
+AS $$
+DECLARE
+  PRAGMA AUTONOMOUS_TRANSACTION;
+BEGIN
+  FOR i IN 0..9 LOOP
+    START TRANSACTION;
+    INSERT INTO test1 VALUES (i);
+    IF i % 2 = 0 THEN
+        COMMIT;
+    ELSE
+        ROLLBACK;
+    END IF;
+  END LOOP;
+
+  RETURN 42;
+END;
+$$;
+SELECT autonomous_test2();
+ autonomous_test2 
+------------------
+               42
+(1 row)
+
+SELECT * FROM test1;
+ a
+---
+  
+  
+  
+  
+  
+(5 rows)
+
diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c
index 2f8b6ff..9f29a17 100644
--- a/src/pl/plpgsql/src/pl_exec.c
+++ b/src/pl/plpgsql/src/pl_exec.c
@@ -30,6 +30,7 @@
 #include "parser/parse_coerce.h"
 #include "parser/scansup.h"
 #include "storage/proc.h"
+#include "tcop/autonomous.h"
 #include "tcop/tcopprot.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
@@ -1176,6 +1177,9 @@ exec_stmt_block(PLpgSQL_execstate *estate, PLpgSQL_stmt_block *block)
 	int			i;
 	int			n;
 
+	if (block->autonomous)
+		estate->autonomous_session = AutonomousSessionStart();
+
 	/*
 	 * First initialize all variables declared in this block
 	 */
@@ -1470,6 +1474,9 @@ exec_stmt_block(PLpgSQL_execstate *estate, PLpgSQL_stmt_block *block)
 
 	estate->err_text = NULL;
 
+	if (block->autonomous)
+		AutonomousSessionEnd(estate->autonomous_session);
+
 	/*
 	 * Handle the return code.
 	 */
@@ -3437,6 +3444,8 @@ plpgsql_estate_setup(PLpgSQL_execstate *estate,
 	}
 	estate->rsi = rsi;
 
+	estate->autonomous_session = NULL;
+
 	estate->found_varno = func->found_varno;
 	estate->ndatums = func->ndatums;
 	estate->datums = palloc(sizeof(PLpgSQL_datum *) * estate->ndatums);
@@ -3614,6 +3623,66 @@ exec_prepare_plan(PLpgSQL_execstate *estate,
 }
 
 
+static
+void build_symbol_table(PLpgSQL_execstate *estate,
+					   PLpgSQL_nsitem *ns_start,
+					   int *ret_nitems,
+					   const char ***ret_names,
+					   Oid **ret_types)
+{
+	PLpgSQL_nsitem *nsitem;
+	List *names = NIL;
+	List *types = NIL;
+	ListCell *lc1, *lc2;
+	int i, nitems;
+	const char **names_vector;
+	Oid *types_vector;
+
+	for (nsitem = ns_start;
+		 nsitem;
+		 nsitem = nsitem->prev)
+	{
+		if (nsitem->itemtype == PLPGSQL_NSTYPE_VAR)
+		{
+			PLpgSQL_datum *datum;
+			PLpgSQL_var *var;
+			Oid		typoid;
+			Value  *name;
+
+			if (strcmp(nsitem->name, "found") == 0)
+				continue;  // XXX
+			elog(LOG, "namespace item variable itemno %d, name %s",
+				 nsitem->itemno, nsitem->name);
+			datum = estate->datums[nsitem->itemno];
+			Assert(datum->dtype == PLPGSQL_DTYPE_VAR);
+			var = (PLpgSQL_var *) datum;
+			name = makeString(nsitem->name);
+			typoid = var->datatype->typoid;
+			if (!list_member(names, name))
+			{
+				names = lappend(names, name);
+				types = lappend_oid(types, typoid);
+			}
+		}
+	}
+
+	Assert(list_length(names) == list_length(types));
+	nitems = list_length(names);
+	names_vector = palloc(nitems * sizeof(char *));
+	types_vector = palloc(nitems * sizeof(Oid));
+	i = 0;
+	forboth(lc1, names, lc2, types)
+	{
+		names_vector[i] = pstrdup(strVal(lfirst(lc1)));
+		types_vector[i] = lfirst_oid(lc2);
+		i++;
+	}
+
+	*ret_nitems = nitems;
+	*ret_names = names_vector;
+	*ret_types = types_vector;
+}
+
 /* ----------
  * exec_stmt_execsql			Execute an SQL statement (possibly with INTO).
  *
@@ -3630,6 +3699,32 @@ exec_stmt_execsql(PLpgSQL_execstate *estate,
 	int			rc;
 	PLpgSQL_expr *expr = stmt->sqlstmt;
 
+	if (estate->autonomous_session)
+	{
+		int		nparams = 0;
+		int		i;
+		const char **param_names = NULL;
+		Oid	   *param_types = NULL;
+		AutonomousPreparedStatement *astmt;
+		Datum  *values;
+		bool   *nulls;
+		AutonomousResult *aresult;
+
+		build_symbol_table(estate, stmt->sqlstmt->ns, &nparams, &param_names, &param_types);
+		astmt = AutonomousSessionPrepare(estate->autonomous_session, stmt->sqlstmt->query, nparams, param_types, param_names);
+
+		values = palloc(nparams * sizeof(*values));
+		nulls = palloc(nparams * sizeof(*nulls));
+		for (i = 0; i < nparams; i++)
+		{
+			nulls[i] = true;
+			//values[i] = TODO;
+		}
+		aresult = AutonomousSessionExecutePrepared(astmt, nparams, values, nulls);
+		exec_set_found(estate, (list_length(aresult->tuples) != 0));
+		return PLPGSQL_RC_OK;
+	}
+
 	/*
 	 * On the first call for this statement generate the plan, and detect
 	 * whether the statement is INSERT/UPDATE/DELETE
@@ -3871,6 +3966,12 @@ exec_stmt_dynexecute(PLpgSQL_execstate *estate,
 
 	exec_eval_cleanup(estate);
 
+	if (estate->autonomous_session)
+	{
+		AutonomousSessionExecute(estate->autonomous_session, querystr);
+		return PLPGSQL_RC_OK;
+	}
+
 	/*
 	 * Execute the query without preparing a saved plan.
 	 */
diff --git a/src/pl/plpgsql/src/pl_gram.y b/src/pl/plpgsql/src/pl_gram.y
index 0b41e3a..c017757 100644
--- a/src/pl/plpgsql/src/pl_gram.y
+++ b/src/pl/plpgsql/src/pl_gram.y
@@ -108,6 +108,8 @@ static	PLpgSQL_expr	*read_cursor_args(PLpgSQL_var *cursor,
 static	List			*read_raise_options(void);
 static	void			check_raise_parameters(PLpgSQL_stmt_raise *stmt);
 
+static bool last_pragma;
+
 %}
 
 %expect 0
@@ -144,6 +146,7 @@ static	void			check_raise_parameters(PLpgSQL_stmt_raise *stmt);
 			char *label;
 			int  n_initvars;
 			int  *initvarnos;
+			bool	autonomous;
 		}						declhdr;
 		struct
 		{
@@ -313,6 +316,7 @@ static	void			check_raise_parameters(PLpgSQL_stmt_raise *stmt);
 %token <keyword>	K_PG_EXCEPTION_CONTEXT
 %token <keyword>	K_PG_EXCEPTION_DETAIL
 %token <keyword>	K_PG_EXCEPTION_HINT
+%token <keyword>	K_PRAGMA
 %token <keyword>	K_PRINT_STRICT_PARAMS
 %token <keyword>	K_PRIOR
 %token <keyword>	K_QUERY
@@ -405,6 +409,7 @@ pl_block		: decl_sect K_BEGIN proc_sect exception_sect K_END opt_label
 						new->cmd_type	= PLPGSQL_STMT_BLOCK;
 						new->lineno		= plpgsql_location_to_lineno(@2);
 						new->label		= $1.label;
+						new->autonomous = $1.autonomous;
 						new->n_initvars = $1.n_initvars;
 						new->initvarnos = $1.initvarnos;
 						new->body		= $3;
@@ -425,6 +430,7 @@ decl_sect		: opt_block_label
 						$$.label	  = $1;
 						$$.n_initvars = 0;
 						$$.initvarnos = NULL;
+						$$.autonomous = false;
 					}
 				| opt_block_label decl_start
 					{
@@ -432,6 +438,7 @@ decl_sect		: opt_block_label
 						$$.label	  = $1;
 						$$.n_initvars = 0;
 						$$.initvarnos = NULL;
+						$$.autonomous = false;
 					}
 				| opt_block_label decl_start decl_stmts
 					{
@@ -439,6 +446,8 @@ decl_sect		: opt_block_label
 						$$.label	  = $1;
 						/* Remember variables declared in decl_stmts */
 						$$.n_initvars = plpgsql_add_initdatums(&($$.initvarnos));
+						$$.autonomous = last_pragma;
+						last_pragma = false;
 					}
 				;
 
@@ -446,6 +455,7 @@ decl_start		: K_DECLARE
 					{
 						/* Forget any variables created before block */
 						plpgsql_add_initdatums(NULL);
+						last_pragma = false;
 						/*
 						 * Disable scanner lookup of identifiers while
 						 * we process the decl_stmts
@@ -586,6 +596,13 @@ decl_statement	: decl_varname decl_const decl_datatype decl_collate decl_notnull
 							new->cursor_explicit_argrow = $5->dno;
 						new->cursor_options = CURSOR_OPT_FAST_PLAN | $2;
 					}
+				| K_PRAGMA any_identifier ';'
+					{
+						if (pg_strcasecmp($2, "autonomous_transaction") == 0)
+							last_pragma = true;
+						else
+							elog(ERROR, "invalid pragma");
+					}
 				;
 
 opt_scrollable :
diff --git a/src/pl/plpgsql/src/pl_scanner.c b/src/pl/plpgsql/src/pl_scanner.c
index 2737fff..ff26126 100644
--- a/src/pl/plpgsql/src/pl_scanner.c
+++ b/src/pl/plpgsql/src/pl_scanner.c
@@ -147,6 +147,7 @@ static const ScanKeyword unreserved_keywords[] = {
 	PG_KEYWORD("pg_exception_context", K_PG_EXCEPTION_CONTEXT, UNRESERVED_KEYWORD)
 	PG_KEYWORD("pg_exception_detail", K_PG_EXCEPTION_DETAIL, UNRESERVED_KEYWORD)
 	PG_KEYWORD("pg_exception_hint", K_PG_EXCEPTION_HINT, UNRESERVED_KEYWORD)
+	PG_KEYWORD("pragma", K_PRAGMA, UNRESERVED_KEYWORD)
 	PG_KEYWORD("print_strict_params", K_PRINT_STRICT_PARAMS, UNRESERVED_KEYWORD)
 	PG_KEYWORD("prior", K_PRIOR, UNRESERVED_KEYWORD)
 	PG_KEYWORD("query", K_QUERY, UNRESERVED_KEYWORD)
diff --git a/src/pl/plpgsql/src/plpgsql.h b/src/pl/plpgsql/src/plpgsql.h
index b416e50..55216fc 100644
--- a/src/pl/plpgsql/src/plpgsql.h
+++ b/src/pl/plpgsql/src/plpgsql.h
@@ -22,6 +22,7 @@
 #include "commands/event_trigger.h"
 #include "commands/trigger.h"
 #include "executor/spi.h"
+#include "tcop/autonomous.h"
 
 /**********************************************************************
  * Definitions
@@ -407,6 +408,7 @@ typedef struct PLpgSQL_stmt_block
 	int			cmd_type;
 	int			lineno;
 	char	   *label;
+	bool		autonomous;
 	List	   *body;			/* List of statements */
 	int			n_initvars;
 	int		   *initvarnos;
@@ -903,6 +905,8 @@ typedef struct PLpgSQL_execstate
 	ResourceOwner tuple_store_owner;
 	ReturnSetInfo *rsi;
 
+	AutonomousSession *autonomous_session;
+
 	/* the datums representing the function's local variables */
 	int			found_varno;
 	int			ndatums;
diff --git a/src/pl/plpgsql/src/sql/plpgsql_autonomous.sql b/src/pl/plpgsql/src/sql/plpgsql_autonomous.sql
new file mode 100644
index 0000000..35e7d15
--- /dev/null
+++ b/src/pl/plpgsql/src/sql/plpgsql_autonomous.sql
@@ -0,0 +1,54 @@
+CREATE TABLE test1 (a int);
+
+CREATE FUNCTION autonomous_test() RETURNS integer
+LANGUAGE plpgsql
+AS $$
+DECLARE
+  PRAGMA AUTONOMOUS_TRANSACTION;
+BEGIN
+  FOR i IN 0..9 LOOP
+    START TRANSACTION;
+    EXECUTE 'INSERT INTO test1 VALUES (' || i::text || ')';
+    IF i % 2 = 0 THEN
+        COMMIT;
+    ELSE
+        ROLLBACK;
+    END IF;
+  END LOOP;
+
+  RETURN 42;
+END;
+$$;
+
+
+SELECT autonomous_test();
+
+SELECT * FROM test1;
+
+TRUNCATE test1;
+
+
+CREATE FUNCTION autonomous_test2() RETURNS integer
+LANGUAGE plpgsql
+AS $$
+DECLARE
+  PRAGMA AUTONOMOUS_TRANSACTION;
+BEGIN
+  FOR i IN 0..9 LOOP
+    START TRANSACTION;
+    INSERT INTO test1 VALUES (i);
+    IF i % 2 = 0 THEN
+        COMMIT;
+    ELSE
+        ROLLBACK;
+    END IF;
+  END LOOP;
+
+  RETURN 42;
+END;
+$$;
+
+
+SELECT autonomous_test2();
+
+SELECT * FROM test1;
diff --git a/src/pl/plpython/Makefile b/src/pl/plpython/Makefile
index 647b4b1..2617bae 100644
--- a/src/pl/plpython/Makefile
+++ b/src/pl/plpython/Makefile
@@ -20,6 +20,7 @@ PGFILEDESC = "PL/Python - procedural language"
 NAME = plpython$(python_majorversion)
 
 OBJS = \
+	plpy_autonomousobject.o \
 	plpy_cursorobject.o \
 	plpy_elog.o \
 	plpy_exec.o \
@@ -89,6 +90,7 @@ REGRESS = \
 	plpython_quote \
 	plpython_composite \
 	plpython_subtransaction \
+	plpython_autonomous \
 	plpython_drop
 
 REGRESS_PLPYTHON3_MANGLE := $(REGRESS)
diff --git a/src/pl/plpython/expected/plpython_autonomous.out b/src/pl/plpython/expected/plpython_autonomous.out
new file mode 100644
index 0000000..7c23720
--- /dev/null
+++ b/src/pl/plpython/expected/plpython_autonomous.out
@@ -0,0 +1,172 @@
+CREATE TABLE test1 (a int, b text);
+CREATE FUNCTION autonomous_test() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+    for i in range(0, 10):
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+        if i % 2 == 0:
+            a.execute("COMMIT")
+        else:
+            a.execute("ROLLBACK")
+
+return 42
+$$;
+SELECT autonomous_test();
+ autonomous_test 
+-----------------
+              42
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+ 0 | 
+ 2 | 
+ 4 | 
+ 6 | 
+ 8 | 
+(5 rows)
+
+CREATE FUNCTION autonomous_test2() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (11)")
+        rv = a.execute("SELECT * FROM test1")
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+
+return 42
+$$;
+SELECT autonomous_test2();
+INFO:  <PLyResult status=5 nrows=6 rows=[{'a': 0, 'b': None}, {'a': 2, 'b': None}, {'a': 4, 'b': None}, {'a': 6, 'b': None}, {'a': 8, 'b': None}, {'a': 11, 'b': None}]>
+ autonomous_test2 
+------------------
+               42
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+ 0 | 
+ 2 | 
+ 4 | 
+ 6 | 
+ 8 | 
+(5 rows)
+
+CREATE FUNCTION autonomous_test3() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+    a.execute("DO $_$ BEGIN RAISE NOTICE 'notice'; END $_$")
+    a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$")
+
+return 42
+$$;
+SELECT autonomous_test3();
+NOTICE:  notice
+ERROR:  error
+CONTEXT:  PL/pgSQL function inline_code_block line 1 at RAISE
+PL/Python function "autonomous_test3"
+CREATE FUNCTION autonomous_test4() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+    a.execute("SET client_encoding TO SJIS")
+
+return 42
+$$;
+SELECT autonomous_test4();
+ERROR:  cannot set client encoding in autonomous session
+CONTEXT:  PL/Python function "autonomous_test4"
+TRUNCATE test1;
+CREATE FUNCTION autonomous_test5() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+    plan = a.prepare("INSERT INTO test1 (a, b) VALUES ($1, $2)", ["int4", "text"])
+    a.execute_prepared(plan, [1, "one"])
+    a.execute_prepared(plan, [2, "two"])
+
+return 42
+$$;
+SELECT autonomous_test5();
+ autonomous_test5 
+------------------
+               42
+(1 row)
+
+SELECT * FROM test1;
+ a |  b  
+---+-----
+ 1 | one
+ 2 | two
+(2 rows)
+
+TRUNCATE test1;
+CREATE FUNCTION autonomous_test6() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+    plan = a.prepare("INSERT INTO test1 (a) VALUES (i)", {"i": "int4"})
+    a.execute_prepared(plan, [1])
+    a.execute_prepared(plan, [2])
+
+return 42
+$$;
+SELECT autonomous_test6();
+ autonomous_test6 
+------------------
+               42
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+ 1 | 
+ 2 | 
+(2 rows)
+
+TRUNCATE test1;
+CREATE FUNCTION autonomous_test7() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+        a.execute("BEGIN")
+        plan = a.prepare("INSERT INTO test1 (a) VALUES ($1)", ["int4"])
+        a.execute_prepared(plan, [11])
+        plan = a.prepare("SELECT * FROM test1")
+        rv = a.execute_prepared(plan, [])
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+
+return 42
+$$;
+SELECT autonomous_test7();
+INFO:  <PLyResult status=5 nrows=1 rows=[{'a': 11, 'b': None}]>
+ autonomous_test7 
+------------------
+               42
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+(0 rows)
+
+CREATE FUNCTION autonomous_test8() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+        a.execute("BEGIN")
+
+return 42
+$$;
+SELECT autonomous_test8();
+ERROR:  autonomous session ended with transaction block open
+CONTEXT:  PL/Python function "autonomous_test8"
+DROP TABLE test1;
diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out
index adb82a8..fa89c60 100644
--- a/src/pl/plpython/expected/plpython_test.out
+++ b/src/pl/plpython/expected/plpython_test.out
@@ -43,9 +43,9 @@ contents.sort()
 return ", ".join(contents)
 $$ LANGUAGE plpythonu;
 select module_contents();
-                                                                               module_contents                                                                                
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
- Error, Fatal, SPIError, cursor, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning
+                                                                                     module_contents                                                                                      
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Error, Fatal, SPIError, autonomous, cursor, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning
 (1 row)
 
 CREATE FUNCTION elog_test_basic() RETURNS void
diff --git a/src/pl/plpython/plpy_autonomousobject.c b/src/pl/plpython/plpy_autonomousobject.c
new file mode 100644
index 0000000..2125452
--- /dev/null
+++ b/src/pl/plpython/plpy_autonomousobject.c
@@ -0,0 +1,459 @@
+/*
+ * the PLyAutonomous class
+ *
+ * src/pl/plpython/plpy_autonomousobject.c
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "executor/spi.h"
+#include "parser/parse_type.h"
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+
+#include "plpython.h"
+
+#include "plpy_autonomousobject.h"
+
+#include "plpy_elog.h"
+#include "plpy_main.h"
+#include "plpy_planobject.h"
+#include "plpy_spi.h"
+
+
+static void PLy_autonomous_dealloc(PyObject *subxact);
+static PyObject *PLy_autonomous_enter(PyObject *self, PyObject *unused);
+static PyObject *PLy_autonomous_exit(PyObject *self, PyObject *args);
+static PyObject *PLy_autonomous_execute(PyObject *self, PyObject *args);
+static PyObject *PLy_autonomous_prepare(PyObject *self, PyObject *args);
+static PyObject *PLy_autonomous_execute_prepared(PyObject *self, PyObject *args);
+
+static char PLy_autonomous_doc[] = {
+	"PostgreSQL autonomous session context manager"
+};
+
+static PyMethodDef PLy_autonomous_methods[] = {
+	{"__enter__", PLy_autonomous_enter, METH_VARARGS, NULL},
+	{"__exit__", PLy_autonomous_exit, METH_VARARGS, NULL},
+	/* user-friendly names for Python <2.6 */
+	{"enter", PLy_autonomous_enter, METH_VARARGS, NULL},
+	{"exit", PLy_autonomous_exit, METH_VARARGS, NULL},
+	{"execute", PLy_autonomous_execute, METH_VARARGS, NULL},
+	{"prepare", PLy_autonomous_prepare, METH_VARARGS, NULL},
+	{"execute_prepared", PLy_autonomous_execute_prepared, METH_VARARGS, NULL},
+	{NULL, NULL, 0, NULL}
+};
+
+static PyTypeObject PLy_AutonomousType = {
+	PyVarObject_HEAD_INIT(NULL, 0)
+	"PLyAutonomous",			/* tp_name */
+	sizeof(PLyAutonomousObject),	/* tp_size */
+	0,							/* tp_itemsize */
+
+	/*
+	 * methods
+	 */
+	PLy_autonomous_dealloc,		/* tp_dealloc */
+	0,							/* tp_print */
+	0,							/* tp_getattr */
+	0,							/* tp_setattr */
+	0,							/* tp_compare */
+	0,							/* tp_repr */
+	0,							/* tp_as_number */
+	0,							/* tp_as_sequence */
+	0,							/* tp_as_mapping */
+	0,							/* tp_hash */
+	0,							/* tp_call */
+	0,							/* tp_str */
+	0,							/* tp_getattro */
+	0,							/* tp_setattro */
+	0,							/* tp_as_buffer */
+	Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,	/* tp_flags */
+	PLy_autonomous_doc,			/* tp_doc */
+	0,							/* tp_traverse */
+	0,							/* tp_clear */
+	0,							/* tp_richcompare */
+	0,							/* tp_weaklistoffset */
+	0,							/* tp_iter */
+	0,							/* tp_iternext */
+	PLy_autonomous_methods,		/* tp_tpmethods */
+};
+
+
+void
+PLy_autonomous_init_type(void)
+{
+	if (PyType_Ready(&PLy_AutonomousType) < 0)
+		elog(ERROR, "could not initialize PLy_AutonomousType");
+}
+
+/* s = plpy.autonomous() */
+PyObject *
+PLy_autonomous_new(PyObject *self, PyObject *unused)
+{
+	PLyAutonomousObject *ob;
+
+	ob = PyObject_New(PLyAutonomousObject, &PLy_AutonomousType);
+
+	if (ob == NULL)
+		return NULL;
+
+	ob->started = false;
+	ob->exited = false;
+
+	return (PyObject *) ob;
+}
+
+/* Python requires a dealloc function to be defined */
+static void
+PLy_autonomous_dealloc(PyObject *auton)
+{
+}
+
+/*
+ * autonomous.__enter__() or autonomous.enter()
+ */
+static PyObject *
+PLy_autonomous_enter(PyObject *self, PyObject *unused)
+{
+	PLyAutonomousObject *auton = (PLyAutonomousObject *) self;
+
+	if (auton->started)
+	{
+		PLy_exception_set(PyExc_ValueError, "this autonomous session has already been entered");
+		return NULL;
+	}
+
+	if (auton->exited)
+	{
+		PLy_exception_set(PyExc_ValueError, "this autonomous session has already been exited");
+		return NULL;
+	}
+
+	auton->started = true;
+	auton->asession = AutonomousSessionStart();
+
+	Py_INCREF(self);
+	return self;
+}
+
+/*
+ * autonomous.__exit__(exc_type, exc, tb) or autonomous.exit(exc_type, exc, tb)
+ *
+ * Exit an explicit subtransaction. exc_type is an exception type, exc
+ * is the exception object, tb is the traceback.
+ *
+ * The method signature is chosen to allow subtransaction objects to
+ * be used as context managers as described in
+ * <http://www.python.org/dev/peps/pep-0343/>.
+ */
+static PyObject *
+PLy_autonomous_exit(PyObject *self, PyObject *args)
+{
+	PyObject   *type;
+	PyObject   *value;
+	PyObject   *traceback;
+	PLyAutonomousObject *auton = (PLyAutonomousObject *) self;
+
+	if (!PyArg_ParseTuple(args, "OOO", &type, &value, &traceback))
+		return NULL;
+
+	if (!auton->started)
+	{
+		PLy_exception_set(PyExc_ValueError, "this autonomous session has not been entered");
+		return NULL;
+	}
+
+	if (auton->exited)
+	{
+		PLy_exception_set(PyExc_ValueError, "this autonomous session has already been exited");
+		return NULL;
+	}
+
+	auton->exited = true;
+	AutonomousSessionEnd(auton->asession);
+
+	Py_INCREF(Py_None);
+	return Py_None;
+}
+
+static PyObject *
+PLy_autonomous_execute(PyObject *self, PyObject *args)
+{
+	PLyAutonomousObject *auton = (PLyAutonomousObject *) self;
+	char	   *query;
+
+	if (PyArg_ParseTuple(args, "s", &query))
+	{
+		AutonomousResult *result;
+		HeapTuple  *tuples;
+		ListCell   *lc;
+		int			i;
+		SPITupleTable faketupletable;
+
+		result = AutonomousSessionExecute(auton->asession, query);
+		if (result->tupdesc)
+		{
+			tuples = palloc(list_length(result->tuples) * sizeof(*tuples));
+			i = 0;
+			foreach (lc, result->tuples)
+			{
+				HeapTuple tuple = (HeapTuple) lfirst(lc);
+				tuples[i++] = tuple;
+			}
+			faketupletable.tupdesc = result->tupdesc;
+			faketupletable.vals = tuples;
+			return PLy_spi_execute_fetch_result(&faketupletable, list_length(result->tuples), SPI_OK_SELECT);
+		}
+		else
+			return PLy_spi_execute_fetch_result(NULL, 0, SPI_OK_UTILITY);
+	}
+	else
+		PLy_exception_set(PLy_exc_error, "autonomous execute expected a query");
+	return NULL;
+}
+
+// XXX lots of overlap with PLy_spi_prepare
+static PyObject *
+PLy_autonomous_prepare(PyObject *self, PyObject *args)
+{
+	PLyAutonomousObject *auton = (PLyAutonomousObject *) self;
+	char	   *query;
+	PyObject   *paraminfo = NULL;
+	AutonomousPreparedStatement *astmt;
+	int			nargs = 0;
+	const char **argnames = NULL;
+	PLyPlanObject *plan;
+	PyObject   *volatile optr = NULL;
+	volatile MemoryContext oldcontext;
+	int			i;
+	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+	PyObject *keys;
+
+	if (!PyArg_ParseTuple(args, "s|O:prepare", &query, &paraminfo))
+		return NULL;
+
+	if (paraminfo &&
+		!PySequence_Check(paraminfo) && !PyMapping_Check(paraminfo))
+	{
+		PLy_exception_set(PyExc_TypeError,
+						  "second argument of prepare must be a sequence or mapping");
+		return NULL;
+	}
+
+	if ((plan = (PLyPlanObject *) PLy_plan_new()) == NULL)
+		return NULL;
+
+	plan->mcxt = AllocSetContextCreate(TopMemoryContext,
+									   "PL/Python autonomous plan context",
+									   ALLOCSET_DEFAULT_MINSIZE,
+									   ALLOCSET_DEFAULT_INITSIZE,
+									   ALLOCSET_DEFAULT_MAXSIZE);
+
+	oldcontext = MemoryContextSwitchTo(plan->mcxt);
+
+	if (!paraminfo)
+		nargs = 0;
+	else if (PySequence_Check(paraminfo))
+		nargs = PySequence_Length(paraminfo);
+	else
+		nargs = PyMapping_Length(paraminfo);
+
+	plan->nargs = nargs;
+	plan->types = nargs ? palloc(sizeof(Oid) * nargs) : NULL;
+	plan->values = nargs ? palloc(sizeof(Datum) * nargs) : NULL;
+	plan->args = nargs ? palloc(sizeof(PLyTypeInfo) * nargs) : NULL;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	if (PyMapping_Check(paraminfo))
+	{
+		argnames = palloc(nargs * sizeof(char *));
+		keys = PyMapping_Keys(paraminfo);
+	}
+	else
+	{
+		argnames = NULL;
+		keys = NULL;
+	}
+
+	for (i = 0; i < nargs; i++)
+	{
+		PLy_typeinfo_init(&plan->args[i], plan->mcxt);
+		plan->values[i] = PointerGetDatum(NULL);
+	}
+
+	for (i = 0; i < nargs; i++)
+	{
+		char	   *sptr;
+		HeapTuple	typeTup;
+		Oid			typeId;
+		int32		typmod;
+
+		if (keys)
+		{
+			PyObject *key;
+			char *keystr;
+
+			key = PySequence_GetItem(keys, i);
+			argnames[i] = keystr = PyString_AsString(key);
+			optr = PyMapping_GetItemString(paraminfo, keystr);
+			Py_DECREF(key);
+		}
+		else
+			optr = PySequence_GetItem(paraminfo, i);
+
+		if (PyString_Check(optr))
+			sptr = PyString_AsString(optr);
+		else if (PyUnicode_Check(optr))
+			sptr = PLyUnicode_AsString(optr);
+		else
+		{
+			ereport(ERROR,
+					(errmsg("autonomous prepare: type name at ordinal position %d is not a string", i)));
+			sptr = NULL;	/* keep compiler quiet */
+		}
+
+		/********************************************************
+		 * Resolve argument type names and then look them up by
+		 * oid in the system cache, and remember the required
+		 *information for input conversion.
+		 ********************************************************/
+
+		parseTypeString(sptr, &typeId, &typmod, false);
+
+		typeTup = SearchSysCache1(TYPEOID,
+								  ObjectIdGetDatum(typeId));
+		if (!HeapTupleIsValid(typeTup))
+			elog(ERROR, "cache lookup failed for type %u", typeId);
+
+		Py_DECREF(optr);
+
+		/*
+		 * set optr to NULL, so we won't try to unref it again in case of
+		 * an error
+		 */
+		optr = NULL;
+
+		plan->types[i] = typeId;
+		PLy_output_datum_func(&plan->args[i], typeTup, exec_ctx->curr_proc->langid, exec_ctx->curr_proc->trftypes);
+		ReleaseSysCache(typeTup);
+	}
+
+	astmt = AutonomousSessionPrepare(auton->asession, query, nargs, plan->types, argnames);
+
+	plan->astmt = astmt;
+
+	return (PyObject *) plan;
+}
+
+static PyObject *
+PLy_autonomous_execute_prepared(PyObject *self, PyObject *args)
+{
+	PLyAutonomousObject *auton pg_attribute_unused() = (PLyAutonomousObject *) self;
+	PyObject   *ob;
+	PLyPlanObject *plan;
+	PyObject   *list = NULL;
+	int			nargs;
+	bool	   *nulls;
+	AutonomousResult *result;
+	HeapTuple  *tuples;
+	ListCell   *lc;
+	int			i;
+	SPITupleTable faketupletable;
+
+	if (!PyArg_ParseTuple(args, "O|O:execute_prepared", &ob, &list))
+		return NULL;
+
+	if (!is_PLyPlanObject(ob))
+	{
+		PLy_exception_set(PyExc_TypeError,
+						  "first argument of execute_prepared must be a plan");
+		return NULL;
+	}
+
+	plan = (PLyPlanObject *) ob;
+
+	if (list && (!PySequence_Check(list)))
+	{
+		PLy_exception_set(PyExc_TypeError,
+						  "second argument of execute_prepared must be a sequence");
+		return NULL;
+	}
+
+	nargs = list ? PySequence_Length(list) : 0;
+
+	if (nargs != plan->nargs)
+	{
+		char	   *sv;
+		PyObject   *so = PyObject_Str(list);
+
+		if (!so)
+			PLy_elog(ERROR, "could not execute plan");
+		sv = PyString_AsString(so);
+		PLy_exception_set_plural(PyExc_TypeError,
+							  "Expected sequence of %d argument, got %d: %s",
+							 "Expected sequence of %d arguments, got %d: %s",
+								 plan->nargs,
+								 plan->nargs, nargs, sv);
+		Py_DECREF(so);
+
+		return NULL;
+	}
+
+	nulls = palloc(nargs * sizeof(*nulls));
+
+	for (i = 0; i < nargs; i++)
+	{
+		PyObject   *elem;
+
+		elem = PySequence_GetItem(list, i);
+		if (elem != Py_None)
+		{
+			PG_TRY();
+			{
+				plan->values[i] =
+					plan->args[i].out.d.func(&(plan->args[i].out.d),
+											 -1,
+											 elem);
+			}
+			PG_CATCH();
+			{
+				Py_DECREF(elem);
+				PG_RE_THROW();
+			}
+			PG_END_TRY();
+
+			Py_DECREF(elem);
+			nulls[i] = false;
+		}
+		else
+		{
+			Py_DECREF(elem);
+			plan->values[i] =
+				InputFunctionCall(&(plan->args[i].out.d.typfunc),
+								  NULL,
+								  plan->args[i].out.d.typioparam,
+								  -1);
+			nulls[i] = true;
+		}
+	}
+
+	result = AutonomousSessionExecutePrepared(plan->astmt, nargs, plan->values, nulls);
+	if (result->tupdesc)
+	{
+		tuples = palloc(list_length(result->tuples) * sizeof(*tuples));
+		i = 0;
+		foreach (lc, result->tuples)
+		{
+			HeapTuple tuple = (HeapTuple) lfirst(lc);
+			tuples[i++] = tuple;
+		}
+		faketupletable.tupdesc = result->tupdesc;
+		faketupletable.vals = tuples;
+		return PLy_spi_execute_fetch_result(&faketupletable, list_length(result->tuples), SPI_OK_SELECT);
+	}
+	else
+		return PLy_spi_execute_fetch_result(NULL, 0, SPI_OK_UTILITY);
+}
diff --git a/src/pl/plpython/plpy_autonomousobject.h b/src/pl/plpython/plpy_autonomousobject.h
new file mode 100644
index 0000000..f5fdaff
--- /dev/null
+++ b/src/pl/plpython/plpy_autonomousobject.h
@@ -0,0 +1,21 @@
+/*
+ * src/pl/plpython/plpy_autonomousobject.h
+ */
+
+#ifndef PLPY_AUTONOMOUSOBJECT
+#define PLPY_AUTONOMOUSOBJECT
+
+#include "tcop/autonomous.h"
+
+typedef struct PLyAutonomousObject
+{
+	PyObject_HEAD
+	bool		started;
+	bool		exited;
+	AutonomousSession *asession;
+} PLyAutonomousObject;
+
+extern void PLy_autonomous_init_type(void);
+extern PyObject *PLy_autonomous_new(PyObject *self, PyObject *unused);
+
+#endif   /* PLPY_AUTONOMOUSOBJECT */
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4..690506a 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -7,6 +7,8 @@
 
 #include "plpy_procedure.h"
 
+#include "tcop/autonomous.h"
+
 /* the interpreter's globals dict */
 extern PyObject *PLy_interp_globals;
 
@@ -19,6 +21,7 @@ typedef struct PLyExecutionContext
 {
 	PLyProcedure *curr_proc;	/* the currently executing procedure */
 	MemoryContext scratch_ctx;	/* a context for things like type I/O */
+	AutonomousSession *asession;
 	struct PLyExecutionContext *next;	/* previous stack level */
 } PLyExecutionContext;
 
diff --git a/src/pl/plpython/plpy_planobject.c b/src/pl/plpython/plpy_planobject.c
index a9040ef..bd44245 100644
--- a/src/pl/plpython/plpy_planobject.c
+++ b/src/pl/plpython/plpy_planobject.c
@@ -77,6 +77,7 @@ PLy_plan_new(void)
 		return NULL;
 
 	ob->plan = NULL;
+	ob->astmt = NULL;
 	ob->nargs = 0;
 	ob->types = NULL;
 	ob->values = NULL;
diff --git a/src/pl/plpython/plpy_planobject.h b/src/pl/plpython/plpy_planobject.h
index c675592..934aa3c 100644
--- a/src/pl/plpython/plpy_planobject.h
+++ b/src/pl/plpython/plpy_planobject.h
@@ -6,6 +6,7 @@
 #define PLPY_PLANOBJECT_H
 
 #include "executor/spi.h"
+#include "tcop/autonomous.h"
 #include "plpy_typeio.h"
 
 
@@ -13,6 +14,7 @@ typedef struct PLyPlanObject
 {
 	PyObject_HEAD
 	SPIPlanPtr	plan;
+	AutonomousPreparedStatement *astmt;
 	int			nargs;
 	Oid		   *types;
 	Datum	   *values;
diff --git a/src/pl/plpython/plpy_plpymodule.c b/src/pl/plpython/plpy_plpymodule.c
index f520e77..c29acf1 100644
--- a/src/pl/plpython/plpy_plpymodule.c
+++ b/src/pl/plpython/plpy_plpymodule.c
@@ -13,6 +13,7 @@
 
 #include "plpy_plpymodule.h"
 
+#include "plpy_autonomousobject.h"
 #include "plpy_cursorobject.h"
 #include "plpy_elog.h"
 #include "plpy_planobject.h"
@@ -88,6 +89,11 @@ static PyMethodDef PLy_methods[] = {
 	{"subtransaction", PLy_subtransaction_new, METH_NOARGS, NULL},
 
 	/*
+	 * create autonomous session context manager
+	 */
+	{"autonomous", PLy_autonomous_new, METH_NOARGS, NULL},
+
+	/*
 	 * create a cursor
 	 */
 	{"cursor", PLy_cursor, METH_VARARGS, NULL},
@@ -156,6 +162,7 @@ PLy_init_plpy(void)
 	PLy_plan_init_type();
 	PLy_result_init_type();
 	PLy_subtransaction_init_type();
+	PLy_autonomous_init_type();
 	PLy_cursor_init_type();
 
 #if PY_MAJOR_VERSION >= 3
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index 09ee06d..995166e 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -31,8 +31,6 @@
 
 static PyObject *PLy_spi_execute_query(char *query, long limit);
 static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
-							 uint64 rows, int status);
 static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
 
 
@@ -291,6 +289,7 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 		rv = SPI_execute_plan(plan->plan, plan->values, nulls,
 							  exec_ctx->curr_proc->fn_readonly, limit);
 		ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+		SPI_freetuptable(SPI_tuptable);
 
 		if (nargs > 0)
 			pfree(nulls);
@@ -360,6 +359,7 @@ PLy_spi_execute_query(char *query, long limit)
 		pg_verifymbstr(query, strlen(query), false);
 		rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit);
 		ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+		SPI_freetuptable(SPI_tuptable);
 
 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
 	}
@@ -382,7 +382,7 @@ PLy_spi_execute_query(char *query, long limit)
 	return ret;
 }
 
-static PyObject *
+PyObject *
 PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
 {
 	PLyResultObject *result;
@@ -469,7 +469,6 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
 		PG_END_TRY();
 
 		MemoryContextDelete(cxt);
-		SPI_freetuptable(tuptable);
 	}
 
 	return (PyObject *) result;
diff --git a/src/pl/plpython/plpy_spi.h b/src/pl/plpython/plpy_spi.h
index b042794..9ed37e5 100644
--- a/src/pl/plpython/plpy_spi.h
+++ b/src/pl/plpython/plpy_spi.h
@@ -5,12 +5,15 @@
 #ifndef PLPY_SPI_H
 #define PLPY_SPI_H
 
+#include "executor/spi.h"
 #include "utils/palloc.h"
 #include "utils/resowner.h"
 
 extern PyObject *PLy_spi_prepare(PyObject *self, PyObject *args);
 extern PyObject *PLy_spi_execute(PyObject *self, PyObject *args);
 
+extern PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status);
+
 typedef struct PLyExceptionEntry
 {
 	int			sqlstate;		/* hash key, must be first */
diff --git a/src/pl/plpython/sql/plpython_autonomous.sql b/src/pl/plpython/sql/plpython_autonomous.sql
new file mode 100644
index 0000000..d7bec05
--- /dev/null
+++ b/src/pl/plpython/sql/plpython_autonomous.sql
@@ -0,0 +1,136 @@
+CREATE TABLE test1 (a int, b text);
+
+CREATE FUNCTION autonomous_test() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+    for i in range(0, 10):
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+        if i % 2 == 0:
+            a.execute("COMMIT")
+        else:
+            a.execute("ROLLBACK")
+
+return 42
+$$;
+
+SELECT autonomous_test();
+
+SELECT * FROM test1;
+
+
+CREATE FUNCTION autonomous_test2() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (11)")
+        rv = a.execute("SELECT * FROM test1")
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+
+return 42
+$$;
+
+SELECT autonomous_test2();
+
+SELECT * FROM test1;
+
+
+CREATE FUNCTION autonomous_test3() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+    a.execute("DO $_$ BEGIN RAISE NOTICE 'notice'; END $_$")
+    a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$")
+
+return 42
+$$;
+
+SELECT autonomous_test3();
+
+
+CREATE FUNCTION autonomous_test4() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+    a.execute("SET client_encoding TO SJIS")
+
+return 42
+$$;
+
+SELECT autonomous_test4();
+
+
+TRUNCATE test1;
+
+CREATE FUNCTION autonomous_test5() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+    plan = a.prepare("INSERT INTO test1 (a, b) VALUES ($1, $2)", ["int4", "text"])
+    a.execute_prepared(plan, [1, "one"])
+    a.execute_prepared(plan, [2, "two"])
+
+return 42
+$$;
+
+SELECT autonomous_test5();
+
+SELECT * FROM test1;
+
+
+TRUNCATE test1;
+
+CREATE FUNCTION autonomous_test6() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+    plan = a.prepare("INSERT INTO test1 (a) VALUES (i)", {"i": "int4"})
+    a.execute_prepared(plan, [1])
+    a.execute_prepared(plan, [2])
+
+return 42
+$$;
+
+SELECT autonomous_test6();
+
+SELECT * FROM test1;
+
+
+TRUNCATE test1;
+
+CREATE FUNCTION autonomous_test7() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+        a.execute("BEGIN")
+        plan = a.prepare("INSERT INTO test1 (a) VALUES ($1)", ["int4"])
+        a.execute_prepared(plan, [11])
+        plan = a.prepare("SELECT * FROM test1")
+        rv = a.execute_prepared(plan, [])
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+
+return 42
+$$;
+
+SELECT autonomous_test7();
+
+SELECT * FROM test1;
+
+
+CREATE FUNCTION autonomous_test8() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.autonomous() as a:
+        a.execute("BEGIN")
+
+return 42
+$$;
+
+SELECT autonomous_test8();
+
+
+DROP TABLE test1;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to