Here's a v25.

I made a few more changes to the docs per David's suggestions; I also
reordered the sections quite a bit.  It's now like this:
 * Batch Mode
   * Using Batch Mode
     * Issuing Queries
     * Processing Results
     * Error Handling
     * Interleaving Result Processing and Query Dispatch
     * Ending Batch Mode
   * Functions Associated with Batch Mode
   * When to Use Batching

To me as a reader, this makes more sense, but if you disagree, I think
we should discuss further changes.  (For example, maybe we should move
the "Functions" section at the end?)  The original had "When to Use
Batching" at the start, but it seemed to me that the points it's making
are not as critical as understanding what it is.

I reworked the test program to better fit the TAP model; I found that if
one test mecha failed in whatever way, the connection would be in a
weird state and cause the next test to also fail.  I changed so that it
runs one test and exits; then the t/001_libpq_async.pl file (hmm, need
to rename to 001_batch.pl I guess) calls it once for each test.
I adapted the test code to our code style.  I also removed the "timings"
stuff; I think that's something better left to pgbench.

(I haven't looked at Daniel's pgbench stuff yet, but I will do that
next.)

While looking at how the tests worked, I gave a hard stare at the new
libpq code and cleaned it up also.  There's a lot of minor changes, but
nothing truly substantial.  I moved the code around a lot, to keep
things where grouped together they belong.

