Here's what seems a final version of the patch.  I renamed one more
function: PQsendPipeline is now PQpipelineSync.  I also reworded the
docs in a couple of places, added a few tests to the pgbench patch, and
made it work.

Note the pgbench results in pipeline mode:

./pgbench -r -Mextended -n -f 
/home/alvherre/Code/pgsql-build/pipeline/src/bin/pgbench/tmp_check/t_001_pgbench_with_server_main_data/001_pgbench_pipeline
 -c 100 -t10000
pgbench (PostgreSQL) 14.0
transaction type: 
/home/alvherre/Code/pgsql-build/pipeline/src/bin/pgbench/tmp_check/t_001_pgbench_with_server_main_data/001_pgbench_pipeline
scaling factor: 1
query mode: extended
number of clients: 100
number of threads: 1
number of transactions per client: 10000
number of transactions actually processed: 1000000/1000000
latency average = 2.316 ms
initial connection time = 113.859 ms
tps = 43182.438635 (without initial connection time)
statement latencies in milliseconds:
         0.000  \startpipeline
         0.000  select 1;
         0.000  select 1;
         0.000  select 1;
         0.000  select 1;
         0.000  select 1;
         0.000  select 1;
         0.000  select 1;
         0.000  select 1;
         0.000  select 1;
         0.000  select 1;
         1.624  \endpipeline

If I just replace the \startpipeline and \endpipeline lines with BEGIN
and COMMIT respectively, I get this:

tps = 10220.259051 (without initial connection time)

         0.830  begin;
         0.765  select 1;
         0.752  select 1;
         0.753  select 1;
         0.755  select 1;
         0.754  select 1;
         0.755  select 1;
         0.757  select 1;
         0.756  select 1;
         0.756  select 1;
         0.756  select 1;
         0.750  commit;

Yes, you could say that this is a liiiitle bit unfair -- but it seems
quite impressive nonetheless.

-- 
Álvaro Herrera                            39°49'30"S 73°17'W
>From 5e4fdd5246d559caf0d75ad74001f09a48ec4c0e Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Mon, 15 Mar 2021 15:05:22 -0300
Subject: [PATCH v37 1/2] Implement pipeline mode in libpq
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Pipeline mode in libpq lets an application avoid the Sync messages in
the FE/BE protocol that are implicit in the old libpq API after each
query.  The application can then insert Sync at its leisure with a new
libpq function PQpipelineSync.  This can lead to substantial reductions
in query latency.

Co-authored-by: Craig Ringer <craig.rin...@enterprisedb.com>
Co-authored-by: Matthieu Garrigues <matthieu.garrig...@gmail.com>
Co-authored-by: Álvaro Herrera <alvhe...@alvh.no-ip.org>
Reviewed-by: Andres Freund <and...@anarazel.de>
Reviewed-by: Aya Iwata <iwata....@jp.fujitsu.com>
Reviewed-by: Daniel Vérité <dan...@manitou-mail.org>
Reviewed-by: David G. Johnston <david.g.johns...@gmail.com>
Reviewed-by: Justin Pryzby <pry...@telsasoft.com>
Reviewed-by: Kirk Jamison <k.jami...@fujitsu.com>
Reviewed-by: Michael Paquier <michael.paqu...@gmail.com>
Reviewed-by: Nikhil Sontakke <nikh...@2ndquadrant.com>
Reviewed-by: Vaishnavi Prabakaran <vaishna...@fast.au.fujitsu.com>
Reviewed-by: Zhihong Yu <z...@yugabyte.com>

Discussion: https://postgr.es/m/CAMsr+YFUjJytRyV4J-16bEoiZyH=4nj+sQ7JP9ajwz=b4dm...@mail.gmail.com
Discussion: https://postgr.es/m/cajkzx4t5e-2cqe3dtv2r78dyfvz+in8py7a8marvlhs_pg7...@mail.gmail.com
---
 doc/src/sgml/libpq.sgml                       |  522 ++++++-
 doc/src/sgml/lobj.sgml                        |    4 +
 doc/src/sgml/ref/pgbench.sgml                 |   21 +
 .../libpqwalreceiver/libpqwalreceiver.c       |    6 +
 src/bin/pg_amcheck/pg_amcheck.c               |    2 +
 src/interfaces/libpq/exports.txt              |    4 +
 src/interfaces/libpq/fe-connect.c             |   37 +-
 src/interfaces/libpq/fe-exec.c                |  717 ++++++++-
 src/interfaces/libpq/fe-protocol3.c           |   77 +-
 src/interfaces/libpq/libpq-fe.h               |   21 +-
 src/interfaces/libpq/libpq-int.h              |   60 +-
 src/test/modules/Makefile                     |    1 +
 src/test/modules/libpq_pipeline/.gitignore    |    5 +
 src/test/modules/libpq_pipeline/Makefile      |   20 +
 src/test/modules/libpq_pipeline/README        |    1 +
 .../modules/libpq_pipeline/libpq_pipeline.c   | 1303 +++++++++++++++++
 .../libpq_pipeline/t/001_libpq_pipeline.pl    |   28 +
 src/tools/msvc/Mkvcbuild.pm                   |    9 +-
 src/tools/pgindent/typedefs.list              |    2 +
 19 files changed, 2727 insertions(+), 113 deletions(-)
 create mode 100644 src/test/modules/libpq_pipeline/.gitignore
 create mode 100644 src/test/modules/libpq_pipeline/Makefile
 create mode 100644 src/test/modules/libpq_pipeline/README
 create mode 100644 src/test/modules/libpq_pipeline/libpq_pipeline.c
 create mode 100644 src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 910e9a81ea..be674fbaa9 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3180,6 +3180,33 @@ ExecStatusType PQresultStatus(const PGresult *res);
            </para>
           </listitem>
          </varlistentry>
+
+         <varlistentry id="libpq-pgres-pipeline-sync">
+          <term><literal>PGRES_PIPELINE_SYNC</literal></term>
+          <listitem>
+           <para>
+            The <structname>PGresult</structname> represents a
+            synchronization point in pipeline mode, requested by
+            <xref linkend="libpq-PQpipelineSync"/>.
+            This status occurs only when pipeline mode has been selected.
+           </para>
+          </listitem>
+         </varlistentry>
+
+         <varlistentry id="libpq-pgres-pipeline-aborted">
+          <term><literal>PGRES_PIPELINE_ABORTED</literal></term>
+          <listitem>
+           <para>
+            The <structname>PGresult</structname> represents a pipeline that has
+            received an error from the server.  <function>PQgetResult</function>
+            must be called repeatedly, and each time it will return this status code
+            until the end of the current pipeline, at which point it will return
+            <literal>PGRES_PIPELINE_SYNC</literal> and normal processing can
+            resume.
+           </para>
+          </listitem>
+         </varlistentry>
+
         </variablelist>
 
         If the result status is <literal>PGRES_TUPLES_OK</literal> or
@@ -4677,8 +4704,9 @@ int PQsendDescribePortal(PGconn *conn, const char *portalName);
        <xref linkend="libpq-PQsendQueryParams"/>,
        <xref linkend="libpq-PQsendPrepare"/>,
        <xref linkend="libpq-PQsendQueryPrepared"/>,
-       <xref linkend="libpq-PQsendDescribePrepared"/>, or
-       <xref linkend="libpq-PQsendDescribePortal"/>
+       <xref linkend="libpq-PQsendDescribePrepared"/>,
+       <xref linkend="libpq-PQsendDescribePortal"/>, or
+       <xref linkend="libpq-PQpipelineSync"/>
        call, and returns it.
        A null pointer is returned when the command is complete and there
        will be no more results.
@@ -4702,6 +4730,19 @@ PGresult *PQgetResult(PGconn *conn);
        <xref linkend="libpq-PQconsumeInput"/>.
       </para>
 
+      <para>
+       In pipeline mode, <function>PQgetResult</function> will return normally
+       unless an error occurs; for any subsequent query sent after the one
+       that caused the error until (and excluding) the next synchronization point,
+       a special result of type <literal>PGRES_PIPELINE_ABORTED</literal> will
+       be returned, and a null pointer will be returned after it.
+       When the pipeline synchronization point is reached, a result of type
+       <literal>PGRES_PIPELINE_SYNC</literal> will be returned.
+       The result of the next query after the synchronization point follows
+       immediately (that is, no null pointer is returned after
+       the synchronization point.)
+      </para>
+
       <note>
        <para>
         Even when <xref linkend="libpq-PQresultStatus"/> indicates a fatal
@@ -4926,6 +4967,476 @@ int PQflush(PGconn *conn);
 
  </sect1>
 
