On 20 May 2016 at 15:35, Craig Ringer <cr...@2ndquadrant.com> wrote:

>
> You can, however, omit Sync from between messages and send a series of
> protocol messages, like
>
> Parse/Bind/Execute/Bind/Execute/Bind/Execute/Sync
>
> to avoid round-trip overheads.
>
>
I implemented what I think is a pretty solid proof of concept of this for
kicks this evening. Attached, including basic test program. Patch attached.
The performance difference over higher latency links is huge, see below.

Demo/test program in src/test/examples/testlibpqbatch.c.

github: https://github.com/2ndQuadrant/postgres/tree/dev/libpq-async-batch



I still need to add the logic for handling an error during a batch by
discarding all input until the next Sync, but otherwise I think it's pretty
reasonable.

The time difference for 10k inserts on the local host over a unix socket
shows a solid improvement:

batch insert elapsed:      0.244293s
sequential insert elapsed: 0.375402s

... but over, say, a connection to a random AWS RDS instance fired up for
the purpose that lives about 320ms away the difference is huge:

batch insert elapsed:      9.029995s
sequential insert elapsed: (I got bored after 10 minutes; it should take a
bit less then an hour based on the latency numbers)

With 500 rows on the remote AWS RDS instance, once the I/O quota is already
saturated:

batch insert elapsed:      1.229024s
sequential insert elapsed: 156.962180s

which is an improvement by a factor of over 120

I didn't compare vs COPY. I'm sure COPY will be faster, but COPY doesn't
let you do INSERT ... ON CONFLICT, do UPDATE, do DELETE, etc. Not without
temp tables and a bunch of hoop jumping anyway. If COPY solved everything
there'd be no point having pipelining.

No docs yet, but if folks think the interface is reasonable I can add them
easily since the comments on each of the new functoins should be easy to
adapt into the SGML docs.

With a bit of polishing I think this can probably go in the next CF, though
I only wrote it as an experiment. Can I get opinions on the API?

The TL;DR API, using the usual async libpq routines, is:


PQbeginBatchMode(conn);

PQsendQueryParams(conn, "BEGIN", 0, NULL, NULL, NULL, NULL, 0);

PQsendPrepare(conn, "my_update", "UPDATE ...");

PQsetnonblocking(conn, 1);

while (!all_responses_received)
{
   select(...)

   if (can-write)
   {
     if (app-has-more-data-to-send)
     {
       PQsendQueryPrepared(conn, "my_update", params-go-here);
     }
     else if (havent-sent-commit-yet)
     {
       PQsendQueryParams(conn, "COMMIT", ...);
     }
     else if (havent-sent-endbatch-yet)
     {
       PqEndBatch(conn);
     }
     PQflush(conn);
   }

   if (can-read)
   {
     PQconsumeInput(conn);
     if (PQisBusy(conn))
       continue;
     res = PQgetResult(conn);
     if (res == NULL)
     {
       PQgetNextQuery(conn);
       continue;
     }
     /* process results in the same order we sent the commands */
     /* client keeps track of that, libpq just supplies the results */
     ...
   }
}

PQendBatch(conn);




Note that:

* PQsendQuery cannot be used as it uses simple query protocol, use
PQsendQueryParams instead;
* Batch supports PQsendQueryParams, PQsendPrepare, PQsendQueryPrepared,
PQsendDescribePrepared, PQsendDescribePortal;
* You don't call PQgetResult after dispatching each query
* Multiple batches may be pipelined, you don't have to wait for one to end
to start another (an advantage over JDBC's API)
* non-blocking mode isn't required, but is strongly advised

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From f0ca25bdc2bacf65530e4f180fdfc7c219866541 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Fri, 20 May 2016 12:45:18 +0800
Subject: [PATCH] Draft of libpq async pipelining support

Now with test in src/test/examples/testlibpqbatch.c
---
 src/interfaces/libpq/exports.txt    |   6 +
 src/interfaces/libpq/fe-connect.c   |  16 +
 src/interfaces/libpq/fe-exec.c      | 540 ++++++++++++++++++++++++++--
 src/interfaces/libpq/fe-protocol2.c |   6 +
 src/interfaces/libpq/fe-protocol3.c |  13 +-
 src/interfaces/libpq/libpq-fe.h     |  12 +-
 src/interfaces/libpq/libpq-int.h    |  36 +-
 src/test/examples/Makefile          |   2 +-
 src/test/examples/testlibpqbatch.c  | 690 ++++++++++++++++++++++++++++++++++++
 9 files changed, 1278 insertions(+), 43 deletions(-)
 create mode 100644 src/test/examples/testlibpqbatch.c

diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 21dd772..e297c4b 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -171,3 +171,9 @@ PQsslAttributeNames       168
 PQsslAttribute            169
 PQsetErrorContextVisibility 170
 PQresultVerboseErrorMessage 171
+PQisInBatchMode           172
+PQqueriesInBatch          173
+PQbeginBatchMode          174
+PQendBatchMode            175
+PQendBatch                176
+PQgetNextQuery            177
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 9b2839b..78154e2 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2949,6 +2949,7 @@ static void
 closePGconn(PGconn *conn)
 {
 	PGnotify   *notify;
+	PGcommandQueueEntry *queue;
 	pgParameterStatus *pstatus;
 
 	/*
@@ -2995,6 +2996,21 @@ closePGconn(PGconn *conn)
 		free(prev);
 	}
 	conn->notifyHead = conn->notifyTail = NULL;
+	queue = conn->cmd_queue_head;
+	while (queue != NULL)
+	{
+		PGcommandQueueEntry *prev = queue;
+		queue = queue->next;
+		free(prev);
+	}
+	conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
+	queue = conn->cmd_queue_recycle;
+	{
+		PGcommandQueueEntry *prev = queue;
+		queue = queue->next;
+		free(prev);
+	}
+	conn->cmd_queue_recycle = NULL;
 	pstatus = conn->pstatus;
 	while (pstatus != NULL)
 	{
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 2621767..c12cb2c 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -39,7 +39,9 @@ char	   *const pgresStatus[] = {
 	"PGRES_NONFATAL_ERROR",
 	"PGRES_FATAL_ERROR",
 	"PGRES_COPY_BOTH",
-	"PGRES_SINGLE_TUPLE"
+	"PGRES_SINGLE_TUPLE",
+	"PGRES_BATCH_OK",
+	"PGRES_BATCH_ABORTED"
 };
 
 /*
@@ -69,6 +71,9 @@ static PGresult *PQexecFinish(PGconn *conn);
 static int PQsendDescribe(PGconn *conn, char desc_type,
 			   const char *desc_target);
 static int	check_field_number(const PGresult *res, int field_num);
+static PGcommandQueueEntry* PQmakePipelinedCommand(PGconn *conn);
+static void PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry);
+static void PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry);
 
 
 /* ----------------
@@ -1090,7 +1095,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 		conn->next_result = conn->result;
 		conn->result = res;
 		/* And mark the result ready to return */
-		conn->asyncStatus = PGASYNC_READY;
+		conn->asyncStatus = PGASYNC_READY_MORE;
 	}
 
 	return 1;
@@ -1113,6 +1118,13 @@ fail:
 int
 PQsendQuery(PGconn *conn, const char *query)
 {
+	if (conn->in_batch)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+				  libpq_gettext("cannot PQsendQuery in batch mode, use PQsendQueryParams\n"));
+		return false;
+	}
+
 	if (!PQsendQueryStart(conn))
 		return 0;
 