I'm not 100% clear on things like PGconn->batch_status and how
PGconn->asyncStatus works.  Currently everything seems to work
correctly, but I'm worried that because we add new status values to
asyncStatus, some existing code might not be doing everything correctly
(for example when changing to/from ASYNC_BUSY in some cases, are we 100%
we shouldn't be changing to ASYNC_QUEUED?)


While looking this over I noticed a thread from 2014[1] where Matt
Newell tried to implement this stuff and apparently the main review
comment he got before abandoning the patch was that the user would like
a way to access the query that corresponded to each result.  The current
patch set does not address that need; the approach to this problem is
that it's on the application's head to keep track of this.  Honestly I
don't understand why it would be otherwise ... I'm not sure that it
makes sense to expect that the application is stupid enough that it
doesn't keep track in which order it sent things, but bright enough to
keep pointers to the queries it sent (??).  So this seems okay to me.
But added Heikki and Claudio to CC because of that old thread.


[1] https://postgr.es/m/2059418.jZQvL3lH90@obsidian

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 73c6a6ea4d..60477ca85c 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3061,6 +3061,30 @@ ExecStatusType PQresultStatus(const PGresult *res);
            </para>
           </listitem>
          </varlistentry>
+
+         <varlistentry id="libpq-pgres-batch-end">
+          <term><literal>PGRES_BATCH_END</literal></term>
+          <listitem>
+           <para>
+            The <structname>PGresult</structname> represents the end of a batch.
+            This status occurs only when batch mode has been selected.
+           </para>
+          </listitem>
+         </varlistentry>
+
+         <varlistentry id="libpq-pgres-batch-aborted">
+          <term><literal>PGRES_BATCH_ABORTED</literal></term>
+          <listitem>
+           <para>
+            The <structname>PGresult</structname> represents a batch that's
+            received an error from the server.  <function>PQgetResult</function>
+            must be called repeatedly, and it will return this status code,
+            until the end of the current batch, at which point it will return
+            <literal>PGRES_BATCH_END</literal> and normal processing can resume.
+           </para>
+          </listitem>
+         </varlistentry>
+
         </variablelist>
 
         If the result status is <literal>PGRES_TUPLES_OK</literal> or
@@ -4819,6 +4843,482 @@ int PQflush(PGconn *conn);
 
  </sect1>
 
+ <sect1 id="libpq-batch-mode">
+  <title>Batch Mode</title>
+
+  <indexterm zone="libpq-batch-mode">
+   <primary>libpq</primary>
+   <secondary>batch mode</secondary>
+  </indexterm>
+
+  <indexterm zone="libpq-batch-mode">
+   <primary>pipelining</primary>
+   <secondary>in libpq</secondary>
+  </indexterm>
+
+  <para>
+   <application>libpq</application> batch mode allows applications to
+   send a query without having to read the result of the previously
+   sent query.  Taking advantage of the batch mode, a client will wait
+   less for the server, since multiple queries/results can be sent/
+   received in a single network transaction.
+  </para>
+
+  <para>
+   While batch mode provides a significant performance boost, writing
+   clients using the batch mode is more complex because it involves
+   managing a queue of pending queries and finding which result
+   corresponds to which query in the queue.
+  </para>
+
+  <sect2 id="libpq-batch-using">
+   <title>Using Batch Mode</title>
+
+   <para>
+    To issue batches the application must switch a connection into batch mode.
+    Enter batch mode with <xref linkend="libpq-PQenterBatchMode"/>
+    or test whether batch mode is active with
+    <xref linkend="libpq-PQbatchStatus"/>.
+    In batch mode, only <link linkend="libpq-async">asynchronous operations</link>
+    are permitted, and <literal>COPY</literal> is not recommended as it
+    may trigger failure in batch processing.  Using any synchronous
+    command execution functions such as <function>PQfn</function>,
+    <function>PQexec</function> or one of its sibling functions are error
+    conditions.
+   </para>
+
+   <note>
+    <para>
+     It is best to use batch mode with <application>libpq</application> in
+     <link linkend="libpq-PQsetnonblocking">non-blocking mode</link>. If used
+     in blocking mode it is possible for a client/server deadlock to occur.
+      <footnote>
+       <para>
+        The client will block trying to send queries to the server, but the
+        server will block trying to send results to the client from queries
+        it has already processed. This only occurs when the client sends
+        enough queries to fill its output buffer and the server's receive
+        buffer before switching to processing input from the server,
+        but it's hard to predict exactly when that will happen.
+       </para>
+      </footnote>
+    </para>
+    <para>
+     Batch mode consumes more memory when send/receive is not done as required,
+     even in non-blocking mode.
+    </para>
+   </note>
+
+   <sect3 id="libpq-batch-sending">
+    <title>Issuing Queries</title>
+
+    <para>
+     After entering batch mode the application dispatches requests using
+     normal asynchronous <application>libpq</application> functions such as
+     <function>PQsendQueryParams</function>, <function>PQsendPrepare</function>,
+     <function>PQsendQueryPrepared</function>, <function>PQsendDescribePortal</function>,
+     <function>PQsendDescribePrepared</function>.
+     The asynchronous requests are followed by a
+     <xref linkend="libpq-PQbatchSendQueue"/>
+     call to mark the end of the batch. The client needs not
+     call <function>PQgetResult</function> immediately after
+     dispatching each operation.
+     <link linkend="libpq-batch-results">Result processing</link>
+     is handled separately.
+    </para>
+
+    <para>
+     The server executes statements, and returns results, in the order the
+     client sends them.  The server may begin executing the batch before all
+     commands in the batch are queued and the end of batch command is sent.
+     If any statement encounters an error the server aborts the current
+     transaction and skips processing the rest of the batch.
+     Query processing resumes after the end of the failed batch.
+    </para>
+
+    <para>
+     It's fine for one operation to depend on the results of a
+     prior one. One query may define a table that the next query in the same
+     batch uses; similarly, an application may create a named prepared statement
+     then execute it with later statements in the same batch.
+    </para>
+   </sect3>
+
+   <sect3 id="libpq-batch-results">
+    <title>Processing Results</title>
+
+    <para>
+     To process the result of one batch query, the application calls
+     <function>PQgetResult</function> repeatedly and handles each result
+     until <function>PQgetResult</function> returns null.
+     The result from the next batch query may then be retrieved using
+     <function>PQgetResult</function> again and the cycle repeated.
+     The application handles individual statement results as normal.
+     When the results of all the queries in the batch have been
+     returned, <function>PQgetResult</function> returns a result
+     containing the status value <literal>PGRES_BATCH_END</literal>.
+    </para>
+
+    <para>
+     The client may choose to defer result processing until the complete
+     batch has been sent, or interleave that with sending further batch
+     queries; see <xref linkend="libpq-batch-interleave" />.
+    </para>
+
+    <para>
+     To enter single-row mode, call <function>PQsetSingleRowMode</function>
+     before retrieving results with <function>PQgetResult</function>.
+     This mode selection is effective only for the query currently
+     being processed. For more information on the use of
+     <function>PQsetSingleRowMode</function>,
+     refer to <xref linkend="libpq-single-row-mode"/>.
+    </para>
+
+    <para>
+     <function>PQgetResult</function> behaves the same as for normal
+     asynchronous processing except that it may contain the new
+     <type>PGresult</type> types <literal>PGRES_BATCH_END</literal>
+     and <literal>PGRES_BATCH_ABORTED</literal>.
+     <literal>PGRES_BATCH_END</literal> is reported exactly once for each
+     <function>PQbatchSendQueue</function> call at the corresponding point in
+     the result stream and at no other time.
+     <literal>PGRES_BATCH_ABORTED</literal> is emitted during error handling;
+     see <xref linkend="libpq-batch-errors"/>.
+    </para>
+
+    <para>
+     <function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
+     operate as normal when processing batch results.
+    </para>
+
+    <para>
+     <application>libpq</application> does not provide any information to the
+     application about the query currently being processed (except that
+     <function>PQgetResult</function> returns null to indicate that we start
+     returning the results of next query). The application must keep track
+     of the order in which it sent queries and the expected results.
+     Applications will typically use a state machine or a FIFO queue for this.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-errors">
+    <title>Error Handling</title>
+
+    <para>
+     When a query in a batch causes an <literal>ERROR</literal> the server
+     skips processing all subsequent messages until the end-of-batch message.
+     The open transaction is aborted.
+    </para>
+
+    <para>
+     From the client perspective, after the client gets a
+     <literal>PGRES_FATAL_ERROR</literal> return from
+     <function>PQresultStatus</function> the batch is flagged as aborted.
+     <application>libpq</application> will report
+     <literal>PGRES_BATCH_ABORTED</literal> result for each remaining queued
+     operation in an aborted batch. The result for
+     <function>PQbatchSendQueue</function> is reported as
+     <literal>PGRES_BATCH_END</literal> to signal the end of the aborted batch
+     and resumption of normal result processing.
+    </para>
+
+    <para>
+     The client <emphasis>must</emphasis> process results with
+     <function>PQgetResult</function> during error recovery.
+    </para>
+
+    <para>
+     If the batch used an implicit transaction then operations that have
+     already executed are rolled back and operations that were queued for after
+     the failed operation are skipped entirely. The same behaviour holds if the
+     batch starts and commits a single explicit transaction (i.e. the first
+     statement is <literal>BEGIN</literal> and the last is
+     <literal>COMMIT</literal>) except that the session remains in an aborted
+     transaction state at the end of the batch. If a batch contains <emphasis>
+     multiple explicit transactions</emphasis>, all transactions that committed
+     prior to the error remain committed, the currently in-progress transaction
+     is aborted and all subsequent operations in the current and all later
+     transactions in the same batch are skipped completely.
+    </para>
+
+    <note>
+     <para>
+      The client must not assume that work is committed when it
+      <emphasis>sends</emphasis> a <literal>COMMIT</literal>, only when the
+      corresponding result is received to confirm the commit is complete.
+      Because errors arrive asynchronously the application needs to be able to
+      restart from the last <emphasis>received</emphasis> committed change and
+      resend work done after that point if something goes wrong.
+     </para>
+    </note>
+   </sect3>
+
+   <sect3 id="libpq-batch-interleave">
+    <title>Interleaving Result Processing and Query Dispatch</title>
+
+    <para>
+     To avoid deadlocks on large batches the client should be structured
+     around a non-blocking event loop using operating system facilities
+     such as <function>select</function>, <function>poll</function>,
+     <function>WaitForMultipleObjectEx</function>, etc.
+    </para>
+
+    <para>
+     The client application should generally maintain a queue of work
+     still to be dispatched and a queue of work that has been dispatched
+     but not yet had its results processed. When the socket is writable
+     it should dispatch more work. When the socket is readable it should
+     read results and process them, matching them up to the next entry in
+     its expected results queue.  Based on available memory, results from
+     socket should be read frequently and there's no need to wait till the
+     batch end to read the results.  Batches should be scoped to logical
+     units of work, usually (but not necessarily) one transaction per batch.
+     There's no need to exit batch mode and re-enter it between batches
+     or to wait for one batch to finish before sending the next.
+    </para>
+
+    <para>
+     An example using <function>select()</function> and a simple state
+     machine to track sent and received work is in
+     <filename>src/test/modules/test_libpq/testlibpqbatch.c</filename>
+     in the PostgreSQL source distribution.
+    </para>
+   </sect3>
+
+   <sect3 id="libpq-batch-end">
+    <title>Ending Batch Mode</title>
+
+    <para>
+     Once all dispatched commands have had their results processed and
+     the end batch result has been consumed the application may return
+     to non-batched mode with <xref linkend="libpq-PQexitBatchMode"/>.
+    </para>
+   </sect3>
+  </sect2>
+
+  <sect2 id="libpq-batch-functions">
+   <title>Functions Associated with Batch Mode</title>
+
+   <variablelist>
+
+    <varlistentry id="libpq-PQbatchStatus">
+     <term>
+      <function>PQbatchStatus</function>
+      <indexterm>
+       <primary>PQbatchStatus</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Returns current batch mode status of the <application>libpq</application>
+      connection.
+<synopsis>
+int PQbatchStatus(const PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       <function>PQbatchStatus</function> can return one of the following values:
+
+       <variablelist>
+        <varlistentry>
+         <term>
+          <literal>PQBATCH_MODE_ON</literal>
+         </term>
+         <listitem>
+          <para>
+           The <application>libpq</application> connection is in
+           batch mode.
+          </para>
+         </listitem>
+        </varlistentry>
+ 
+        <varlistentry>
+         <term>
+          <literal>PQBATCH_MODE_OFF</literal>
+         </term>
+         <listitem>
+          <para>
+           The <application>libpq</application> connection is
+           <emphasis>not</emphasis> in batch mode.
+          </para>
+         </listitem>
+        </varlistentry>
+ 
+        <varlistentry>
+         <term>
+          <literal>PQBATCH_MODE_ABORTED</literal>
+         </term>
+         <listitem>
+          <para>
+           The <application>libpq</application> connection is in aborted
+           batch status. The aborted flag is cleared as soon as the result
+           of the <function>PQbatchSendQueue</function> at the end of the aborted
+           batch is processed. Clients don't usually need this function to
+           verify aborted status, as they can tell that the batch is aborted
+           from the <literal>PGRES_BATCH_ABORTED</literal> result code.
+          </para>
+         </listitem>
+        </varlistentry>
+ 
+       </variablelist>
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQenterBatchMode">
+     <term>
+      <function>PQenterBatchMode</function>
+      <indexterm>
+       <primary>PQenterBatchMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Causes a connection to enter batch mode if it is currently idle or
+      already in batch mode.
+
+<synopsis>
+int PQenterBatchMode(PGconn *conn);
+</synopsis>
+
+      </para>
+      <para>
+       Returns 1 for success.
+       Returns 0 and has no effect if the connection is not currently
+       idle, i.e., it has a result ready, or it is waiting for more
+       input from the server, etc.
+       This function does not actually send anything to the server,
+       it just changes the <application>libpq</application> connection
+       state.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQexitBatchMode">
+     <term>
+      <function>PQexitBatchMode</function>
+      <indexterm>
+       <primary>PQexitBatchMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       Causes a connection to exit batch mode if it is currently in batch mode
+       with an empty queue and no pending results.
+<synopsis>
+int PQexitBatchMode(PGconn *conn);
+</synopsis>
+      </para>
+      <para>
+       Returns 1 for success.  Returns 1 and takes no action if not in
+       batch mode. If the current statement isn't finished processing 
+       or there are results pending for collection with
+       <function>PQgetResult</function>, returns 0 and does nothing.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQbatchSendQueue">
+     <term>
+      <function>PQbatchSendQueue</function>
+      <indexterm>
+       <primary>PQbatchSendQueue</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       Delimits the end of a set of a batched commands by sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>
+       and flushing the send buffer. The end of a batch serves as
+       the delimiter of an implicit transaction and an error recovery
+       point; see <xref linkend="libpq-batch-errors"/>.
+
+<synopsis>
+int PQbatchSendQueue(PGconn *conn);
+</synopsis>
+      </para>
+      <para>
+       Returns 1 for success. Returns 0 if the connection is not in
+       batch mode or sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>
+       is failed.
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </sect2>
+
+  <sect2 id="libpq-batch-tips">
+   <title>When to Use Batching</title>
+
+   <para>
+    Much like asynchronous query mode, there is no performance disadvantage to
+    using batching and pipelining. It increases client application complexity
+    and extra caution is required to prevent client/server deadlocks, but
+    pipelining can sometimes offer considerable performance improvements.
+   </para>
+
+   <para>
+    Batching is most useful when the server is distant, i.e., network latency
+    (<quote>ping time</quote>) is high, and also when many small operations
+    are being performed in rapid sequence.  There is usually less benefit
+    in using batches when each query takes many multiples of the client/server
+    round-trip time to execute.  A 100-statement operation run on a server
+    300ms round-trip-time away would take 30 seconds in network latency alone
+    without batching; with batching it may spend as little as 0.3s waiting for
+    results from the server.
+   </para>
+
+   <para>
+    Use batches when your application does lots of small
+    <literal>INSERT</literal>, <literal>UPDATE</literal> and
+    <literal>DELETE</literal> operations that can't easily be transformed
+    into operations on sets or into a <literal>COPY</literal> operation.
+   </para>
+
+   <para>
+    Batching is not useful when information from one operation is required by
+    the client to produce the next operation. In such cases, the client
+    must introduce a synchronization point and wait for a full client/server
+    round-trip to get the results it needs. However, it's often possible to
+    adjust the client design to exchange the required information server-side.
+    Read-modify-write cycles are especially good candidates; for example:
+    <programlisting>
+BEGIN;
+SELECT x FROM mytable WHERE id = 42 FOR UPDATE;
+-- result: x=2
+-- client adds 1 to x:
+UPDATE mytable SET x = 3 WHERE id = 42;
+COMMIT;
+    </programlisting>
+    could be much more efficiently done with:
+    <programlisting>
+UPDATE mytable SET x = x + 1 WHERE id = 42;
+    </programlisting>
+   </para>
+
+   <para>
+    Batching is less useful, and more complex, when a single batch contains
+    multiple transactions (see <xref linkend="libpq-batch-errors"/>).
+   </para>
+
+   <note>
+    <para>
+     The batch API was introduced in PostgreSQL 14.0, but clients using
+     the PostgreSQL 14 version of <application>libpq</application> can use
+     batches on server versions 7.4 and newer. Batching works on any server
+     that supports the v3 extended query protocol.
+    </para>
+   </note>
+  </sect2>
+ </sect1>
+
  <sect1 id="libpq-single-row-mode">
   <title>Retrieving Query Results Row-by-Row</title>
 
@@ -4859,6 +5359,17 @@ int PQflush(PGconn *conn);
    Each object should be freed with <xref linkend="libpq-PQclear"/> as usual.
   </para>
 
+  <!-- XXX Isn't this note merely repeating what was said above?
+       I think it should be removed. -->
+  <note>
+    <para>
+     When using batch mode, activate the single-row mode on the current
+     batch query by calling <function>PQsetSingleRowMode</function>
+     before retrieving results with <function>PQgetResult</function>.
+     See <xref linkend="libpq-batch-mode"/> for more information.
+    </para>
+   </note>
+
   <para>
    <variablelist>
     <varlistentry id="libpq-PQsetSingleRowMode">
diff --git a/doc/src/sgml/lobj.sgml b/doc/src/sgml/lobj.sgml
index 6329cf0796..49be8b1dbe 100644
--- a/doc/src/sgml/lobj.sgml
+++ b/doc/src/sgml/lobj.sgml
@@ -130,6 +130,10 @@
     <application>libpq</application> library.
    </para>
 
+   <para>
+    Client applications cannot use these functions while libpq connection is in batch mode.
+   </para>
+
    <sect2 id="lo-create">
     <title>Creating a Large Object</title>
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 24f8b3e42e..9ae67387a5 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -1026,6 +1026,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 			walres->status = WALRCV_ERROR;
 			walres->err = pchomp(PQerrorMessage(conn->streamConn));
 			break;
+		default:
+		/* This is just to keep compiler quiet */
+			break;
 	}
 
 	PQclear(pgres);
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index bbc1f90481..ca86f55652 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -179,3 +179,7 @@ PQgetgssctx               176
 PQsetSSLKeyPassHook_OpenSSL         177
 PQgetSSLKeyPassHook_OpenSSL         178
 PQdefaultSSLKeyPassHook_OpenSSL     179
+PQenterBatchMode          180
+PQexitBatchMode           181
+PQbatchSendQueue          182
+PQbatchStatus             183
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index e7781d010f..c673bc405f 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -536,6 +536,23 @@ pqDropConnection(PGconn *conn, bool flushInput)
 	}
 }
 