+ <sect1 id="libpq-pipeline-mode">
+  <title>Pipeline Mode</title>
+
+  <indexterm zone="libpq-pipeline-mode">
+   <primary>libpq</primary>
+   <secondary>pipeline mode</secondary>
+  </indexterm>
+
+  <indexterm zone="libpq-pipeline-mode">
+   <primary>pipelining</primary>
+   <secondary>in libpq</secondary>
+  </indexterm>
+
+  <indexterm zone="libpq-pipeline-mode">
+   <primary>batch mode</primary>
+   <secondary>in libpq</secondary>
+  </indexterm>
+
+  <para>
+   <application>libpq</application> pipeline mode allows applications to
+   send a query without having to read the result of the previously
+   sent query.  Taking advantage of the pipeline mode, a client will wait
+   less for the server, since multiple queries/results can be
+   sent/received in a single network transaction.
+  </para>
+
+  <para>
+   While pipeline mode provides a significant performance boost, writing
+   clients using the pipeline mode is more complex because it involves
+   managing a queue of pending queries and finding which result
+   corresponds to which query in the queue.
+  </para>
+
+  <para>
+   Pipeline mode also generally consumes more memory on both the client and server,
+   though careful and aggressive management of the send/receive queue can mitigate
+   this.  This applies whether or not the connection is in blocking or non-blocking
+   mode.
+  </para>
+
+  <para>
+   While the pipeline API was introduced in
+   <productname>PostgreSQL</productname> 14, it is a client-side feature
+   which doesn't require special server support, and works on any server
+   that supports the v3 extended query protocol.
+  </para>
+
+  <sect2 id="libpq-pipeline-using">
+   <title>Using Pipeline Mode</title>
+
+   <para>
+    To issue pipelines, the application must switch the connection
+    into pipeline mode,
+    which is done with <xref linkend="libpq-PQenterPipelineMode"/>.
+    <xref linkend="libpq-PQpipelineStatus"/> can be used
+    to test whether pipeline mode is active.
+    In pipeline mode, only <link linkend="libpq-async">asynchronous operations</link>
+    are permitted, and <literal>COPY</literal> is disallowed.
+    Using synchronous command execution functions
+    such as <function>PQfn</function>,
+    <function>PQexec</function>,
+    <function>PQexecParams</function>,
+    <function>PQprepare</function>,
+    <function>PQexecPrepared</function>,
+    <function>PQdescribePrepared</function>,
+    <function>PQdescribePortal</function>,
+    is an error condition.
+    Once all dispatched commands have had their results processed, and
+    the end pipeline result has been consumed, the application may return
+    to non-pipelined mode with <xref linkend="libpq-PQexitPipelineMode"/>.
+   </para>
+
+   <note>
+    <para>
+     It is best to use pipeline mode with <application>libpq</application> in
+     <link linkend="libpq-PQsetnonblocking">non-blocking mode</link>. If used
+     in blocking mode it is possible for a client/server deadlock to occur.
+      <footnote>
+       <para>
+        The client will block trying to send queries to the server, but the
+        server will block trying to send results to the client from queries
+        it has already processed. This only occurs when the client sends
+        enough queries to fill both its output buffer and the server's receive
+        buffer before it switches to processing input from the server,
+        but it's hard to predict exactly when that will happen.
+       </para>
+      </footnote>
+    </para>
+   </note>
+
+   <sect3 id="libpq-pipeline-sending">
+    <title>Issuing Queries</title>
+
+    <para>
+     After entering pipeline mode, the application dispatches requests using
+     <xref linkend="libpq-PQsendQuery"/>,
+     <xref linkend="libpq-PQsendQueryParams"/>,
+     or its prepared-query sibling
+     <xref linkend="libpq-PQsendQueryPrepared"/>.
+     These requests are queued on the client-side until flushed to the server;
+     this occurs when <xref linkend="libpq-PQpipelineSync"/> is used to
+     establish a synchronization point in the pipeline,
+     or when <xref linkend="libpq-PQflush"/> is called.
+     The functions <xref linkend="libpq-PQsendPrepare"/>,
+     <xref linkend="libpq-PQsendDescribePrepared"/>, and
+     <xref linkend="libpq-PQsendDescribePortal"/> also work in pipeline mode.
+     Result processing is described below.
+    </para>
+
+    <para>
+     The server executes statements, and returns results, in the order the
+     client sends them.  The server will begin executing the commands in the
+     pipeline immediately, not waiting for the end of the pipeline.
+     If any statement encounters an error, the server aborts the current
+     transaction and does not execute any subsequent command in the queue
+     until the next synchronization point established by
+     <function>PQpipelineSync</function>;
+     a <literal>PGRES_PIPELINE_ABORTED</literal> result is produced for
+     each such command.
+     (This remains true even if the commands in the pipeline would rollback
+     the transaction.)
+     Query processing resumes after the synchronization point.
+    </para>
+
+    <para>
+     It's fine for one operation to depend on the results of a
+     prior one; for example, one query may define a table that the next
+     query in the same pipeline uses. Similarly, an application may
+     create a named prepared statement and execute it with later
+     statements in the same pipeline.
+    </para>
+   </sect3>
+
+   <sect3 id="libpq-pipeline-results">
+    <title>Processing Results</title>
+
+    <para>
+     To process the result of one query in a pipeline, the application calls
+     <function>PQgetResult</function> repeatedly and handles each result
+     until <function>PQgetResult</function> returns null.
+     The result from the next query in the pipeline may then be retrieved using
+     <function>PQgetResult</function> again and the cycle repeated.
+     The application handles individual statement results as normal.
+     When the results of all the queries in the pipeline have been
+     returned, <function>PQgetResult</function> returns a result
+     containing the status value <literal>PGRES_PIPELINE_SYNC</literal>
+    </para>
+
+    <para>
+     The client may choose to defer result processing until the complete
+     pipeline has been sent, or interleave that with sending further
+     queries in the pipeline; see <xref linkend="libpq-pipeline-interleave"/>.
+    </para>
+
+    <para>
+     To enter single-row mode, call <function>PQsetSingleRowMode</function>
+     before retrieving results with <function>PQgetResult</function>.
+     This mode selection is effective only for the query currently
+     being processed. For more information on the use of
+     <function>PQsetSingleRowMode</function>,
+     refer to <xref linkend="libpq-single-row-mode"/>.
+    </para>
+
+    <para>
+     <function>PQgetResult</function> behaves the same as for normal
+     asynchronous processing except that it may contain the new
+     <type>PGresult</type> types <literal>PGRES_PIPELINE_SYNC</literal>
+     and <literal>PGRES_PIPELINE_ABORTED</literal>.
+     <literal>PGRES_PIPELINE_SYNC</literal> is reported exactly once for each
+     <function>PQpipelineSync</function> at the corresponding point
+     in the pipeline.
+     <literal>PGRES_PIPELINE_ABORTED</literal> is emitted in place of a normal
+     query result for the first error and all subsequent results
+     until the next <literal>PGRES_PIPELINE_SYNC</literal>;
+     see <xref linkend="libpq-pipeline-errors"/>.
+    </para>
+
+    <para>
+     <function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
+     operate as normal when processing pipeline results.
+    </para>
+
+    <para>
+     <application>libpq</application> does not provide any information to the
+     application about the query currently being processed (except that
+     <function>PQgetResult</function> returns null to indicate that we start
+     returning the results of next query). The application must keep track
+     of the order in which it sent queries, to associate them with their
+     corresponding results.
+     Applications will typically use a state machine or a FIFO queue for this.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-pipeline-errors">
+    <title>Error Handling</title>
+
+    <para>
+     From the client's perspective, after <function>PQresultStatus</function>
+     returns <literal>PGRES_FATAL_ERROR</literal>,
+     the pipeline is flagged as aborted.
+     <function>PQresultStatus</function> will report a
+     <literal>PGRES_PIPELINE_ABORTED</literal> result for each remaining queued
+     operation in an aborted pipeline. The result for
+     <function>PQpipelineSync</function> is reported as
+     <literal>PGRES_PIPELINE_SYNC</literal> to signal the end of the aborted pipeline
+     and resumption of normal result processing.
+    </para>
+
+    <para>
+     The client <emphasis>must</emphasis> process results with
+     <function>PQgetResult</function> during error recovery.
+    </para>
+
+    <para>
+     If the pipeline used an implicit transaction, then operations that have
+     already executed are rolled back and operations that were queued to follow
+     the failed operation are skipped entirely. The same behavior holds if the
+     pipeline starts and commits a single explicit transaction (i.e. the first
+     statement is <literal>BEGIN</literal> and the last is
+     <literal>COMMIT</literal>) except that the session remains in an aborted
+     transaction state at the end of the pipeline. If a pipeline contains
+     <emphasis>multiple explicit transactions</emphasis>, all transactions that
+     committed prior to the error remain committed, the currently in-progress
+     transaction is aborted, and all subsequent operations are skipped completely,
+     including subsequent transactions.  If a pipeline synchronization point
+     occurs with an explicit transaction block in aborted state, the next pipeline
+     will become aborted immediately unless the next command puts the transaction
+     in normal mode with <command>ROLLBACK</command>.
+    </para>
+
+    <note>
+     <para>
+      The client must not assume that work is committed when it
+      <emphasis>sends</emphasis> a <literal>COMMIT</literal> &mdash; only when the
+      corresponding result is received to confirm the commit is complete.
+      Because errors arrive asynchronously, the application needs to be able to
+      restart from the last <emphasis>received</emphasis> committed change and
+      resend work done after that point if something goes wrong.
+     </para>
+    </note>
+   </sect3>
+
+   <sect3 id="libpq-pipeline-interleave">
+    <title>Interleaving Result Processing and Query Dispatch</title>
+
+    <para>
+     To avoid deadlocks on large pipelines the client should be structured
+     around a non-blocking event loop using operating system facilities
+     such as <function>select</function>, <function>poll</function>,
+     <function>WaitForMultipleObjectEx</function>, etc.
+    </para>
+
+    <para>
+     The client application should generally maintain a queue of work
+     remaining to be dispatched and a queue of work that has been dispatched
+     but not yet had its results processed. When the socket is writable
+     it should dispatch more work. When the socket is readable it should
+     read results and process them, matching them up to the next entry in
+     its corresponding results queue.  Based on available memory, results from the
+     socket should be read frequently: there's no need to wait until the
+     pipeline end to read the results.  Pipelines should be scoped to logical
+     units of work, usually (but not necessarily) one transaction per pipeline.
+     There's no need to exit pipeline mode and re-enter it between pipelines,
+     or to wait for one pipeline to finish before sending the next.
+    </para>
+
+    <para>
+     An example using <function>select()</function> and a simple state
+     machine to track sent and received work is in
+     <filename>src/test/modules/libpq_pipeline/libpq_pipeline.c</filename>
+     in the PostgreSQL source distribution.
+    </para>
+   </sect3>
+  </sect2>
+
+  <sect2 id="libpq-pipeline-functions">
+   <title>Functions Associated with Pipeline Mode</title>
+
+   <variablelist>
+
+    <varlistentry id="libpq-PQpipelineStatus">
+     <term><function>PQpipelineStatus</function><indexterm><primary>PQpipelineStatus</primary></indexterm></term>
+
+     <listitem>
+      <para>
+      Returns the current pipeline mode status of the
+      <application>libpq</application> connection.
+<synopsis>
+PGpipelineStatus PQpipelineStatus(const PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       <function>PQpipelineStatus</function> can return one of the following values:
+
+       <variablelist>
+        <varlistentry>
+         <term>
+          <literal>PQ_PIPELINE_ON</literal>
+         </term>
+         <listitem>
+          <para>
+           The <application>libpq</application> connection is in
+           pipeline mode.
+          </para>
+         </listitem>
+        </varlistentry>
+
+        <varlistentry>
+         <term>
+          <literal>PQ_PIPELINE_OFF</literal>
+         </term>
+         <listitem>
+          <para>
+           The <application>libpq</application> connection is
+           <emphasis>not</emphasis> in pipeline mode.
+          </para>
+         </listitem>
+        </varlistentry>
+
+        <varlistentry>
+         <term>
+          <literal>PQ_PIPELINE_ABORTED</literal>
+         </term>
+         <listitem>
+          <para>
+           The <application>libpq</application> connection is in pipeline
+           mode and an error occurred while processing the current pipeline.
+           The aborted flag is cleared when <function>PQgetResult</function>
+           returns a result of type <literal>PGRES_PIPELINE_SYNC</literal>.
+          </para>
+         </listitem>
+        </varlistentry>
+
+       </variablelist>
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQenterPipelineMode">
+     <term><function>PQenterPipelineMode</function><indexterm><primary>PQenterPipelineMode</primary></indexterm></term>
+
+     <listitem>
+      <para>
+      Causes a connection to enter pipeline mode if it is currently idle or
+      already in pipeline mode.
+
+<synopsis>
+int PQenterPipelineMode(PGconn *conn);
+</synopsis>
+
+      </para>
+      <para>
+       Returns 1 for success.
+       Returns 0 and has no effect if the connection is not currently
+       idle, i.e., it has a result ready, or it is waiting for more
+       input from the server, etc.
+       This function does not actually send anything to the server,
+       it just changes the <application>libpq</application> connection
+       state.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQexitPipelineMode">
+     <term><function>PQexitPipelineMode</function><indexterm><primary>PQexitPipelineMode</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       Causes a connection to exit pipeline mode if it is currently in pipeline mode
+       with an empty queue and no pending results.
+<synopsis>
+int PQexitPipelineMode(PGconn *conn);
+</synopsis>
+      </para>
+      <para>
+       Returns 1 for success.  Returns 1 and takes no action if not in
+       pipeline mode. If the current statement isn't finished processing,
+       or <function>PQgetResult</function> has not been called to collect
+       results from all previously sent query, returns 0 (in which case,
+       use <xref linkend="libpq-PQerrorMessage"/> to get more information
+       about the failure).
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQpipelineSync">
+     <term><function>PQpipelineSync</function><indexterm><primary>PQpipelineSync</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       Marks a synchronization point in a pipeline by sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>
+       and flushing the send buffer. This serves as
+       the delimiter of an implicit transaction and an error recovery
+       point; see <xref linkend="libpq-pipeline-errors"/>.
+
+<synopsis>
+int PQpipelineSync(PGconn *conn);
+</synopsis>
+      </para>
+      <para>
+       Returns 1 for success. Returns 0 if the connection is not in
+       pipeline mode or sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>
+       failed.
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </sect2>
+
+  <sect2 id="libpq-pipeline-tips">
+   <title>When to Use Pipeline Mode</title>
+
+   <para>
+    Much like asynchronous query mode, there is no meaningful performance
+    overhead when using pipeline mode. It increases client application complexity,
+    and extra caution is required to prevent client/server deadlocks, but
+    pipeline mode can offer considerable performance improvements, in exchange for
+    increased memory usage from leaving state around longer.
+   </para>
+
+   <para>
+    Pipeline mode is most useful when the server is distant, i.e., network latency
+    (<quote>ping time</quote>) is high, and also when many small operations
+    are being performed in rapid succession.  There is usually less benefit
+    in using pipelined commands when each query takes many multiples of the client/server
+    round-trip time to execute.  A 100-statement operation run on a server
+    300ms round-trip-time away would take 30 seconds in network latency alone
+    without pipelining; with pipelining it may spend as little as 0.3s waiting for
+    results from the server.
+   </para>
+
+   <para>
+    Use pipelined commands when your application does lots of small
+    <literal>INSERT</literal>, <literal>UPDATE</literal> and
+    <literal>DELETE</literal> operations that can't easily be transformed
+    into operations on sets, or into a <literal>COPY</literal> operation.
+   </para>
+
+   <para>
+    Pipeline mode is not useful when information from one operation is required by
+    the client to produce the next operation. In such cases, the client
+    would have to introduce a synchronization point and wait for a full client/server
+    round-trip to get the results it needs. However, it's often possible to
+    adjust the client design to exchange the required information server-side.
+    Read-modify-write cycles are especially good candidates; for example:
+    <programlisting>
+BEGIN;
+SELECT x FROM mytable WHERE id = 42 FOR UPDATE;
+-- result: x=2
+-- client adds 1 to x:
+UPDATE mytable SET x = 3 WHERE id = 42;
+COMMIT;
+    </programlisting>
+    could be much more efficiently done with:
+    <programlisting>
+UPDATE mytable SET x = x + 1 WHERE id = 42;
+    </programlisting>
+   </para>
+
+   <para>
+    Pipelining is less useful, and more complex, when a single pipeline contains
+    multiple transactions (see <xref linkend="libpq-pipeline-errors"/>).
+   </para>
+  </sect2>
+ </sect1>
+
  <sect1 id="libpq-single-row-mode">
   <title>Retrieving Query Results Row-by-Row</title>
 
@@ -4966,6 +5477,13 @@ int PQflush(PGconn *conn);
    Each object should be freed with <xref linkend="libpq-PQclear"/> as usual.
   </para>
 
+  <para>
+   When using pipeline mode, single-row mode needs to be activated for each
+   query in the pipeline before retrieving results for that query
+   with <function>PQgetResult</function>.
+   See <xref linkend="libpq-pipeline-mode"/> for more information.
+  </para>
+
   <para>
    <variablelist>
     <varlistentry id="libpq-PQsetSingleRowMode">
diff --git a/doc/src/sgml/lobj.sgml b/doc/src/sgml/lobj.sgml
index 6d46da42e2..012e44c736 100644
--- a/doc/src/sgml/lobj.sgml
+++ b/doc/src/sgml/lobj.sgml
@@ -130,6 +130,10 @@
     <application>libpq</application> library.
    </para>
 
+   <para>
+    Client applications cannot use these functions while a libpq connection is in pipeline mode.
+   </para>
+
    <sect2 id="lo-create">
     <title>Creating a Large Object</title>
 
diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 299d93b241..5dd1e9e936 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -1110,6 +1110,12 @@ pgbench <optional> <replaceable>options</replaceable> </optional> <replaceable>d
       row, the last value is kept.
      </para>
 
+     <para>
+      <literal>\gset</literal> and <literal>\aset</literal> cannot be used
+      pipeline mode, since query results are not immediately
+      fetched in this mode.
+     </para>
+
      <para>
       The following example puts the final account balance from the first query
       into variable <replaceable>abalance</replaceable>, and fills variables
@@ -1270,6 +1276,21 @@ SELECT 4 AS four \; SELECT 5 AS five \aset
 </programlisting></para>
     </listitem>
    </varlistentry>
+
+   <varlistentry id='pgbench-metacommand-pipeline'>
+    <term><literal>\startpipeline</literal></term>
+    <term><literal>\endpipeline</literal></term>
+
+    <listitem>
+      <para>
+        These commands delimit the start and end of a pipeline of SQL statements.
+        In a pipeline, statements are sent to server without waiting for the results
+        of previous statements (see <xref linkend="libpq-pipeline-mode"/>).
+        Pipeline mode requires the extended query protocol.
+     </para>
+    </listitem>
+   </varlistentry>
+
   </variablelist>
  </refsect2>
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 5272eed9ab..f74378110a 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -1019,6 +1019,12 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 			walres->err = _("empty query");
 			break;
 
+		case PGRES_PIPELINE_SYNC:
+		case PGRES_PIPELINE_ABORTED:
+			walres->status = WALRCV_ERROR;
+			walres->err = _("unexpected pipeline mode");
+			break;
+
 		case PGRES_NONFATAL_ERROR:
 		case PGRES_FATAL_ERROR:
 		case PGRES_BAD_RESPONSE:
diff --git a/src/bin/pg_amcheck/pg_amcheck.c b/src/bin/pg_amcheck/pg_amcheck.c
index 008a75d207..c9d9900693 100644
--- a/src/bin/pg_amcheck/pg_amcheck.c
+++ b/src/bin/pg_amcheck/pg_amcheck.c
@@ -929,6 +929,8 @@ should_processing_continue(PGresult *res)
 		case PGRES_COPY_IN:
 		case PGRES_COPY_BOTH:
 		case PGRES_SINGLE_TUPLE:
+		case PGRES_PIPELINE_SYNC:
+		case PGRES_PIPELINE_ABORTED:
 			return false;
 	}
 	return true;
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index bbc1f90481..5c48c14191 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -179,3 +179,7 @@ PQgetgssctx               176
 PQsetSSLKeyPassHook_OpenSSL         177
 PQgetSSLKeyPassHook_OpenSSL         178
 PQdefaultSSLKeyPassHook_OpenSSL     179
+PQenterPipelineMode       180
+PQexitPipelineMode        181
+PQpipelineSync            182
+PQpipelineStatus          183
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 4e21057d0f..53b354abb2 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -522,6 +522,23 @@ pqDropConnection(PGconn *conn, bool flushInput)
 	}
 }
 