@@ -1211,9 +1223,29 @@ PQsendPrepare(PGconn *conn,
 			  const char *stmtName, const char *query,
 			  int nParams, const Oid *paramTypes)
 {
+	PGcommandQueueEntry *pipeCmd = NULL;
+	char 	   **last_query;
+	PGQueryClass *queryclass;
+
 	if (!PQsendQueryStart(conn))
 		return 0;
 
+	if (conn->in_batch)
+	{
+		pipeCmd = PQmakePipelinedCommand(conn);
+
+		if (pipeCmd == NULL)
+			return 0; /* error msg already set */
+
+		last_query = &pipeCmd->query;
+		queryclass = &pipeCmd->queryclass;
+	}
+	else
+	{
+		last_query = &conn->last_query;
+		queryclass = &conn->queryclass;
+	}
+
 	/* check the arguments */
 	if (!stmtName)
 	{
@@ -1269,18 +1301,21 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (!conn->in_batch)
+	{
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are doing just a Parse */
-	conn->queryclass = PGQUERY_PREPARE;
+	*queryclass = PGQUERY_PREPARE;
 
 	/* and remember the query text too, if possible */
 	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	if (*last_query)
+		free(*last_query);
+	*last_query = strdup(query);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1290,10 +1325,14 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->in_batch)
+		PQappendPipelinedCommand(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	PQrecyclePipelinedCommand(conn, pipeCmd);
 	pqHandleSendFailure(conn);
 	return 0;
 }
@@ -1340,6 +1379,81 @@ PQsendQueryPrepared(PGconn *conn,
 						   resultFormat);
 }
 
+/* Get a new command queue entry, allocating it if required. Doesn't add it to
+ * the tail of the queue yet, use PQappendPipelinedCommand once the command has
+ * been written for that. If a command fails once it's called this, it should
+ * use PQrecyclePipelinedCommand to put it on the freelist or release it.
+ *
+ * If allocation fails sets the error message and returns null.
+ */
+static PGcommandQueueEntry*
+PQmakePipelinedCommand(PGconn *conn)
+{
+	PGcommandQueueEntry *entry;
+
+	if (conn->cmd_queue_recycle == NULL)
+	{
+		entry = (PGcommandQueueEntry*) malloc(sizeof(PGcommandQueueEntry));
+		if (entry == NULL)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("out of memory\n"));
+			return NULL;
+		}
+	}
+	else
+	{
+		entry = conn->cmd_queue_recycle;
+		conn->cmd_queue_recycle = entry->next;
+	}
+	entry->next = NULL;
+	entry->query = NULL;
+
+	return entry;
+}
+
+/* Append a precreated command queue entry to the queue after it's been
+ * sent successfully.
+ */
+static void
+PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry)
+{
+	if (conn->cmd_queue_head == NULL)
+		conn->cmd_queue_head = entry;
+	else
+		conn->cmd_queue_tail->next = entry;
+	conn->cmd_queue_tail = entry;
+}
+
+/* Push a command queue entry onto the freelist. It must be a dangling entry
+ * with null next pointer and not referenced by any other entry's next pointer.
+ */
+static void
+PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry)
+{
+	if (entry == NULL)
+		return;
+	if (entry->next != NULL)
+	{
+		fprintf(stderr, "tried to recycle non-dangling command queue entry");
+		abort();
+	}
+	entry->next = conn->cmd_queue_recycle;
+	conn->cmd_queue_recycle = entry;
+}
+
+/* Set up for processing a new query's results */
+static void
+PQstartProcessingNewQuery(PGconn *conn)
+{
+	/* initialize async result-accumulation state */
+	conn->result = NULL;
+	conn->next_result = NULL;
+
+	/* reset single-row processing mode */
+	conn->singleRowMode = false;
+}
+
 /*
  * Common startup code for PQsendQuery and sibling routines
  */
@@ -1359,20 +1473,52 @@ PQsendQueryStart(PGconn *conn)
 						  libpq_gettext("no connection to the server\n"));
 		return false;
 	}
-	/* Can't send while already busy, either. */
-	if (conn->asyncStatus != PGASYNC_IDLE)
+
+	/* Can't send while already busy, either, unless enqueuing for later */
+	if (conn->asyncStatus != PGASYNC_IDLE && !conn->in_batch)
 	{
 		printfPQExpBuffer(&conn->errorMessage,
 				  libpq_gettext("another command is already in progress\n"));
 		return false;
 	}
 
-	/* initialize async result-accumulation state */
-	conn->result = NULL;
-	conn->next_result = NULL;
-
-	/* reset single-row processing mode */
-	conn->singleRowMode = false;
+	if (conn->in_batch)
+	{
+		/* When enqueuing a message we don't change much of the connection
+		 * state since it's already in use for the current command. The
+		 * connection state will get updated when PQgetNextQuery(...) advances
+		 * to start processing the queued message.
+		 *
+		 * Just make sure we can safely enqueue given the current connection
+		 * state. We can enqueue behind another queue item, or behind a
+		 * non-queue command (one that sends its own sync), but we can't
+		 * enqueue if the connection is in a copy state.
+		 */
+		switch (conn->asyncStatus)
+		{
+			case PGASYNC_QUEUED:
+			case PGASYNC_READY:
+			case PGASYNC_READY_MORE:
+			case PGASYNC_BUSY:
+				/* ok to queue */
+				break;
+			case PGASYNC_COPY_IN:
+			case PGASYNC_COPY_OUT:
+			case PGASYNC_COPY_BOTH:
+				printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("cannot queue commands during COPY\n"));
+				return false;
+			case PGASYNC_IDLE:
+				fprintf(stderr, "internal error, idle state in batch mode");
+				abort();
+				break;
+		}
+	}
+	else
+	{
+		/* This command's results will come in immediately */
+		PQstartProcessingNewQuery(conn);
+	}
 
 	/* ready to send command message */
 	return true;