+/*
+ * pqFreeCommandQueue
+ * Free all the entries of PGcommandQueueEntry queue passed.
+ */
+static void
+pqFreeCommandQueue(PGcommandQueueEntry *queue)
+{
+	while (queue != NULL)
+	{
+		PGcommandQueueEntry *cur = queue;
+
+		queue = cur->next;
+		if (cur->query)
+			free(cur->query);
+		free(cur);
+	}
+}
 
 /*
  *		pqDropServerData
@@ -555,6 +572,7 @@ pqDropServerData(PGconn *conn)
 {
 	PGnotify   *notify;
 	pgParameterStatus *pstatus;
+	PGcommandQueueEntry *queue;
 
 	/* Forget pending notifies */
 	notify = conn->notifyHead;
@@ -567,6 +585,14 @@ pqDropServerData(PGconn *conn)
 	}
 	conn->notifyHead = conn->notifyTail = NULL;
 
+	queue = conn->cmd_queue_head;
+	pqFreeCommandQueue(queue);
+	conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
+
+	queue = conn->cmd_queue_recycle;
+	pqFreeCommandQueue(queue);
+	conn->cmd_queue_recycle = NULL;
+
 	/* Reset ParameterStatus data, as well as variables deduced from it */
 	pstatus = conn->pstatus;
 	while (pstatus != NULL)
@@ -6699,6 +6725,15 @@ PQbackendPID(const PGconn *conn)
 	return conn->be_pid;
 }
 
+int
+PQbatchStatus(const PGconn *conn)
+{
+	if (!conn)
+		return false;
+
+	return conn->batch_status;
+}
+
 int
 PQconnectionNeedsPassword(const PGconn *conn)
 {
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index eea0237c3a..93971857d5 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -39,7 +39,9 @@ char	   *const pgresStatus[] = {
 	"PGRES_NONFATAL_ERROR",
 	"PGRES_FATAL_ERROR",
 	"PGRES_COPY_BOTH",
-	"PGRES_SINGLE_TUPLE"
+	"PGRES_SINGLE_TUPLE",
+	"PGRES_BATCH_END",
+	"PGRES_BATCH_ABORTED"
 };
 
 /*
@@ -70,6 +72,11 @@ static PGresult *PQexecFinish(PGconn *conn);
 static int	PQsendDescribe(PGconn *conn, char desc_type,
 						   const char *desc_target);
 static int	check_field_number(const PGresult *res, int field_num);
+static PGcommandQueueEntry *pqMakePipelineCmd(PGconn *conn);
+static void pqAppendPipelineCmd(PGconn *conn, PGcommandQueueEntry *entry);
+static void pqRecyclePipelineCmd(PGconn *conn, PGcommandQueueEntry *entry);
+static int pqBatchProcessQueue(PGconn *conn);
+static int	pqBatchFlush(PGconn *conn);
 
 
 /* ----------------
@@ -1210,7 +1217,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 		conn->next_result = conn->result;
 		conn->result = res;
 		/* And mark the result ready to return */
-		conn->asyncStatus = PGASYNC_READY;
+		conn->asyncStatus = PGASYNC_READY_MORE;
 	}
 
 	return 1;
@@ -1233,6 +1240,13 @@ fail:
 int
 PQsendQuery(PGconn *conn, const char *query)
 {
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("cannot PQsendQuery in batch mode, use PQsendQueryParams\n"));
+		return 0;
+	}
+
 	if (!PQsendQueryStart(conn))
 		return 0;
 
@@ -1263,10 +1277,11 @@ PQsendQuery(PGconn *conn, const char *query)
 	conn->last_query = strdup(query);
 
 	/*
-	 * Give the data a push.  In nonblock mode, don't complain if we're unable
-	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 * Give the data a push (in batch mode, only if we're past the size
+	 * threshold).  In nonblock mode, don't complain if we're unable to send
+	 * it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqBatchFlush(conn) < 0)
 	{
 		/* error message should be set up already */
 		return 0;
@@ -1331,6 +1346,10 @@ PQsendPrepare(PGconn *conn,
 			  const char *stmtName, const char *query,
 			  int nParams, const Oid *paramTypes)
 {
+	PGcommandQueueEntry *pipeCmd = NULL;
+	char	  **last_query;
+	PGQueryClass *queryclass;
+
 	if (!PQsendQueryStart(conn))
 		return 0;
 
@@ -1362,6 +1381,15 @@ PQsendPrepare(PGconn *conn,
 		return 0;
 	}
 
+	/* Alloc batch memory before doing anything */
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		pipeCmd = pqMakePipelineCmd(conn);
+
+		if (pipeCmd == NULL)
+			return 0;			/* error msg already set */
+	}
+
 	/* construct the Parse message */
 	if (pqPutMsgStart('P', false, conn) < 0 ||
 		pqPuts(stmtName, conn) < 0 ||
@@ -1389,31 +1417,47 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+	{
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+
+		last_query = &conn->last_query;
+		queryclass = &conn->queryclass;
+	}
+	else
+	{
+		last_query = &pipeCmd->query;
+		queryclass = &pipeCmd->queryclass;
+	}
 
 	/* remember we are doing just a Parse */
-	conn->queryclass = PGQUERY_PREPARE;
+	*queryclass = PGQUERY_PREPARE;
 
 	/* and remember the query text too, if possible */
 	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	if (*last_query)
+		free(*last_query);
+	*last_query = strdup(query);
 
 	/*
-	 * Give the data a push.  In nonblock mode, don't complain if we're unable
-	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 * Give the data a push (in batch mode, only if we're past the size
+	 * threshold).  In nonblock mode, don't complain if we're unable to send
+	 * it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqBatchFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		pqAppendPipelineCmd(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	pqRecyclePipelineCmd(conn, pipeCmd);
 	/* error message should be set up already */
 	return 0;
 }
@@ -1461,7 +1505,8 @@ PQsendQueryPrepared(PGconn *conn,
 }
 
 /*
- * Common startup code for PQsendQuery and sibling routines
+ * PQsendQueryStart
+ *	Common startup code for PQsendQuery and sibling routines
  */
 static bool
 PQsendQueryStart(PGconn *conn)
@@ -1479,20 +1524,62 @@ PQsendQueryStart(PGconn *conn)
 						  libpq_gettext("no connection to the server\n"));
 		return false;
 	}
-	/* Can't send while already busy, either. */
-	if (conn->asyncStatus != PGASYNC_IDLE)
+
+	/* Can't send while already busy, either, unless enqueuing for later */
+	if (conn->asyncStatus != PGASYNC_IDLE &&
+		conn->batch_status == PQBATCH_MODE_OFF)
 	{
 		printfPQExpBuffer(&conn->errorMessage,
 						  libpq_gettext("another command is already in progress\n"));
 		return false;
 	}
 
-	/* initialize async result-accumulation state */
-	pqClearAsyncResult(conn);
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		/*
+		 * When enqueuing a message we don't change much of the connection
+		 * state since it's already in use for the current command. The
+		 * connection state will get updated when PQbatchQueueProcess(...)
+		 * advances to start processing the queued message.
+		 *
+		 * Just make sure we can safely enqueue given the current connection
+		 * state. We can enqueue behind another queue item, or behind a
+		 * non-queue command (one that sends its own sync), but we can't
+		 * enqueue if the connection is in a copy state.
+		 */
+		switch (conn->asyncStatus)
+		{
+			case PGASYNC_QUEUED:
+			case PGASYNC_READY:
+			case PGASYNC_READY_MORE:
+			case PGASYNC_BUSY:
+				/* ok to queue */
+				break;
+			case PGASYNC_COPY_IN:
+			case PGASYNC_COPY_OUT:
+			case PGASYNC_COPY_BOTH:
+				printfPQExpBuffer(&conn->errorMessage,
+								  libpq_gettext("cannot queue commands during COPY\n"));
+				return false;
+				break;
+			case PGASYNC_IDLE:
+				printfPQExpBuffer(&conn->errorMessage,
+								  libpq_gettext("internal error, idle state in batch mode"));
+				break;
+		}
+	}
+	else
+	{
+		/*
+		 * This command's results will come in immediately. Initialize async
+		 * result-accumulation state
+		 */
+		pqClearAsyncResult(conn);
 
-	/* reset single-row processing mode */
-	conn->singleRowMode = false;
+		/* reset single-row processing mode */
+		conn->singleRowMode = false;
 
+	}
 	/* ready to send command message */
 	return true;
 }
@@ -1516,6 +1603,10 @@ PQsendQueryGuts(PGconn *conn,
 				int resultFormat)
 {
 	int			i;
+	PGcommandQueueEntry *pipeCmd = NULL;
+	char	  **last_query;
+	PGQueryClass *queryclass;
+
 
 	/* This isn't gonna work on a 2.0 server */
 	if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
@@ -1525,6 +1616,23 @@ PQsendQueryGuts(PGconn *conn,
 		return 0;
 	}
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		pipeCmd = pqMakePipelineCmd(conn);
+
+		if (pipeCmd == NULL)
+			return 0;			/* error msg already set */
+
+		last_query = &pipeCmd->query;
+		queryclass = &pipeCmd->queryclass;
+	}
+	else
+	{
+		last_query = &conn->last_query;
+		queryclass = &conn->queryclass;
+	}
+
+
 	/*
 	 * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
 	 * using specified statement name and the unnamed portal.
@@ -1637,35 +1745,43 @@ PQsendQueryGuts(PGconn *conn,
 		pqPutMsgEnd(conn) < 0)
 		goto sendFailed;
 
-	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+	{
+		/* construct the Sync message */
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are using extended query protocol */
-	conn->queryclass = PGQUERY_EXTENDED;
+	*queryclass = PGQUERY_EXTENDED;
 
 	/* and remember the query text too, if possible */
 	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
+	if (*last_query)
+		free(*last_query);
 	if (command)
-		conn->last_query = strdup(command);
+		*last_query = strdup(command);
 	else
-		conn->last_query = NULL;
+		*last_query = NULL;
 
 	/*
-	 * Give the data a push.  In nonblock mode, don't complain if we're unable
-	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 * Give the data a push (in batch mode, only if we're past the size
+	 * threshold).  In nonblock mode, don't complain if we're unable to send
+	 * it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqBatchFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		pqAppendPipelineCmd(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	pqRecyclePipelineCmd(conn, pipeCmd);
 	/* error message should be set up already */
 	return 0;
 }
@@ -1766,14 +1882,17 @@ PQisBusy(PGconn *conn)
 	return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed;
 }
 
-
 /*
  * PQgetResult
  *	  Get the next PGresult produced by a query.  Returns NULL if no
  *	  query work remains or an error has occurred (e.g. out of
  *	  memory).
+ *
+ *	  In batch mode, once all the result of a query have been returned,
+ *	  PQgetResult returns NULL to let the user know that the next batched
+ *	  query is being processed.  At the end of the batch, returns a
+ *	  end-of-batch result with PQresultStatus(result) == PGRES_BATCH_END.
  */