+/*
+ * pqFreeCommandQueue
+ * Free all the entries of PGcmdQueueEntry queue passed.
+ */
+static void
+pqFreeCommandQueue(PGcmdQueueEntry *queue)
+{
+	while (queue != NULL)
+	{
+		PGcmdQueueEntry *cur = queue;
+
+		queue = cur->next;
+		if (cur->query)
+			free(cur->query);
+		free(cur);
+	}
+}
 
 /*
  *		pqDropServerData
@@ -553,6 +570,12 @@ pqDropServerData(PGconn *conn)
 	}
 	conn->notifyHead = conn->notifyTail = NULL;
 
+	pqFreeCommandQueue(conn->cmd_queue_head);
+	conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
+
+	pqFreeCommandQueue(conn->cmd_queue_recycle);
+	conn->cmd_queue_recycle = NULL;
+
 	/* Reset ParameterStatus data, as well as variables deduced from it */
 	pstatus = conn->pstatus;
 	while (pstatus != NULL)
@@ -2459,6 +2482,7 @@ keep_going:						/* We will come back to here until there is
 		/* Drop any PGresult we might have, too */
 		conn->asyncStatus = PGASYNC_IDLE;
 		conn->xactStatus = PQTRANS_IDLE;
+		conn->pipelineStatus = PQ_PIPELINE_OFF;
 		pqClearAsyncResult(conn);
 
 		/* Reset conn->status to put the state machine in the right state */
@@ -3917,6 +3941,7 @@ makeEmptyPGconn(void)
 
 	conn->status = CONNECTION_BAD;
 	conn->asyncStatus = PGASYNC_IDLE;
+	conn->pipelineStatus = PQ_PIPELINE_OFF;
 	conn->xactStatus = PQTRANS_IDLE;
 	conn->options_valid = false;
 	conn->nonblocking = false;
@@ -4084,8 +4109,6 @@ freePGconn(PGconn *conn)
 	if (conn->connip)
 		free(conn->connip);
 	/* Note that conn->Pfdebug is not ours to close or free */
-	if (conn->last_query)
-		free(conn->last_query);
 	if (conn->write_err_msg)
 		free(conn->write_err_msg);
 	if (conn->inBuffer)
@@ -4174,6 +4197,7 @@ closePGconn(PGconn *conn)
 	conn->status = CONNECTION_BAD;	/* Well, not really _bad_ - just absent */
 	conn->asyncStatus = PGASYNC_IDLE;
 	conn->xactStatus = PQTRANS_IDLE;
+	conn->pipelineStatus = PQ_PIPELINE_OFF;
 	pqClearAsyncResult(conn);	/* deallocate result */
 	resetPQExpBuffer(&conn->errorMessage);
 	release_conn_addrinfo(conn);
@@ -6726,6 +6750,15 @@ PQbackendPID(const PGconn *conn)
 	return conn->be_pid;
 }
 
+PGpipelineStatus
+PQpipelineStatus(const PGconn *conn)
+{
+	if (!conn)
+		return PQ_PIPELINE_OFF;
+
+	return conn->pipelineStatus;
+}
+
 int
 PQconnectionNeedsPassword(const PGconn *conn)
 {
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 9a038043b2..f3443708a6 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -39,7 +39,9 @@ char	   *const pgresStatus[] = {
 	"PGRES_NONFATAL_ERROR",
 	"PGRES_FATAL_ERROR",
 	"PGRES_COPY_BOTH",
-	"PGRES_SINGLE_TUPLE"
+	"PGRES_SINGLE_TUPLE",
+	"PGRES_PIPELINE_SYNC",
+	"PGRES_PIPELINE_ABORTED"
 };
 
 /*
@@ -71,6 +73,8 @@ static PGresult *PQexecFinish(PGconn *conn);
 static int	PQsendDescribe(PGconn *conn, char desc_type,
 						   const char *desc_target);
 static int	check_field_number(const PGresult *res, int field_num);
+static void pqPipelineProcessQueue(PGconn *conn);
+static int	pqPipelineFlush(PGconn *conn);
 
 
 /* ----------------
@@ -1171,7 +1175,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 		conn->next_result = conn->result;
 		conn->result = res;
 		/* And mark the result ready to return */
-		conn->asyncStatus = PGASYNC_READY;
+		conn->asyncStatus = PGASYNC_READY_MORE;
 	}
 
 	return 1;
@@ -1184,6 +1188,87 @@ fail:
 }
 
 
+/*
+ * pqAllocCmdQueueEntry
+ *		Get a command queue entry for caller to fill.
+ *
+ * If the recycle queue has a free element, that is returned; if not, a
+ * fresh one is allocated.  Caller is responsible for adding it to the
+ * command queue (pqAppendCmdQueueEntry) once the struct is filled in, or
+ * releasing the memory (pqRecycleCmdQueueEntry) if an error occurs.
+ *
+ * If allocation fails, sets the error message and returns NULL.
+ */
+static PGcmdQueueEntry *
+pqAllocCmdQueueEntry(PGconn *conn)
+{
+	PGcmdQueueEntry *entry;
+
+	if (conn->cmd_queue_recycle == NULL)
+	{
+		entry = (PGcmdQueueEntry *) malloc(sizeof(PGcmdQueueEntry));
+		if (entry == NULL)
+		{
+			appendPQExpBufferStr(&conn->errorMessage,
+								 libpq_gettext("out of memory\n"));
+			return NULL;
+		}
+	}
+	else
+	{
+		entry = conn->cmd_queue_recycle;
+		conn->cmd_queue_recycle = entry->next;
+	}
+	entry->next = NULL;
+	entry->query = NULL;
+
+	return entry;
+}
+
+/*
+ * pqAppendCmdQueueEntry
+ *		Append a caller-allocated command queue entry to the queue.
+ *
+ * The query itself must already have been put in the output buffer by the
+ * caller.
+ */
+static void
+pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
+{
+	Assert(entry->next == NULL);
+
+	if (conn->cmd_queue_head == NULL)
+		conn->cmd_queue_head = entry;
+	else
+		conn->cmd_queue_tail->next = entry;
+
+	conn->cmd_queue_tail = entry;
+}
+
+/*
+ * pqRecycleCmdQueueEntry
+ *		Push a command queue entry onto the freelist.
+ */
+static void
+pqRecycleCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
+{
+	if (entry == NULL)
+		return;
+
+	/* recyclable entries should not have a follow-on command */
+	Assert(entry->next == NULL);
+
+	if (entry->query)
+	{
+		free(entry->query);
+		entry->query = NULL;
+	}
+
+	entry->next = conn->cmd_queue_recycle;
+	conn->cmd_queue_recycle = entry;
+}
+
+
 /*
  * PQsendQuery
  *	 Submit a query, but don't wait for it to finish
@@ -1209,9 +1294,15 @@ PQsendQueryContinue(PGconn *conn, const char *query)
 static int
 PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 {
+	PGcmdQueueEntry *entry = NULL;
+
 	if (!PQsendQueryStart(conn, newQuery))
 		return 0;
 
+	entry = pqAllocCmdQueueEntry(conn);
+	if (entry == NULL)
+		return 0;				/* error msg already set */
+
 	/* check the argument */
 	if (!query)
 	{
@@ -1220,37 +1311,75 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 		return 0;
 	}
 