@@ -1397,6 +1543,10 @@ PQsendQueryGuts(PGconn *conn,
 				int resultFormat)
 {
 	int			i;
+	PGcommandQueueEntry *pipeCmd = NULL;
+	char 	   **last_query;
+	PGQueryClass *queryclass;
+
 
 	/* This isn't gonna work on a 2.0 server */
 	if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
@@ -1406,6 +1556,23 @@ PQsendQueryGuts(PGconn *conn,
 		return 0;
 	}
 
+	if (conn->in_batch)
+	{
+		pipeCmd = PQmakePipelinedCommand(conn);
+
+		if (pipeCmd == NULL)
+			return 0; /* error msg already set */
+
+		last_query = &pipeCmd->query;
+		queryclass = &pipeCmd->queryclass;
+	}
+	else
+	{
+		last_query = &conn->last_query;
+		queryclass = &conn->queryclass;
+	}
+
+
 	/*
 	 * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
 	 * using specified statement name and the unnamed portal.
@@ -1518,22 +1685,25 @@ PQsendQueryGuts(PGconn *conn,
 		pqPutMsgEnd(conn) < 0)
 		goto sendFailed;
 
-	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (!conn->in_batch)
+	{
+		/* construct the Sync message */
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are using extended query protocol */
-	conn->queryclass = PGQUERY_EXTENDED;
+	*queryclass = PGQUERY_EXTENDED;
 
 	/* and remember the query text too, if possible */
 	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
+	if (*last_query)
+		free(*last_query);
 	if (command)
-		conn->last_query = strdup(command);
+		*last_query = strdup(command);
 	else
-		conn->last_query = NULL;
+		*last_query = NULL;
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1543,10 +1713,15 @@ PQsendQueryGuts(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->in_batch)
+		PQappendPipelinedCommand(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 
 sendFailed:
+	PQrecyclePipelinedCommand(conn, pipeCmd);
 	pqHandleSendFailure(conn);
 	return 0;
 }
@@ -1578,6 +1753,9 @@ pqHandleSendFailure(PGconn *conn)
 	 * but it prevents buffer bloat if there's a lot of data available.)
 	 */
 	parseInput(conn);
+
+	/* TODO: handle pipelined queries by sending a sync and flushing until
+	 * the next sync?? */
 }
 
 /*
@@ -1673,6 +1851,245 @@ PQisBusy(PGconn *conn)
 	return conn->asyncStatus == PGASYNC_BUSY;
 }
 
+/* PQisInBatchMode
+ *   Return true if currently in batch mode
+ */
+int
+PQisInBatchMode(PGconn *conn)
+{
+	if (!conn)
+		return FALSE;
+	
+	return conn->in_batch;
+}
+
+/* PQqueriesInBatch
+ *   Return true if there are queries currently pending in batch mode
+ */
+int
+PQqueriesInBatch(PGconn *conn)
+{
+	if (!PQisInBatchMode(conn))
+		return false;
+	
+	return conn->cmd_queue_head != NULL;
+}
+
+/* Put an idle connection in batch mode. Commands submitted after this
+ * can be pipelined on the connection, there's no requirement to wait for
+ * one to finish before the next is dispatched.
+ *
+ * COPY is not permitted in batch mode.
+ *
+ * A set of commands is terminated by a PQendBatch. Multiple sets of batched
+ * commands may be sent while in batch mode. Batch mode can be exited by
+ * calling PQendBatchMode() once all results are processed.
+ *
+ * This doesn't actually send anything on the wire, it just puts libpq
+ * into a state where it can pipeline work.
+ */
+int
+PQbeginBatchMode(PGconn *conn)
+{
+	if (!conn)
+		return false;
+
+	if (conn->in_batch)
+		return true;
+
+	if (conn->asyncStatus != PGASYNC_IDLE)
+		return false;
+
+	conn->in_batch = true;
+	conn->asyncStatus = PGASYNC_QUEUED;
+
+	return true;
+}
+
+/* End batch mode and return to normal command mode.
+ *
+ * Has no effect unless the client has processed all results
+ * from all outstanding batches and the connection is idle,
+ * i.e. PQisInBatch() && 
+ *
+ * Returns true if batch mode ended.
+ */
+int
+PQendBatchMode(PGconn *conn)
+{
+	if (!conn)
+		return false;
+
+	if (!conn->in_batch)
+		return true;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_IDLE:
+			fprintf(stderr, "internal error, IDLE in batch mode");
+			abort();
+			break;
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			fprintf(stderr, "internal error, COPY in batch mode");
+			abort();
+			break;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+			/* can't end batch while busy */
+			return false;
+		case PGASYNC_QUEUED:
+			break;
+	}
+
+	/* still work to process */
+	if (conn->cmd_queue_head != NULL)
+		return false;
+
+	conn->in_batch = false;
+	conn->asyncStatus = PGASYNC_IDLE;
+
+	return true;
+}
+
+/* End a batch submission by sending a protocol sync. The connection will
+ * remain in batch mode and unavailable for new non-batch commands until all
+ * results from the batch are processed by the client.
+ *
+ * It's legal to start submitting another batch immediately, without waiting
+ * for the results of the current batch. There's no need to end batch mode
+ * and start it again.
+ *
+ * If a command in a batch fails, every subsequent command up to and including
+ * the PQendBatch command result gets set to PGRES_BATCH_ABORTED state. If the
+ * whole batch is processed without error, a PGresult with PGRES_BATCH_OK is
+ * produced.
+ */
+int
+PQendBatch(PGconn *conn)
+{
+	PGcommandQueueEntry *entry;
+
+	if (!conn)
+		return false;
+
+	if (!conn->in_batch)
+		return false;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_IDLE:
+			fprintf(stderr, "internal error, IDLE in batch mode");
+			abort();
+			break;
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			fprintf(stderr, "internal error, COPY in batch mode");
+			abort();
+			break;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+		case PGASYNC_QUEUED:
+			/* can send sync to end this batch of cmds */
+			break;
+	}
+
+	entry = PQmakePipelinedCommand(conn);
+	entry->queryclass = PGQUERY_SYNC;
+	entry->query = NULL;
+
+	/* construct the Sync message */
+	if (pqPutMsgStart('S', false, conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+		goto sendFailed;
+
+	PQappendPipelinedCommand(conn, entry);
+
+	return true;
+
+sendFailed:
+	PQrecyclePipelinedCommand(conn, entry);
+	pqHandleSendFailure(conn);
+	return false;
+}
+
+/* PQgetNextQuery
+ *   In batch mode, start processing the next query in the queue.
+ *
+ * Returns true if the next query was popped from the queue and can
+ * be processed by PQconsumeInput, PQgetResult, etc.
+ *
+ * Returns false if the current query isn't done yet, the connection
+ * is not in a batch, or there are no more queries to process.
+ */
+int
+PQgetNextQuery(PGconn *conn)
+{
+	PGcommandQueueEntry *next_query;
+
+	if (!conn)
+		return FALSE;
+
+	if (!conn->in_batch)
+		return false;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			fprintf(stderr, "internal error, COPY in batch mode");
+			abort();
+			break;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+			/* client still has to process current query or results */
+			return false;
+			break;
+		case PGASYNC_IDLE:
+			fprintf(stderr, "internal error, idle in batch mode");
+			abort();
+			break;
+		case PGASYNC_QUEUED:
+			/* next query please */
+			break;
+	}
+
+	if (conn->cmd_queue_head == NULL)
+	{
+		/* In batch mode but nothing left on the queue; caller can submit
+		 * more work or PQendBatchMode() now. */
+		return false;
+	}
+
+	/* Pop the next query from the queue and set up the connection state
+	 * as if it'd just been dispatched from a non-batched call */
+	next_query = conn->cmd_queue_head;
+	conn->cmd_queue_head = next_query->next;
+	next_query->next = NULL;
+
+	PQstartProcessingNewQuery(conn);
+
+	conn->last_query = next_query->query;
+	next_query->query = NULL;
+	conn->queryclass = next_query->queryclass;
+
+	PQrecyclePipelinedCommand(conn, next_query);
+
+	/* Allow parsing of the query */
+	conn->asyncStatus = PGASYNC_BUSY;
+
+	/* Parse any available data */
+	parseInput(conn);
+
+	return true;
+}
+
 
 /*
  * PQgetResult
@@ -1718,6 +2135,8 @@ PQgetResult(PGconn *conn)
 			/*
 			 * conn->errorMessage has been set by pqWait or pqReadData. We
 			 * want to append it to any already-received error message.
+			 *
+			 * TODO: handle purging the queue here
 			 */
 			pqSaveErrorResult(conn);
 			conn->asyncStatus = PGASYNC_IDLE;