-
 PGresult *
 PQgetResult(PGconn *conn)
 {
@@ -1842,9 +1961,38 @@ PQgetResult(PGconn *conn)
 	switch (conn->asyncStatus)
 	{
 		case PGASYNC_IDLE:
+		case PGASYNC_QUEUED:
 			res = NULL;			/* query is complete */
+			if (conn->batch_status != PQBATCH_MODE_OFF)
+			{
+				/*
+				 * In batch mode, we prepare the processing of the results of
+				 * the next query.
+				 */
+				pqBatchProcessQueue(conn);
+			}
 			break;
 		case PGASYNC_READY:
+			res = pqPrepareAsyncResult(conn);
+			if (conn->batch_status != PQBATCH_MODE_OFF)
+			{
+				/*
+				 * In batch mode, query execution state cannot be IDLE as
+				 * there can be other queries or results waiting in the queue
+				 *
+				 * The connection isn't idle since we can't submit new
+				 * nonbatched commands. It isn't also busy since the current
+				 * command is done and we need to process a new one.
+				 */
+				conn->asyncStatus = PGASYNC_QUEUED;
+			}
+			else
+			{
+				/* Set the state back to BUSY, allowing parsing to proceed. */
+				conn->asyncStatus = PGASYNC_BUSY;
+			}
+			break;
+		case PGASYNC_READY_MORE:
 			res = pqPrepareAsyncResult(conn);
 			/* Set the state back to BUSY, allowing parsing to proceed. */
 			conn->asyncStatus = PGASYNC_BUSY;
@@ -2025,6 +2173,13 @@ PQexecStart(PGconn *conn)
 	if (!conn)
 		return false;
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("Synchronous command execution functions are not allowed in batch mode\n"));
+		return false;
+	}
+
 	/*
 	 * Silently discard any prior query result that application didn't eat.
 	 * This is probably poor design, but it's here for backward compatibility.
@@ -2219,6 +2374,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
 static int
 PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 {
+	PGcommandQueueEntry *pipeCmd = NULL;
+	PGQueryClass *queryclass;
+
 	/* Treat null desc_target as empty string */
 	if (!desc_target)
 		desc_target = "";
@@ -2234,6 +2392,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		return 0;
 	}
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		pipeCmd = pqMakePipelineCmd(conn);
+
+		if (pipeCmd == NULL)
+			return 0;			/* error msg already set */
+
+		queryclass = &pipeCmd->queryclass;
+	}
+	else
+		queryclass = &conn->queryclass;
+
 	/* construct the Describe message */
 	if (pqPutMsgStart('D', false, conn) < 0 ||
 		pqPutc(desc_type, conn) < 0 ||
@@ -2242,32 +2412,40 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+	{
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are doing a Describe */
-	conn->queryclass = PGQUERY_DESCRIBE;
+	*queryclass = PGQUERY_DESCRIBE;
 
-	/* reset last_query string (not relevant now) */
-	if (conn->last_query)
+	/* reset last-query string (not relevant now) */
+	if (conn->last_query && conn->batch_status != PQBATCH_MODE_OFF)
 	{
 		free(conn->last_query);
 		conn->last_query = NULL;
 	}
 
 	/*
-	 * Give the data a push.  In nonblock mode, don't complain if we're unable
-	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 * Give the data a push (in batch mode, only if we're past the size
+	 * threshold).  In nonblock mode, don't complain if we're unable to send
+	 * it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqBatchFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		pqAppendPipelineCmd(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	pqRecyclePipelineCmd(conn, pipeCmd);
 	/* error message should be set up already */
 	return 0;
 }
@@ -2665,6 +2843,13 @@ PQfn(PGconn *conn,
 	/* clear the error string */
 	resetPQExpBuffer(&conn->errorMessage);
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("PQfn not allowed in batch mode\n"));
+		return NULL;
+	}
+
 	if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE ||
 		conn->result != NULL)
 	{
@@ -2685,6 +2870,377 @@ PQfn(PGconn *conn,
 							   args, nargs);
 }
 
+/* ====== Batch mode support ======== */
+
+/*
+ * PQenterBatchMode
+ *		Put an idle connection in batch mode.
+ *
+ * Returns 1 on success. On failure, errorMessage is set and 0 is returned.
+ *
+ * Commands submitted after this can be pipelined on the connection;
+ * there's no requirement to wait for one to finish before the next is
+ * dispatched.
+ *
+ * Queuing of a new query or syncing during COPY is not allowed.
+ *
+ * A set of commands is terminated by a PQbatchQueueSync. Multiple sets of
+ * batched commands may be sent while in batch mode. Batch mode can be exited
+ * by calling PQexitBatchMode() once all results are processed.
+ *
+ * This doesn't actually send anything on the wire, it just puts libpq
+ * into a state where it can pipeline work.
+ */
+int
+PQenterBatchMode(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+
+	/* succeed with no action if already in batch mode */
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		return 1;
+
+	if (conn->asyncStatus != PGASYNC_IDLE)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("cannot enter batch mode, connection not idle\n"));
+		return 0;
+	}
+
+	conn->batch_status = PQBATCH_MODE_ON;
+	conn->asyncStatus = PGASYNC_QUEUED;
+
+	return 1;
+}
+
+/*
+ * PQexitBatchMode
+ *		End batch mode and return to normal command mode.
+ *
+ * Returns 1 in success (batch mode was ended, or not in batch mode).
+ *
+ * Returns 0 if in batch mode and cannot be ended yet.
+ * Error message will be set.
+ */
+int
+PQexitBatchMode(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+		return 1;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+			/* there are some uncollected results */
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("cannot exit batch mode with uncollected results\n"));
+			return 0;
+
+		case PGASYNC_BUSY:
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("cannot exit batch mode while busy\n"));
+			return 0;
+
+		default:
+			/* OK */
+			break;
+	}
+
+	/* still work to process */
+	if (conn->cmd_queue_head != NULL)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("command queue not clean"));
+		return 0;
+	}
+
+	conn->batch_status = PQBATCH_MODE_OFF;
+	conn->asyncStatus = PGASYNC_IDLE;
+
+	/* Flush any pending data in out buffer */
+	if (pqFlush(conn) < 0)
+		return 0;	/* error message is setup already */
+	return 1;
+}
+
+/*
+ * internal function pqBatchProcessQueue
+ *
+ * In batch mode, start processing the next query in the queue.
+ *
+ * Returns 1 if the next query was popped from the queue and can
+ * be processed by PQconsumeInput, PQgetResult, etc.
+ *
+ * Returns 0 if the current query isn't done yet, the connection
+ * is not in a batch, or there are no more queries to process.
+ */
+static int
+pqBatchProcessQueue(PGconn *conn)
+{
+	PGcommandQueueEntry *next_query;
+
+	if (!conn)
+		return 0;
+
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+		return 0;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			/* should be unreachable */
+			printfPQExpBuffer(&conn->errorMessage,
+							  "internal error, COPY in batch mode");
+			break;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+			/* client still has to process current query or results */
+			return 0;
+			break;
+		case PGASYNC_IDLE:
+			/* should be unreachable */
+			printfPQExpBuffer(&conn->errorMessage,
+							  "internal error, IDLE in batch mode");
+			break;
+		case PGASYNC_QUEUED:
+			/* next query please */
+			break;
+	}
+
+	if (conn->cmd_queue_head == NULL)
+	{
+		/*
+		 * In batch mode but nothing left on the queue; caller can submit more
+		 * work or PQexitBatchMode() now.
+		 */
+		return 0;
+	}
+
+	/*
+	 * Pop the next query from the queue and set up the connection state as if
+	 * it'd just been dispatched from a non-batched call
+	 */
+	next_query = conn->cmd_queue_head;
+	conn->cmd_queue_head = next_query->next;
+	next_query->next = NULL;
+
+	/* Initialize async result-accumulation state */
+	pqClearAsyncResult(conn);
+
+	/* reset single-row processing mode XXX why?? */
+	conn->singleRowMode = false;
+
+
+	conn->last_query = next_query->query;
+	next_query->query = NULL;
+	conn->queryclass = next_query->queryclass;
+
+	pqRecyclePipelineCmd(conn, next_query);
+
+	if (conn->batch_status == PQBATCH_MODE_ABORTED &&
+		conn->queryclass != PGQUERY_SYNC)
+	{
+		/*
+		 * In an aborted batch we don't get anything from the server for each
+		 * result; we're just discarding input until we get to the next sync
+		 * from the server. The client needs to know its queries got aborted
+		 * so we create a fake PGresult to return immediately from
+		 * PQgetResult.
+		 */
+		conn->result =
+			PQmakeEmptyPGresult(conn, PGRES_BATCH_ABORTED);
+		if (!conn->result)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("out of memory"));
+			pqSaveErrorResult(conn);
+			return 0;
+		}
+		conn->asyncStatus = PGASYNC_READY;
+	}
+	else
+	{
+		/* allow parsing to continue */
+		conn->asyncStatus = PGASYNC_BUSY;
+	}
+
+	return 1;
+}
+
+/*
+ * PQbatchSendQueue
+ *		End a batch submission.
+ *
+ * It's legal to start submitting another batch immediately, without
+ * waiting for the results of the current batch. There's no need to end batch
+ * mode and start it again.
+ *
+ * If a command in a batch fails, every subsequent command up to and
+ * including the PQbatchQueueSync command result gets set to PGRES_BATCH_ABORTED
+ * state. If the whole batch is processed without error, a PGresult with
+ * PGRES_BATCH_END is produced.
+ *
+ * Queries can already have been sent before PQbatchSendQueue is called, but
+ * PQbatchSendQueue need to be called before retrieving command results.
+ *
+ * The connection will remain in batch mode and unavailable for new synchronous
+ * command execution functions until all results from the batch are processed
+ * by the client.
+ */
+int
+PQbatchSendQueue(PGconn *conn)
+{
+	PGcommandQueueEntry *entry;
+
+	if (!conn)
+		return 0;
+
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("cannot send batch when not in batch mode\n"));
+		return 0;
+	}
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_IDLE:
+			/* should be unreachable */
+			printfPQExpBuffer(&conn->errorMessage,
+							  "internal error: cannot send batch when idle\n");
+			return 0;
+			break;
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			/* should be unreachable */
+			printfPQExpBuffer(&conn->errorMessage,
+							  "internal error: cannot send batch while in COPY\n");
+			return 0;
+			break;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+		case PGASYNC_QUEUED:
+			/* can send sync to end this batch of cmds */
+			break;
+	}
+
+	entry = pqMakePipelineCmd(conn);
+	if (entry == NULL)
+		return 0;				/* error msg already set */
+
+	entry->queryclass = PGQUERY_SYNC;
+	entry->query = NULL;
+
+	/* construct the Sync message */
+	if (pqPutMsgStart('S', false, conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+		goto sendFailed;
+
+	pqAppendPipelineCmd(conn, entry);
+
+	/*
+	 * Give the data a push.  In nonblock mode, don't complain if we're unable
+	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 */
+	if (PQflush(conn) < 0)
+		goto sendFailed;
+
+	/*
+	 * Call pqBatchProcessQueue so the user can call start calling getResult.
+	 */
+	pqBatchProcessQueue(conn);
+
+	return 1;
+
+sendFailed:
+	pqRecyclePipelineCmd(conn, entry);
+	/* error message should be set up already */
+	return 0;
+}
+
+/*
+ * pqMakePipelineCmd
+ *		Get a command queue entry for caller to fill.
+ *
+ * If the recycle queue has a free element, that is returned; if not, a
+ * fresh one is allocated.  Caller is responsible for adding it to the
+ * command queue (pqAppendPipelineCmd) once the struct is filled in, or
+ * releasing the memory (pqRecyclePipelineCmd) if an error occurs.
+ *
+ * If allocation fails, sets the error message and returns NULL.
+ */
+static PGcommandQueueEntry *
+pqMakePipelineCmd(PGconn *conn)
+{
+	PGcommandQueueEntry *entry;
+
+	if (conn->cmd_queue_recycle == NULL)
+	{
+		entry = (PGcommandQueueEntry *) malloc(sizeof(PGcommandQueueEntry));
+		if (entry == NULL)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("out of memory\n"));
+			return NULL;
+		}
+	}
+	else
+	{
+		entry = conn->cmd_queue_recycle;
+		conn->cmd_queue_recycle = entry->next;
+	}
+	entry->next = NULL;
+	entry->query = NULL;
+
+	return entry;
+}
+
+/*
+ * pqAppendPipelineCmd
+ *		Append a precreated command queue entry to the queue after it's been
+ *		sent successfully.
+ */
+static void
+pqAppendPipelineCmd(PGconn *conn, PGcommandQueueEntry *entry)
+{
+	if (conn->cmd_queue_head == NULL)
+		conn->cmd_queue_head = entry;
+	else
+		conn->cmd_queue_tail->next = entry;
+	conn->cmd_queue_tail = entry;
+}
+
+/*
+ * pqRecyclePipelineCmd
+ *		Push a command queue entry onto the freelist. It must be an entry
+ *		with null next pointer and not referenced by any other entry's next
+ *		pointer.
+ */
+static void
+pqRecyclePipelineCmd(PGconn *conn, PGcommandQueueEntry *entry)
+{
+	if (entry == NULL)
+		return;
+
+	Assert(entry->next == NULL);
+
+	if (entry->query)
+		free(entry->query);
+
+	entry->next = conn->cmd_queue_recycle;
+	conn->cmd_queue_recycle = entry;
+}
+
 
 /* ====== accessor funcs for PGresult ======== */
 