-	/* construct the outgoing Query message */
-	if (pqPutMsgStart('Q', conn) < 0 ||
-		pqPuts(query, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
+	/* Send the query message(s) */
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
 	{
-		/* error message should be set up already */
-		return 0;
+		/* construct the outgoing Query message */
+		if (pqPutMsgStart('Q', conn) < 0 ||
+			pqPuts(query, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+		{
+			/* error message should be set up already */
+			return 0;
+		}
+
+		/* remember we are using simple query protocol */
+		entry->queryclass = PGQUERY_SIMPLE;
+		/* and remember the query text too, if possible */
+		entry->query = strdup(query);
 	}
+	else
+	{
+		/*
+		 * In pipeline mode we cannot use the simple protocol, so we send
+		 * Parse, Bind, Describe Portal, Execute.
+		 */
+		if (pqPutMsgStart('P', conn) < 0 ||
+			pqPuts("", conn) < 0 ||
+			pqPuts(query, conn) < 0 ||
+			pqPutInt(0, 2, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+		if (pqPutMsgStart('B', conn) < 0 ||
+			pqPuts("", conn) < 0 ||
+			pqPuts("", conn) < 0 ||
+			pqPutInt(0, 2, conn) < 0 ||
+			pqPutInt(0, 2, conn) < 0 ||
+			pqPutInt(0, 2, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+		if (pqPutMsgStart('D', conn) < 0 ||
+			pqPutc('P', conn) < 0 ||
+			pqPuts("", conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+		if (pqPutMsgStart('E', conn) < 0 ||
+			pqPuts("", conn) < 0 ||
+			pqPutInt(0, 4, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
 
-	/* remember we are using simple query protocol */
-	conn->queryclass = PGQUERY_SIMPLE;
-
-	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+		entry->queryclass = PGQUERY_EXTENDED;
+		entry->query = strdup(query);
+	}
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
-	{
-		/* error message should be set up already */
-		return 0;
-	}
+	if (pqPipelineFlush(conn) < 0)
+		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	pqAppendCmdQueueEntry(conn, entry);
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
+
+sendFailed:
+	pqRecycleCmdQueueEntry(conn, entry);
+	/* error message should be set up already */
+	return 0;
 }
 
 /*
@@ -1307,6 +1436,8 @@ PQsendPrepare(PGconn *conn,
 			  const char *stmtName, const char *query,
 			  int nParams, const Oid *paramTypes)
 {
+	PGcmdQueueEntry *entry = NULL;
+
 	if (!PQsendQueryStart(conn, true))
 		return 0;
 
@@ -1330,6 +1461,10 @@ PQsendPrepare(PGconn *conn,
 		return 0;
 	}
 
+	entry = pqAllocCmdQueueEntry(conn);
+	if (entry == NULL)
+		return 0;				/* error msg already set */
+
 	/* construct the Parse message */
 	if (pqPutMsgStart('P', conn) < 0 ||
 		pqPuts(stmtName, conn) < 0 ||
@@ -1356,32 +1491,38 @@ PQsendPrepare(PGconn *conn,
 	if (pqPutMsgEnd(conn) < 0)
 		goto sendFailed;
 
-	/* construct the Sync message */
-	if (pqPutMsgStart('S', conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	/* Add a Sync, unless in pipeline mode. */
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+	{
+		if (pqPutMsgStart('S', conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are doing just a Parse */
-	conn->queryclass = PGQUERY_PREPARE;
+	entry->queryclass = PGQUERY_PREPARE;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	/* if insufficient memory, query just winds up NULL */
+	entry->query = strdup(query);
+
+	pqAppendCmdQueueEntry(conn, entry);
+
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+		conn->asyncStatus = PGASYNC_BUSY;
 
 	/*
-	 * Give the data a push.  In nonblock mode, don't complain if we're unable
-	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 * Give the data a push (in pipeline mode, only if we're past the size
+	 * threshold).  In nonblock mode, don't complain if we're unable to send
+	 * it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqPipelineFlush(conn) < 0)
 		goto sendFailed;
 
-	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	pqRecycleCmdQueueEntry(conn, entry);
 	/* error message should be set up already */
 	return 0;
 }
@@ -1429,7 +1570,8 @@ PQsendQueryPrepared(PGconn *conn,
 }
 
 /*
- * Common startup code for PQsendQuery and sibling routines
+ * PQsendQueryStart
+ *	Common startup code for PQsendQuery and sibling routines
  */
 static bool
 PQsendQueryStart(PGconn *conn, bool newQuery)
@@ -1450,20 +1592,57 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
 							 libpq_gettext("no connection to the server\n"));
 		return false;
 	}
-	/* Can't send while already busy, either. */
-	if (conn->asyncStatus != PGASYNC_IDLE)
+
+	/* Can't send while already busy, either, unless enqueuing for later */
+	if (conn->asyncStatus != PGASYNC_IDLE &&
+		conn->pipelineStatus == PQ_PIPELINE_OFF)
 	{
 		appendPQExpBufferStr(&conn->errorMessage,
 							 libpq_gettext("another command is already in progress\n"));
 		return false;
 	}
 
-	/* initialize async result-accumulation state */
-	pqClearAsyncResult(conn);
+	if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+	{
+		/*
+		 * When enqueuing commands we don't change much of the connection
+		 * state since it's already in use for the current command. The
+		 * connection state will get updated when pqPipelineProcessQueue()
+		 * advances to start processing the queued message.
+		 *
+		 * Just make sure we can safely enqueue given the current connection
+		 * state. We can enqueue behind another queue item, or behind a
+		 * non-queue command (one that sends its own sync), but we can't
+		 * enqueue if the connection is in a copy state.
+		 */
+		switch (conn->asyncStatus)
+		{
+			case PGASYNC_IDLE:
+			case PGASYNC_READY:
+			case PGASYNC_READY_MORE:
+			case PGASYNC_BUSY:
+				/* ok to queue */
+				break;
+			case PGASYNC_COPY_IN:
+			case PGASYNC_COPY_OUT:
+			case PGASYNC_COPY_BOTH:
+				appendPQExpBufferStr(&conn->errorMessage,
+									 libpq_gettext("cannot queue commands during COPY\n"));
+				return false;
+		}
+	}
+	else
+	{
+		/*
+		 * This command's results will come in immediately. Initialize async
+		 * result-accumulation state
+		 */
+		pqClearAsyncResult(conn);
 
-	/* reset single-row processing mode */
-	conn->singleRowMode = false;
+		/* reset single-row processing mode */
+		conn->singleRowMode = false;
 
+	}
 	/* ready to send command message */
 	return true;
 }
@@ -1487,10 +1666,16 @@ PQsendQueryGuts(PGconn *conn,
 				int resultFormat)
 {
 	int			i;
+	PGcmdQueueEntry *entry;
+
+	entry = pqAllocCmdQueueEntry(conn);
+	if (entry == NULL)
+		return 0;				/* error msg already set */
 
 	/*
-	 * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
-	 * using specified statement name and the unnamed portal.
+	 * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync
+	 * (if not in pipeline mode), using specified statement name and the
+	 * unnamed portal.
 	 */
 
 	if (command)
@@ -1600,35 +1785,38 @@ PQsendQueryGuts(PGconn *conn,
 		pqPutMsgEnd(conn) < 0)
 		goto sendFailed;
 
-	/* construct the Sync message */
-	if (pqPutMsgStart('S', conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	/* construct the Sync message if not in pipeline mode */
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+	{
+		if (pqPutMsgStart('S', conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are using extended query protocol */
-	conn->queryclass = PGQUERY_EXTENDED;
+	entry->queryclass = PGQUERY_EXTENDED;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
+	/* if insufficient memory, query just winds up NULL */
 	if (command)
-		conn->last_query = strdup(command);
-	else
-		conn->last_query = NULL;
+		entry->query = strdup(command);
 
 	/*
-	 * Give the data a push.  In nonblock mode, don't complain if we're unable
-	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 * Give the data a push (in pipeline mode, only if we're past the size
+	 * threshold).  In nonblock mode, don't complain if we're unable to send
+	 * it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqPipelineFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	pqAppendCmdQueueEntry(conn, entry);
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	pqRecycleCmdQueueEntry(conn, entry);
 	/* error message should be set up already */
 	return 0;
 }
@@ -1647,8 +1835,9 @@ PQsetSingleRowMode(PGconn *conn)
 		return 0;
 	if (conn->asyncStatus != PGASYNC_BUSY)
 		return 0;
-	if (conn->queryclass != PGQUERY_SIMPLE &&
-		conn->queryclass != PGQUERY_EXTENDED)
+	if (!conn->cmd_queue_head ||
+		(conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE &&
+		 conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED))
 		return 0;
 	if (conn->result)
 		return 0;
@@ -1726,14 +1915,17 @@ PQisBusy(PGconn *conn)
 	return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed;
 }
 
-
 /*
  * PQgetResult
  *	  Get the next PGresult produced by a query.  Returns NULL if no
  *	  query work remains or an error has occurred (e.g. out of
  *	  memory).
+ *
+ *	  In pipeline mode, once all the result of a query have been returned,
+ *	  PQgetResult returns NULL to let the user know that the next
+ *	  query is being processed.  At the end of the pipeline, returns a
+ *	  result with PQresultStatus(result) == PGRES_PIPELINE_SYNC.
  */
-
 PGresult *
 PQgetResult(PGconn *conn)
 {
@@ -1803,8 +1995,62 @@ PQgetResult(PGconn *conn)
 	{
 		case PGASYNC_IDLE:
 			res = NULL;			/* query is complete */
+			if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+			{
+				/*
+				 * We're about to return the NULL that terminates the round of
+				 * results from the current query; prepare to send the results
+				 * of the next query when we're called next.  Also, since this
+				 * is the start of the results of the next query, clear any
+				 * prior error message.
+				 */
+				resetPQExpBuffer(&conn->errorMessage);
+				pqPipelineProcessQueue(conn);
+			}
 			break;
 		case PGASYNC_READY:
+
+			/*
+			 * For any query type other than simple query protocol, we advance
+			 * the command queue here.  This is because for simple query
+			 * protocol we can get the READY state multiple times before the
+			 * command is actually complete, since the command string can
+			 * contain many queries.  In simple query protocol, the queue
+			 * advance is done by fe-protocol3 when it receives ReadyForQuery.
+			 */
+			if (conn->cmd_queue_head &&
+				conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
+				pqCommandQueueAdvance(conn);
+			res = pqPrepareAsyncResult(conn);
+			if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+			{
+				/*
+				 * We're about to send the results of the current query.  Set
+				 * us idle now, and ...
+				 */
+				conn->asyncStatus = PGASYNC_IDLE;
+
+				/*
+				 * ... in cases when we're sending a pipeline-sync result,
+				 * move queue processing forwards immediately, so that next
+				 * time we're called, we're prepared to return the next result
+				 * received from the server.  In all other cases, leave the
+				 * queue state change for next time, so that a terminating
+				 * NULL result is sent.
+				 *
+				 * (In other words: we don't return a NULL after a pipeline
+				 * sync.)
+				 */
+				if (res && res->resultStatus == PGRES_PIPELINE_SYNC)
+					pqPipelineProcessQueue(conn);
+			}
+			else
+			{
+				/* Set the state back to BUSY, allowing parsing to proceed. */
+				conn->asyncStatus = PGASYNC_BUSY;
+			}
+			break;
+		case PGASYNC_READY_MORE:
 			res = pqPrepareAsyncResult(conn);
 			/* Set the state back to BUSY, allowing parsing to proceed. */
 			conn->asyncStatus = PGASYNC_BUSY;
@@ -1985,6 +2231,13 @@ PQexecStart(PGconn *conn)
 	if (!conn)
 		return false;
 
+	if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("synchronous command execution functions are not allowed in pipeline mode\n"));
+		return false;
+	}
+
 	/*
 	 * Since this is the beginning of a query cycle, reset the error buffer.
 	 */
@@ -2148,6 +2401,8 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
 static int
 PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 {
+	PGcmdQueueEntry *entry = NULL;
+
 	/* Treat null desc_target as empty string */
 	if (!desc_target)
 		desc_target = "";
@@ -2155,6 +2410,10 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 	if (!PQsendQueryStart(conn, true))
 		return 0;
 
+	entry = pqAllocCmdQueueEntry(conn);
+	if (entry == NULL)
+		return 0;				/* error msg already set */
+
 	/* construct the Describe message */
 	if (pqPutMsgStart('D', conn) < 0 ||
 		pqPutc(desc_type, conn) < 0 ||
@@ -2163,32 +2422,32 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* construct the Sync message */
-	if (pqPutMsgStart('S', conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
-
-	/* remember we are doing a Describe */
-	conn->queryclass = PGQUERY_DESCRIBE;
-
-	/* reset last_query string (not relevant now) */
-	if (conn->last_query)
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
 	{
-		free(conn->last_query);
-		conn->last_query = NULL;
+		if (pqPutMsgStart('S', conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
 	}
 
+	/* remember we are doing a Describe */
+	entry->queryclass = PGQUERY_DESCRIBE;
+
 	/*
-	 * Give the data a push.  In nonblock mode, don't complain if we're unable
-	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 * Give the data a push (in pipeline mode, only if we're past the size
+	 * threshold).  In nonblock mode, don't complain if we're unable to send
+	 * it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqPipelineFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	pqAppendCmdQueueEntry(conn, entry);
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	pqRecycleCmdQueueEntry(conn, entry);
 	/* error message should be set up already */
 	return 0;
 }
@@ -2327,7 +2586,8 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
 	 * If we sent the COPY command in extended-query mode, we must issue a
 	 * Sync as well.
 	 */
-	if (conn->queryclass != PGQUERY_SIMPLE)
+	if (conn->cmd_queue_head &&
+		conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
 	{
 		if (pqPutMsgStart('S', conn) < 0 ||
 			pqPutMsgEnd(conn) < 0)
@@ -2541,6 +2801,13 @@ PQfn(PGconn *conn,
 	 */
 	resetPQExpBuffer(&conn->errorMessage);
 
+	if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("PQfn not allowed in pipeline mode\n"));
+		return NULL;
+	}
+
 	if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE ||
 		conn->result != NULL)
 	{
@@ -2555,6 +2822,277 @@ PQfn(PGconn *conn,
 						   args, nargs);
 }
 
+/* ====== Pipeline mode support ======== */
+
+/*
+ * PQenterPipelineMode
+ *		Put an idle connection in pipeline mode.
+ *
+ * Returns 1 on success. On failure, errorMessage is set and 0 is returned.
+ *
+ * Commands submitted after this can be pipelined on the connection;
+ * there's no requirement to wait for one to finish before the next is
+ * dispatched.
+ *
+ * Queuing of a new query or syncing during COPY is not allowed.
+ *
+ * A set of commands is terminated by a PQpipelineSync.  Multiple sync
+ * points can be established while in pipeline mode.  Pipeline mode can
+ * be exited by calling PQexitPipelineMode() once all results are processed.
+ *
+ * This doesn't actually send anything on the wire, it just puts libpq
+ * into a state where it can pipeline work.
+ */
+int
+PQenterPipelineMode(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+
+	/* succeed with no action if already in pipeline mode */
+	if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+		return 1;
+
+	if (conn->asyncStatus != PGASYNC_IDLE)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("cannot enter pipeline mode, connection not idle\n"));
+		return 0;
+	}
+
+	conn->pipelineStatus = PQ_PIPELINE_ON;
+
+	return 1;
+}
+
+/*
+ * PQexitPipelineMode
+ *		End pipeline mode and return to normal command mode.
+ *
+ * Returns 1 in success (pipeline mode successfully ended, or not in pipeline
+ * mode).
+ *
+ * Returns 0 if in pipeline mode and cannot be ended yet.  Error message will
+ * be set.
+ */
+int
+PQexitPipelineMode(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+		return 1;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+			/* there are some uncollected results */
+			appendPQExpBufferStr(&conn->errorMessage,
+								 libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
+			return 0;
+
+		case PGASYNC_BUSY:
+			appendPQExpBufferStr(&conn->errorMessage,
+								 libpq_gettext("cannot exit pipeline mode while busy\n"));
+			return 0;
+
+		default:
+			/* OK */
+			break;
+	}
+
+	/* still work to process */
+	if (conn->cmd_queue_head != NULL)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("cannot exit pipeline mode with uncollected results\n"));
+		return 0;
+	}
+
+	conn->pipelineStatus = PQ_PIPELINE_OFF;
+	conn->asyncStatus = PGASYNC_IDLE;
+
+	/* Flush any pending data in out buffer */
+	if (pqFlush(conn) < 0)
+		return 0;				/* error message is setup already */
+	return 1;
+}
+
+/*
+ * pqCommandQueueAdvance
+ *		Remove one query from the command queue, when we receive
+ *		all results from the server that pertain to it.
+ */
+void
+pqCommandQueueAdvance(PGconn *conn)
+{
+	PGcmdQueueEntry *prevquery;
+
+	if (conn->cmd_queue_head == NULL)
+		return;
+
+	/* delink from queue */
+	prevquery = conn->cmd_queue_head;
+	conn->cmd_queue_head = conn->cmd_queue_head->next;
+
+	/* and make it recyclable */
+	prevquery->next = NULL;
+	pqRecycleCmdQueueEntry(conn, prevquery);
+}
+
+/*
+ * pqPipelineProcessQueue: subroutine for PQgetResult
+ *		In pipeline mode, start processing the results of the next query in the queue.
+ */
+void
+pqPipelineProcessQueue(PGconn *conn)
+{
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+			/* client still has to process current query or results */
+			return;
+		case PGASYNC_IDLE:
+			/* next query please */
+			break;
+	}
+
+	/* Nothing to do if not in pipeline mode, or queue is empty */
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
+		conn->cmd_queue_head == NULL)
+		return;
+
+	/* Initialize async result-accumulation state */
+	pqClearAsyncResult(conn);
+
+	/*
+	 * Reset single-row processing mode.  (Client has to set it up for each
+	 * query, if desired.)
+	 */
+	conn->singleRowMode = false;
+
+	if (conn->pipelineStatus == PQ_PIPELINE_ABORTED &&
+		conn->cmd_queue_head->queryclass != PGQUERY_SYNC)
+	{
+		/*
+		 * In an aborted pipeline we don't get anything from the server for
+		 * each result; we're just discarding commands from the queue until we
+		 * get to the next sync from the server.
+		 *
+		 * The PGRES_PIPELINE_ABORTED results tell the client that its queries
+		 * got aborted.
+		 */
+		conn->result = PQmakeEmptyPGresult(conn, PGRES_PIPELINE_ABORTED);
+		if (!conn->result)
+		{
+			appendPQExpBufferStr(&conn->errorMessage,
+								 libpq_gettext("out of memory\n"));
+			pqSaveErrorResult(conn);
+			return;
+		}
+		conn->asyncStatus = PGASYNC_READY;
+	}
+	else
+	{
+		/* allow parsing to continue */
+		conn->asyncStatus = PGASYNC_BUSY;
+	}
+}
+
+/*
+ * PQpipelineSync
+ *		Send a Sync message as part of a pipeline, and flush to server
+ *
+ * It's legal to start submitting more commands in the pipeline immediately,
+ * without waiting for the results of the current pipeline. There's no need to
+ * end pipeline mode and start it again.
+ *
+ * If a command in a pipeline fails, every subsequent command up to and including
+ * the result to the Sync message sent by PQpipelineSync gets set to
+ * PGRES_PIPELINE_ABORTED state. If the whole pipeline is processed without
+ * error, a PGresult with PGRES_PIPELINE_SYNC is produced.
+ *
+ * Queries can already have been sent before PQpipelineSync is called, but
+ * PQpipelineSync need to be called before retrieving command results.
+ *
+ * The connection will remain in pipeline mode and unavailable for new
+ * synchronous command execution functions until all results from the pipeline
+ * are processed by the client.
+ */
+int
+PQpipelineSync(PGconn *conn)
+{
+	PGcmdQueueEntry *entry;
+
+	if (!conn)
+		return 0;
+
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("cannot send pipeline when not in pipeline mode\n"));
+		return 0;
+	}
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			/* should be unreachable */
+			appendPQExpBufferStr(&conn->errorMessage,
+								 "internal error: cannot send pipeline while in COPY\n");
+			return 0;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+		case PGASYNC_IDLE:
+			/* OK to send sync */
+			break;
+	}
+
+	entry = pqAllocCmdQueueEntry(conn);
+	if (entry == NULL)
+		return 0;				/* error msg already set */
+
+	entry->queryclass = PGQUERY_SYNC;
+	entry->query = NULL;
+
+	/* construct the Sync message */
+	if (pqPutMsgStart('S', conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+		goto sendFailed;
+
+	pqAppendCmdQueueEntry(conn, entry);
+
+	/*
+	 * Give the data a push.  In nonblock mode, don't complain if we're unable
+	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 */
+	if (PQflush(conn) < 0)
+		goto sendFailed;
+
+	/*
+	 * Call pqPipelineProcessQueue so the user can call start calling
+	 * PQgetResult.
+	 */
+	pqPipelineProcessQueue(conn);
+
+	return 1;
+
+sendFailed:
+	pqRecycleCmdQueueEntry(conn, entry);
+	/* error message should be set up already */
+	return 0;
+}
+
 
 /* ====== accessor funcs for PGresult ======== */
 
@@ -2569,7 +3107,7 @@ PQresultStatus(const PGresult *res)
 char *
 PQresStatus(ExecStatusType status)
 {
-	if ((unsigned int) status >= sizeof pgresStatus / sizeof pgresStatus[0])
+	if ((unsigned int) status >= lengthof(pgresStatus))
 		return libpq_gettext("invalid ExecStatusType code");
 	return pgresStatus[status];
 }
@@ -3152,6 +3690,23 @@ PQflush(PGconn *conn)
 	return pqFlush(conn);
 }
 
+/*
+ * pqPipelineFlush
+ *
+ * In pipeline mode, data will be flushed only when the out buffer reaches the
+ * threshold value.  In non-pipeline mode, it behaves as stock pqFlush.
+ *
+ * Returns 0 on success.
+ */
+static int
+pqPipelineFlush(PGconn *conn)
+{
+	if ((conn->pipelineStatus != PQ_PIPELINE_ON) ||
+		(conn->outCount >= OUTBUFFER_THRESHOLD))
+		return pqFlush(conn);
+	return 0;
+}
+
 
 /*
  *		PQfreemem - safely frees memory allocated
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index eb55d528fb..306e89acfd 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -158,6 +158,18 @@ pqParseInput3(PGconn *conn)
 			if (conn->asyncStatus != PGASYNC_IDLE)
 				return;
 
+			/*
+			 * We're also notionally not-IDLE when in pipeline mode the state
+			 * says "idle" (so we have completed receiving the results of one
+			 * query from the server and dispatched them to the application)
+			 * but another query is queued; yield back control to caller so
+			 * that they can initiate processing of the next query in the
+			 * queue.
+			 */
+			if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
+				conn->cmd_queue_head != NULL)
+				return;
+
 			/*
 			 * Unexpected message in IDLE state; need to recover somehow.
 			 * ERROR messages are handled using the notice processor;
@@ -179,6 +191,7 @@ pqParseInput3(PGconn *conn)
 			}
 			else
 			{
+				/* Any other case is unexpected and we summarily skip it */
 				pqInternalNotice(&conn->noticeHooks,
 								 "message type 0x%02x arrived from server while idle",
 								 id);
@@ -217,10 +230,37 @@ pqParseInput3(PGconn *conn)
 						return;
 					conn->asyncStatus = PGASYNC_READY;
 					break;
-				case 'Z':		/* backend is ready for new query */
+				case 'Z':		/* sync response, backend is ready for new
+								 * query */
 					if (getReadyForQuery(conn))
 						return;
-					conn->asyncStatus = PGASYNC_IDLE;
+					if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+					{
+						conn->result = PQmakeEmptyPGresult(conn,
+														   PGRES_PIPELINE_SYNC);
+						if (!conn->result)
+						{
+							appendPQExpBufferStr(&conn->errorMessage,
+												 libpq_gettext("out of memory"));
+							pqSaveErrorResult(conn);
+						}
+						else
+						{
+							conn->pipelineStatus = PQ_PIPELINE_ON;
+							conn->asyncStatus = PGASYNC_READY;
+						}
+					}
+					else
+					{
+						/*
+						 * In simple query protocol, advance the command queue
+						 * (see PQgetResult).
+						 */
+						if (conn->cmd_queue_head &&
+							conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE)
+							pqCommandQueueAdvance(conn);
+						conn->asyncStatus = PGASYNC_IDLE;
+					}
 					break;
 				case 'I':		/* empty query */
 					if (conn->result == NULL)
@@ -238,7 +278,8 @@ pqParseInput3(PGconn *conn)
 					break;
 				case '1':		/* Parse Complete */
 					/* If we're doing PQprepare, we're done; else ignore */
-					if (conn->queryclass == PGQUERY_PREPARE)
+					if (conn->cmd_queue_head &&
+						conn->cmd_queue_head->queryclass == PGQUERY_PREPARE)
 					{
 						if (conn->result == NULL)
 						{
@@ -285,7 +326,8 @@ pqParseInput3(PGconn *conn)
 						conn->inCursor += msgLength;
 					}
 					else if (conn->result == NULL ||
-							 conn->queryclass == PGQUERY_DESCRIBE)
+							 (conn->cmd_queue_head &&
+							  conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
 					{
 						/* First 'T' in a query sequence */
 						if (getRowDescriptions(conn, msgLength))
@@ -316,7 +358,8 @@ pqParseInput3(PGconn *conn)
 					 * instead of PGRES_TUPLES_OK.  Otherwise we can just
 					 * ignore this message.
 					 */
-					if (conn->queryclass == PGQUERY_DESCRIBE)
+					if (conn->cmd_queue_head &&
+						conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE)
 					{
 						if (conn->result == NULL)
 						{
@@ -445,7 +488,7 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
 					  id, msgLength);
 	/* build an error result holding the error message */
 	pqSaveErrorResult(conn);
-	conn->asyncStatus = PGASYNC_READY;	/* drop out of GetResult wait loop */
+	conn->asyncStatus = PGASYNC_READY;	/* drop out of PQgetResult wait loop */
 	/* flush input data since we're giving up on processing it */
 	pqDropConnection(conn, true);
 	conn->status = CONNECTION_BAD;	/* No more connection to backend */
@@ -471,7 +514,9 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * PGresult created by getParamDescriptions, and we should fill data into
 	 * that.  Otherwise, create a new, empty PGresult.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if (!conn->cmd_queue_head ||
+		(conn->cmd_queue_head &&
+		 conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
 	{
 		if (conn->result)
 			result = conn->result;
@@ -568,7 +613,9 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * If we're doing a Describe, we're done, and ready to pass the result
 	 * back to the client.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if ((!conn->cmd_queue_head) ||
+		(conn->cmd_queue_head &&
+		 conn->cmd_queue_head->queryclass == PGQUERY_DESCRIBE))
 	{
 		conn->asyncStatus = PGASYNC_READY;
 		return 0;
@@ -841,6 +888,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
 	PQExpBufferData workBuf;
 	char		id;
 
+	/* If in pipeline mode, set error indicator for it */
+	if (isError && conn->pipelineStatus != PQ_PIPELINE_OFF)
+		conn->pipelineStatus = PQ_PIPELINE_ABORTED;
+
 	/*
 	 * If this is an error message, pre-emptively clear any incomplete query
 	 * result we may have.  We'd just throw it away below anyway, and
@@ -897,8 +948,8 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
 	 * might need it for an error cursor display, which is only true if there
 	 * is a PG_DIAG_STATEMENT_POSITION field.
 	 */
-	if (have_position && conn->last_query && res)
-		res->errQuery = pqResultStrdup(res, conn->last_query);
+	if (have_position && res && conn->cmd_queue_head && conn->cmd_queue_head->query)
+		res->errQuery = pqResultStrdup(res, conn->cmd_queue_head->query);
 
 	/*
 	 * Now build the "overall" error message for PQresultErrorMessage.
@@ -1817,7 +1868,8 @@ pqEndcopy3(PGconn *conn)
 		 * If we sent the COPY command in extended-query mode, we must issue a
 		 * Sync as well.
 		 */
-		if (conn->queryclass != PGQUERY_SIMPLE)
+		if (conn->cmd_queue_head &&
+			conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
 		{
 			if (pqPutMsgStart('S', conn) < 0 ||
 				pqPutMsgEnd(conn) < 0)
@@ -1897,6 +1949,9 @@ pqFunctionCall3(PGconn *conn, Oid fnid,
 	int			avail;
 	int			i;
 
+	/* already validated by PQfn */
+	Assert(conn->pipelineStatus == PQ_PIPELINE_OFF);
+
 	/* PQfn already validated connection state */
 
 	if (pqPutMsgStart('F', conn) < 0 || /* function call msg */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index fa9b62a844..cee42d4843 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -96,7 +96,10 @@ typedef enum
 	PGRES_NONFATAL_ERROR,		/* notice or warning message */
 	PGRES_FATAL_ERROR,			/* query failed */
 	PGRES_COPY_BOTH,			/* Copy In/Out data transfer in progress */
-	PGRES_SINGLE_TUPLE			/* single tuple from larger resultset */
+	PGRES_SINGLE_TUPLE,			/* single tuple from larger resultset */
+	PGRES_PIPELINE_SYNC,		/* pipeline synchronization point */
+	PGRES_PIPELINE_ABORTED,		/* Command didn't run because of an abort
+								 * earlier in a pipeline */
 } ExecStatusType;
 
 typedef enum
@@ -136,6 +139,16 @@ typedef enum
 	PQPING_NO_ATTEMPT			/* connection not attempted (bad params) */
 } PGPing;
 
+/*
+ * PGpipelineStatus - Current status of pipeline mode
+ */
+typedef enum
+{
+	PQ_PIPELINE_OFF,
+	PQ_PIPELINE_ON,
+	PQ_PIPELINE_ABORTED
+} PGpipelineStatus;
+
 /* PGconn encapsulates a connection to the backend.
  * The contents of this struct are not supposed to be known to applications.
  */
@@ -327,6 +340,7 @@ extern int	PQserverVersion(const PGconn *conn);
 extern char *PQerrorMessage(const PGconn *conn);
 extern int	PQsocket(const PGconn *conn);
 extern int	PQbackendPID(const PGconn *conn);
+extern PGpipelineStatus PQpipelineStatus(const PGconn *conn);
 extern int	PQconnectionNeedsPassword(const PGconn *conn);
 extern int	PQconnectionUsedPassword(const PGconn *conn);
 extern int	PQclientEncoding(const PGconn *conn);
@@ -434,6 +448,11 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
 
+/* Routines for pipeline mode management */
+extern int	PQenterPipelineMode(PGconn *conn);
+extern int	PQexitPipelineMode(PGconn *conn);
+extern int	PQpipelineSync(PGconn *conn);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 2f052f61f8..6374ec657a 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -217,21 +217,16 @@ typedef enum
 {
 	PGASYNC_IDLE,				/* nothing's happening, dude */
 	PGASYNC_BUSY,				/* query in progress */
-	PGASYNC_READY,				/* result ready for PQgetResult */
+	PGASYNC_READY,				/* query done, waiting for client to fetch
+								 * result */
+	PGASYNC_READY_MORE,			/* query done, waiting for client to fetch
+								 * result, more results expected from this
+								 * query */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
 	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
 } PGAsyncStatusType;
 
-/* PGQueryClass tracks which query protocol we are now executing */
-typedef enum
-{
-	PGQUERY_SIMPLE,				/* simple Query protocol (PQexec) */
-	PGQUERY_EXTENDED,			/* full Extended protocol (PQexecParams) */
-	PGQUERY_PREPARE,			/* Parse only (PQprepare) */
-	PGQUERY_DESCRIBE			/* Describe Statement or Portal */
-} PGQueryClass;
-
 /* Target server type (decoded value of target_session_attrs) */
 typedef enum
 {
@@ -305,6 +300,29 @@ typedef enum pg_conn_host_type
 	CHT_UNIX_SOCKET
 } pg_conn_host_type;
 
+/*
+ * PGQueryClass tracks which query protocol is in use for each command queue
+ * entry, or special operation in execution
+ */
+typedef enum
+{
+	PGQUERY_SIMPLE,				/* simple Query protocol (PQexec) */
+	PGQUERY_EXTENDED,			/* full Extended protocol (PQexecParams) */
+	PGQUERY_PREPARE,			/* Parse only (PQprepare) */
+	PGQUERY_DESCRIBE,			/* Describe Statement or Portal */
+	PGQUERY_SYNC				/* Sync (at end of a pipeline) */
+} PGQueryClass;
+
+/*
+ * An entry in the pending command queue.
+ */
+typedef struct PGcmdQueueEntry
+{
+	PGQueryClass queryclass;	/* Query type */
+	char	   *query;			/* SQL command, or NULL if none/unknown/OOM */
+	struct PGcmdQueueEntry *next;	/* list link */
+} PGcmdQueueEntry;
+
 /*
  * pg_conn_host stores all information about each of possibly several hosts
  * mentioned in the connection string.  Most fields are derived by splitting
@@ -389,12 +407,11 @@ struct pg_conn
 	ConnStatusType status;
 	PGAsyncStatusType asyncStatus;
 	PGTransactionStatusType xactStatus; /* never changes to ACTIVE */
-	PGQueryClass queryclass;
-	char	   *last_query;		/* last SQL command, or NULL if unknown */
 	char		last_sqlstate[6];	/* last reported SQLSTATE */
 	bool		options_valid;	/* true if OK to attempt connection */
 	bool		nonblocking;	/* whether this connection is using nonblock
 								 * sending semantics */
+	PGpipelineStatus pipelineStatus;	/* status of pipeline mode */
 	bool		singleRowMode;	/* return current query result row-by-row? */
 	char		copy_is_binary; /* 1 = copy binary, 0 = copy text */
 	int			copy_already_done;	/* # bytes already returned in COPY OUT */
@@ -407,6 +424,19 @@ struct pg_conn
 	pg_conn_host *connhost;		/* details about each named host */
 	char	   *connip;			/* IP address for current network connection */
 
+	/*
+	 * The pending command queue as a singly-linked list.  Head is the command
+	 * currently in execution, tail is where new commands are added.
+	 */
+	PGcmdQueueEntry *cmd_queue_head;
+	PGcmdQueueEntry *cmd_queue_tail;
+
+	/*
+	 * To save malloc traffic, we don't free entries right away; instead we
+	 * save them in this list for possible reuse.
+	 */
+	PGcmdQueueEntry *cmd_queue_recycle;
+
 	/* Connection data */
 	pgsocket	sock;			/* FD for socket, PGINVALID_SOCKET if
 								 * unconnected */
@@ -622,6 +652,7 @@ extern void pqSaveMessageField(PGresult *res, char code,
 extern void pqSaveParameterStatus(PGconn *conn, const char *name,
 								  const char *value);
 extern int	pqRowProcessor(PGconn *conn, const char **errmsgp);
+extern void pqCommandQueueAdvance(PGconn *conn);
 extern int	PQsendQueryContinue(PGconn *conn, const char *query);
 
 /* === in fe-protocol3.c === */
@@ -795,6 +826,11 @@ extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t len);
  */
 #define pqIsnonblocking(conn)	((conn)->nonblocking)
 
+/*
+ * Connection's outbuffer threshold, for pipeline mode.
+ */
+#define OUTBUFFER_THRESHOLD	65536
+
 #ifdef ENABLE_NLS
 extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1);
 extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2);
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 5391f461a2..93e7829c67 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -10,6 +10,7 @@ SUBDIRS = \
 		  delay_execution \
 		  dummy_index_am \
 		  dummy_seclabel \
+		  libpq_pipeline \
 		  plsample \
 		  snapshot_too_old \
 		  test_bloomfilter \
diff --git a/src/test/modules/libpq_pipeline/.gitignore b/src/test/modules/libpq_pipeline/.gitignore
new file mode 100644
index 0000000000..3a11e786b8
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/.gitignore
@@ -0,0 +1,5 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
+/libpq_pipeline
diff --git a/src/test/modules/libpq_pipeline/Makefile b/src/test/modules/libpq_pipeline/Makefile
new file mode 100644
index 0000000000..b798f5fbbc
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/Makefile
@@ -0,0 +1,20 @@
+# src/test/modules/libpq_pipeline/Makefile
+
+PROGRAM = libpq_pipeline
+OBJS = libpq_pipeline.o
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS_INTERNAL += $(libpq_pgport)
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/libpq_pipeline
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/libpq_pipeline/README b/src/test/modules/libpq_pipeline/README
new file mode 100644
index 0000000000..d8174dd579
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/README
@@ -0,0 +1 @@
+Test programs and libraries for libpq
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
new file mode 100644
index 0000000000..03eb3df504
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -0,0 +1,1303 @@
+/*-------------------------------------------------------------------------
+ *
+ * libpq_pipeline.c
+ *		Verify libpq pipeline execution functionality
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *		src/test/modules/libpq_pipeline/libpq_pipeline.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <sys/time.h>
+#ifdef HAVE_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
+#include "catalog/pg_type_d.h"
+#include "common/fe_memutils.h"
+#include "libpq-fe.h"
+#include "portability/instr_time.h"
+
+
+static void exit_nicely(PGconn *conn);
+
+const char *const progname = "libpq_pipeline";
+
+
+#define DEBUG
+#ifdef DEBUG
+#define	pg_debug(...)  do { fprintf(stderr, __VA_ARGS__); } while (0)
+#else
+#define pg_debug(...)
+#endif
+
+static const char *const drop_table_sql =
+"DROP TABLE IF EXISTS pq_pipeline_demo";
+static const char *const create_table_sql =
+"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer);";
+static const char *const insert_sql =
+"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1);";
+
+/* max char length of an int32, plus sign and null terminator */
+#define MAXINTLEN 12
+
+static void
+exit_nicely(PGconn *conn)
+{
+	PQfinish(conn);
+	exit(1);
+}
+
+/*
+ * Print an error to stderr and terminate the program.
+ */
+#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
+static void
+pg_fatal_impl(int line, const char *fmt,...)
+{
+	va_list		args;
+
+
+	fflush(stdout);
+
+	fprintf(stderr, "\n%s:%d: ", progname, line);
+	va_start(args, fmt);
+	vfprintf(stderr, fmt, args);
+	va_end(args);
+	Assert(fmt[strlen(fmt) - 1] != '\n');
+	fprintf(stderr, "\n");
+	exit(1);
+}
+
+static void
+test_disallowed_in_pipeline(PGconn *conn)
+{
+	PGresult   *res = NULL;
+
+	fprintf(stderr, "test error cases... ");
+
+	if (PQisnonblocking(conn))
+		pg_fatal("Expected blocking connection mode");
+
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("Unable to enter pipeline mode");
+
+	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+		pg_fatal("Pipeline mode not activated properly");
+
+	/* PQexec should fail in pipeline mode */
+	res = PQexec(conn, "SELECT 1");
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("PQexec should fail in pipeline mode but succeeded");
+
+	/* Entering pipeline mode when already in pipeline mode is OK */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("re-entering pipeline mode should be a no-op but failed");
+
+	if (PQisBusy(conn) != 0)
+		pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
+
+	/* ok, back to normal command mode */
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("couldn't exit idle empty pipeline mode");
+
+	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+		pg_fatal("Pipeline mode not terminated properly");
+
+	/* exiting pipeline mode when not in pipeline mode should be a no-op */
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
+
+	/* can now PQexec again */
+	res = PQexec(conn, "SELECT 1");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
+				 PQerrorMessage(conn));
+
+	fprintf(stderr, "ok\n");
+}
+
+static void
+test_multi_pipelines(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+
+	fprintf(stderr, "multi pipeline... ");
+
+	/*
+	 * Queue up a couple of small pipelines and process each without returning
+	 * to command mode first.
+	 */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
+
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
+
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+	/* OK, start processing the results */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = NULL;
+
+	if (PQgetResult(conn) != NULL)
+		pg_fatal("PQgetResult returned something extra after first result");
+
+	if (PQexitPipelineMode(conn) != 0)
+		pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when sync result expected: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of sync result, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	PQclear(res);
+
+	/* second pipeline */
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from second pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("Expected null result, got %s",
+				 PQresStatus(PQresultStatus(res)));
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s from second pipeline sync",
+				 PQresStatus(PQresultStatus(res)));
+
+	/* We're still in pipeline mode ... */
+	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+		pg_fatal("Fell out of pipeline mode somehow");
+
+	/* until we end it, which we can safely do now */
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+		pg_fatal("exiting pipeline mode didn't seem to work");
+
+	fprintf(stderr, "ok\n");
+}
+
+/*
+ * When an operation in a pipeline fails the rest of the pipeline is flushed. We
+ * still have to get results for each pipeline item, but the item will just be
+ * a PGRES_PIPELINE_ABORTED code.
+ *
+ * This intentionally doesn't use a transaction to wrap the pipeline. You should
+ * usually use an xact, but in this case we want to observe the effects of each
+ * statement.
+ */
+static void
+test_pipeline_abort(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+	int			i;
+	bool		goterror;
+
+	fprintf(stderr, "aborted pipeline... ");
+
+	res = PQexec(conn, drop_table_sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
+
+	res = PQexec(conn, create_table_sql);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
+
+	/*
+	 * Queue up a couple of small pipelines and process each without returning
+	 * to command mode first. Make sure the second operation in the first
+	 * pipeline ERRORs.
+	 */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	dummy_params[0] = "1";
+	if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
+						  1, dummy_param_oids, dummy_params,
+						  NULL, NULL, 0) != 1)
+		pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
+
+	dummy_params[0] = "2";
+	if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
+
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+	dummy_params[0] = "3";
+	if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching second-pipeline insert failed: %s",
+				 PQerrorMessage(conn));
+
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+	/*
+	 * OK, start processing the pipeline results.
+	 *
+	 * We should get a command-ok for the first query, then a fatal error and
+	 * a pipeline aborted message for the second insert, a pipeline-end, then
+	 * a command-ok and a pipeline-ok for the second pipeline operation.
+	 */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("Unexpected result status %s: %s",
+				 PQresStatus(PQresultStatus(res)),
+				 PQresultErrorMessage(res));
+	PQclear(res);
+
+	/* NULL result to signal end-of-results for this command */
+	if ((res = PQgetResult(conn)) != NULL)
+		pg_fatal("Expected null result, got %s",
+				 PQresStatus(PQresultStatus(res)));
+
+	/* Second query caused error, so we expect an error next */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	/* NULL result to signal end-of-results for this command */
+	if ((res = PQgetResult(conn)) != NULL)
+		pg_fatal("Expected null result, got %s",
+				 PQresStatus(PQresultStatus(res)));
+
+	/*
+	 * pipeline should now be aborted.
+	 *
+	 * Note that we could still queue more queries at this point if we wanted;
+	 * they'd get added to a new third pipeline since we've already sent a
+	 * second. The aborted flag relates only to the pipeline being received.
+	 */
+	if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
+		pg_fatal("pipeline should be flagged as aborted but isn't");
+
+	/* third query in pipeline, the second insert */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
+		pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	/* NULL result to signal end-of-results for this command */
+	if ((res = PQgetResult(conn)) != NULL)
+		pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
+
+	if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
+		pg_fatal("pipeline should be flagged as aborted but isn't");
+
+	/* Ensure we're still in pipeline */
+	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+		pg_fatal("Fell out of pipeline mode somehow");
+
+	/*
+	 * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
+	 *
+	 * (This is so clients know to start processing results normally again and
+	 * can tell the difference between skipped commands and the sync.)
+	 */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code from first pipeline sync\n"
+				 "Expected PGRES_PIPELINE_SYNC, got %s",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
+		pg_fatal("sync should've cleared the aborted flag but didn't");
+
+	/* We're still in pipeline mode... */
+	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+		pg_fatal("Fell out of pipeline mode somehow");
+
+	/* the insert from the second pipeline */
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("Unexpected result code %s from first item in second pipeline",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	/* Read the NULL result at the end of the command */
+	if ((res = PQgetResult(conn)) != NULL)
+		pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
+
+	/* the second pipeline sync */
+	if ((res = PQgetResult(conn)) == NULL)
+		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s from second pipeline sync",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	if ((res = PQgetResult(conn)) != NULL)
+		pg_fatal("Expected null result, got %s: %s",
+				 PQresStatus(PQresultStatus(res)),
+				 PQerrorMessage(conn));
+
+	/* Try to send two queries in one command */
+	if (PQsendQuery(conn, "SELECT 1; SELECT 2") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	goterror = false;
+	while ((res = PQgetResult(conn)) != NULL)
+	{
+		switch (PQresultStatus(res))
+		{
+			case PGRES_FATAL_ERROR:
+				if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
+					pg_fatal("expected error about multiple commands, got %s",
+							 PQerrorMessage(conn));
+				printf("got expected %s", PQerrorMessage(conn));
+				goterror = true;
+				break;
+			default:
+				pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
+				break;
+		}
+	}
+	if (!goterror)
+		pg_fatal("did not get cannot-insert-multiple-commands error");
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("got NULL result");
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s from pipeline sync",
+				 PQresStatus(PQresultStatus(res)));
+
+	/* Test single-row mode with an error partways */
+	if (PQsendQuery(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g") != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	PQsetSingleRowMode(conn);
+	goterror = false;
+	while ((res = PQgetResult(conn)) != NULL)
+	{
+		switch (PQresultStatus(res))
+		{
+			case PGRES_SINGLE_TUPLE:
+				printf("got row: %s\n", PQgetvalue(res, 0, 0));
+				break;
+			case PGRES_FATAL_ERROR:
+				if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
+					pg_fatal("expected division-by-zero, got: %s (%s)",
+							 PQerrorMessage(conn),
+							 PQresultErrorField(res, PG_DIAG_SQLSTATE));
+				printf("got expected division-by-zero\n");
+				goterror = true;
+				break;
+			default:
+				pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
+		}
+		PQclear(res);
+	}
+	if (!goterror)
+		pg_fatal("did not get division-by-zero error");
+	/* the third pipeline sync */
+	if ((res = PQgetResult(conn)) == NULL)
+		pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s from third pipeline sync",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+
+	/* We're still in pipeline mode... */
+	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+		pg_fatal("Fell out of pipeline mode somehow");
+
+	/* until we end it, which we can safely do now */
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+		pg_fatal("exiting pipeline mode didn't seem to work");
+
+	fprintf(stderr, "ok\n");
+
+	/*-
+	 * Since we fired the pipelines off without a surrounding xact, the results
+	 * should be:
+	 *
+	 * - Implicit xact started by server around 1st pipeline
+	 * - First insert applied
+	 * - Second statement aborted xact
+	 * - Third insert skipped
+	 * - Sync rolled back first implicit xact
+	 * - Implicit xact created by server around 2nd pipeline
+	 * - insert applied from 2nd pipeline
+	 * - Sync commits 2nd xact
+	 *
+	 * So we should only have the value 3 that we inserted.
+	 */
+	res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Expected tuples, got %s: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	if (PQntuples(res) != 1)
+		pg_fatal("expected 1 result, got %d", PQntuples(res));
+	for (i = 0; i < PQntuples(res); i++)
+	{
+		const char *val = PQgetvalue(res, i, 0);
+
+		if (strcmp(val, "3") != 0)
+			pg_fatal("expected only insert with value 3, got %s", val);
+	}
+
+	PQclear(res);
+}
+
+/* State machine enum for test_pipelined_insert */
+enum PipelineInsertStep
+{
+	BI_BEGIN_TX,
+	BI_DROP_TABLE,
+	BI_CREATE_TABLE,
+	BI_PREPARE,
+	BI_INSERT_ROWS,
+	BI_COMMIT_TX,
+	BI_SYNC,
+	BI_DONE
+};
+
+static void
+test_pipelined_insert(PGconn *conn, int n_rows)
+{
+	const char *insert_params[1];
+	Oid			insert_param_oids[1] = {INT4OID};
+	char		insert_param_0[MAXINTLEN];
+	enum PipelineInsertStep send_step = BI_BEGIN_TX,
+				recv_step = BI_BEGIN_TX;
+	int			rows_to_send,
+				rows_to_receive;
+
+	insert_params[0] = &insert_param_0[0];
+
+	rows_to_send = rows_to_receive = n_rows;
+
+	/*
+	 * Do a pipelined insert into a table created at the start of the pipeline
+	 */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	while (send_step != BI_PREPARE)
+	{
+		const char *sql;
+
+		switch (send_step)
+		{
+			case BI_BEGIN_TX:
+				sql = "BEGIN TRANSACTION";
+				send_step = BI_DROP_TABLE;
+				break;
+
+			case BI_DROP_TABLE:
+				sql = drop_table_sql;
+				send_step = BI_CREATE_TABLE;
+				break;
+
+			case BI_CREATE_TABLE:
+				sql = create_table_sql;
+				send_step = BI_PREPARE;
+				break;
+
+			default:
+				pg_fatal("invalid state");
+		}
+
+		pg_debug("sending: %s\n", sql);
+		if (PQsendQueryParams(conn, sql,
+							  0, NULL, NULL, NULL, NULL, 0) != 1)
+			pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
+	}
+
+	Assert(send_step == BI_PREPARE);
+	pg_debug("sending: %s\n", insert_sql);
+	if (PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids) != 1)
+		pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
+	send_step = BI_INSERT_ROWS;
+
+	/*
+	 * Now we start inserting. We'll be sending enough data that we could fill
+	 * our output buffer, so to avoid deadlocking we need to enter nonblocking
+	 * mode and consume input while we send more output. As results of each
+	 * query are processed we should pop them to allow processing of the next
+	 * query. There's no need to finish the pipeline before processing
+	 * results.
+	 */
+	if (PQsetnonblocking(conn, 1) != 0)
+		pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
+
+	while (recv_step != BI_DONE)
+	{
+		int			sock;
+		fd_set		input_mask;
+		fd_set		output_mask;
+
+		sock = PQsocket(conn);
+
+		if (sock < 0)
+			break;				/* shouldn't happen */
+
+		FD_ZERO(&input_mask);
+		FD_SET(sock, &input_mask);
+		FD_ZERO(&output_mask);
+		FD_SET(sock, &output_mask);
+
+		if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
+		{
+			fprintf(stderr, "select() failed: %s\n", strerror(errno));
+			exit_nicely(conn);
+		}
+
+		/*
+		 * Process any results, so we keep the server's output buffer free
+		 * flowing and it can continue to process input
+		 */
+		if (FD_ISSET(sock, &input_mask))
+		{
+			PQconsumeInput(conn);
+
+			/* Read until we'd block if we tried to read */
+			while (!PQisBusy(conn) && recv_step < BI_DONE)
+			{
+				PGresult   *res;
+				const char *cmdtag;
+				const char *description = "";
+				int			status;
+
+				/*
+				 * Read next result.  If no more results from this query,
+				 * advance to the next query
+				 */
+				res = PQgetResult(conn);
+				if (res == NULL)
+					continue;
+
+				status = PGRES_COMMAND_OK;
+				switch (recv_step)
+				{
+					case BI_BEGIN_TX:
+						cmdtag = "BEGIN";
+						recv_step++;
+						break;
+					case BI_DROP_TABLE:
+						cmdtag = "DROP TABLE";
+						recv_step++;
+						break;
+					case BI_CREATE_TABLE:
+						cmdtag = "CREATE TABLE";
+						recv_step++;
+						break;
+					case BI_PREPARE:
+						cmdtag = "";
+						description = "PREPARE";
+						recv_step++;
+						break;
+					case BI_INSERT_ROWS:
+						cmdtag = "INSERT";
+						rows_to_receive--;
+						if (rows_to_receive == 0)
+							recv_step++;
+						break;
+					case BI_COMMIT_TX:
+						cmdtag = "COMMIT";
+						recv_step++;
+						break;
+					case BI_SYNC:
+						cmdtag = "";
+						description = "SYNC";
+						status = PGRES_PIPELINE_SYNC;
+						recv_step++;
+						break;
+					case BI_DONE:
+						/* unreachable */
+						description = "";
+						abort();
+				}
+
+				if (PQresultStatus(res) != status)
+					pg_fatal("%s reported status %s, expected %s\n"
+							 "Error message: \"%s\"",
+							 description, PQresStatus(PQresultStatus(res)),
+							 PQresStatus(status), PQerrorMessage(conn));
+
+				if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
+					pg_fatal("%s expected command tag '%s', got '%s'",
+							 description, cmdtag, PQcmdStatus(res));
+
+				pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
+
+				PQclear(res);
+			}
+		}
+
+		/* Write more rows and/or the end pipeline message, if needed */
+		if (FD_ISSET(sock, &output_mask))
+		{
+			PQflush(conn);
+
+			if (send_step == BI_INSERT_ROWS)
+			{
+				snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send);
+
+				if (PQsendQueryPrepared(conn, "my_insert",
+										1, insert_params, NULL, NULL, 0) == 1)
+				{
+					pg_debug("sent row %d\n", rows_to_send);
+
+					rows_to_send--;
+					if (rows_to_send == 0)
+						send_step++;
+				}
+				else
+				{
+					/*
+					 * in nonblocking mode, so it's OK for an insert to fail
+					 * to send
+					 */
+					fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
+							rows_to_send, PQerrorMessage(conn));
+				}
+			}
+			else if (send_step == BI_COMMIT_TX)
+			{
+				if (PQsendQueryParams(conn, "COMMIT",
+									  0, NULL, NULL, NULL, NULL, 0) == 1)
+				{
+					pg_debug("sent COMMIT\n");
+					send_step++;
+				}
+				else
+				{
+					fprintf(stderr, "WARNING: failed to send commit: %s\n",
+							PQerrorMessage(conn));
+				}
+			}
+			else if (send_step == BI_SYNC)
+			{
+				if (PQpipelineSync(conn) == 1)
+				{
+					fprintf(stdout, "pipeline sync sent\n");
+					send_step++;
+				}
+				else
+				{
+					fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
+							PQerrorMessage(conn));
+				}
+			}
+		}
+	}
+
+	/* We've got the sync message and the pipeline should be done */
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	if (PQsetnonblocking(conn, 0) != 0)
+		pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
+
+	fprintf(stderr, "ok\n");
+}
+
+static void
+test_prepared(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	Oid			param_oids[1] = {INT4OID};
+	Oid			expected_oids[4];
+	Oid			typ;
+
+	fprintf(stderr, "prepared... ");
+
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+	if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
+					  "interval '1 sec'",
+					  1, param_oids) != 1)
+		pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
+	expected_oids[0] = INT4OID;
+	expected_oids[1] = TEXTOID;
+	expected_oids[2] = NUMERICOID;
+	expected_oids[3] = INTERVALOID;
+	if (PQsendDescribePrepared(conn, "select_one") != 1)
+		pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("expected NULL result");
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned NULL");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
+	if (PQnfields(res) != lengthof(expected_oids))
+		pg_fatal("expected %d columns, got %d",
+				 lengthof(expected_oids), PQnfields(res));
+	for (int i = 0; i < PQnfields(res); i++)
+	{
+		typ = PQftype(res, i);
+		if (typ != expected_oids[i])
+			pg_fatal("field %d: expected type %u, got %u",
+					 i, expected_oids[i], typ);
+	}
+	PQclear(res);
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("expected NULL result");
+
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
+
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
+
+	PQexec(conn, "BEGIN");
+	PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
+	PQenterPipelineMode(conn);
+	if (PQsendDescribePortal(conn, "cursor_one") != 1)
+		pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("expected NULL result");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
+
+	typ = PQftype(res, 0);
+	if (typ != INT4OID)
+		pg_fatal("portal: expected type %u, got %u",
+				 INT4OID, typ);
+	PQclear(res);
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("expected NULL result");
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
+
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
+
+	fprintf(stderr, "ok\n");
+}
+
+static void
+test_simple_pipeline(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+
+	fprintf(stderr, "simple pipeline... ");
+
+	/*
+	 * Enter pipeline mode and dispatch a set of operations, which we'll then
+	 * process the results of as they come in.
+	 *
+	 * For a simple case we should be able to do this without interim
+	 * processing of results since our output buffer will give us enough slush
+	 * to work with and we won't block on sending. So blocking mode is fine.
+	 */
+	if (PQisnonblocking(conn))
+		pg_fatal("Expected blocking connection mode");
+
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn, "SELECT $1",
+						  1, dummy_param_oids, dummy_params,
+						  NULL, NULL, 0) != 1)
+		pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
+
+	if (PQexitPipelineMode(conn) != 0)
+		pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
+
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+
+	PQclear(res);
+	res = NULL;
+
+	if (PQgetResult(conn) != NULL)
+		pg_fatal("PQgetResult returned something extra after first query result.");
+
+	/*
+	 * Even though we've processed the result there's still a sync to come and
+	 * we can't exit pipeline mode yet
+	 */
+	if (PQexitPipelineMode(conn) != 0)
+		pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+
+	PQclear(res);
+	res = NULL;
+
+	if (PQgetResult(conn) != NULL)
+		pg_fatal("PQgetResult returned something extra after pipeline end: %s",
+				 PQresStatus(PQresultStatus(res)));
+
+	/* We're still in pipeline mode... */
+	if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+		pg_fatal("Fell out of pipeline mode somehow");
+
+	/* ... until we end it, which we can safely do now */
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+				 PQerrorMessage(conn));
+
+	if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+		pg_fatal("Exiting pipeline mode didn't seem to work");
+
+	fprintf(stderr, "ok\n");
+}
+
+static void
+test_singlerowmode(PGconn *conn)
+{
+	PGresult   *res;
+	int			i;
+	bool		pipeline_ended = false;
+
+	/* 1 pipeline, 3 queries in it */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s",
+				 PQerrorMessage(conn));
+
+	for (i = 0; i < 3; i++)
+	{
+		char	   *param[1];
+
+		param[0] = psprintf("%d", 44 + i);
+
+		if (PQsendQueryParams(conn,
+							  "SELECT generate_series(42, $1)",
+							  1,
+							  NULL,
+							  (const char **) param,
+							  NULL,
+							  NULL,
+							  0) != 1)
+			pg_fatal("failed to send query: %s",
+					 PQerrorMessage(conn));
+		pfree(param[0]);
+	}
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+	for (i = 0; !pipeline_ended; i++)
+	{
+		bool		first = true;
+		bool		saw_ending_tuplesok;
+		bool		isSingleTuple = false;
+
+		/* Set single row mode for only first 2 SELECT queries */
+		if (i < 2)
+		{
+			if (PQsetSingleRowMode(conn) != 1)
+				pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
+		}
+
+		/* Consume rows for this query */
+		saw_ending_tuplesok = false;
+		while ((res = PQgetResult(conn)) != NULL)
+		{
+			ExecStatusType est = PQresultStatus(res);
+
+			if (est == PGRES_PIPELINE_SYNC)
+			{
+				fprintf(stderr, "end of pipeline reached\n");
+				pipeline_ended = true;
+				PQclear(res);
+				if (i != 3)
+					pg_fatal("Expected three results, got %d", i);
+				break;
+			}
+
+			/* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
+			if (first)
+			{
+				if (i <= 1 && est != PGRES_SINGLE_TUPLE)
+					pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
+							 i, PQresStatus(est));
+				if (i >= 2 && est != PGRES_TUPLES_OK)
+					pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
+							 i, PQresStatus(est));
+				first = false;
+			}
+
+			fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
+			switch (est)
+			{
+				case PGRES_TUPLES_OK:
+					fprintf(stderr, ", tuples: %d\n", PQntuples(res));
+					saw_ending_tuplesok = true;
+					if (isSingleTuple)
+					{
+						if (PQntuples(res) == 0)
+							fprintf(stderr, "all tuples received in query %d\n", i);
+						else
+							pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
+					}
+					break;
+
+				case PGRES_SINGLE_TUPLE:
+					isSingleTuple = true;
+					fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
+					break;
+
+				default:
+					pg_fatal("unexpected");
+			}
+			PQclear(res);
+		}
+		if (!pipeline_ended && !saw_ending_tuplesok)
+			pg_fatal("didn't get expected terminating TUPLES_OK");
+	}
+
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
+}
+
+/*
+ * Simple test to verify that a pipeline is discarded as a whole when there's
+ * an error, ignoring transaction commands.
+ */
+static void
+test_transaction(PGconn *conn)
+{
+	PGresult   *res;
+	bool		expect_null;
+	int			num_syncs = 0;
+
+	res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
+				 "CREATE TABLE pq_pipeline_tst (id int)");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("failed to create test table: %s",
+				 PQerrorMessage(conn));
+	PQclear(res);
+
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s",
+				 PQerrorMessage(conn));
+	if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
+		pg_fatal("could not send prepare on pipeline: %s",
+				 PQerrorMessage(conn));
+
+	if (PQsendQueryParams(conn,
+						  "BEGIN",
+						  0, NULL, NULL, NULL, NULL, 0) != 1)
+		pg_fatal("failed to send query: %s",
+				 PQerrorMessage(conn));
+	if (PQsendQueryParams(conn,
+						  "SELECT 0/0",
+						  0, NULL, NULL, NULL, NULL, 0) != 1)
+		pg_fatal("failed to send query: %s",
+				 PQerrorMessage(conn));
+
+	/*
+	 * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
+	 * get out of the pipeline-aborted state first.
+	 */
+	if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
+		pg_fatal("failed to execute prepared: %s",
+				 PQerrorMessage(conn));
+
+	/* This insert fails because we're in pipeline-aborted state */
+	if (PQsendQueryParams(conn,
+						  "INSERT INTO pq_pipeline_tst VALUES (1)",
+						  0, NULL, NULL, NULL, NULL, 0) != 1)
+		pg_fatal("failed to send query: %s",
+				 PQerrorMessage(conn));
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	num_syncs++;
+
+	/*
+	 * This insert fails even though the pipeline got a SYNC, because we're in
+	 * an aborted transaction
+	 */
+	if (PQsendQueryParams(conn,
+						  "INSERT INTO pq_pipeline_tst VALUES (2)",
+						  0, NULL, NULL, NULL, NULL, 0) != 1)
+		pg_fatal("failed to send query: %s",
+				 PQerrorMessage(conn));
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	num_syncs++;
+
+	/*
+	 * Send ROLLBACK using prepared stmt. This one works because we just did
+	 * PQpipelineSync above.
+	 */
+	if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
+		pg_fatal("failed to execute prepared: %s",
+				 PQerrorMessage(conn));
+
+	/*
+	 * Now that we're out of a transaction and in pipeline-good mode, this
+	 * insert works
+	 */
+	if (PQsendQueryParams(conn,
+						  "INSERT INTO pq_pipeline_tst VALUES (3)",
+						  0, NULL, NULL, NULL, NULL, 0) != 1)
+		pg_fatal("failed to send query: %s",
+				 PQerrorMessage(conn));
+	/* Send two syncs now -- match up to SYNC messages below */
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	num_syncs++;
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	num_syncs++;
+
+	expect_null = false;
+	for (int i = 0;; i++)
+	{
+		ExecStatusType restype;
+
+		res = PQgetResult(conn);
+		if (res == NULL)
+		{
+			printf("%d: got NULL result\n", i);
+			if (!expect_null)
+				pg_fatal("did not expect NULL here");
+			expect_null = false;
+			continue;
+		}
+		restype = PQresultStatus(res);
+		printf("%d: got status %s", i, PQresStatus(restype));
+		if (expect_null)
+			pg_fatal("expected NULL");
+		if (restype == PGRES_FATAL_ERROR)
+			printf("; error: %s", PQerrorMessage(conn));
+		else if (restype == PGRES_PIPELINE_ABORTED)
+		{
+			printf(": command didn't run because pipeline aborted\n");
+		}
+		else
+			printf("\n");
+		PQclear(res);
+
+		if (restype == PGRES_PIPELINE_SYNC)
+			num_syncs--;
+		else
+			expect_null = true;
+		if (num_syncs <= 0)
+			break;
+	}
+	if (PQgetResult(conn) != NULL)
+		pg_fatal("returned something extra after all the syncs: %s",
+				 PQresStatus(PQresultStatus(res)));
+
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
+
+	/* We expect to find one tuple containing the value "3" */
+	res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
+	if (PQntuples(res) != 1)
+		pg_fatal("did not get 1 tuple");
+	if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
+		pg_fatal("did not get expected tuple");
+	PQclear(res);
+
+	fprintf(stderr, "ok\n");
+}
+
+static void
+usage(const char *progname)
+{
+	fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
+	fprintf(stderr, "Usage:\n");
+	fprintf(stderr, "  %s tests", progname);
+	fprintf(stderr, "  %s testname [conninfo [number_of_rows]]\n", progname);
+}
+
+static void
+print_test_list(void)
+{
+	printf("disallowed_in_pipeline\n");
+	printf("multi_pipelines\n");
+	printf("pipeline_abort\n");
+	printf("pipelined_insert\n");
+	printf("prepared\n");
+	printf("simple_pipeline\n");
+	printf("singlerow\n");
+	printf("transaction\n");
+}
+
+int
+main(int argc, char **argv)
+{
+	const char *conninfo = "";
+	PGconn	   *conn;
+	int			numrows = 10000;
+	PGresult   *res;
+
+	if (strcmp(argv[1], "tests") == 0)
+	{
+		print_test_list();
+		exit(0);
+	}
+
+	/*
+	 * The testname parameter is mandatory; it can be followed by a conninfo
+	 * string and number of rows.
+	 */
+	if (argc < 2 || argc > 4)
+	{
+		usage(argv[0]);
+		exit(1);
+	}
+
+	if (argc >= 3)
+		conninfo = pg_strdup(argv[2]);
+
+	if (argc >= 4)
+	{
+		errno = 0;
+		numrows = strtol(argv[3], NULL, 10);
+		if (errno != 0 || numrows <= 0)
+		{
+			fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n", argv[3]);
+			exit(1);
+		}
+	}
+
+	/* Make a connection to the database */
+	conn = PQconnectdb(conninfo);
+	if (PQstatus(conn) != CONNECTION_OK)
+	{
+		fprintf(stderr, "Connection to database failed: %s\n",
+				PQerrorMessage(conn));
+		exit_nicely(conn);
+	}
+	res = PQexec(conn, "SET lc_messages TO \"C\"");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
+
+	if (strcmp(argv[1], "disallowed_in_pipeline") == 0)
+		test_disallowed_in_pipeline(conn);
+	else if (strcmp(argv[1], "multi_pipelines") == 0)
+		test_multi_pipelines(conn);
+	else if (strcmp(argv[1], "pipeline_abort") == 0)
+		test_pipeline_abort(conn);
+	else if (strcmp(argv[1], "pipelined_insert") == 0)
+		test_pipelined_insert(conn, numrows);
+	else if (strcmp(argv[1], "prepared") == 0)
+		test_prepared(conn);
+	else if (strcmp(argv[1], "simple_pipeline") == 0)
+		test_simple_pipeline(conn);
+	else if (strcmp(argv[1], "singlerow") == 0)
+		test_singlerowmode(conn);
+	else if (strcmp(argv[1], "transaction") == 0)
+		test_transaction(conn);
+	else
+	{
+		fprintf(stderr, "\"%s\" is not a recognized test name\n", argv[1]);
+		usage(argv[0]);
+		exit(1);
+	}
+
+	/* close the connection to the database and cleanup */
+	PQfinish(conn);
+	return 0;
+}
diff --git a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
new file mode 100644
index 0000000000..ba15b64ca7
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
@@ -0,0 +1,28 @@
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 8;
+use Cwd;
+
+my $node = get_new_node('main');
+$node->init;
+$node->start;
+
+my $numrows = 10000;
+$ENV{PATH} = "$ENV{PATH}:" . getcwd();
+
+my ($out, $err) = run_command(['libpq_pipeline', 'tests']);
+die "oops: $err" unless $err eq '';
+my @tests = split(/\s/, $out);
+
+for my $testname (@tests)
+{
+	$node->command_ok(
+		[ 'libpq_pipeline', $testname, $node->connstr('postgres'), $numrows ],
+		"libpq_pipeline $testname");
+}
+
+$node->stop('fast');
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 74fde40e3a..a184404e21 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -33,10 +33,11 @@ my @unlink_on_exit;
 
 # Set of variables for modules in contrib/ and src/test/modules/
 my $contrib_defines = { 'refint' => 'REFINT_VERBOSE' };
-my @contrib_uselibpq = ('dblink', 'oid2name', 'postgres_fdw', 'vacuumlo');
-my @contrib_uselibpgport   = ('oid2name', 'vacuumlo');
-my @contrib_uselibpgcommon = ('oid2name', 'vacuumlo');
-my $contrib_extralibs      = undef;
+my @contrib_uselibpq =
+  ('dblink', 'oid2name', 'postgres_fdw', 'vacuumlo', 'libpq_pipeline');
+my @contrib_uselibpgport   = ('libpq_pipeline', 'oid2name', 'vacuumlo');
+my @contrib_uselibpgcommon = ('libpq_pipeline', 'oid2name', 'vacuumlo');
+my $contrib_extralibs      = { 'libpq_pipeline' => ['ws2_32.lib'] };
 my $contrib_extraincludes = { 'dblink' => ['src/backend'] };
 my $contrib_extrasource = {
 	'cube' => [ 'contrib/cube/cubescan.l', 'contrib/cube/cubeparse.y' ],
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 61cf4eae1f..9e6777e9d0 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1563,10 +1563,12 @@ PG_Locale_Strategy
 PG_Lock_Status
 PG_init_t
 PGcancel
+PGcmdQueueEntry
 PGconn
 PGdataValue
 PGlobjfuncs
 PGnotify
+PGpipelineStatus
 PGresAttDesc
 PGresAttValue
 PGresParamDesc
-- 
2.20.1

>From 2c50bf03d46fe2767c271dafb22c20e8ead5e510 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Mon, 15 Mar 2021 15:07:59 -0300
Subject: [PATCH v37 2/2] Add libpq pipeline mode support to pgbench
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Author: Daniel Vérité <dan...@manitou-mail.org>
Discussion: https://postgr.es/m/b4e34135-2bd9-4b8a-94ca-27d760da2...@manitou-mail.org
---
 src/bin/pgbench/pgbench.c                    | 128 +++++++++++++++++--
 src/bin/pgbench/t/001_pgbench_with_server.pl |  61 +++++++++
 2 files changed, 176 insertions(+), 13 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index f6a214669c..ba7b35d83c 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -395,10 +395,11 @@ typedef enum
 	 *
 	 * CSTATE_START_COMMAND starts the execution of a command.  On a SQL
 	 * command, the command is sent to the server, and we move to
-	 * CSTATE_WAIT_RESULT state.  On a \sleep meta-command, the timer is set,
-	 * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
-	 * meta-commands are executed immediately.  If the command about to start
-	 * is actually beyond the end of the script, advance to CSTATE_END_TX.
+	 * CSTATE_WAIT_RESULT state unless in pipeline mode. On a \sleep
+	 * meta-command, the timer is set, and we enter the CSTATE_SLEEP state to
+	 * wait for it to expire. Other meta-commands are executed immediately. If
+	 * the command about to start is actually beyond the end of the script,
+	 * advance to CSTATE_END_TX.
 	 *
 	 * CSTATE_WAIT_RESULT waits until we get a result set back from the server
 	 * for the current command.
@@ -530,7 +531,9 @@ typedef enum MetaCommand
 	META_IF,					/* \if */
 	META_ELIF,					/* \elif */
 	META_ELSE,					/* \else */
-	META_ENDIF					/* \endif */
+	META_ENDIF,					/* \endif */
+	META_STARTPIPELINE,			/* \startpipeline */
+	META_ENDPIPELINE			/* \endpipeline */
 } MetaCommand;
 
 typedef enum QueryMode
@@ -2568,6 +2571,10 @@ getMetaCommand(const char *cmd)
 		mc = META_GSET;
 	else if (pg_strcasecmp(cmd, "aset") == 0)
 		mc = META_ASET;
+	else if (pg_strcasecmp(cmd, "startpipeline") == 0)
+		mc = META_STARTPIPELINE;
+	else if (pg_strcasecmp(cmd, "endpipeline") == 0)
+		mc = META_ENDPIPELINE;
 	else
 		mc = META_NONE;
 	return mc;
@@ -2757,11 +2764,25 @@ sendCommand(CState *st, Command *command)
 				if (commands[j]->type != SQL_COMMAND)
 					continue;
 				preparedStatementName(name, st->use_file, j);
-				res = PQprepare(st->con, name,
-								commands[j]->argv[0], commands[j]->argc - 1, NULL);
-				if (PQresultStatus(res) != PGRES_COMMAND_OK)
-					pg_log_error("%s", PQerrorMessage(st->con));
-				PQclear(res);
+				if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
+				{
+					res = PQprepare(st->con, name,
+									commands[j]->argv[0], commands[j]->argc - 1, NULL);
+					if (PQresultStatus(res) != PGRES_COMMAND_OK)
+						pg_log_error("%s", PQerrorMessage(st->con));
+					PQclear(res);
+				}
+				else
+				{
+					/*
+					 * In pipeline mode, we use asynchronous functions. If a
+					 * server-side error occurs, it will be processed later
+					 * among the other results.
+					 */
+					if (!PQsendPrepare(st->con, name,
+									   commands[j]->argv[0], commands[j]->argc - 1, NULL))
+						pg_log_error("%s", PQerrorMessage(st->con));
+				}
 			}
 			st->prepared[st->use_file] = true;
 		}
@@ -2805,8 +2826,10 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
 	 * varprefix should be set only with \gset or \aset, and SQL commands do
 	 * not need it.
 	 */
+#if 0
 	Assert((meta == META_NONE && varprefix == NULL) ||
 		   ((meta == META_GSET || meta == META_ASET) && varprefix != NULL));
+#endif
 
 	res = PQgetResult(st->con);
 
@@ -2874,6 +2897,13 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
 				/* otherwise the result is simply thrown away by PQclear below */
 				break;
 
+			case PGRES_PIPELINE_SYNC:
+				pg_log_debug("client %d pipeline ending", st->id);
+				if (PQexitPipelineMode(st->con) != 1)
+					pg_log_error("client %d failed to exit pipeline mode: %s", st->id,
+								 PQerrorMessage(st->con));
+				break;
+
 			default:
 				/* anything else is unexpected */
 				pg_log_error("client %d script %d aborted in command %d query %d: %s",
@@ -3127,13 +3157,36 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 				/* Execute the command */
 				if (command->type == SQL_COMMAND)
 				{
+					/* disallow \aset and \gset in pipeline mode */
+					if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
+					{
+						if (command->meta == META_GSET)
+						{
+							commandFailed(st, "gset", "\\gset is not allowed in pipeline mode");
+							st->state = CSTATE_ABORTED;
+							break;
+						}
+						else if (command->meta == META_ASET)
+						{
+							commandFailed(st, "aset", "\\aset is not allowed in pipeline mode");
+							st->state = CSTATE_ABORTED;
+							break;
+						}
+					}
+
 					if (!sendCommand(st, command))
 					{
 						commandFailed(st, "SQL", "SQL command send failed");
 						st->state = CSTATE_ABORTED;
 					}
 					else
-						st->state = CSTATE_WAIT_RESULT;
+					{
+						/* Wait for results, unless in pipeline mode */
+						if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
+							st->state = CSTATE_WAIT_RESULT;
+						else
+							st->state = CSTATE_END_COMMAND;
+					}
 				}
 				else if (command->type == META_COMMAND)
 				{
@@ -3273,7 +3326,15 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 				if (readCommandResponse(st,
 										sql_script[st->use_file].commands[st->command]->meta,
 										sql_script[st->use_file].commands[st->command]->varprefix))
-					st->state = CSTATE_END_COMMAND;
+				{
+					/*
+					 * outside of pipeline mode: stop reading results.
+					 * pipeline mode: continue reading results until an
+					 * end-of-pipeline response.
+					 */
+					if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+						st->state = CSTATE_END_COMMAND;
+				}
 				else
 					st->state = CSTATE_ABORTED;
 				break;
@@ -3516,6 +3577,45 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
 			return CSTATE_ABORTED;
 		}
 	}
+	else if (command->meta == META_STARTPIPELINE)
+	{
+		/*
+		 * In pipeline mode, we use a workflow based on libpq pipeline
+		 * functions.
+		 */
+		if (querymode == QUERY_SIMPLE)
+		{
+			commandFailed(st, "startpipeline", "cannot use pipeline mode with the simple query protocol");
+			return CSTATE_ABORTED;
+		}
+
+		if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
+		{
+			commandFailed(st, "startpipeline", "already in pipeline mode");
+			return CSTATE_ABORTED;
+		}
+		if (PQenterPipelineMode(st->con) == 0)
+		{
+			commandFailed(st, "startpipeline", "failed to enter pipeline mode");
+			return CSTATE_ABORTED;
+		}
+	}
+	else if (command->meta == META_ENDPIPELINE)
+	{
+		if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+		{
+			commandFailed(st, "endpipeline", "not in pipeline mode");
+			return CSTATE_ABORTED;
+		}
+		if (!PQpipelineSync(st->con))
+		{
+			commandFailed(st, "endpipeline", "failed to send a pipeline sync");
+			return CSTATE_ABORTED;
+		}
+		/* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
+		/* collect pending results before getting out of pipeline mode */
+		return CSTATE_WAIT_RESULT;
+	}
 
 	/*
 	 * executing the expression or shell command might have taken a
@@ -4725,7 +4825,9 @@ process_backslash_command(PsqlScanState sstate, const char *source)
 			syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
 						 "missing command", NULL, -1);
 	}
-	else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF)
+	else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
+			 my_command->meta == META_STARTPIPELINE ||
+			 my_command->meta == META_ENDPIPELINE)
 	{
 		if (my_command->argc != 1)
 			syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index daffc18e52..d07b36faa7 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -755,6 +755,67 @@ pgbench(
 }
 	});
 
+# Working \startpipeline
+pgbench(
+	'-t 1 -n -M extended',
+	0,
+	[ qr{type: .*/001_pgbench_pipeline}, qr{processed: 1/1} ],
+	[],
+	'pgbench startpipeline command',
+	{
+		'001_pgbench_pipeline' => q{
+-- test startpipeline
+\startpipeline
+} . "select 1;\n" x 10 . q{
+\endpipeline
+}
+	});
+
+# Try \startpipeline twice
+pgbench(
+	'-t 1 -n -M extended',
+	2,
+	[],
+	[qr{already in pipeline mode}],
+	'pgbench startpipeline command',
+	{
+		'001_pgbench_pipeline_2' => q{
+-- startpipeline twice
+\startpipeline
+\startpipeline
+}
+	});
+
+# Try to end a pipeline that hasn't started
+pgbench(
+	'-t 1 -n -M extended',
+	2,
+	[],
+	[qr{not in pipeline mode}],
+	'pgbench startpipeline command',
+	{
+		'001_pgbench_pipeline_3' => q{
+-- pipeline not started
+\endpipeline
+}
+	});
+
+# Try \gset in pipeline mode
+pgbench(
+	'-t 1 -n -M extended',
+	2,
+	[],
+	[qr{gset is not allowed in pipeline mode}],
+	'pgbench startpipeline command',
+	{
+		'001_pgbench_pipeline_gset' => q{
+\startpipeline
+select 1 \gset f
+\endpipeline
+}
+	});
+
+
 # trigger many expression errors
 my @errors = (
 
-- 
2.20.1

Reply via email to