@@ -1732,13 +2151,33 @@ PQgetResult(PGconn *conn)
 	switch (conn->asyncStatus)
 	{
 		case PGASYNC_IDLE:
+		case PGASYNC_QUEUED:
 			res = NULL;			/* query is complete */
 			break;
 		case PGASYNC_READY:
 			res = pqPrepareAsyncResult(conn);
+			if (conn->in_batch)
+			{
+				/* batched queries aren't followed by a Sync to put us back in
+				 * PGASYNC_IDLE state, and when we do get a sync we could still
+				 * have another batch coming after this one.
+				 *
+				 * The connection isn't idle since we can't submit new
+				 * nonbatched commands. It isn't also busy since the current
+				 * command is done and we need to process a new one.
+				 */
+				conn->asyncStatus = PGASYNC_QUEUED;
+			}
+			else
+			{
+				/* Set the state back to BUSY, allowing parsing to proceed. */
+				conn->asyncStatus = PGASYNC_BUSY;
+			}
+			break;
+		case PGASYNC_READY_MORE:
+			res = pqPrepareAsyncResult(conn);
 			/* Set the state back to BUSY, allowing parsing to proceed. */
 			conn->asyncStatus = PGASYNC_BUSY;
-			break;
 		case PGASYNC_COPY_IN:
 			res = getCopyResult(conn, PGRES_COPY_IN);
 			break;
@@ -1915,6 +2354,13 @@ PQexecStart(PGconn *conn)
 	if (!conn)
 		return false;
 
+	if (conn->asyncStatus == PGASYNC_QUEUED || conn->in_batch)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+		  libpq_gettext("cannot PQexec in batch mode\n"));
+		return false;
+	}
+
 	/*
 	 * Silently discard any prior query result that application didn't eat.
 	 * This is probably poor design, but it's here for backward compatibility.
@@ -2109,6 +2555,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
 static int
 PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 {
+	PGcommandQueueEntry *pipeCmd = NULL;
+	PGQueryClass *queryclass;
+
 	/* Treat null desc_target as empty string */
 	if (!desc_target)
 		desc_target = "";
@@ -2124,6 +2573,20 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		return 0;
 	}
 