@@ -3285,6 +3841,23 @@ PQflush(PGconn *conn)
 	return pqFlush(conn);
 }
 
+/*
+ * pqBatchFlush
+ *
+ * In batch mode, data will be flushed only when the out buffer reaches the
+ * threshold value.  In non-batch mode, data will be flushed all the time.
+ *
+ * Returns 0 on success.
+ */
+static int
+pqBatchFlush(PGconn *conn)
+{
+	if ((conn->batch_status == PQBATCH_MODE_OFF) ||
+		(conn->outCount >= OUTBUFFER_THRESHOLD))
+		return (pqFlush(conn));
+	return 0;
+}
+
 
 /*
  *		PQfreemem - safely frees memory allocated
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index 9360c541be..2ff3fa4883 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -406,6 +406,12 @@ pqParseInput2(PGconn *conn)
 {
 	char		id;
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		fprintf(stderr, "internal error, attempt to read v2 protocol in batch mode");
+		abort();
+	}
+
 	/*
 	 * Loop to parse successive complete messages available in the buffer.
 	 */
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 1696525475..da38e6aed1 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -217,10 +217,19 @@ pqParseInput3(PGconn *conn)
 						return;
 					conn->asyncStatus = PGASYNC_READY;
 					break;
-				case 'Z':		/* backend is ready for new query */
+				case 'Z':		/* sync response, backend is ready for new
+								 * query */
 					if (getReadyForQuery(conn))
 						return;
-					conn->asyncStatus = PGASYNC_IDLE;
+					if (conn->batch_status != PQBATCH_MODE_OFF)
+					{
+						conn->batch_status = PQBATCH_MODE_ON;
+						conn->result = PQmakeEmptyPGresult(conn,
+														   PGRES_BATCH_END);
+						conn->asyncStatus = PGASYNC_READY;
+					}
+					else
+						conn->asyncStatus = PGASYNC_IDLE;
 					break;
 				case 'I':		/* empty query */
 					if (conn->result == NULL)
@@ -875,6 +884,9 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
 	PQExpBufferData workBuf;
 	char		id;
 
+	if (isError && conn->batch_status != PQBATCH_MODE_OFF)
+		conn->batch_status = PQBATCH_MODE_ABORTED;
+
 	/*
 	 * If this is an error message, pre-emptively clear any incomplete query
 	 * result we may have.  We'd just throw it away below anyway, and
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 3b6a9fbce3..fcdd887958 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -97,7 +97,10 @@ typedef enum
 	PGRES_NONFATAL_ERROR,		/* notice or warning message */
 	PGRES_FATAL_ERROR,			/* query failed */
 	PGRES_COPY_BOTH,			/* Copy In/Out data transfer in progress */
-	PGRES_SINGLE_TUPLE			/* single tuple from larger resultset */
+	PGRES_SINGLE_TUPLE,			/* single tuple from larger resultset */
+	PGRES_BATCH_END,			/* end of a batch of commands */
+	PGRES_BATCH_ABORTED,		/* Command didn't run because of an abort
+								 * earlier in a batch */
 } ExecStatusType;
 
 typedef enum
@@ -137,6 +140,17 @@ typedef enum
 	PQPING_NO_ATTEMPT			/* connection not attempted (bad params) */
 } PGPing;
 
+/*
+ * PQBatchStatus - Current status of batch mode
+ */
+
+typedef enum
+{
+	PQBATCH_MODE_OFF,
+	PQBATCH_MODE_ON,
+	PQBATCH_MODE_ABORTED
+} PQBatchStatus;
+
 /* PGconn encapsulates a connection to the backend.
  * The contents of this struct are not supposed to be known to applications.
  */
@@ -328,6 +342,7 @@ extern int	PQserverVersion(const PGconn *conn);
 extern char *PQerrorMessage(const PGconn *conn);
 extern int	PQsocket(const PGconn *conn);
 extern int	PQbackendPID(const PGconn *conn);
+extern int	PQbatchStatus(const PGconn *conn);
 extern int	PQconnectionNeedsPassword(const PGconn *conn);
 extern int	PQconnectionUsedPassword(const PGconn *conn);
 extern int	PQclientEncoding(const PGconn *conn);
@@ -435,6 +450,11 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
 
+/* Routines for batch mode management */
+extern int	PQenterBatchMode(PGconn *conn);
+extern int	PQexitBatchMode(PGconn *conn);
+extern int	PQbatchSendQueue(PGconn *conn);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 1de91ae295..d2b26f2299 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -217,10 +217,15 @@ typedef enum
 {
 	PGASYNC_IDLE,				/* nothing's happening, dude */
 	PGASYNC_BUSY,				/* query in progress */
-	PGASYNC_READY,				/* result ready for PQgetResult */
+	PGASYNC_READY,				/* query done, waiting for client to fetch
+								 * result */
+	PGASYNC_READY_MORE,			/* query done, waiting for client to fetch
+								 * result, More results expected from this
+								 * query */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
-	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
+	PGASYNC_COPY_BOTH,			/* Copy In/Out data transfer in progress */
+	PGASYNC_QUEUED				/* Current query done, more in queue */
 } PGAsyncStatusType;
 
 /* PGQueryClass tracks which query protocol we are now executing */
@@ -229,7 +234,8 @@ typedef enum
 	PGQUERY_SIMPLE,				/* simple Query protocol (PQexec) */
 	PGQUERY_EXTENDED,			/* full Extended protocol (PQexecParams) */
 	PGQUERY_PREPARE,			/* Parse only (PQprepare) */
-	PGQUERY_DESCRIBE			/* Describe Statement or Portal */
+	PGQUERY_DESCRIBE,			/* Describe Statement or Portal */
+	PGQUERY_SYNC				/* A protocol sync to end a batch */
 } PGQueryClass;
 
 /* PGSetenvStatusType defines the state of the pqSetenv state machine */
@@ -301,6 +307,22 @@ typedef enum pg_conn_host_type
 	CHT_UNIX_SOCKET
 } pg_conn_host_type;
 
+/* An entry in the pending command queue. Used by batch mode to keep track
+ * of the expected results of future commands we've dispatched.
+ *
+ * Note that entries in this list are reused by being zeroed and appended to
+ * the tail when popped off the head. The entry with null next pointer is not
+ * the end of the list of expected commands, that's the tail pointer in
+ * pg_conn.
+ */
+typedef struct pgCommandQueueEntry
+{
+	PGQueryClass queryclass;	/* Query type; PGQUERY_SYNC for sync msg */
+	char	   *query;			/* SQL command, or NULL if unknown */
+	struct pgCommandQueueEntry *next;
+} PGcommandQueueEntry;
+
+
 /*
  * pg_conn_host stores all information about each of possibly several hosts
  * mentioned in the connection string.  Most fields are derived by splitting
@@ -394,6 +416,7 @@ struct pg_conn
 	bool		options_valid;	/* true if OK to attempt connection */
 	bool		nonblocking;	/* whether this connection is using nonblock
 								 * sending semantics */
+	PQBatchStatus batch_status; /* Batch(pipelining) mode status of connection */
 	bool		singleRowMode;	/* return current query result row-by-row? */
 	char		copy_is_binary; /* 1 = copy binary, 0 = copy text */
 	int			copy_already_done;	/* # bytes already returned in COPY OUT */
@@ -406,6 +429,16 @@ struct pg_conn
 	pg_conn_host *connhost;		/* details about each named host */
 	char	   *connip;			/* IP address for current network connection */
 
+	/*
+	 * The command queue
+	 *
+	 * head is the next pending cmd, tail is where we append new commands.
+	 * Freed entries for recycling go on the recycle linked list.
+	 */
+	PGcommandQueueEntry *cmd_queue_head;
+	PGcommandQueueEntry *cmd_queue_tail;
+	PGcommandQueueEntry *cmd_queue_recycle;
+
 	/* Connection data */
 	pgsocket	sock;			/* FD for socket, PGINVALID_SOCKET if
 								 * unconnected */
@@ -798,6 +831,11 @@ extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t len);
  */
 #define pqIsnonblocking(conn)	((conn)->nonblocking)
 