+	if (conn->in_batch)
+	{
+		pipeCmd = PQmakePipelinedCommand(conn);
+
+		if (pipeCmd == NULL)
+			return 0; /* error msg already set */
+
+		queryclass = &pipeCmd->queryclass;
+	}
+	else
+	{
+		queryclass = &conn->queryclass;
+	}
+
 	/* construct the Describe message */
 	if (pqPutMsgStart('D', false, conn) < 0 ||
 		pqPutc(desc_type, conn) < 0 ||
@@ -2132,15 +2595,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (!conn->in_batch)
+	{
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are doing a Describe */
-	conn->queryclass = PGQUERY_DESCRIBE;
+	*queryclass = PGQUERY_DESCRIBE;
 
 	/* reset last-query string (not relevant now) */
-	if (conn->last_query)
+	if (conn->last_query && !conn->in_batch)
 	{
 		free(conn->last_query);
 		conn->last_query = NULL;
@@ -2154,10 +2620,14 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->in_batch)
+		PQappendPipelinedCommand(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	PQrecyclePipelinedCommand(conn, pipeCmd);
 	pqHandleSendFailure(conn);
 	return 0;
 }
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index f1b90f3..e824f36 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -412,6 +412,12 @@ pqParseInput2(PGconn *conn)
 {
 	char		id;
 
+	if (conn->asyncStatus == PGASYNC_QUEUED || conn->in_batch)
+	{
+		fprintf(stderr, "internal error, attempt to read v2 protocol in batch mode");
+		abort();
+	}
+
 	/*
 	 * Loop to parse successive complete messages available in the buffer.
 	 */
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 0b8c62f..30c8dfd 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -220,10 +220,17 @@ pqParseInput3(PGconn *conn)
 						return;
 					conn->asyncStatus = PGASYNC_READY;
 					break;
-				case 'Z':		/* backend is ready for new query */
+				case 'Z':		/* sync response, backend is ready for new query */
 					if (getReadyForQuery(conn))
 						return;
-					conn->asyncStatus = PGASYNC_IDLE;
+					if (conn->in_batch)
+					{
+						conn->result = PQmakeEmptyPGresult(conn,
+												PGRES_BATCH_OK);
+						conn->asyncStatus = PGASYNC_READY;
+					}
+					else
+						conn->asyncStatus = PGASYNC_IDLE;
 					break;
 				case 'I':		/* empty query */
 					if (conn->result == NULL)
@@ -305,7 +312,7 @@ pqParseInput3(PGconn *conn)
 						 * parsing until the application accepts the current
 						 * result.
 						 */
-						conn->asyncStatus = PGASYNC_READY;
+						conn->asyncStatus = PGASYNC_READY_MORE;
 						return;
 					}
 					break;
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 9ca0756..a210e3e 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -91,7 +91,9 @@ typedef enum
 	PGRES_NONFATAL_ERROR,		/* notice or warning message */
 	PGRES_FATAL_ERROR,			/* query failed */
 	PGRES_COPY_BOTH,			/* Copy In/Out data transfer in progress */
-	PGRES_SINGLE_TUPLE			/* single tuple from larger resultset */
+	PGRES_SINGLE_TUPLE,			/* single tuple from larger resultset */
+	PGRES_BATCH_OK,				/* successful end of a batch of commands */
+	PGRES_BATCH_ABORTED,		/* Command didn't run because of an abort earlier in a batch */
 } ExecStatusType;
 
 typedef enum
@@ -421,6 +423,14 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
 
+/* Routines for batch mode management */
+extern int PQisInBatchMode(PGconn *conn);
+extern int PQqueriesInBatch(PGconn *conn);
+extern int PQbeginBatchMode(PGconn *conn);
+extern int PQendBatchMode(PGconn *conn);
+extern int PQendBatch(PGconn *conn);
+extern int PQgetNextQuery(PGconn *conn);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 1183323..6caed9f 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -217,10 +217,13 @@ typedef enum
 {
 	PGASYNC_IDLE,				/* nothing's happening, dude */
 	PGASYNC_BUSY,				/* query in progress */
-	PGASYNC_READY,				/* result ready for PQgetResult */
+	PGASYNC_READY,				/* query done, waiting for client to fetch result */
+	PGASYNC_READY_MORE,			/* query done, waiting for client to fetch result,
+								   More results expected from this query */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
-	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
+	PGASYNC_COPY_BOTH,			/* Copy In/Out data transfer in progress */
+	PGASYNC_QUEUED				/* Current query done, more in queue */
 } PGAsyncStatusType;
 
 /* PGQueryClass tracks which query protocol we are now executing */
@@ -229,7 +232,8 @@ typedef enum
 	PGQUERY_SIMPLE,				/* simple Query protocol (PQexec) */
 	PGQUERY_EXTENDED,			/* full Extended protocol (PQexecParams) */
 	PGQUERY_PREPARE,			/* Parse only (PQprepare) */
-	PGQUERY_DESCRIBE			/* Describe Statement or Portal */
+	PGQUERY_DESCRIBE,			/* Describe Statement or Portal */
+    PGQUERY_SYNC				/* A protocol sync to end a batch */
 } PGQueryClass;
 
 /* PGSetenvStatusType defines the state of the PQSetenv state machine */
@@ -292,6 +296,22 @@ typedef struct pgDataValue
 	const char *value;			/* data value, without zero-termination */
 } PGdataValue;
 
+/* An entry in the pending command queue. Used by batch mode to keep track
+ * of the expected results of future commands we've dispatched.
+ *
+ * Note that entries in this list are reused by being zeroed and appended to
+ * the tail when popped off the head. The entry with null next pointer is not
+ * the end of the list of expected commands, that's the tail pointer in
+ * pg_conn.
+ */
+typedef struct pgCommandQueueEntry
+{
+  PGQueryClass	queryclass;	/* Query type; PGQUERY_SYNC for sync msg */
+  char		   *query;		/* SQL command, or NULL if unknown */
+  struct pgCommandQueueEntry *next;
+} PGcommandQueueEntry;
+
+
 /*
  * PGconn stores all the state data associated with a single connection
  * to a backend.
@@ -356,6 +376,7 @@ struct pg_conn
 	bool		options_valid;	/* true if OK to attempt connection */
 	bool		nonblocking;	/* whether this connection is using nonblock
 								 * sending semantics */
+	bool		in_batch;		/* connection is in batch (pipelined) mode */
 	bool		singleRowMode;	/* return current query result row-by-row? */
 	char		copy_is_binary; /* 1 = copy binary, 0 = copy text */
 	int			copy_already_done;		/* # bytes already returned in COPY
@@ -363,6 +384,15 @@ struct pg_conn
 	PGnotify   *notifyHead;		/* oldest unreported Notify msg */
 	PGnotify   *notifyTail;		/* newest unreported Notify msg */
 