+/*
+ * Connection's outbuffer threshold.
+ */
+#define OUTBUFFER_THRESHOLD	65536
+
 #ifdef ENABLE_NLS
 extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1);
 extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2);
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index a6d2ffbf9e..287214c544 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -17,6 +17,7 @@ SUBDIRS = \
 		  test_extensions \
 		  test_ginpostinglist \
 		  test_integerset \
+		  test_libpq \
 		  test_misc \
 		  test_parser \
 		  test_pg_dump \
diff --git a/src/test/modules/test_libpq/.gitignore b/src/test/modules/test_libpq/.gitignore
new file mode 100644
index 0000000000..11e8463984
--- /dev/null
+++ b/src/test/modules/test_libpq/.gitignore
@@ -0,0 +1,5 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
+/testlibpqbatch
diff --git a/src/test/modules/test_libpq/Makefile b/src/test/modules/test_libpq/Makefile
new file mode 100644
index 0000000000..6d3a0ea4d5
--- /dev/null
+++ b/src/test/modules/test_libpq/Makefile
@@ -0,0 +1,20 @@
+# src/test/modules/test_libpq/Makefile
+
+PROGRAM = testlibpqbatch
+OBJS = testlibpqbatch.o
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS += $(libpq)
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_libpq
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_libpq/README b/src/test/modules/test_libpq/README
new file mode 100644
index 0000000000..d8174dd579
--- /dev/null
+++ b/src/test/modules/test_libpq/README
@@ -0,0 +1 @@
+Test programs and libraries for libpq
diff --git a/src/test/modules/test_libpq/t/001_libpq_async.pl b/src/test/modules/test_libpq/t/001_libpq_async.pl
new file mode 100644
index 0000000000..0b27896a2a
--- /dev/null
+++ b/src/test/modules/test_libpq/t/001_libpq_async.pl
@@ -0,0 +1,26 @@
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 6;
+use Cwd;
+
+my $node = get_new_node('main');
+$node->init;
+$node->start;
+
+my $numrows = 10000;
+my @tests =
+  qw(disallowed_in_batch simple_batch multi_batch batch_abort
+  batch_insert singlerow);
+$ENV{PATH} = "$ENV{PATH}:" . getcwd();
+for my $testname (@tests)
+{
+	$node->command_ok(
+		[ 'testlibpqbatch', $testname, $node->connstr('postgres'), $numrows ],
+		"testlibpqbatch $testname");
+}
+
+$node->stop('fast');
diff --git a/src/test/modules/test_libpq/testlibpqbatch.c b/src/test/modules/test_libpq/testlibpqbatch.c
new file mode 100644
index 0000000000..7157e6cb90
--- /dev/null
+++ b/src/test/modules/test_libpq/testlibpqbatch.c
@@ -0,0 +1,963 @@
+/*
+ * src/test/modules/test_libpq/testlibpqbatch.c
+ *		Verify libpq batch execution functionality
+ */
+#include "postgres_fe.h"
+
+#include <sys/time.h>
+
+#include "catalog/pg_type_d.h"
+#include "common/fe_memutils.h"
+#include "libpq-fe.h"
+#include "portability/instr_time.h"
+
+
+static void exit_nicely(PGconn *conn);
+
+const char *const progname = "testlibpqbatch";
+
+
+#define DEBUG
+#ifdef DEBUG
+#define	pg_debug(...)  do { fprintf(stderr, __VA_ARGS__); } while (0)
+#else
+#define pg_debug(...)
+#endif
+
+static const char *const drop_table_sql =
+"DROP TABLE IF EXISTS batch_demo";
+static const char *const create_table_sql =
+"CREATE UNLOGGED TABLE batch_demo(id serial primary key, itemno integer);";
+static const char *const insert_sql =
+"INSERT INTO batch_demo(itemno) VALUES ($1);";
+
+/* max char length of an int32, plus sign and null terminator */
+#define MAXINTLEN 12
+
+static void
+exit_nicely(PGconn *conn)
+{
+	PQfinish(conn);
+	exit(1);
+}
+
+/*
+ * Print an error to stderr and terminate the program.
+ */
+#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
+static void
+pg_fatal_impl(int line, const char *fmt,...)
+{
+	va_list		args;
+
+	fprintf(stderr, "\n");		/* XXX hack */
+	fprintf(stderr, "%s:%d: ", progname, line);
+
+	va_start(args, fmt);
+	vfprintf(stderr, fmt, args);
+	va_end(args);
+	printf("Failure, exiting\n");
+	exit(1);
+}
+
+static void
+test_disallowed_in_batch(PGconn *conn)
+{
+	PGresult   *res = NULL;
+
+	fprintf(stderr, "test error cases... ");
+
+	if (PQisnonblocking(conn))
+		pg_fatal("Expected blocking connection mode\n");
+
+	if (PQenterBatchMode(conn) != 1)
+		pg_fatal("Unable to enter batch mode\n");
+
+	if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+		pg_fatal("Batch mode not activated properly\n");
+
+	/* PQexec should fail in batch mode */
+	res = PQexec(conn, "SELECT 1");
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("PQexec should fail in batch mode but succeeded\n");
+
+	/* So should PQsendQuery */
+	if (PQsendQuery(conn, "SELECT 1") != 0)
+		pg_fatal("PQsendQuery should fail in batch mode but succeeded\n");
+
+	/* Entering batch mode when already in batch mode is OK */
+	if (PQenterBatchMode(conn) != 1)
+		pg_fatal("re-entering batch mode should be a no-op but failed\n");
+
+	if (PQisBusy(conn) != 0)
+		pg_fatal("PQisBusy should return 0 when idle in batch, returned 1\n");
+
+	/* ok, back to normal command mode */
+	if (PQexitBatchMode(conn) != 1)
+		pg_fatal("couldn't exit idle empty batch mode\n");
+
+	if (PQbatchStatus(conn) != PQBATCH_MODE_OFF)
+		pg_fatal("Batch mode not terminated properly\n");
+
+	/* exiting batch mode when not in batch mode should be a no-op */
+	if (PQexitBatchMode(conn) != 1)
+		pg_fatal("batch mode exit when not in batch mode should succeed but failed\n");
+
+	/* can now PQexec again */
+	res = PQexec(conn, "SELECT 1");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("PQexec should succeed after exiting batch mode but failed with: %s\n",
+				 PQerrorMessage(conn));
+
+	fprintf(stderr, "ok\n");
+}
+
+static void
+test_simple_batch(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+
+	fprintf(stderr, "simple batch... ");
+
+	/*
+	 * Enter batch mode and dispatch a set of operations, which we'll then
+	 * process the results of as they come in.
+	 *
+	 * For a simple case we should be able to do this without interim
+	 * processing of results since our out buffer will give us enough slush to
+	 * work with and we won't block on sending. So blocking mode is fine.
+	 */
+	if (PQisnonblocking(conn))
+		pg_fatal("Expected blocking connection mode\n");
+
+	if (PQenterBatchMode(conn) != 1)
+		pg_fatal("failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn, "SELECT $1",
+						  1, dummy_param_oids, dummy_params,
+						  NULL, NULL, 0) != 1)
+		pg_fatal("dispatching SELECT failed: %s\n", PQerrorMessage(conn));
+
+	if (PQexitBatchMode(conn) != 0)
+		pg_fatal("exiting batch mode with work in progress should fail, but succeeded\n");
+
+	if (PQbatchSendQueue(conn) != 1)
+		pg_fatal("Ending a batch failed: %s\n", PQerrorMessage(conn));
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a batch item: %s\n",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first batch item\n",
+				 PQresStatus(PQresultStatus(res)));
+
+	PQclear(res);
+	res = NULL;
+
+	if (PQgetResult(conn) != NULL)
+		pg_fatal("PQgetResult returned something extra after first query result.\n");
+
+	/*
+	 * Even though we've processed the result there's still a sync to come and
+	 * we can't exit batch mode yet
+	 */
+	if (PQexitBatchMode(conn) != 0)
+		pg_fatal("exiting batch mode after query but before sync succeeded incorrectly\n");
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when sync result PGRES_BATCH_END expected: %s\n",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_BATCH_END)
+		pg_fatal("Unexpected result code %s instead of sync result, error: %s\n",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+
+	PQclear(res);
+	res = NULL;
+
+	if (PQgetResult(conn) != NULL)
+		pg_fatal("PQgetResult returned something extra after end batch call\n");
+
+	/* We're still in a batch... */
+	if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+		pg_fatal("Fell out of batch mode somehow\n");
+
+	/* ... until we end it, which we can safely do now */
+	if (PQexitBatchMode(conn) != 1)
+		pg_fatal("attempt to exit batch mode failed when it should've succeeded: %s\n",
+				 PQerrorMessage(conn));
+
+	if (PQbatchStatus(conn) != PQBATCH_MODE_OFF)
+		pg_fatal("Exiting batch mode didn't seem to work\n");
+
+	fprintf(stderr, "ok\n");
+}
+
+static void
+test_multi_batch(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+
+	fprintf(stderr, "multi batch... ");
+
+	/*
+	 * Queue up a couple of small batches and process each without returning
+	 * to command mode first.
+	 */
+	if (PQenterBatchMode(conn) != 1)
+		pg_fatal("failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching first SELECT failed: %s\n", PQerrorMessage(conn));
+
+	if (PQbatchSendQueue(conn) != 1)
+		pg_fatal("Ending first batch failed: %s\n", PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching second SELECT failed: %s\n", PQerrorMessage(conn));
+
+	if (PQbatchSendQueue(conn) != 1)
+		pg_fatal("Ending second batch failed: %s\n", PQerrorMessage(conn));
+
+	/* OK, start processing the batch results */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a batch item: %s\n",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first batch item\n",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = NULL;
+
+	if (PQgetResult(conn) != NULL)
+		pg_fatal("PQgetResult returned something extra after first result\n");
+
+	if (PQexitBatchMode(conn) != 0)
+		pg_fatal("exiting batch mode after query but before sync succeeded incorrectly\n");
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when sync result expected: %s\n",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_BATCH_END)
+		pg_fatal("Unexpected result code %s instead of sync result, error: %s\n",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+
+	PQclear(res);
+
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("Expected null result, got %s\n",
+				 PQresStatus(PQresultStatus(res)));
+
+	/* second batch */
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a batch item: %s\n",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from second batch item\n",
+				 PQresStatus(PQresultStatus(res)));
+
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("Expected null result, got %s\n",
+				 PQresStatus(PQresultStatus(res)));
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a batch item: %s\n",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_BATCH_END)
+		pg_fatal("Unexpected result code %s from second end batch\n",
+				 PQresStatus(PQresultStatus(res)));
+
+	/* We're still in a batch... */
+	if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+		pg_fatal("Fell out of batch mode somehow\n");
+
+	/* until we end it, which we can safely do now */
+	if (PQexitBatchMode(conn) != 1)
+		pg_fatal("attempt to exit batch mode failed when it should've succeeded: %s\n",
+				 PQerrorMessage(conn));
+
+	if (PQbatchStatus(conn) != PQBATCH_MODE_OFF)
+		pg_fatal("exiting batch mode didn't seem to work\n");
+
+	fprintf(stderr, "ok\n");
+}
+
+/*
+ * When an operation in a batch fails the rest of the batch is flushed. We
+ * still have to get results for each batch item, but the item will just be
+ * a PGRES_BATCH_ABORTED code.
+ *
+ * This intentionally doesn't use a transaction to wrap the batch. You should
+ * usually use an xact, but in this case we want to observe the effects of each
+ * statement.
+ */
+static void
+test_batch_abort(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+	int			i;
+
+	fprintf(stderr, "aborted batch... ");
+
+	res = PQexec(conn, drop_table_sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("dispatching DROP TABLE failed: %s\n", PQerrorMessage(conn));
+
+	res = PQexec(conn, create_table_sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("dispatching CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+
+	/*
+	 * Queue up a couple of small batches and process each without returning
+	 * to command mode first. Make sure the second operation in the first
+	 * batch ERRORs.
+	 */
+	if (PQenterBatchMode(conn) != 1)
+		pg_fatal("failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+	dummy_params[0] = "1";
+	if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching first insert failed: %s\n", PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
+						  1, dummy_param_oids, dummy_params,
+						  NULL, NULL, 0) != 1)
+		pg_fatal("dispatching error select failed: %s\n", PQerrorMessage(conn));
+
+	dummy_params[0] = "2";
+	if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching second insert failed: %s\n", PQerrorMessage(conn));
+
+	if (PQbatchSendQueue(conn) != 1)
+		pg_fatal("Sending first batch failed: %s\n", PQerrorMessage(conn));
+
+	dummy_params[0] = "3";
+	if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching second-batch insert failed: %s\n",
+				 PQerrorMessage(conn));
+
+	if (PQbatchSendQueue(conn) != 1)
+		pg_fatal("Ending second batch failed: %s\n", PQerrorMessage(conn));
+
+	/*
+	 * OK, start processing the batch results.
+	 *
+	 * We should get a command-ok for the first query, then a fatal error and
+	 * a batch aborted message for the second insert, a batch-end, then a
+	 * command-ok and a batch-ok for the second batch operation.
+	 */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("Unexpected result status %s: %s\n",
+				 PQresStatus(PQresultStatus(res)),
+				 PQresultErrorMessage(res));
+	PQclear(res);
+
+	/* NULL result to signal end-of-results for this command */
+	if ((res = PQgetResult(conn)) != NULL)
+		pg_fatal("Expected null result, got %s\n",
+				 PQresStatus(PQresultStatus(res)));
+
+	/* Second query caused error, so we expect an error next */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s\n",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	/* NULL result to signal end-of-results for this command */
+	if ((res = PQgetResult(conn)) != NULL)
+		pg_fatal("Expected null result, got %s\n",
+				 PQresStatus(PQresultStatus(res)));
+
+	/*
+	 * batch should now be aborted.
+	 *
+	 * Note that we could still queue more queries at this point if we wanted;
+	 * they'd get added to a new third batch since we've already sent a
+	 * second. The aborted flag relates only to the batch being received.
+	 */
+	if (PQbatchStatus(conn) != PQBATCH_MODE_ABORTED)
+		pg_fatal("batch should be flagged as aborted but isn't\n");
+
+	/* third query in batch, the second insert */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_BATCH_ABORTED)
+		pg_fatal("Unexpected result code -- expected PGRES_BATCH_ABORTED, got %s\n",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	/* NULL result to signal end-of-results for this command */
+	if ((res = PQgetResult(conn)) != NULL)
+		pg_fatal("Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	if (PQbatchStatus(conn) != PQBATCH_MODE_ABORTED)
+		pg_fatal("batch should be flagged as aborted but isn't\n");
+
+	/* Ensure we're still in batch */
+	if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+		pg_fatal("Fell out of batch mode somehow\n");
+
+	/*
+	 * The end of a failed batch is a PGRES_BATCH_END.
+	 *
+	 * (This is so clients know to start processing results normally again and
+	 * can tell the difference between skipped commands and the sync.)
+	 */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_BATCH_END)
+		pg_fatal("Unexpected result code from first batch sync\n"
+				 "Expected PGRES_BATCH_END, got %s\n",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	/* XXX why do we have a NULL result after PGRES_BATCH_END? */
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	if (PQbatchStatus(conn) == PQBATCH_MODE_ABORTED)
+		pg_fatal("sync should've cleared the aborted flag but didn't\n");
+
+	/* We're still in a batch... */
+	if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+		pg_fatal("Fell out of batch mode somehow\n");
+
+	/* the insert from the second batch */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("Unexpected result code %s from first item in second batch\n",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	/* Read the NULL result at the end of the command */
+	if ((res = PQgetResult(conn)) != NULL)
+		pg_fatal("Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	/* the second batch sync */
+	if ((res = PQgetResult(conn)) == NULL)
+		pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_BATCH_END)
+		pg_fatal("Unexpected result code %s from second batch sync\n",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	if ((res = PQgetResult(conn)) != NULL)
+		pg_fatal("Expected null result, got %s: %s\n",
+				 PQresStatus(PQresultStatus(res)),
+				 PQerrorMessage(conn));
+
+	/* We're still in a batch... */
+	if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+		pg_fatal("Fell out of batch mode somehow\n");
+
+	/* until we end it, which we can safely do now */
+	if (PQexitBatchMode(conn) != 1)
+		pg_fatal("attempt to exit batch mode failed when it should've succeeded: %s\n",
+				 PQerrorMessage(conn));
+
+	if (PQbatchStatus(conn) != PQBATCH_MODE_OFF)
+		pg_fatal("exiting batch mode didn't seem to work\n");
+
+	fprintf(stderr, "ok\n");
+
+	/*-
+	 * Since we fired the batches off without a surrounding xact, the results
+	 * should be:
+	 *
+	 * - Implicit xact started by server around 1st batch
+	 * - First insert applied
+	 * - Second statement aborted xact
+	 * - Third insert skipped
+	 * - Sync rolled back first implicit xact
+	 * - Implicit xact created by server around 2nd batch
+	 * - insert applied from 2nd batch
+	 * - Sync commits 2nd xact
+	 *
+	 * So we should only have the value 3 that we inserted.
+	 */
+	res = PQexec(conn, "SELECT itemno FROM batch_demo");
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Expected tuples, got %s: %s\n",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	if (PQntuples(res) != 1)
+		pg_fatal("expected 1 result, got %d\n", PQntuples(res));
+	for (i = 0; i < PQntuples(res); i++)
+	{
+		const char *val = PQgetvalue(res, i, 0);
+
+		if (strcmp(val, "3") != 0)
+			pg_fatal("expected only insert with value 3, got %s", val);
+	}
+
+	PQclear(res);
+}
+
+/* State machine enum for test_batch_insert */
+typedef enum BatchInsertStep
+{
+	BI_BEGIN_TX,
+	BI_DROP_TABLE,
+	BI_CREATE_TABLE,
+	BI_PREPARE,
+	BI_INSERT_ROWS,
+	BI_COMMIT_TX,
+	BI_SYNC,
+	BI_DONE
+} BatchInsertStep;
+
+static void
+test_batch_insert(PGconn *conn, int n_rows)
+{
+	const char *insert_params[1];
+	Oid			insert_param_oids[1] = {INT4OID};
+	char		insert_param_0[MAXINTLEN];
+	BatchInsertStep send_step = BI_BEGIN_TX,
+				recv_step = BI_BEGIN_TX;
+	int			rows_to_send,
+				rows_to_receive;
+
+	insert_params[0] = &insert_param_0[0];
+
+	rows_to_send = rows_to_receive = n_rows;
+
+	/*
+	 * Do a batched insert into a table created at the start of the batch
+	 */
+	if (PQenterBatchMode(conn) != 1)
+		pg_fatal("failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+	while (send_step != BI_PREPARE)
+	{
+		const char *sql;
+
+		switch (send_step)
+		{
+			case BI_BEGIN_TX:
+				sql = "BEGIN TRANSACTION";
+				send_step = BI_DROP_TABLE;
+				break;
+
+			case BI_DROP_TABLE:
+				sql = drop_table_sql;
+				send_step = BI_CREATE_TABLE;
+				break;
+
+			case BI_CREATE_TABLE:
+				sql = create_table_sql;
+				send_step = BI_PREPARE;
+				break;
+
+			default:
+				pg_fatal("invalid state");
+		}
+
+		pg_debug("sending: %s\n", sql);
+		if (PQsendQueryParams(conn, sql,
+							  0, NULL, NULL, NULL, NULL, 0) != 1)
+			pg_fatal("dispatching %s failed: %s\n", sql, PQerrorMessage(conn));
+	}
+
+	Assert(send_step == BI_PREPARE);
+	pg_debug("sending: %s\n", insert_sql);
+	if (PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids) != 1)
+		pg_fatal("dispatching PREPARE failed: %s\n", PQerrorMessage(conn));
+	send_step = BI_INSERT_ROWS;
+
+	/*
+	 * Now we start inserting. We'll be sending enough data that we could fill
+	 * our out buffer, so to avoid deadlocking we need to enter nonblocking
+	 * mode and consume input while we send more output. As results of each
+	 * query are processed we should pop them to allow processing of the next
+	 * query. There's no need to finish the batch before processing results.
+	 */
+	if (PQsetnonblocking(conn, 1) != 0)
+		pg_fatal("failed to set nonblocking mode: %s\n", PQerrorMessage(conn));
+
+	while (recv_step != BI_DONE)
+	{
+		int			sock;
+		fd_set		input_mask;
+		fd_set		output_mask;
+
+		sock = PQsocket(conn);
+
+		if (sock < 0)
+			break;				/* shouldn't happen */
+
+		FD_ZERO(&input_mask);
+		FD_SET(sock, &input_mask);
+		FD_ZERO(&output_mask);
+		FD_SET(sock, &output_mask);
+
+		if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
+		{
+			fprintf(stderr, "select() failed: %s\n", strerror(errno));
+			exit_nicely(conn);
+		}
+
+		/*
+		 * Process any results, so we keep the server's out buffer free
+		 * flowing and it can continue to process input
+		 */
+		if (FD_ISSET(sock, &input_mask))
+		{
+			PQconsumeInput(conn);
+
+			/* Read until we'd block if we tried to read */
+			while (!PQisBusy(conn) && recv_step < BI_DONE)
+			{
+				PGresult   *res;
+				const char *cmdtag;
+				const char *description = "";
+				int			status;
+
+				/*
+				 * Read next result.  If no more results from this query,
+				 * advance to the next query
+				 */
+				res = PQgetResult(conn);
+				if (res == NULL)
+					continue;
+
+				status = PGRES_COMMAND_OK;
+				switch (recv_step)
+				{
+					case BI_BEGIN_TX:
+						cmdtag = "BEGIN";
+						recv_step++;
+						break;
+					case BI_DROP_TABLE:
+						cmdtag = "DROP TABLE";
+						recv_step++;
+						break;
+					case BI_CREATE_TABLE:
+						cmdtag = "CREATE TABLE";
+						recv_step++;
+						break;
+					case BI_PREPARE:
+						cmdtag = "";
+						description = "PREPARE";
+						recv_step++;
+						break;
+					case BI_INSERT_ROWS:
+						cmdtag = "INSERT";
+						rows_to_receive--;
+						if (rows_to_receive == 0)
+							recv_step++;
+						break;
+					case BI_COMMIT_TX:
+						cmdtag = "COMMIT";
+						recv_step++;
+						break;
+					case BI_SYNC:
+						cmdtag = "";
+						description = "SYNC";
+						status = PGRES_BATCH_END;
+						recv_step++;
+						break;
+					case BI_DONE:
+						/* unreachable */
+						description = "";
+						abort();
+				}
+
+				if (PQresultStatus(res) != status)
+					pg_fatal("%s reported status %s, expected %s\n"
+							 "Error message: \"%s\"\n",
+							 description, PQresStatus(PQresultStatus(res)),
+							 PQresStatus(status), PQerrorMessage(conn));
+
+				if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
+					pg_fatal("%s expected command tag '%s', got '%s'\n",
+							 description, cmdtag, PQcmdStatus(res));
+
+				pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
+
+				PQclear(res);
+			}
+		}
+
+		/* Write more rows and/or the end batch message, if needed */
+		if (FD_ISSET(sock, &output_mask))
+		{
+			PQflush(conn);
+
+			if (send_step == BI_INSERT_ROWS)
+			{
+				snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send);
+
+				if (PQsendQueryPrepared(conn, "my_insert",
+										1, insert_params, NULL, NULL, 0) == 1)
+				{
+					pg_debug("sent row %d\n", rows_to_send);
+
+					rows_to_send--;
+					if (rows_to_send == 0)
+						send_step = BI_COMMIT_TX;
+				}
+				else
+				{
+					/*
+					 * in nonblocking mode, so it's OK for an insert to fail
+					 * to send
+					 */
+					fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
+							rows_to_send, PQerrorMessage(conn));
+				}
+			}
+			else if (send_step == BI_COMMIT_TX)
+			{
+				if (PQsendQueryParams(conn, "COMMIT",
+									  0, NULL, NULL, NULL, NULL, 0) == 1)
+				{
+					pg_debug("sent COMMIT\n");
+					send_step = BI_SYNC;
+				}
+				else
+				{
+					fprintf(stderr, "WARNING: failed to send commit: %s\n",
+							PQerrorMessage(conn));
+				}
+			}
+			else if (send_step == BI_SYNC)
+			{
+				if (PQbatchSendQueue(conn) == 1)
+				{
+					fprintf(stdout, "Dispatched end batch message\n");
+					send_step = BI_DONE;
+				}
+				else
+				{
+					fprintf(stderr, "WARNING: Ending a batch failed: %s\n",
+							PQerrorMessage(conn));
+				}
+			}
+		}
+	}
+
+	/* We've got the sync message and the batch should be done */
+	if (PQexitBatchMode(conn) != 1)
+		pg_fatal("attempt to exit batch mode failed when it should've succeeded: %s\n",
+				 PQerrorMessage(conn));
+
+	if (PQsetnonblocking(conn, 0) != 0)
+		pg_fatal("failed to clear nonblocking mode: %s\n", PQerrorMessage(conn));
+}
+
+static void
+test_singlerowmode(PGconn *conn)
+{
+	PGresult   *res;
+	int			i;
+	bool		batch_ended = false;
+
+	/* 1 batch, 3 queries in it */
+	if (PQenterBatchMode(conn) != 1)
+		pg_fatal("failed to enter batch mode: %s\n",
+				 PQerrorMessage(conn));
+
+	for (i = 0; i < 3; i++)
+	{
+		char   *param[1];
+
+		param[0] = psprintf("%d", 44 + i);
+
+		if (PQsendQueryParams(conn,
+							  "SELECT generate_series(42, $1)",
+							  1,
+							  NULL,
+							  (const char **) param,
+							  NULL,
+							  NULL,
+							  0) != 1)
+			pg_fatal("failed to send query: %s\n",
+					 PQerrorMessage(conn));
+		pfree(param[0]);
+	}
+	PQbatchSendQueue(conn);
+
+	for (i = 0; !batch_ended; i++)
+	{
+		bool		first = true;
+		bool		saw_ending_tuplesok;
+		bool		isSingleTuple = false;
+
+		/* Set single row mode for only first 2 SELECT queries */
+		if (i < 2)
+		{
+			if (PQsetSingleRowMode(conn) != 1)
+				pg_fatal("PQsetSingleRowMode() failed for i=%d\n", i);
+		}
+
+		/* Consume rows for this query */
+		saw_ending_tuplesok = false;
+		while ((res = PQgetResult(conn)) != NULL)
+		{
+			ExecStatusType est = PQresultStatus(res);
+
+			if (est == PGRES_BATCH_END)
+			{
+				fprintf(stderr, "end of batch reached\n");
+				batch_ended = true;
+				PQclear(res);
+				if (i != 3)
+					pg_fatal("Expected three results, got %d\n", i);
+				break;
+			}
+
+			/* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
+			if (first)
+			{
+				if (i <= 1 && est != PGRES_SINGLE_TUPLE)
+					pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s\n",
+							 i, PQresStatus(est));
+				if (i >= 2 && est != PGRES_TUPLES_OK)
+					pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s\n",
+							 i, PQresStatus(est));
+				first = false;
+			}
+
+			fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
+			switch (est)
+			{
+				case PGRES_TUPLES_OK:
+					fprintf(stderr, ", tuples: %d\n", PQntuples(res));
+					saw_ending_tuplesok = true;
+					if (isSingleTuple)
+					{
+						if (PQntuples(res) == 0)
+							fprintf(stderr, "all tuples received in query %d\n", i);
+						else
+							pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, "
+									 "but received PGRES_TUPLES_OK directly instead\n");
+					}
+					break;
+
+				case PGRES_SINGLE_TUPLE:
+					fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
+					break;
+
+				default:
+					pg_fatal("unexpected\n");
+			}
+			PQclear(res);
+		}
+		if (!batch_ended && !saw_ending_tuplesok)
+			pg_fatal("didn't get expected terminating TUPLES_OK\n");
+	}
+
+	if (PQexitBatchMode(conn) != 1)
+		pg_fatal("failed to end batch mode: %s\n", PQerrorMessage(conn));
+}
+
+static void
+usage(const char *progname)
+{
+	fprintf(stderr, "%s tests libpq's batch-mode.\n\n", progname);
+	fprintf(stderr, "Usage:\n");
+	fprintf(stderr, "  %s testname [conninfo [number_of_rows]]\n", progname);
+	fprintf(stderr, "Tests:\n");
+	fprintf(stderr, "  disallowed_in_batch|simple_batch|multi_batch|batch_abort|\n");
+	fprintf(stderr, "  singlerow|batch_insert\n");
+}
+
+int
+main(int argc, char **argv)
+{
+	const char *conninfo = "";
+	PGconn	   *conn;
+	int			numrows = 10000;
+
+	/*
+	 * The testname parameter is mandatory; it can be followed by a conninfo
+	 * string and number of rows.
+	 */
+	if (argc < 2 || argc > 4)
+	{
+		usage(argv[0]);
+		exit(1);
+	}
+
+	if (argc >= 3)
+		conninfo = pg_strdup(argv[2]);
+
+	if (argc >= 4)
+	{
+		errno = 0;
+		numrows = strtol(argv[3], NULL, 10);
+		if (errno != 0 || numrows <= 0)
+		{
+			fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n", argv[3]);
+			exit(1);
+		}
+	}
+
+	/* Make a connection to the database */
+	conn = PQconnectdb(conninfo);
+	if (PQstatus(conn) != CONNECTION_OK)
+	{
+		fprintf(stderr, "Connection to database failed: %s\n",
+				PQerrorMessage(conn));
+		exit_nicely(conn);
+	}
+
+	if (strcmp(argv[1], "disallowed_in_batch") == 0)
+		test_disallowed_in_batch(conn);
+	else if (strcmp(argv[1], "simple_batch") == 0)
+		test_simple_batch(conn);
+	else if (strcmp(argv[1], "multi_batch") == 0)
+		test_multi_batch(conn);
+	else if (strcmp(argv[1], "batch_abort") == 0)
+		test_batch_abort(conn);
+	else if (strcmp(argv[1], "batch_insert") == 0)
+		test_batch_insert(conn, numrows);
+	else if (strcmp(argv[1], "singlerow") == 0)
+		test_singlerowmode(conn);
+	else
+	{
+		fprintf(stderr, "\"%s\" is not a recognized test name\n", argv[1]);
+		usage(argv[0]);
+		exit(1);
+	}
+
+	/* close the connection to the database and cleanup */
+	PQfinish(conn);
+	return 0;
+}
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 90594bd41b..634e48ec85 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -50,7 +50,8 @@ my @contrib_excludes = (
 	'pgcrypto',         'sepgsql',
 	'brin',             'test_extensions',
 	'test_misc',        'test_pg_dump',
-	'snapshot_too_old', 'unsafe_tests');
+	'snapshot_too_old', 'unsafe_tests',
+	'test_libpq');
 
 # Set of variables for frontend modules
 my $frontend_defines = { 'initdb' => 'FRONTEND' };
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 0e4bb4f07f..4c7709c2e6 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -215,6 +215,7 @@ BackgroundWorkerHandle
 BackgroundWorkerSlot
 Barrier
 BaseBackupCmd
+BatchInsertStep
 BeginDirectModify_function
 BeginForeignInsert_function
 BeginForeignModify_function
@@ -1544,6 +1545,7 @@ PG_Locale_Strategy
 PG_Lock_Status
 PG_init_t
 PGcancel
+PGcommandQueueEntry
 PGconn
 PGdataValue
 PGlobjfuncs
@@ -1656,6 +1658,7 @@ PMSignalReason
 PMState
 POLYGON
 PQArgBlock
+PQBatchStatus
 PQEnvironmentOption
 PQExpBuffer
 PQExpBufferData

Reply via email to