+	/* The command queue
+	 *
+	 * head is the next pending cmd, tail is where we append new commands.
+	 * Freed entries for recycling go on the recycle linked list.
+	 */
+	PGcommandQueueEntry *cmd_queue_head;
+	PGcommandQueueEntry *cmd_queue_tail;
+    PGcommandQueueEntry *cmd_queue_recycle;
+
 	/* Connection data */
 	/* See PQconnectPoll() for how we use 'int' and not 'pgsocket'. */
 	pgsocket	sock;			/* FD for socket, PGINVALID_SOCKET if
diff --git a/src/test/examples/Makefile b/src/test/examples/Makefile
index 31da210..92a6faf 100644
--- a/src/test/examples/Makefile
+++ b/src/test/examples/Makefile
@@ -14,7 +14,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 override LDLIBS := $(libpq_pgport) $(LDLIBS)
 
 
-PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64
+PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64 testlibpqbatch
 
 all: $(PROGS)
 
diff --git a/src/test/examples/testlibpqbatch.c b/src/test/examples/testlibpqbatch.c
new file mode 100644
index 0000000..f82bce2
--- /dev/null
+++ b/src/test/examples/testlibpqbatch.c
@@ -0,0 +1,690 @@
+/*
+ * src/test/examples/testlibpqbatch.c
+ *
+ *
+ * testlibpqbatch.c
+ *		Test of batch execution funtionality
+ */
+
+#ifdef WIN32
+#include <windows.h>
+#endif
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include "libpq-fe.h"
+
+static void exit_nicely(PGconn *conn);
+static void simple_batch(PGconn *conn);
+static void test_disallowed_in_batch(PGconn *conn);
+static void batch_insert_pipelined(PGconn *conn, int n_rows);
+static void batch_insert_sequential(PGconn *conn, int n_rows);
+
+#ifndef VERBOSE
+#define VERBOSE 0
+#endif
+
+static const Oid INT4OID = 23;
+
+static void
+exit_nicely(PGconn *conn)
+{
+	PQfinish(conn);
+	exit(1);
+}
+
+static void
+simple_batch(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+
+	/*
+	 * Enter batch mode and dispatch a set of operations, which we'll then process
+	 * the results of as they come in.
+	 *
+	 * For a simple case we should be able to do this without interim processing
+	 * of results since our out buffer will give us enough slush to work with
+	 * and we won't block on sending. So blocking mode is fine.
+	 */
+	if (PQisnonblocking(conn))
+	{
+		fprintf(stderr, "Expected blocking connection mode\n");
+		goto fail;
+	}
+
+	if (!PQbeginBatchMode(conn))
+	{
+		fprintf(stderr, "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (!PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+			dummy_params, NULL, NULL, 0))
+	{
+		fprintf(stderr, "dispatching SELECT failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQendBatchMode(conn))
+	{
+		fprintf(stderr, "exiting batch mode with work in progress should fail, but succeeded\n");
+		goto fail;
+	}
+
+	if (!PQendBatch(conn))
+	{
+		fprintf(stderr, "Ending a batch failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	/* in batch mode we have to ask for the first result to be processed; until we
+	 * do PQgetResult will return null: */
+	if (PQgetResult(conn) != NULL)
+	{
+		fprintf(stderr, "PQgetResult returned something in a batch before first PQgetNextQuery() call\n");
+		goto fail;
+	}
+
+	if (!PQgetNextQuery(conn))
+	{
+		fprintf(stderr, "PQgetNextQuery() failed at first batch entry: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+	{
+		fprintf(stderr, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		fprintf(stderr, "Unexpected result code %s from first batch item\n",
+			PQresStatus(PQresultStatus(res)));
+		goto fail;
+	}
+
+	PQclear(res);
+
+	if (PQgetResult(conn) != NULL)
+	{
+		fprintf(stderr, "PQgetResult returned something extra after first result before PQgetNextQuery() call\n");
+		goto fail;
+	}
+
+	/* Even though we've processed the result there's still a sync to come and we
+	 * can't exit batch mode yet */
+	if (PQendBatchMode(conn))
+	{
+		fprintf(stderr, "exiting batch mode after query but before sync succeeded incorrectly\n");
+		goto fail;
+	}
+
+	/* should now get an explicit sync result */
+	if (!PQgetNextQuery(conn))
+	{
+		fprintf(stderr, "PQgetNextQuery() failed at sync after first batch entry: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+	{
+		fprintf(stderr, "PQgetResult returned null when sync result expected: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQresultStatus(res) != PGRES_BATCH_OK)
+	{
+		fprintf(stderr, "Unexpected result code %s instead of sync result, error: %s\n",
+			PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+		goto fail;
+	}
+
+	PQclear(res);
+
+	/* We're still in a batch... */
+	if (!PQisInBatchMode(conn))
+	{
+		fprintf(stderr, "Fell out of batch mode somehow\n");
+		goto fail;
+	}
+
+	/* until we end it, which we can safely do now */
+	if (!PQendBatchMode(conn))
+	{
+		fprintf(stderr, "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQisInBatchMode(conn))
+	{
+		fprintf(stderr, "exiting batch mode didn't seem to work\n");
+		goto fail;
+	}
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+static void
+test_disallowed_in_batch(PGconn *conn)
+{
+	PGresult   *res = NULL;
+
+	if (PQisnonblocking(conn))
+	{
+		fprintf(stderr, "Expected blocking connection mode: %u\n", __LINE__);
+		goto fail;
+	}
+
+	if (!PQbeginBatchMode(conn))
+	{
+		fprintf(stderr, "Unable to enter batch mode\n");
+		goto fail;
+	}
+
+	if (!PQisInBatchMode(conn))
+	{
+		fprintf(stderr, "Batch mode not activated properly\n");
+		goto fail;
+	}
+
+	/* PQexec should fail in batch mode */
+	res = PQexec(conn, "SELECT 1");
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+	{
+		fprintf(stderr, "PQexec should fail in batch mode but succeeded\n");
+		goto fail;
+	}
+
+	/* So should PQsendQuery */
+	if (PQsendQuery(conn, "SELECT 1") != 0)
+	{
+		fprintf(stderr, "PQsendQuery should fail in batch mode but succeeded\n");
+		goto fail;
+	}
+
+	/* Entering batch mode when already in batch mode is OK */
+	if (!PQbeginBatchMode(conn))
+	{
+		fprintf(stderr, "re-entering batch mode should be a no-op but failed\n");
+		goto fail;
+	}
+
+	if (PQisBusy(conn))
+	{
+		fprintf(stderr, "PQisBusy should return false when idle in batch, returned true\n");
+		goto fail;
+	}
+
+	/* ok, back to normal command mode */
+	if (!PQendBatchMode(conn))
+	{
+		fprintf(stderr, "couldn't exit idle empty batch mode\n");
+		goto fail;
+	}
+
+	if (PQisInBatchMode(conn))
+	{
+		fprintf(stderr, "Batch mode not terminated properly\n");
+		goto fail;
+	}
+
+	/* exiting batch mode when not in batch mode should be a no-op */
+	if (!PQendBatchMode(conn))
+	{
+		fprintf(stderr, "batch mode exit when not in batch mode should succeed but failed\n");
+		goto fail;
+	}
+
+	/* can now PQexec again */
+	res = PQexec(conn, "SELECT 1");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		fprintf(stderr, "PQexec should succeed after exiting batch mode but failed with: %s\n",
+			PQerrorMessage(conn));
+		goto fail;
+	}
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+/* max char length of an int32, plus sign and null terminator */
+#define MAXINTLEN 12
+
+/* State machine enums for batch insert */
+typedef enum BatchInsertStep
+{
+	BI_BEGIN_TX,
+	BI_DROP_TABLE,
+	BI_CREATE_TABLE,
+	BI_PREPARE,
+	BI_INSERT_ROWS,
+	BI_COMMIT_TX,
+	BI_SYNC,
+	BI_DONE
+} BatchInsertStep;
+
+static const char * const drop_table_sql
+	= "DROP TABLE IF EXISTS batch_demo";
+static const char * const create_table_sql
+	= "CREATE UNLOGGED TABLE batch_demo(id serial primary key, itemno integer);";
+static const char * const insert_sql
+	= "INSERT INTO batch_demo(itemno) VALUES ($1);";
+
+static void
+batch_insert_pipelined(PGconn *conn, int n_rows)
+{
+	PGresult   *res = NULL;
+	const char *insert_params[1];
+	Oid			insert_param_oids[1] = {INT4OID};
+	char		insert_param_0[MAXINTLEN];
+	BatchInsertStep send_step = BI_BEGIN_TX, recv_step = BI_BEGIN_TX;
+	int			rows_to_send, rows_to_receive;
+
+	insert_params[0] = &insert_param_0[0];
+
+	rows_to_send = rows_to_receive = n_rows;
+
+	/*
+	 * Do a batched insert into a table created at the start of the batch
+	 */
+	if (!PQbeginBatchMode(conn))
+	{
+		fprintf(stderr, "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (!PQsendQueryParams(conn, "BEGIN",
+		0, NULL, NULL, NULL, NULL, 0))
+	{
+		fprintf(stderr, "xact start failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+	#if VERBOSE
+	fprintf(stdout, "sent BEGIN\n");
+	#endif
+	send_step = BI_DROP_TABLE;
+
+	if (!PQsendQueryParams(conn, drop_table_sql,
+		0, NULL, NULL, NULL, NULL, 0))
+	{
+		fprintf(stderr, "dispatching DROP TABLE failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+	#if VERBOSE
+	fprintf(stdout, "sent DROP\n");
+	#endif
+	send_step = BI_CREATE_TABLE;
+
+	if (!PQsendQueryParams(conn, create_table_sql,
+		0, NULL, NULL, NULL, NULL, 0))
+	{
+		fprintf(stderr, "dispatching CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+	#if VERBOSE
+	fprintf(stdout, "sent CREATE\n");
+	#endif
+	send_step = BI_PREPARE;
+
+	if (!PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids))
+	{
+		fprintf(stderr, "dispatching PREPARE failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+	#if VERBOSE
+	fprintf(stdout, "sent PREPARE\n");
+	#endif
+	send_step = BI_INSERT_ROWS;
+
+	/* Now we start inserting. We'll be sending enough data that we could fill
+	 * our out buffer, so to avoid deadlocking we need to enter nonblocking
+	 * mode and consume input while we send more output. As results of each query are
+	 * processed we should pop them to allow processing of the next query. There's
+	 * no need to finish the batch before processing results.
+	 */
+	if (PQsetnonblocking(conn, 1) != 0)
+	{
+		fprintf(stderr, "failed to set nonblocking mode: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	while (recv_step != BI_DONE)
+	{
+		int			sock;
+		fd_set		input_mask;
+		fd_set		output_mask;
+
+		sock = PQsocket(conn);
+
+		if (sock < 0)
+			break;				/* shouldn't happen */
+
+		FD_ZERO(&input_mask);
+		FD_SET(sock, &input_mask);
+		FD_ZERO(&output_mask);
+		FD_SET(sock, &output_mask);
+
+		if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
+		{
+			fprintf(stderr, "select() failed: %s\n", strerror(errno));
+			exit_nicely(conn);
+		}
+
+		/* Process any results, so we keep the server's out buffer free flowing
+		 * and it can continue to process input */
+		if (FD_ISSET(sock, &input_mask))
+		{
+			PQconsumeInput(conn);
+
+			/* Read until we'd block if we tried to read */
+			while (!PQisBusy(conn) && recv_step < BI_DONE)
+			{
+				const char * cmdtag;
+				const char * description = NULL;
+				int status;
+				BatchInsertStep next_step;
+
+
+				res = PQgetResult(conn);
+
+				if (res == NULL)
+				{
+					/* No more results from this query, advance to the next result */
+					if (!PQgetNextQuery(conn))
+					{
+						fprintf(stderr, "Expected next query result but unable to dequeue: %s\n",
+								PQerrorMessage(conn));
+						goto fail;
+					}
+					#if VERBOSE
+					fprintf(stdout, "next query!\n");
+					#endif
+					continue;
+				}
+
+				status = PGRES_COMMAND_OK;
+				next_step = recv_step + 1;
+				switch (recv_step)
+				{
+					case BI_BEGIN_TX:
+						cmdtag = "BEGIN";
+						break;
+					case BI_DROP_TABLE:
+						cmdtag = "DROP TABLE";
+						break;
+					case BI_CREATE_TABLE:
+						cmdtag = "CREATE TABLE";
+						break;
+					case BI_PREPARE:
+						cmdtag = "";
+						description = "PREPARE";
+						break;
+					case BI_INSERT_ROWS:
+						cmdtag = "INSERT";
+						rows_to_receive --;
+						if (rows_to_receive > 0)
+							next_step = BI_INSERT_ROWS;
+						break;
+					case BI_COMMIT_TX:
+						cmdtag = "COMMIT";
+						break;
+					case BI_SYNC:
+						cmdtag = "";
+						description = "SYNC";
+						status = PGRES_BATCH_OK;
+						break;
+					case BI_DONE:
+						/* unreachable */
+						abort();
+				}
+				if (description == NULL)
+					description = cmdtag;
+
+				#if VERBOSE
+				fprintf(stderr, "At state %d (%s) expect tag '%s', result code %s, expect %d more rows, transition to %d\n",
+					recv_step, description, cmdtag, PQresStatus(status), rows_to_receive, next_step);
+				#endif
+
+				if (PQresultStatus(res) != status)
+				{
+					fprintf(stderr, "%s reported status %s, expected %s. Error msg is [%s]\n",
+						description, PQresStatus(PQresultStatus(res)), PQresStatus(status), PQerrorMessage(conn));
+					goto fail;
+				}
+				if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
+				{
+					fprintf(stderr, "%s expected command tag '%s', got '%s'\n",
+						description, cmdtag, PQcmdStatus(res));
+					goto fail;
+				}
+				#if VERBOSE
+				fprintf(stdout, "Got %s OK\n", cmdtag);
+				#endif
+				recv_step = next_step;
+
+				PQclear(res);
+			}
+		}
+
+		/* Write more rows and/or the end batch message, if needed */
+		if (FD_ISSET(sock, &output_mask))
+		{
+			PQflush(conn);
+
+			if (send_step == BI_INSERT_ROWS)
+			{
+				snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send);
+				insert_param_0[MAXINTLEN-1] = '\0';	
+				
+				if (PQsendQueryPrepared(conn, "my_insert",
+					1, insert_params, NULL, NULL, 0))
+				{
+					#if VERBOSE
+					fprintf(stdout, "sent row %d\n", rows_to_send);
+					#endif
+					rows_to_send --;
+					if (rows_to_send == 0)
+						send_step = BI_COMMIT_TX;
+				}
+				else
+				{
+					/* in nonblocking mode, so it's OK for an insert to fail to send */
+					fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
+						rows_to_send, PQerrorMessage(conn));
+				}
+			}
+			else if (send_step == BI_COMMIT_TX)
+			{
+				if (PQsendQueryParams(conn, "COMMIT",
+					0, NULL, NULL, NULL, NULL, 0))
+				{
+					#if VERBOSE
+					fprintf(stdout, "sent COMMIT\n");
+					#endif
+					send_step = BI_SYNC;
+				}
+				else
+				{
+					fprintf(stderr, "WARNING: failed to send commit: %s\n",
+						PQerrorMessage(conn));
+				}
+			} else if (send_step == BI_SYNC)
+			{
+				if (PQendBatch(conn))
+				{
+					#if VERBOSE
+					fprintf(stdout, "Dispatched end batch message\n");
+					#endif
+					send_step = BI_DONE;
+				}
+				else
+				{
+					fprintf(stderr, "WARNING: Ending a batch failed: %s\n",
+						PQerrorMessage(conn));
+				}
+			}
+		}
+
+	}
+
+	/* We've got the sync message and the batch should be done */
+	if (!PQendBatchMode(conn))
+	{
+		fprintf(stderr, "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	if (PQsetnonblocking(conn, 0) != 0)
+	{
+		fprintf(stderr, "failed to clear nonblocking mode: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+
+static void
+batch_insert_sequential(PGconn *conn, int nrows)
+{
+	PGresult *res = NULL;
+	const char *insert_params[1];
+	Oid			insert_param_oids[1] = {INT4OID};
+	char		insert_param_0[MAXINTLEN];
+
+	insert_params[0] = &insert_param_0[0];
+
+	res = PQexec(conn, "BEGIN");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "BEGIN failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+	PQclear(res);
+
+	res = PQexec(conn, drop_table_sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "DROP TABLE failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+	PQclear(res);
+
+	res =PQexec(conn, create_table_sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+	PQclear(res);
+
+	res = PQprepare(conn, "my_insert2", insert_sql, 1, insert_param_oids);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "prepare failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+	PQclear(res);
+
+	while (nrows > 0)
+	{
+		snprintf(&insert_param_0[0], MAXINTLEN, "%d", nrows);
+		insert_param_0[MAXINTLEN-1] = '\0';	
+		
+		res = PQexecPrepared(conn, "my_insert2",
+			1, insert_params, NULL, NULL, 0);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			fprintf(stderr, "INSERT failed: %s\n", PQerrorMessage(conn));
+			goto fail;
+		}
+		PQclear(res);
+		nrows --;
+	}
+
+	res = PQexec(conn, "COMMIT");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "COMMIT failed: %s\n", PQerrorMessage(conn));
+		goto fail;
+	}
+	PQclear(res);
+
+	return;
+
+fail:
+	PQclear(res);
+}
+
+
+int
+main(int argc, char **argv)
+{
+	const char *conninfo;
+	PGconn	   *conn;
+	struct timeval	start_time, end_time, elapsed_time;
+
+	/*
+	 * If the user supplies a parameter on the command line, use it as the
+	 * conninfo string; otherwise default to setting dbname=postgres and using
+	 * environment variables or defaults for all other connection parameters.
+	 */
+	if (argc > 1)
+		conninfo = argv[1];
+	else
+		conninfo = "dbname = postgres";
+
+	/* Make a connection to the database */
+	conn = PQconnectdb(conninfo);
+
+	/* Check to see that the backend connection was successfully made */
+	if (PQstatus(conn) != CONNECTION_OK)
+	{
+		fprintf(stderr, "Connection to database failed: %s\n",
+				PQerrorMessage(conn));
+		exit_nicely(conn);
+	}
+
+	test_disallowed_in_batch(conn);
+	simple_batch(conn);
+
+	gettimeofday(&start_time, NULL);
+	batch_insert_pipelined(conn, 10000);
+	gettimeofday(&end_time, NULL);
+	timersub(&end_time, &start_time, &elapsed_time);
+	printf("batch insert elapsed:      %ld.%06lds\n", elapsed_time.tv_sec, elapsed_time.tv_usec);
+
+	gettimeofday(&start_time, NULL);
+	batch_insert_sequential(conn, 10000);
+	gettimeofday(&end_time, NULL);
+	timersub(&end_time, &start_time, &elapsed_time);
+	printf("sequential insert elapsed: %ld.%06lds\n", elapsed_time.tv_sec, elapsed_time.tv_usec);
+
+	fprintf(stderr, "Done.\n");
+
+
+	/* close the connection to the database and cleanup */
+	PQfinish(conn);
+
+	return 0;
+}
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to