Here's a new version, where I've renamed everything to "pipeline". I think the docs could use some additional tweaks now in order to make a coherent story on pipeline mode, how it can be used in a batched fashion, etc.
Here's the renames I applied. It's mostly mechanical, except PQbatchSendQueue is now PQsendPipeline: PQBatchStatus -> PGpipelineStatus (enum) PQBATCH_MODE_OFF -> PQ_PIPELINE_OFF PQBATCH_MODE_ON -> PQ_PIPELINE_ON PQBATCH_MODE_ABORTED -> PQ_PIPELINE_ABORTED PQbatchStatus -> PQpipelineStatus (function) PQenterBatchMode -> PQenterPipelineMode PQexitBatchMode -> PQexitPipelineMode PQbatchSendQueue -> PQsendPipeline PGRES_BATCH_END -> PGRES_PIPELINE_END PGRES_BATCH_ABORTED -> PGRES_PIPELINE_ABORTED Also, PQbatchStatus(conn) returns enum PGpipelineStatus (it previously returned int). I'm tempted to rename PGASYNC_QUEUED to PGASYNC_PIPELINE_IDLE (not sure if PGASYNC_PIPELINE_READY fits better with the existing one). In pgbench, I changed the metacommands to be \startpipeline and \endpipeline. There's a failing Assert() there which I commented out; needs fixed. -- Álvaro Herrera 39°49'30"S 73°17'W
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index b7a82453f0..9f98257dbe 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -3099,6 +3099,31 @@ ExecStatusType PQresultStatus(const PGresult *res); </para> </listitem> </varlistentry> + + <varlistentry id="libpq-pgres-pipeline-end"> + <term><literal>PGRES_PIPELINE_END</literal></term> + <listitem> + <para> + The <structname>PGresult</structname> represents the end of a pipeline. + This status occurs only when pipeline mode has been selected. + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-pgres-pipeline-aborted"> + <term><literal>PGRES_PIPELINE_ABORTED</literal></term> + <listitem> + <para> + The <structname>PGresult</structname> represents a pipeline that's + received an error from the server. <function>PQgetResult</function> + must be called repeatedly, and it will return this status code, + until the end of the current pipeline, at which point it will return + <literal>PGRES_PIPELINE_END</literal> and normal processing can + resume. + </para> + </listitem> + </varlistentry> + </variablelist> If the result status is <literal>PGRES_TUPLES_OK</literal> or @@ -4857,6 +4882,494 @@ int PQflush(PGconn *conn); </sect1> + <sect1 id="libpq-pipeline-mode"> + <title>Pipeline Mode</title> + + <indexterm zone="libpq-pipeline-mode"> + <primary>libpq</primary> + <secondary>pipeline mode</secondary> + </indexterm> + + <indexterm zone="libpq-pipeline-mode"> + <primary>pipelining</primary> + <secondary>in libpq</secondary> + </indexterm> + + <indexterm zone="libpq-pipeline-mode"> + <primary>batch mode</primary> + <secondary>in libpq</secondary> + </indexterm> + + <para> + <application>libpq</application> pipeline mode allows applications to + send a query without having to read the result of the previously + sent query. Taking advantage of the pipeline mode, a client will wait + less for the server, since multiple queries/results can be sent/ + received in a single network transaction. + </para> + + <para> + While pipeline mode provides a significant performance boost, writing + clients using the pipeline mode is more complex because it involves + managing a queue of pending queries and finding which result + corresponds to which query in the queue. + </para> + + <para> + Pipeline mode also generally consumes more memory on both the client and server, + though careful and aggressive management of the send/receive queue can mitigate + this. This applies whether or not the connection is in blocking or non-blocking + mode. + </para> + + <sect2 id="libpq-pipeline-using"> + <title>Using Pipeline Mode</title> + + <para> + To issue pipelines the application must switch a connection into pipeline mode. + Enter pipeline mode with <xref linkend="libpq-PQenterPipelineMode"/> + or test whether pipeline mode is active with + <xref linkend="libpq-PQpipelineStatus"/>. + In pipeline mode, only <link linkend="libpq-async">asynchronous operations</link> + are permitted, and <literal>COPY</literal> is disallowed. + Using any synchronous command execution functions, + such as <function>PQfn</function>, or <function>PQexec</function> + and its sibling functions, is an error condition. + </para> + + <note> + <para> + It is best to use pipeline mode with <application>libpq</application> in + <link linkend="libpq-PQsetnonblocking">non-blocking mode</link>. If used + in blocking mode it is possible for a client/server deadlock to occur. + <footnote> + <para> + The client will block trying to send queries to the server, but the + server will block trying to send results to the client from queries + it has already processed. This only occurs when the client sends + enough queries to fill its output buffer and the server's receive + buffer before switching to processing input from the server, + but it's hard to predict exactly when that will happen. + </para> + </footnote> + </para> + </note> + + <sect3 id="libpq-pipeline-sending"> + <title>Issuing Queries</title> + + <para> + After entering pipeline mode the application dispatches requests using + normal asynchronous <application>libpq</application> functions, such as: + <function>PQsendQueryParams</function>, <function>PQsendPrepare</function>, + <function>PQsendQueryPrepared</function>, <function>PQsendDescribePortal</function>, + and <function>PQsendDescribePrepared</function>. + The asynchronous requests are followed by a + <xref linkend="libpq-PQsendPipeline"/> + call to mark the end of the pipeline. The client need not + call <function>PQgetResult</function> immediately after + dispatching each operation. + <link linkend="libpq-pipeline-results">Result processing</link> + is handled separately. + </para> + + <para> + The server executes statements, and returns results, in the order the + client sends them. The server will begin executing the commands in the + pipeline immediately, not waiting for the end of the pipeline. + If any statement encounters an error the server aborts the current + transaction and skips processing the rest of the pipeline. + Query processing resumes after the end of the failed pipeline. + </para> + + <para> + It's fine for one operation to depend on the results of a + prior one. One query may define a table that the next query in the same + pipeline uses; similarly, an application may create a named prepared statement + then execute it with later statements in the same pipeline. + </para> + </sect3> + + <sect3 id="libpq-pipeline-results"> + <title>Processing Results</title> + + <para> + To process the result of one pipeline query, the application calls + <function>PQgetResult</function> repeatedly and handles each result + until <function>PQgetResult</function> returns null. + The result from the next query in the pipeline may then be retrieved using + <function>PQgetResult</function> again and the cycle repeated. + The application handles individual statement results as normal. + When the results of all the queries in the pipeline have been + returned, <function>PQgetResult</function> returns a result + containing the status value <literal>PGRES_PIPELINE_END</literal>. + </para> + + <para> + The client may choose to defer result processing until the complete + pipeline has been sent, or interleave that with sending further + queries in the pipeline; see <xref linkend="libpq-pipeline-interleave" />. + </para> + + <para> + To enter single-row mode, call <function>PQsetSingleRowMode</function> + before retrieving results with <function>PQgetResult</function>. + This mode selection is effective only for the query currently + being processed. For more information on the use of + <function>PQsetSingleRowMode</function>, + refer to <xref linkend="libpq-single-row-mode"/>. + </para> + + <para> + <function>PQgetResult</function> behaves the same as for normal + asynchronous processing except that it may contain the new + <type>PGresult</type> types <literal>PGRES_PIPELINE_END</literal> + and <literal>PGRES_PIPELINE_ABORTED</literal>. + <literal>PGRES_PIPELINE_END</literal> is reported exactly once for each + <function>PQsendPipeline</function> call at the corresponding point in + the result stream. + <literal>PGRES_PIPELINE_ABORTED</literal> is emitted in place of a normal + result stream result for the first error and all subsequent results + except <literal>PGRES_PIPELINE_END</literal> and null; + see <xref linkend="libpq-pipeline-errors"/>. + </para> + + <para> + <function>PQisBusy</function>, <function>PQconsumeInput</function>, etc + operate as normal when processing pipeline results. + </para> + + <para> + <application>libpq</application> does not provide any information to the + application about the query currently being processed (except that + <function>PQgetResult</function> returns null to indicate that we start + returning the results of next query). The application must keep track + of the order in which it sent queries and the expected results. + Applications will typically use a state machine or a FIFO queue for this. + </para> + + </sect3> + + <sect3 id="libpq-pipeline-errors"> + <title>Error Handling</title> + + <para> + When a query in a pipeline causes an <literal>ERROR</literal> the server + skips processing all subsequent messages until the end-of-pipeline message. + The open transaction is aborted. + </para> + + <para> + From the client perspective, after the client gets a + <literal>PGRES_FATAL_ERROR</literal> return from + <function>PQresultStatus</function> the pipeline is flagged as aborted. + <application>libpq</application> will report + <literal>PGRES_PIPELINE_ABORTED</literal> result for each remaining queued + operation in an aborted pipeline. The result for + <function>PQsendPipeline</function> is reported as + <literal>PGRES_PIPELINE_END</literal> to signal the end of the aborted pipeline + and resumption of normal result processing. + </para> + + <para> + The client <emphasis>must</emphasis> process results with + <function>PQgetResult</function> during error recovery. + </para> + + <para> + If the pipeline used an implicit transaction then operations that have + already executed are rolled back and operations that were queued for after + the failed operation are skipped entirely. The same behaviour holds if the + pipeline starts and commits a single explicit transaction (i.e. the first + statement is <literal>BEGIN</literal> and the last is + <literal>COMMIT</literal>) except that the session remains in an aborted + transaction state at the end of the pipeline. If a pipeline contains + <emphasis>multiple explicit transactions</emphasis>, all transactions that + committed prior to the error remain committed, the currently in-progress + transaction is aborted, and all subsequent operations are skipped completely, + including subsequent transactions. + </para> + + <note> + <para> + The client must not assume that work is committed when it + <emphasis>sends</emphasis> a <literal>COMMIT</literal>, only when the + corresponding result is received to confirm the commit is complete. + Because errors arrive asynchronously the application needs to be able to + restart from the last <emphasis>received</emphasis> committed change and + resend work done after that point if something goes wrong. + </para> + </note> + </sect3> + + <sect3 id="libpq-pipeline-interleave"> + <title>Interleaving Result Processing and Query Dispatch</title> + + <para> + To avoid deadlocks on large pipelines the client should be structured + around a non-blocking event loop using operating system facilities + such as <function>select</function>, <function>poll</function>, + <function>WaitForMultipleObjectEx</function>, etc. + </para> + + <para> + The client application should generally maintain a queue of work + still to be dispatched and a queue of work that has been dispatched + but not yet had its results processed. When the socket is writable + it should dispatch more work. When the socket is readable it should + read results and process them, matching them up to the next entry in + its expected results queue. Based on available memory, results from + socket should be read frequently: there's no need to wait until the + pipeline end to read the results. Pipelines should be scoped to logical + units of work, usually (but not necessarily) one transaction per pipeline. + There's no need to exit pipeline mode and re-enter it between pipelines, + or to wait for one pipeline to finish before sending the next. + </para> + + <para> + An example using <function>select()</function> and a simple state + machine to track sent and received work is in + <filename>src/test/modules/test_libpq/pipeline.c</filename> + in the PostgreSQL source distribution. + </para> + </sect3> + + <sect3 id="libpq-pipeline-end"> + <title>Ending Pipeline Mode</title> + + <para> + Once all dispatched commands have had their results processed, and + the end pipeline result has been consumed, the application may return + to non-pipelined mode with <xref linkend="libpq-PQexitPipelineMode"/>. + </para> + </sect3> + </sect2> + + <sect2 id="libpq-pipeline-functions"> + <title>Functions Associated with Pipeline Mode</title> + + <variablelist> + + <varlistentry id="libpq-PQpipelineStatus"> + <term> + <function>PQpipelineStatus</function> + <indexterm> + <primary>PQpipelineStatus</primary> + </indexterm> + </term> + + <listitem> + <para> + Returns current pipeline mode status of the <application>libpq</application> + connection. +<synopsis> +PGpipelineStatus PQpipelineStatus(const PGconn *conn); +</synopsis> + </para> + + <para> + <function>PQpipelineStatus</function> can return one of the following values: + + <variablelist> + <varlistentry> + <term> + <literal>PQ_PIPELINE_ON</literal> + </term> + <listitem> + <para> + The <application>libpq</application> connection is in + pipeline mode. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term> + <literal>PQ_PIPELINE_OFF</literal> + </term> + <listitem> + <para> + The <application>libpq</application> connection is + <emphasis>not</emphasis> in pipeline mode. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term> + <literal>PQ_PIPELINE_ABORTED</literal> + </term> + <listitem> + <para> + The <application>libpq</application> connection is in pipeline + mode and an error has occurred while processing the current + pipeline. + The aborted flag is cleared as soon as the result + of the <function>PQsendPipeline</function> at the end of the aborted + pipeline is processed. Clients don't usually need this function to + verify aborted status, as they can tell that the pipeline is aborted + from the <literal>PGRES_PIPELINE_ABORTED</literal> result code. + </para> + </listitem> + </varlistentry> + + </variablelist> + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-PQenterPipelineMode"> + <term> + <function>PQenterPipelineMode</function> + <indexterm> + <primary>PQenterPipelineMode</primary> + </indexterm> + </term> + + <listitem> + <para> + Causes a connection to enter pipeline mode if it is currently idle or + already in pipeline mode. + +<synopsis> +int PQenterPipelineMode(PGconn *conn); +</synopsis> + + </para> + <para> + Returns 1 for success. + Returns 0 and has no effect if the connection is not currently + idle, i.e., it has a result ready, or it is waiting for more + input from the server, etc. + This function does not actually send anything to the server, + it just changes the <application>libpq</application> connection + state. + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-PQexitPipelineMode"> + <term> + <function>PQexitPipelineMode</function> + <indexterm> + <primary>PQexitPipelineMode</primary> + </indexterm> + </term> + + <listitem> + <para> + Causes a connection to exit pipeline mode if it is currently in pipeline mode + with an empty queue and no pending results. +<synopsis> +int PQexitPipelineMode(PGconn *conn); +</synopsis> + </para> + <para> + Returns 1 for success. Returns 1 and takes no action if not in + pipeline mode. If the current statement isn't finished processing + or there are results pending for collection with + <function>PQgetResult</function>, returns 0 and does nothing. + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-PQsendPipeline"> + <term> + <function>PQsendPipeline</function> + <indexterm> + <primary>PQsendPipeline</primary> + </indexterm> + </term> + + <listitem> + <para> + Marks a synchronization point in a pipeline by sending a + <link linkend="protocol-flow-ext-query">sync message</link> + and flushing the send buffer. This serves as + the delimiter of an implicit transaction and an error recovery + point; see <xref linkend="libpq-pipeline-errors"/>. + +<synopsis> +int PQsendPipeline(PGconn *conn); +</synopsis> + </para> + <para> + Returns 1 for success. Returns 0 if the connection is not in + pipeline mode or sending a + <link linkend="protocol-flow-ext-query">sync message</link> + is failed. + </para> + </listitem> + </varlistentry> + </variablelist> + </sect2> + + <sect2 id="libpq-pipeline-tips"> + <title>When to Use Pipeline Mode</title> + + <para> + Much like asynchronous query mode, there is no meaningful performance + overhead when using pipeline mode. It increases client application complexity, + and extra caution is required to prevent client/server deadlocks, but + pipeline mode can offer considerable performance improvements, in exchange for + increased memory usage from leaving state around longer. + </para> + + <para> + Pipeline mode is most useful when the server is distant, i.e., network latency + (<quote>ping time</quote>) is high, and also when many small operations + are being performed in rapid sequence. There is usually less benefit + in using pipelined commands when each query takes many multiples of the client/server + round-trip time to execute. A 100-statement operation run on a server + 300ms round-trip-time away would take 30 seconds in network latency alone + without pipelining; with pipelining it may spend as little as 0.3s waiting for + results from the server. + </para> + + <para> + Use pipelined commands when your application does lots of small + <literal>INSERT</literal>, <literal>UPDATE</literal> and + <literal>DELETE</literal> operations that can't easily be transformed + into operations on sets, or into a <literal>COPY</literal> operation. + </para> + + <para> + Pipeline mode is not useful when information from one operation is required by + the client to produce the next operation. In such cases, the client + must introduce a synchronization point and wait for a full client/server + round-trip to get the results it needs. However, it's often possible to + adjust the client design to exchange the required information server-side. + Read-modify-write cycles are especially good candidates; for example: + <programlisting> +BEGIN; +SELECT x FROM mytable WHERE id = 42 FOR UPDATE; +-- result: x=2 +-- client adds 1 to x: +UPDATE mytable SET x = 3 WHERE id = 42; +COMMIT; + </programlisting> + could be much more efficiently done with: + <programlisting> +UPDATE mytable SET x = x + 1 WHERE id = 42; + </programlisting> + </para> + + <para> + Pipelining is less useful, and more complex, when a single pipeline contains + multiple transactions (see <xref linkend="libpq-pipeline-errors"/>). + </para> + + <note> + <para> + The pipeline API was introduced in PostgreSQL 14, but clients using + the PostgreSQL 14 version of <application>libpq</application> can use + pipelines on server versions 7.4 and newer. Pipeline mode works on any server + that supports the v3 extended query protocol. + </para> + </note> + </sect2> + </sect1> + <sect1 id="libpq-single-row-mode"> <title>Retrieving Query Results Row-by-Row</title> @@ -4897,6 +5410,13 @@ int PQflush(PGconn *conn); Each object should be freed with <xref linkend="libpq-PQclear"/> as usual. </para> + <para> + When using pipeline mode, single-row mode needs to be activated for each + query in the pipeline before retrieving results for that query + with <function>PQgetResult</function>. + See <xref linkend="libpq-pipeline-mode"/> for more information. + </para> + <para> <variablelist> <varlistentry id="libpq-PQsetSingleRowMode"> diff --git a/doc/src/sgml/lobj.sgml b/doc/src/sgml/lobj.sgml index 6d46da42e2..012e44c736 100644 --- a/doc/src/sgml/lobj.sgml +++ b/doc/src/sgml/lobj.sgml @@ -130,6 +130,10 @@ <application>libpq</application> library. </para> + <para> + Client applications cannot use these functions while a libpq connection is in pipeline mode. + </para> + <sect2 id="lo-create"> <title>Creating a Large Object</title> diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml index faa7c26b0a..dc3f817879 100644 --- a/doc/src/sgml/ref/pgbench.sgml +++ b/doc/src/sgml/ref/pgbench.sgml @@ -1096,6 +1096,12 @@ pgbench <optional> <replaceable>options</replaceable> </optional> <replaceable>d row, the last value is kept. </para> + <para> + <literal>\gset</literal> and <literal>\aset</literal> cannot be used + pipeline mode, since query results are not immediately + fetched in this mode. + </para> + <para> The following example puts the final account balance from the first query into variable <replaceable>abalance</replaceable>, and fills variables @@ -1256,6 +1262,21 @@ SELECT 4 AS four \; SELECT 5 AS five \aset </programlisting></para> </listitem> </varlistentry> + + <varlistentry id='pgbench-metacommand-pipeline'> + <term><literal>\startpipeline</literal></term> + <term><literal>\endpipeline</literal></term> + + <listitem> + <para> + These commands delimit the start and end of a pipeline of SQL statements. + In a pipeline, statements are sent to server without waiting for the results + of previous statements (see <xref linkend="libpq-pipeline-mode"/>). + Pipeline mode requires the extended query protocol. + </para> + </listitem> + </varlistentry> + </variablelist> </refsect2> diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index a4a3f40048..df75a471ee 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -351,10 +351,11 @@ typedef enum * * CSTATE_START_COMMAND starts the execution of a command. On a SQL * command, the command is sent to the server, and we move to - * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set, - * and we enter the CSTATE_SLEEP state to wait for it to expire. Other - * meta-commands are executed immediately. If the command about to start - * is actually beyond the end of the script, advance to CSTATE_END_TX. + * CSTATE_WAIT_RESULT state unless in pipeline mode. On a \sleep + * meta-command, the timer is set, and we enter the CSTATE_SLEEP state to + * wait for it to expire. Other meta-commands are executed immediately. If + * the command about to start is actually beyond the end of the script, + * advance to CSTATE_END_TX. * * CSTATE_WAIT_RESULT waits until we get a result set back from the server * for the current command. @@ -485,7 +486,9 @@ typedef enum MetaCommand META_IF, /* \if */ META_ELIF, /* \elif */ META_ELSE, /* \else */ - META_ENDIF /* \endif */ + META_ENDIF, /* \endif */ + META_STARTPIPELINE, /* \startpipeline */ + META_ENDPIPELINE /* \endpipeline */ } MetaCommand; typedef enum QueryMode @@ -2505,6 +2508,10 @@ getMetaCommand(const char *cmd) mc = META_GSET; else if (pg_strcasecmp(cmd, "aset") == 0) mc = META_ASET; + else if (pg_strcasecmp(cmd, "startpipeline") == 0) + mc = META_STARTPIPELINE; + else if (pg_strcasecmp(cmd, "endpipeline") == 0) + mc = META_ENDPIPELINE; else mc = META_NONE; return mc; @@ -2694,11 +2701,25 @@ sendCommand(CState *st, Command *command) if (commands[j]->type != SQL_COMMAND) continue; preparedStatementName(name, st->use_file, j); - res = PQprepare(st->con, name, - commands[j]->argv[0], commands[j]->argc - 1, NULL); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pg_log_error("%s", PQerrorMessage(st->con)); - PQclear(res); + if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF) + { + res = PQprepare(st->con, name, + commands[j]->argv[0], commands[j]->argc - 1, NULL); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_log_error("%s", PQerrorMessage(st->con)); + PQclear(res); + } + else + { + /* + * In pipeline mode, we use asynchronous functions. If a + * server-side error occurs, it will be processed later + * among the other results. + */ + if (!PQsendPrepare(st->con, name, + commands[j]->argv[0], commands[j]->argc - 1, NULL)) + pg_log_error("%s", PQerrorMessage(st->con)); + } } st->prepared[st->use_file] = true; } @@ -2743,8 +2764,10 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix) * varprefix should be set only with \gset or \aset, and SQL commands do * not need it. */ +#if 0 Assert((meta == META_NONE && varprefix == NULL) || ((meta == META_GSET || meta == META_ASET) && varprefix != NULL)); +#endif res = PQgetResult(st->con); @@ -2812,6 +2835,12 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix) /* otherwise the result is simply thrown away by PQclear below */ break; + case PGRES_PIPELINE_END: + pg_log_debug("client %d pipeline ending", st->id); + if (PQexitPipelineMode(st->con) != 1) + pg_log_error("client %d failed to exit pipeline mode", st->id); + break; + default: /* anything else is unexpected */ pg_log_error("client %d script %d aborted in command %d query %d: %s", @@ -3070,13 +3099,36 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) /* Execute the command */ if (command->type == SQL_COMMAND) { + /* disallow \aset and \gset in pipeline mode */ + if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF) + { + if (command->meta == META_GSET) + { + commandFailed(st, "gset", "\\gset is not allowed in pipeline mode"); + st->state = CSTATE_ABORTED; + break; + } + else if (command->meta == META_ASET) + { + commandFailed(st, "aset", "\\aset is not allowed in pipeline mode"); + st->state = CSTATE_ABORTED; + break; + } + } + if (!sendCommand(st, command)) { commandFailed(st, "SQL", "SQL command send failed"); st->state = CSTATE_ABORTED; } else - st->state = CSTATE_WAIT_RESULT; + { + /* Wait for results, unless in pipeline mode */ + if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF) + st->state = CSTATE_WAIT_RESULT; + else + st->state = CSTATE_END_COMMAND; + } } else if (command->type == META_COMMAND) { @@ -3216,7 +3268,15 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) if (readCommandResponse(st, sql_script[st->use_file].commands[st->command]->meta, sql_script[st->use_file].commands[st->command]->varprefix)) - st->state = CSTATE_END_COMMAND; + { + /* + * outside of pipeline mode: stop reading results. + * pipeline mode: continue reading results until an + * end-of-pipeline response. + */ + if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON) + st->state = CSTATE_END_COMMAND; + } else st->state = CSTATE_ABORTED; break; @@ -3460,6 +3520,46 @@ executeMetaCommand(CState *st, instr_time *now) return CSTATE_ABORTED; } } + else if (command->meta == META_STARTPIPELINE) + { + /* + * In pipeline mode, we use a workflow based on libpq pipeline + * functions. + */ + if (querymode == QUERY_SIMPLE) + { + commandFailed(st, "startpipeline", "cannot use pipeline mode with the simple query protocol"); + st->state = CSTATE_ABORTED; + return CSTATE_ABORTED; + } + + if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF) + { + commandFailed(st, "startpipeline", "already in pipeline mode"); + return CSTATE_ABORTED; + } + if (PQenterPipelineMode(st->con) == 0) + { + commandFailed(st, "startpipeline", "failed to enter pipeline mode"); + return CSTATE_ABORTED; + } + } + else if (command->meta == META_ENDPIPELINE) + { + if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON) + { + commandFailed(st, "endpipeline", "not in pipeline mode"); + return CSTATE_ABORTED; + } + if (!PQsendPipeline(st->con)) + { + commandFailed(st, "endpipeline", "failed to send the pipeline"); + return CSTATE_ABORTED; + } + /* XXX shouldn't we do PQexitPipelineMode here? */ + /* collect pending results before getting out of pipeline mode */ + return CSTATE_WAIT_RESULT; + } /* * executing the expression or shell command might have taken a @@ -4687,7 +4787,9 @@ process_backslash_command(PsqlScanState sstate, const char *source) syntax_error(source, lineno, my_command->first_line, my_command->argv[0], "missing command", NULL, -1); } - else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF) + else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF || + my_command->meta == META_STARTPIPELINE || + my_command->meta == META_ENDPIPELINE) { if (my_command->argc != 1) syntax_error(source, lineno, my_command->first_line, my_command->argv[0], @@ -6812,4 +6914,5 @@ pthread_join(pthread_t th, void **thread_return) return 0; } + #endif /* WIN32 */ diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index bbc1f90481..60d09e6d63 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -179,3 +179,7 @@ PQgetgssctx 176 PQsetSSLKeyPassHook_OpenSSL 177 PQgetSSLKeyPassHook_OpenSSL 178 PQdefaultSSLKeyPassHook_OpenSSL 179 +PQenterPipelineMode 180 +PQexitPipelineMode 181 +PQsendPipeline 182 +PQpipelineStatus 183 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 8ca0583aa9..6c468c5301 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -536,6 +536,23 @@ pqDropConnection(PGconn *conn, bool flushInput) } } +/* + * pqFreeCommandQueue + * Free all the entries of PGcommandQueueEntry queue passed. + */ +static void +pqFreeCommandQueue(PGcommandQueueEntry *queue) +{ + while (queue != NULL) + { + PGcommandQueueEntry *cur = queue; + + queue = cur->next; + if (cur->query) + free(cur->query); + free(cur); + } +} /* * pqDropServerData @@ -567,6 +584,12 @@ pqDropServerData(PGconn *conn) } conn->notifyHead = conn->notifyTail = NULL; + pqFreeCommandQueue(conn->cmd_queue_head); + conn->cmd_queue_head = conn->cmd_queue_tail = NULL; + + pqFreeCommandQueue(conn->cmd_queue_recycle); + conn->cmd_queue_recycle = NULL; + /* Reset ParameterStatus data, as well as variables deduced from it */ pstatus = conn->pstatus; while (pstatus != NULL) @@ -6656,6 +6679,15 @@ PQbackendPID(const PGconn *conn) return conn->be_pid; } +PGpipelineStatus +PQpipelineStatus(const PGconn *conn) +{ + if (!conn) + return false; + + return conn->pipelineStatus; +} + int PQconnectionNeedsPassword(const PGconn *conn) { diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index e730753387..c467bb903b 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -39,7 +39,9 @@ char *const pgresStatus[] = { "PGRES_NONFATAL_ERROR", "PGRES_FATAL_ERROR", "PGRES_COPY_BOTH", - "PGRES_SINGLE_TUPLE" + "PGRES_SINGLE_TUPLE", + "PGRES_PIPELINE_END", + "PGRES_PIPELINE_ABORTED" }; /* @@ -71,6 +73,11 @@ static PGresult *PQexecFinish(PGconn *conn); static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target); static int check_field_number(const PGresult *res, int field_num); +static PGcommandQueueEntry *pqMakePipelineCmd(PGconn *conn); +static void pqAppendPipelineCmd(PGconn *conn, PGcommandQueueEntry *entry); +static void pqRecyclePipelineCmd(PGconn *conn, PGcommandQueueEntry *entry); +static void pqPipelineProcessQueue(PGconn *conn); +static int pqPipelineFlush(PGconn *conn); /* ---------------- @@ -1160,7 +1167,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) conn->next_result = conn->result; conn->result = res; /* And mark the result ready to return */ - conn->asyncStatus = PGASYNC_READY; + conn->asyncStatus = PGASYNC_READY_MORE; } return 1; @@ -1186,6 +1193,13 @@ fail: int PQsendQuery(PGconn *conn, const char *query) { + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot PQsendQuery in pipeline mode, use PQsendQueryParams\n")); + return 0; + } + return PQsendQueryInternal(conn, query, true); } @@ -1228,10 +1242,11 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery) conn->last_query = strdup(query); /* - * Give the data a push. In nonblock mode, don't complain if we're unable - * to send it all; PQgetResult() will do any additional flushing needed. + * Give the data a push (in pipeline mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqPipelineFlush(conn) < 0) { /* error message should be set up already */ return 0; @@ -1296,6 +1311,8 @@ PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes) { + PGcommandQueueEntry *pipeCmd = NULL; + if (!PQsendQueryStart(conn, true)) return 0; @@ -1327,6 +1344,15 @@ PQsendPrepare(PGconn *conn, return 0; } + /* Alloc pipeline memory before doing anything */ + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + pipeCmd = pqMakePipelineCmd(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + } + /* construct the Parse message */ if (pqPutMsgStart('P', false, conn) < 0 || pqPuts(stmtName, conn) < 0 || @@ -1353,32 +1379,46 @@ PQsendPrepare(PGconn *conn, if (pqPutMsgEnd(conn) < 0) goto sendFailed; - /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + /* + * In non-pipeline mode, add a Sync and prepare to send. In pipeline mode + * we just keep track of the new message. + */ + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + { + /* construct the Sync message */ + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + /* remember we are doing just a Parse */ + conn->queryclass = PGQUERY_PREPARE; - /* remember we are doing just a Parse */ - conn->queryclass = PGQUERY_PREPARE; - - /* and remember the query text too, if possible */ - /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); - conn->last_query = strdup(query); + /* and remember the query text too, if possible */ + /* if insufficient memory, last_query just winds up NULL */ + if (conn->last_query) + free(conn->last_query); + conn->last_query = strdup(query); + conn->asyncStatus = PGASYNC_BUSY; + } + else + { + pipeCmd->queryclass = PGQUERY_PREPARE; + /* as above, if insufficient memory, query winds up NULL */ + pipeCmd->query = strdup(query); + pqAppendPipelineCmd(conn, pipeCmd); + } /* - * Give the data a push. In nonblock mode, don't complain if we're unable - * to send it all; PQgetResult() will do any additional flushing needed. + * Give the data a push (in pipeline mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqPipelineFlush(conn) < 0) goto sendFailed; - /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + pqRecyclePipelineCmd(conn, pipeCmd); /* error message should be set up already */ return 0; } @@ -1426,7 +1466,8 @@ PQsendQueryPrepared(PGconn *conn, } /* - * Common startup code for PQsendQuery and sibling routines + * PQsendQueryStart + * Common startup code for PQsendQuery and sibling routines */ static bool PQsendQueryStart(PGconn *conn, bool newQuery) @@ -1447,20 +1488,61 @@ PQsendQueryStart(PGconn *conn, bool newQuery) libpq_gettext("no connection to the server\n")); return false; } - /* Can't send while already busy, either. */ - if (conn->asyncStatus != PGASYNC_IDLE) + + /* Can't send while already busy, either, unless enqueuing for later */ + if (conn->asyncStatus != PGASYNC_IDLE && + conn->pipelineStatus == PQ_PIPELINE_OFF) { appendPQExpBufferStr(&conn->errorMessage, libpq_gettext("another command is already in progress\n")); return false; } - /* initialize async result-accumulation state */ - pqClearAsyncResult(conn); + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + /* + * When enqueuing a message we don't change much of the connection + * state since it's already in use for the current command. The + * connection state will get updated when pqPipelineProcessQueue() + * advances to start processing the queued message. + * + * Just make sure we can safely enqueue given the current connection + * state. We can enqueue behind another queue item, or behind a + * non-queue command (one that sends its own sync), but we can't + * enqueue if the connection is in a copy state. + */ + switch (conn->asyncStatus) + { + case PGASYNC_QUEUED: + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* ok to queue */ + break; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot queue commands during COPY\n")); + return false; + case PGASYNC_IDLE: + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("internal error, idle state in pipeline mode\n")); + return false; + } + } + else + { + /* + * This command's results will come in immediately. Initialize async + * result-accumulation state + */ + pqClearAsyncResult(conn); - /* reset single-row processing mode */ - conn->singleRowMode = false; + /* reset single-row processing mode */ + conn->singleRowMode = false; + } /* ready to send command message */ return true; } @@ -1484,6 +1566,10 @@ PQsendQueryGuts(PGconn *conn, int resultFormat) { int i; + PGcommandQueueEntry *pipeCmd = NULL; + char **query; + PGQueryClass *queryclass; + /* This isn't gonna work on a 2.0 server */ if (PG_PROTOCOL_MAJOR(conn->pversion) < 3) @@ -1493,9 +1579,26 @@ PQsendQueryGuts(PGconn *conn, return 0; } + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + pipeCmd = pqMakePipelineCmd(conn); + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + query = &pipeCmd->query; + queryclass = &pipeCmd->queryclass; + } + else + { + query = &conn->last_query; + queryclass = &conn->queryclass; + } + + /* - * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync, - * using specified statement name and the unnamed portal. + * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync + * (if not in pipeline mode), using specified statement name and the + * unnamed portal. */ if (command) @@ -1605,35 +1708,43 @@ PQsendQueryGuts(PGconn *conn, pqPutMsgEnd(conn) < 0) goto sendFailed; - /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + /* construct the Sync message if not in pipeline mode */ + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + { + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are using extended query protocol */ - conn->queryclass = PGQUERY_EXTENDED; + *queryclass = PGQUERY_EXTENDED; /* and remember the query text too, if possible */ - /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); + /* if insufficient memory, query just winds up NULL */ + if (*query) + free(*query); if (command) - conn->last_query = strdup(command); + *query = strdup(command); else - conn->last_query = NULL; + *query = NULL; /* - * Give the data a push. In nonblock mode, don't complain if we're unable - * to send it all; PQgetResult() will do any additional flushing needed. + * Give the data a push (in pipeline mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqPipelineFlush(conn) < 0) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + pqAppendPipelineCmd(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + pqRecyclePipelineCmd(conn, pipeCmd); /* error message should be set up already */ return 0; } @@ -1734,14 +1845,17 @@ PQisBusy(PGconn *conn) return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed; } - /* * PQgetResult * Get the next PGresult produced by a query. Returns NULL if no * query work remains or an error has occurred (e.g. out of * memory). + * + * In pipeline mode, once all the result of a query have been returned, + * PQgetResult returns NULL to let the user know that the next + * query is being processed. At the end of the pipeline, returns a + * result with PQresultStatus(result) == PGRES_PIPELINE_END. */ - PGresult * PQgetResult(PGconn *conn) { @@ -1810,9 +1924,38 @@ PQgetResult(PGconn *conn) switch (conn->asyncStatus) { case PGASYNC_IDLE: + case PGASYNC_QUEUED: res = NULL; /* query is complete */ + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + /* + * In pipeline mode, we prepare the processing of the results + * of the next query. + */ + pqPipelineProcessQueue(conn); + } break; case PGASYNC_READY: + res = pqPrepareAsyncResult(conn); + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + /* + * In pipeline mode, query execution state cannot be IDLE as + * there can be other queries or results waiting in the queue + * + * The connection isn't idle since we can't submit new + * non-pipeline commands. It isn't also busy since the current + * command is done and we need to process a new one. + */ + conn->asyncStatus = PGASYNC_QUEUED; + } + else + { + /* Set the state back to BUSY, allowing parsing to proceed. */ + conn->asyncStatus = PGASYNC_BUSY; + } + break; + case PGASYNC_READY_MORE: res = pqPrepareAsyncResult(conn); /* Set the state back to BUSY, allowing parsing to proceed. */ conn->asyncStatus = PGASYNC_BUSY; @@ -1993,6 +2136,13 @@ PQexecStart(PGconn *conn) if (!conn) return false; + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("synchronous command execution functions are not allowed in pipeline mode\n")); + return false; + } + /* * Since this is the beginning of a query cycle, reset the error buffer. */ @@ -2177,6 +2327,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal) static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) { + PGcommandQueueEntry *pipeCmd = NULL; + PGQueryClass *queryclass; + /* Treat null desc_target as empty string */ if (!desc_target) desc_target = ""; @@ -2192,6 +2345,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) return 0; } + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + pipeCmd = pqMakePipelineCmd(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + queryclass = &pipeCmd->queryclass; + } + else + queryclass = &conn->queryclass; + /* construct the Describe message */ if (pqPutMsgStart('D', false, conn) < 0 || pqPutc(desc_type, conn) < 0 || @@ -2200,32 +2365,40 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) goto sendFailed; /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + { + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are doing a Describe */ - conn->queryclass = PGQUERY_DESCRIBE; + *queryclass = PGQUERY_DESCRIBE; - /* reset last_query string (not relevant now) */ - if (conn->last_query) + /* reset last-query string (not relevant now) */ + if (conn->last_query && conn->pipelineStatus != PQ_PIPELINE_OFF) { free(conn->last_query); conn->last_query = NULL; } /* - * Give the data a push. In nonblock mode, don't complain if we're unable - * to send it all; PQgetResult() will do any additional flushing needed. + * Give the data a push (in pipeline mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqPipelineFlush(conn) < 0) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + pqAppendPipelineCmd(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + pqRecyclePipelineCmd(conn, pipeCmd); /* error message should be set up already */ return 0; } @@ -2624,6 +2797,13 @@ PQfn(PGconn *conn, */ resetPQExpBuffer(&conn->errorMessage); + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("PQfn not allowed in pipeline mode\n")); + return NULL; + } + if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE || conn->result != NULL) { @@ -2644,6 +2824,371 @@ PQfn(PGconn *conn, args, nargs); } +/* ====== Pipeline mode support ======== */ + +/* + * PQenterPipelineMode + * Put an idle connection in pipeline mode. + * + * Returns 1 on success. On failure, errorMessage is set and 0 is returned. + * + * Commands submitted after this can be pipelined on the connection; + * there's no requirement to wait for one to finish before the next is + * dispatched. + * + * Queuing of a new query or syncing during COPY is not allowed. + * + * A set of commands is terminated by a PQsendPipeline. Multiple pipelines + * can be sent while in pipeline mode. Pipeline mode can be exited + * by calling PQexitPipelineMode() once all results are processed. + * + * This doesn't actually send anything on the wire, it just puts libpq + * into a state where it can pipeline work. + */ +int +PQenterPipelineMode(PGconn *conn) +{ + if (!conn) + return 0; + + /* succeed with no action if already in pipeline mode */ + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + return 1; + + if (conn->asyncStatus != PGASYNC_IDLE) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot enter pipeline mode, connection not idle\n")); + return 0; + } + + conn->pipelineStatus = PQ_PIPELINE_ON; + conn->asyncStatus = PGASYNC_QUEUED; + + return 1; +} + +/* + * PQexitPipelineMode + * End pipeline mode and return to normal command mode. + * + * Returns 1 in success (pipeline mode successfully ended, or not in pipeline + * mode). + * + * Returns 0 if in pipeline mode and cannot be ended yet. Error message will + * be set. + */ +int +PQexitPipelineMode(PGconn *conn) +{ + if (!conn) + return 0; + + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + return 1; + + switch (conn->asyncStatus) + { + case PGASYNC_READY: + case PGASYNC_READY_MORE: + /* there are some uncollected results */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot exit pipeline mode with uncollected results\n")); + return 0; + + case PGASYNC_BUSY: + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot exit pipeline mode while busy\n")); + return 0; + + default: + /* OK */ + break; + } + + /* still work to process */ + if (conn->cmd_queue_head != NULL) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("command queue not clean\n")); + return 0; + } + + conn->pipelineStatus = PQ_PIPELINE_OFF; + conn->asyncStatus = PGASYNC_IDLE; + + /* Flush any pending data in out buffer */ + if (pqFlush(conn) < 0) + return 0; /* error message is setup already */ + return 1; +} + +/* + * pqPipelineProcessQueue: subroutine for PQgetResult + * In pipeline mode, start processing the results of the next query in the queue. + */ +static void +pqPipelineProcessQueue(PGconn *conn) +{ + PGcommandQueueEntry *next_query; + + Assert(conn->pipelineStatus != PQ_PIPELINE_OFF); + + switch (conn->asyncStatus) + { + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + /* should be unreachable */ + appendPQExpBufferStr(&conn->errorMessage, + "internal error: COPY in pipeline mode\n"); + break; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* client still has to process current query or results */ + return; + case PGASYNC_IDLE: + /* should be unreachable */ + appendPQExpBufferStr(&conn->errorMessage, + "internal error, IDLE in pipeline mode\n"); + break; + case PGASYNC_QUEUED: + /* next query please */ + break; + } + + if (conn->cmd_queue_head == NULL) + { + /* + * In pipeline mode but nothing left on the queue; caller can submit + * more work or PQexitPipelineMode() now. + */ + return; + } + + /* + * Pop the next query from the queue and set up the connection state as if + * it'd just been dispatched from a non-pipeline call. + */ + next_query = conn->cmd_queue_head; + conn->cmd_queue_head = next_query->next; + next_query->next = NULL; + + /* Initialize async result-accumulation state */ + pqClearAsyncResult(conn); + + /* + * Reset single-row processing mode. (Client has to set it up for each + * query, if desired.) + */ + conn->singleRowMode = false; + + conn->last_query = next_query->query; + next_query->query = NULL; + conn->queryclass = next_query->queryclass; + + pqRecyclePipelineCmd(conn, next_query); + + if (conn->pipelineStatus == PQ_PIPELINE_ABORTED && + conn->queryclass != PGQUERY_SYNC) + { + /* + * In an aborted pipeline we don't get anything from the server for + * each result; we're just discarding input until we get to the next + * sync from the server. The client needs to know its queries got + * aborted so we create a fake PGresult to return immediately from + * PQgetResult. + */ + conn->result = + PQmakeEmptyPGresult(conn, PGRES_PIPELINE_ABORTED); + if (!conn->result) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("out of memory\n")); + pqSaveErrorResult(conn); + return; + } + conn->asyncStatus = PGASYNC_READY; + } + else + { + /* allow parsing to continue */ + conn->asyncStatus = PGASYNC_BUSY; + } +} + +/* + * PQsendPipeline + * Send a Sync message as part of a pipeline, and flush to server + * + * It's legal to start submitting more commands in the pipeline immediately, + * without waiting for the results of the current pipeline. There's no need to + * end pipeline mode and start it again. + * + * If a command in a pipeline fails, every subsequent command up to and including + * the result to the Sync message sent by PQsendPipeline gets set to + * PGRES_PIPELINE_ABORTED state. If the whole pipeline is processed without + * error, a PGresult with PGRES_PIPELINE_END is produced. + * + * Queries can already have been sent before PQsendPipeline is called, but + * PQsendPipeline need to be called before retrieving command results. + * + * The connection will remain in pipeline mode and unavailable for new + * synchronous command execution functions until all results from the pipeline + * are processed by the client. + */ +int +PQsendPipeline(PGconn *conn) +{ + PGcommandQueueEntry *entry; + + if (!conn) + return 0; + + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("cannot send pipeline when not in pipeline mode\n")); + return 0; + } + + switch (conn->asyncStatus) + { + case PGASYNC_IDLE: + /* should be unreachable */ + appendPQExpBufferStr(&conn->errorMessage, + "internal error: cannot send pipeline when idle\n"); + return 0; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + /* should be unreachable */ + appendPQExpBufferStr(&conn->errorMessage, + "internal error: cannot send pipeline while in COPY\n"); + return 0; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + case PGASYNC_QUEUED: + /* OK to send sync */ + break; + } + + entry = pqMakePipelineCmd(conn); + if (entry == NULL) + return 0; /* error msg already set */ + + entry->queryclass = PGQUERY_SYNC; + entry->query = NULL; + + /* construct the Sync message */ + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + pqAppendPipelineCmd(conn, entry); + + /* + * Give the data a push. In nonblock mode, don't complain if we're unable + * to send it all; PQgetResult() will do any additional flushing needed. + */ + if (PQflush(conn) < 0) + goto sendFailed; + + /* + * Call pqPipelineProcessQueue so the user can call start calling + * PQgetResult. + */ + pqPipelineProcessQueue(conn); + + return 1; + +sendFailed: + pqRecyclePipelineCmd(conn, entry); + /* error message should be set up already */ + return 0; +} + +/* + * pqMakePipelineCmd + * Get a command queue entry for caller to fill. + * + * If the recycle queue has a free element, that is returned; if not, a + * fresh one is allocated. Caller is responsible for adding it to the + * command queue (pqAppendPipelineCmd) once the struct is filled in, or + * releasing the memory (pqRecyclePipelineCmd) if an error occurs. + * + * If allocation fails, sets the error message and returns NULL. + */ +static PGcommandQueueEntry * +pqMakePipelineCmd(PGconn *conn) +{ + PGcommandQueueEntry *entry; + + if (conn->cmd_queue_recycle == NULL) + { + entry = (PGcommandQueueEntry *) malloc(sizeof(PGcommandQueueEntry)); + if (entry == NULL) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("out of memory\n")); + return NULL; + } + } + else + { + entry = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry->next; + } + entry->next = NULL; + entry->query = NULL; + + return entry; +} + +/* + * pqAppendPipelineCmd + * Append a caller-allocated command queue entry to the queue. + * + * The query itself must already have been put in the output buffer by the + * caller. + */ +static void +pqAppendPipelineCmd(PGconn *conn, PGcommandQueueEntry *entry) +{ + PGcommandQueueEntry **tail; + + if (conn->cmd_queue_head == NULL) + tail = &conn->cmd_queue_head; + else + tail = &conn->cmd_queue_tail->next; + + *tail = entry; + conn->cmd_queue_tail = entry; +} + +/* + * pqRecyclePipelineCmd + * Push a command queue entry onto the freelist. It must be an entry + * with null next pointer and not referenced by any other entry's next + * pointer. + */ +static void +pqRecyclePipelineCmd(PGconn *conn, PGcommandQueueEntry *entry) +{ + if (entry == NULL) + return; + + Assert(entry->next == NULL); + + if (entry->query) + free(entry->query); + + entry->next = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry; +} + /* ====== accessor funcs for PGresult ======== */ @@ -3248,6 +3793,23 @@ PQflush(PGconn *conn) return pqFlush(conn); } +/* + * pqPipelineFlush + * + * In pipeline mode, data will be flushed only when the out buffer reaches the + * threshold value. In non-pipeline mode, it behaves as stock pqFlush. + * + * Returns 0 on success. + */ +static int +pqPipelineFlush(PGconn *conn) +{ + if ((conn->pipelineStatus == PQ_PIPELINE_OFF) || + (conn->outCount >= OUTBUFFER_THRESHOLD)) + return (pqFlush(conn)); + return 0; +} + /* * PQfreemem - safely frees memory allocated diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index 6efa53d8b7..b46c6421f4 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -406,6 +406,12 @@ pqParseInput2(PGconn *conn) { char id; + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + fprintf(stderr, "internal error, attempt to read v2 protocol in pipeline mode"); + abort(); + } + /* * Loop to parse successive complete messages available in the buffer. */ diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index e4ee9d69d2..8cc619192b 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -217,10 +217,28 @@ pqParseInput3(PGconn *conn) return; conn->asyncStatus = PGASYNC_READY; break; - case 'Z': /* backend is ready for new query */ + case 'Z': /* sync response, backend is ready for new + * query */ if (getReadyForQuery(conn)) return; - conn->asyncStatus = PGASYNC_IDLE; + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + conn->result = PQmakeEmptyPGresult(conn, + PGRES_PIPELINE_END); + if (!conn->result) + { + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext("out of memory")); + pqSaveErrorResult(conn); + } + else + { + conn->pipelineStatus = PQ_PIPELINE_ON; + conn->asyncStatus = PGASYNC_READY; + } + } + else + conn->asyncStatus = PGASYNC_IDLE; break; case 'I': /* empty query */ if (conn->result == NULL) @@ -875,6 +893,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError) PQExpBufferData workBuf; char id; + /* If in pipeline mode, set error indicator for it */ + if (isError && conn->pipelineStatus != PQ_PIPELINE_OFF) + conn->pipelineStatus = PQ_PIPELINE_ABORTED; + /* * If this is an error message, pre-emptively clear any incomplete query * result we may have. We'd just throw it away below anyway, and diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index effe0ccf85..6fbb4fe82d 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -97,7 +97,10 @@ typedef enum PGRES_NONFATAL_ERROR, /* notice or warning message */ PGRES_FATAL_ERROR, /* query failed */ PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */ - PGRES_SINGLE_TUPLE /* single tuple from larger resultset */ + PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */ + PGRES_PIPELINE_END, /* end of a pipeline */ + PGRES_PIPELINE_ABORTED, /* Command didn't run because of an abort + * earlier in a pipeline */ } ExecStatusType; typedef enum @@ -137,6 +140,16 @@ typedef enum PQPING_NO_ATTEMPT /* connection not attempted (bad params) */ } PGPing; +/* + * PGpipelineStatus - Current status of pipeline mode + */ +typedef enum +{ + PQ_PIPELINE_OFF, + PQ_PIPELINE_ON, + PQ_PIPELINE_ABORTED +} PGpipelineStatus; + /* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to applications. */ @@ -328,6 +341,7 @@ extern int PQserverVersion(const PGconn *conn); extern char *PQerrorMessage(const PGconn *conn); extern int PQsocket(const PGconn *conn); extern int PQbackendPID(const PGconn *conn); +extern PGpipelineStatus PQpipelineStatus(const PGconn *conn); extern int PQconnectionNeedsPassword(const PGconn *conn); extern int PQconnectionUsedPassword(const PGconn *conn); extern int PQclientEncoding(const PGconn *conn); @@ -435,6 +449,11 @@ extern PGresult *PQgetResult(PGconn *conn); extern int PQisBusy(PGconn *conn); extern int PQconsumeInput(PGconn *conn); +/* Routines for pipeline mode management */ +extern int PQenterPipelineMode(PGconn *conn); +extern int PQexitPipelineMode(PGconn *conn); +extern int PQsendPipeline(PGconn *conn); + /* LISTEN/NOTIFY support */ extern PGnotify *PQnotifies(PGconn *conn); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 4db498369c..3838697fd5 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -217,7 +217,12 @@ typedef enum { PGASYNC_IDLE, /* nothing's happening, dude */ PGASYNC_BUSY, /* query in progress */ - PGASYNC_READY, /* result ready for PQgetResult */ + PGASYNC_READY, /* query done, waiting for client to fetch + * result */ + PGASYNC_READY_MORE, /* query done, waiting for client to fetch + * result, more results expected from this + * query */ + PGASYNC_QUEUED, /* query done, more in current pipeline */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */ PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */ @@ -229,7 +234,8 @@ typedef enum PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */ PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */ PGQUERY_PREPARE, /* Parse only (PQprepare) */ - PGQUERY_DESCRIBE /* Describe Statement or Portal */ + PGQUERY_DESCRIBE, /* Describe Statement or Portal */ + PGQUERY_SYNC /* Sync at end of a pipeline */ } PGQueryClass; /* PGSetenvStatusType defines the state of the pqSetenv state machine */ @@ -301,6 +307,22 @@ typedef enum pg_conn_host_type CHT_UNIX_SOCKET } pg_conn_host_type; +/* An entry in the pending command queue. Used by pipeline mode to keep track + * of the expected results of future commands we've dispatched. + * + * Note that entries in this list are reused by being zeroed and appended to + * the tail when popped off the head. The entry with null next pointer is not + * the end of the list of expected commands, that's the tail pointer in + * pg_conn. + */ +typedef struct PGcommandQueueEntry +{ + PGQueryClass queryclass; /* Query type; PGQUERY_SYNC for sync msg */ + char *query; /* SQL command, or NULL if unknown */ + struct PGcommandQueueEntry *next; +} PGcommandQueueEntry; + + /* * pg_conn_host stores all information about each of possibly several hosts * mentioned in the connection string. Most fields are derived by splitting @@ -394,6 +416,7 @@ struct pg_conn bool options_valid; /* true if OK to attempt connection */ bool nonblocking; /* whether this connection is using nonblock * sending semantics */ + PGpipelineStatus pipelineStatus; /* status of pipeline mode */ bool singleRowMode; /* return current query result row-by-row? */ char copy_is_binary; /* 1 = copy binary, 0 = copy text */ int copy_already_done; /* # bytes already returned in COPY OUT */ @@ -406,6 +429,16 @@ struct pg_conn pg_conn_host *connhost; /* details about each named host */ char *connip; /* IP address for current network connection */ + /* + * The command queue, for pipeline mode. + * + * head is the next pending cmd, tail is where we append new commands. + * Freed entries for recycling go on the recycle linked list. + */ + PGcommandQueueEntry *cmd_queue_head; + PGcommandQueueEntry *cmd_queue_tail; + PGcommandQueueEntry *cmd_queue_recycle; + /* Connection data */ pgsocket sock; /* FD for socket, PGINVALID_SOCKET if * unconnected */ @@ -802,6 +835,11 @@ extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t len); */ #define pqIsnonblocking(conn) ((conn)->nonblocking) +/* + * Connection's outbuffer threshold. + */ +#define OUTBUFFER_THRESHOLD 65536 + #ifdef ENABLE_NLS extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1); extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2); diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 5391f461a2..6e31458be2 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -17,6 +17,7 @@ SUBDIRS = \ test_extensions \ test_ginpostinglist \ test_integerset \ + test_libpq \ test_misc \ test_parser \ test_pg_dump \ diff --git a/src/test/modules/test_libpq/.gitignore b/src/test/modules/test_libpq/.gitignore new file mode 100644 index 0000000000..4fbf97a5b0 --- /dev/null +++ b/src/test/modules/test_libpq/.gitignore @@ -0,0 +1,5 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ +/pipeline diff --git a/src/test/modules/test_libpq/Makefile b/src/test/modules/test_libpq/Makefile new file mode 100644 index 0000000000..64a3a4af5e --- /dev/null +++ b/src/test/modules/test_libpq/Makefile @@ -0,0 +1,20 @@ +# src/test/modules/test_libpq/Makefile + +PROGRAM = pipeline +OBJS = pipeline.o + +PG_CPPFLAGS = -I$(libpq_srcdir) +PG_LIBS += $(libpq) + +TAP_TESTS = 1 + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_libpq +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_libpq/README b/src/test/modules/test_libpq/README new file mode 100644 index 0000000000..d8174dd579 --- /dev/null +++ b/src/test/modules/test_libpq/README @@ -0,0 +1 @@ +Test programs and libraries for libpq diff --git a/src/test/modules/test_libpq/pipeline.c b/src/test/modules/test_libpq/pipeline.c new file mode 100644 index 0000000000..5e97285e2b --- /dev/null +++ b/src/test/modules/test_libpq/pipeline.c @@ -0,0 +1,969 @@ +/* + * src/test/modules/test_libpq/pipeline.c + * Verify libpq pipeline execution functionality + */ +#include "postgres_fe.h" + +#include <sys/time.h> + +#include "catalog/pg_type_d.h" +#include "common/fe_memutils.h" +#include "libpq-fe.h" +#include "portability/instr_time.h" + + +static void exit_nicely(PGconn *conn); + +const char *const progname = "pipeline"; + + +#define DEBUG +#ifdef DEBUG +#define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0) +#else +#define pg_debug(...) +#endif + +static const char *const drop_table_sql = +"DROP TABLE IF EXISTS pq_pipeline_demo"; +static const char *const create_table_sql = +"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer);"; +static const char *const insert_sql = +"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1);"; + +/* max char length of an int32, plus sign and null terminator */ +#define MAXINTLEN 12 + +static void +exit_nicely(PGconn *conn) +{ + PQfinish(conn); + exit(1); +} + +/* + * Print an error to stderr and terminate the program. + */ +#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__) +static void +pg_fatal_impl(int line, const char *fmt,...) +{ + va_list args; + + fprintf(stderr, "\n"); /* XXX hack */ + fprintf(stderr, "%s:%d: ", progname, line); + + va_start(args, fmt); + vfprintf(stderr, fmt, args); + va_end(args); + printf("Failure, exiting\n"); + exit(1); +} + +static void +test_disallowed(PGconn *conn) +{ + PGresult *res = NULL; + + fprintf(stderr, "test error cases... "); + + if (PQisnonblocking(conn)) + pg_fatal("Expected blocking connection mode\n"); + + if (PQenterPipelineMode(conn) != 1) + pg_fatal("Unable to enter pipeline mode\n"); + + if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF) + pg_fatal("Pipeline mode not activated properly\n"); + + /* PQexec should fail in pipeline mode */ + res = PQexec(conn, "SELECT 1"); + if (PQresultStatus(res) != PGRES_FATAL_ERROR) + pg_fatal("PQexec should fail in pipeline mode but succeeded\n"); + + /* So should PQsendQuery */ + if (PQsendQuery(conn, "SELECT 1") != 0) + pg_fatal("PQsendQuery should fail in pipeline mode but succeeded\n"); + + /* Entering pipeline mode when already in pipeline mode is OK */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("re-entering pipeline mode should be a no-op but failed\n"); + + if (PQisBusy(conn) != 0) + pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1\n"); + + /* ok, back to normal command mode */ + if (PQexitPipelineMode(conn) != 1) + pg_fatal("couldn't exit idle empty pipeline mode\n"); + + if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF) + pg_fatal("Pipeline mode not terminated properly\n"); + + /* exiting pipeline mode when not in pipeline mode should be a no-op */ + if (PQexitPipelineMode(conn) != 1) + pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed\n"); + + /* can now PQexec again */ + res = PQexec(conn, "SELECT 1"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s\n", + PQerrorMessage(conn)); + + fprintf(stderr, "ok\n"); +} + +static void +test_simple_pipeline(PGconn *conn) +{ + PGresult *res = NULL; + const char *dummy_params[1] = {"1"}; + Oid dummy_param_oids[1] = {INT4OID}; + + fprintf(stderr, "simple pipeline... "); + + /* + * Enter pipeline mode and dispatch a set of operations, which we'll then + * process the results of as they come in. + * + * For a simple case we should be able to do this without interim + * processing of results since our out buffer will give us enough slush to + * work with and we won't block on sending. So blocking mode is fine. + */ + if (PQisnonblocking(conn)) + pg_fatal("Expected blocking connection mode\n"); + + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s\n", PQerrorMessage(conn)); + + if (PQsendQueryParams(conn, "SELECT $1", + 1, dummy_param_oids, dummy_params, + NULL, NULL, 0) != 1) + pg_fatal("dispatching SELECT failed: %s\n", PQerrorMessage(conn)); + + if (PQexitPipelineMode(conn) != 0) + pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded\n"); + + if (PQsendPipeline(conn) != 1) + pg_fatal("Sending pipeline failed: %s\n", PQerrorMessage(conn)); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s\n", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from first pipeline item\n", + PQresStatus(PQresultStatus(res))); + + PQclear(res); + res = NULL; + + if (PQgetResult(conn) != NULL) + pg_fatal("PQgetResult returned something extra after first query result.\n"); + + /* + * Even though we've processed the result there's still a sync to come and + * we can't exit pipeline mode yet + */ + if (PQexitPipelineMode(conn) != 0) + pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly\n"); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_END expected: %s\n", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_PIPELINE_END) + pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_END, error: %s\n", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + + PQclear(res); + res = NULL; + + if (PQgetResult(conn) != NULL) + pg_fatal("PQgetResult returned something extra after pipeline end: %s\n", + PQresStatus(PQresultStatus(res))); + + /* We're still in pipeline mode... */ + if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF) + pg_fatal("Fell out of pipeline mode somehow\n"); + + /* ... until we end it, which we can safely do now */ + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s\n", + PQerrorMessage(conn)); + + if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF) + pg_fatal("Exiting pipeline mode didn't seem to work\n"); + + fprintf(stderr, "ok\n"); +} + +static void +test_multi_pipelines(PGconn *conn) +{ + PGresult *res = NULL; + const char *dummy_params[1] = {"1"}; + Oid dummy_param_oids[1] = {INT4OID}; + + fprintf(stderr, "multi pipeline... "); + + /* + * Queue up a couple of small pipelines and process each without returning + * to command mode first. + */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s\n", PQerrorMessage(conn)); + + if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching first SELECT failed: %s\n", PQerrorMessage(conn)); + + if (PQsendPipeline(conn) != 1) + pg_fatal("Ending first pipeline failed: %s\n", PQerrorMessage(conn)); + + if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching second SELECT failed: %s\n", PQerrorMessage(conn)); + + if (PQsendPipeline(conn) != 1) + pg_fatal("Ending second pipeline failed: %s\n", PQerrorMessage(conn)); + + /* OK, start processing the results */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s\n", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from first pipeline item\n", + PQresStatus(PQresultStatus(res))); + PQclear(res); + res = NULL; + + if (PQgetResult(conn) != NULL) + pg_fatal("PQgetResult returned something extra after first result\n"); + + if (PQexitPipelineMode(conn) != 0) + pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly\n"); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when sync result expected: %s\n", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_PIPELINE_END) + pg_fatal("Unexpected result code %s instead of sync result, error: %s\n", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + + PQclear(res); + + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("Expected null result, got %s\n", + PQresStatus(PQresultStatus(res))); + + /* second pipeline */ + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s\n", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from second pipeline item\n", + PQresStatus(PQresultStatus(res))); + + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("Expected null result, got %s\n", + PQresStatus(PQresultStatus(res))); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s\n", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_PIPELINE_END) + pg_fatal("Unexpected result code %s from second end pipeline\n", + PQresStatus(PQresultStatus(res))); + + /* We're still in pipeline mode ... */ + if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF) + pg_fatal("Fell out of pipeline mode somehow\n"); + + /* until we end it, which we can safely do now */ + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s\n", + PQerrorMessage(conn)); + + if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF) + pg_fatal("exiting pipeline mode didn't seem to work\n"); + + fprintf(stderr, "ok\n"); +} + +/* + * When an operation in a pipeline fails the rest of the pipeline is flushed. We + * still have to get results for each pipeline item, but the item will just be + * a PGRES_PIPELINE_ABORTED code. + * + * This intentionally doesn't use a transaction to wrap the pipeline. You should + * usually use an xact, but in this case we want to observe the effects of each + * statement. + */ +static void +test_aborted_pipeline(PGconn *conn) +{ + PGresult *res = NULL; + const char *dummy_params[1] = {"1"}; + Oid dummy_param_oids[1] = {INT4OID}; + int i; + + fprintf(stderr, "aborted pipeline... "); + + res = PQexec(conn, drop_table_sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("dispatching DROP TABLE failed: %s\n", PQerrorMessage(conn)); + + res = PQexec(conn, create_table_sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("dispatching CREATE TABLE failed: %s\n", PQerrorMessage(conn)); + + /* + * Queue up a couple of small pipelines and process each without returning + * to command mode first. Make sure the second operation in the first + * pipeline ERRORs. + */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s\n", PQerrorMessage(conn)); + + dummy_params[0] = "1"; + if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching first insert failed: %s\n", PQerrorMessage(conn)); + + if (PQsendQueryParams(conn, "SELECT no_such_function($1)", + 1, dummy_param_oids, dummy_params, + NULL, NULL, 0) != 1) + pg_fatal("dispatching error select failed: %s\n", PQerrorMessage(conn)); + + dummy_params[0] = "2"; + if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching second insert failed: %s\n", PQerrorMessage(conn)); + + if (PQsendPipeline(conn) != 1) + pg_fatal("Sending first pipeline failed: %s\n", PQerrorMessage(conn)); + + dummy_params[0] = "3"; + if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching second-pipeline insert failed: %s\n", + PQerrorMessage(conn)); + + if (PQsendPipeline(conn) != 1) + pg_fatal("Ending second pipeline failed: %s\n", PQerrorMessage(conn)); + + /* + * OK, start processing the pipeline results. + * + * We should get a command-ok for the first query, then a fatal error and + * a pipeline aborted message for the second insert, a pipeline-end, then + * a command-ok and a pipeline-ok for the second pipeline operation. + */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("Unexpected result status %s: %s\n", + PQresStatus(PQresultStatus(res)), + PQresultErrorMessage(res)); + PQclear(res); + + /* NULL result to signal end-of-results for this command */ + if ((res = PQgetResult(conn)) != NULL) + pg_fatal("Expected null result, got %s\n", + PQresStatus(PQresultStatus(res))); + + /* Second query caused error, so we expect an error next */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_FATAL_ERROR) + pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s\n", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + /* NULL result to signal end-of-results for this command */ + if ((res = PQgetResult(conn)) != NULL) + pg_fatal("Expected null result, got %s\n", + PQresStatus(PQresultStatus(res))); + + /* + * pipeline should now be aborted. + * + * Note that we could still queue more queries at this point if we wanted; + * they'd get added to a new third pipeline since we've already sent a + * second. The aborted flag relates only to the pipeline being received. + */ + if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED) + pg_fatal("pipeline should be flagged as aborted but isn't\n"); + + /* third query in pipeline, the second insert */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED) + pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s\n", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + /* NULL result to signal end-of-results for this command */ + if ((res = PQgetResult(conn)) != NULL) + pg_fatal("Expected null result, got %s\n", PQresStatus(PQresultStatus(res))); + + if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED) + pg_fatal("pipeline should be flagged as aborted but isn't\n"); + + /* Ensure we're still in pipeline */ + if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF) + pg_fatal("Fell out of pipeline mode somehow\n"); + + /* + * The end of a failed pipeline is a PGRES_PIPELINE_END. + * + * (This is so clients know to start processing results normally again and + * can tell the difference between skipped commands and the sync.) + */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_PIPELINE_END) + pg_fatal("Unexpected result code from first pipeline sync\n" + "Expected PGRES_PIPELINE_END, got %s\n", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + /* XXX why do we have a NULL result after PGRES_PIPELINE_END? */ + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("Expected null result, got %s\n", PQresStatus(PQresultStatus(res))); + + if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED) + pg_fatal("sync should've cleared the aborted flag but didn't\n"); + + /* We're still in pipeline mode... */ + if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF) + pg_fatal("Fell out of pipeline mode somehow\n"); + + /* the insert from the second pipeline */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_fatal("Unexpected result code %s from first item in second pipeline\n", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + /* Read the NULL result at the end of the command */ + if ((res = PQgetResult(conn)) != NULL) + pg_fatal("Expected null result, got %s\n", PQresStatus(PQresultStatus(res))); + + /* the second pipeline sync */ + if ((res = PQgetResult(conn)) == NULL) + pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_PIPELINE_END) + pg_fatal("Unexpected result code %s from second pipeline sync\n", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + if ((res = PQgetResult(conn)) != NULL) + pg_fatal("Expected null result, got %s: %s\n", + PQresStatus(PQresultStatus(res)), + PQerrorMessage(conn)); + + /* We're still in pipeline mode... */ + if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF) + pg_fatal("Fell out of pipeline mode somehow\n"); + + /* until we end it, which we can safely do now */ + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s\n", + PQerrorMessage(conn)); + + if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF) + pg_fatal("exiting pipeline mode didn't seem to work\n"); + + fprintf(stderr, "ok\n"); + + /*- + * Since we fired the pipelines off without a surrounding xact, the results + * should be: + * + * - Implicit xact started by server around 1st pipeline + * - First insert applied + * - Second statement aborted xact + * - Third insert skipped + * - Sync rolled back first implicit xact + * - Implicit xact created by server around 2nd pipeline + * - insert applied from 2nd pipeline + * - Sync commits 2nd xact + * + * So we should only have the value 3 that we inserted. + */ + res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Expected tuples, got %s: %s\n", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + if (PQntuples(res) != 1) + pg_fatal("expected 1 result, got %d\n", PQntuples(res)); + for (i = 0; i < PQntuples(res); i++) + { + const char *val = PQgetvalue(res, i, 0); + + if (strcmp(val, "3") != 0) + pg_fatal("expected only insert with value 3, got %s", val); + } + + PQclear(res); +} + +/* State machine enum for test_pipelined_insert */ +typedef enum PipelineInsertStep +{ + BI_BEGIN_TX, + BI_DROP_TABLE, + BI_CREATE_TABLE, + BI_PREPARE, + BI_INSERT_ROWS, + BI_COMMIT_TX, + BI_SYNC, + BI_DONE +} PipelineInsertStep; + +static void +test_pipelined_insert(PGconn *conn, int n_rows) +{ + const char *insert_params[1]; + Oid insert_param_oids[1] = {INT4OID}; + char insert_param_0[MAXINTLEN]; + PipelineInsertStep send_step = BI_BEGIN_TX, + recv_step = BI_BEGIN_TX; + int rows_to_send, + rows_to_receive; + + insert_params[0] = &insert_param_0[0]; + + rows_to_send = rows_to_receive = n_rows; + + /* + * Do a pipelined insert into a table created at the start of the pipeline + */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s\n", PQerrorMessage(conn)); + + while (send_step != BI_PREPARE) + { + const char *sql; + + switch (send_step) + { + case BI_BEGIN_TX: + sql = "BEGIN TRANSACTION"; + send_step = BI_DROP_TABLE; + break; + + case BI_DROP_TABLE: + sql = drop_table_sql; + send_step = BI_CREATE_TABLE; + break; + + case BI_CREATE_TABLE: + sql = create_table_sql; + send_step = BI_PREPARE; + break; + + default: + pg_fatal("invalid state"); + } + + pg_debug("sending: %s\n", sql); + if (PQsendQueryParams(conn, sql, + 0, NULL, NULL, NULL, NULL, 0) != 1) + pg_fatal("dispatching %s failed: %s\n", sql, PQerrorMessage(conn)); + } + + Assert(send_step == BI_PREPARE); + pg_debug("sending: %s\n", insert_sql); + if (PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids) != 1) + pg_fatal("dispatching PREPARE failed: %s\n", PQerrorMessage(conn)); + send_step = BI_INSERT_ROWS; + + /* + * Now we start inserting. We'll be sending enough data that we could fill + * our out buffer, so to avoid deadlocking we need to enter nonblocking + * mode and consume input while we send more output. As results of each + * query are processed we should pop them to allow processing of the next + * query. There's no need to finish the pipeline before processing + * results. + */ + if (PQsetnonblocking(conn, 1) != 0) + pg_fatal("failed to set nonblocking mode: %s\n", PQerrorMessage(conn)); + + while (recv_step != BI_DONE) + { + int sock; + fd_set input_mask; + fd_set output_mask; + + sock = PQsocket(conn); + + if (sock < 0) + break; /* shouldn't happen */ + + FD_ZERO(&input_mask); + FD_SET(sock, &input_mask); + FD_ZERO(&output_mask); + FD_SET(sock, &output_mask); + + if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0) + { + fprintf(stderr, "select() failed: %s\n", strerror(errno)); + exit_nicely(conn); + } + + /* + * Process any results, so we keep the server's out buffer free + * flowing and it can continue to process input + */ + if (FD_ISSET(sock, &input_mask)) + { + PQconsumeInput(conn); + + /* Read until we'd block if we tried to read */ + while (!PQisBusy(conn) && recv_step < BI_DONE) + { + PGresult *res; + const char *cmdtag; + const char *description = ""; + int status; + + /* + * Read next result. If no more results from this query, + * advance to the next query + */ + res = PQgetResult(conn); + if (res == NULL) + continue; + + status = PGRES_COMMAND_OK; + switch (recv_step) + { + case BI_BEGIN_TX: + cmdtag = "BEGIN"; + recv_step++; + break; + case BI_DROP_TABLE: + cmdtag = "DROP TABLE"; + recv_step++; + break; + case BI_CREATE_TABLE: + cmdtag = "CREATE TABLE"; + recv_step++; + break; + case BI_PREPARE: + cmdtag = ""; + description = "PREPARE"; + recv_step++; + break; + case BI_INSERT_ROWS: + cmdtag = "INSERT"; + rows_to_receive--; + if (rows_to_receive == 0) + recv_step++; + break; + case BI_COMMIT_TX: + cmdtag = "COMMIT"; + recv_step++; + break; + case BI_SYNC: + cmdtag = ""; + description = "SYNC"; + status = PGRES_PIPELINE_END; + recv_step++; + break; + case BI_DONE: + /* unreachable */ + description = ""; + abort(); + } + + if (PQresultStatus(res) != status) + pg_fatal("%s reported status %s, expected %s\n" + "Error message: \"%s\"\n", + description, PQresStatus(PQresultStatus(res)), + PQresStatus(status), PQerrorMessage(conn)); + + if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0) + pg_fatal("%s expected command tag '%s', got '%s'\n", + description, cmdtag, PQcmdStatus(res)); + + pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description); + + PQclear(res); + } + } + + /* Write more rows and/or the end pipeline message, if needed */ + if (FD_ISSET(sock, &output_mask)) + { + PQflush(conn); + + if (send_step == BI_INSERT_ROWS) + { + snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send); + + if (PQsendQueryPrepared(conn, "my_insert", + 1, insert_params, NULL, NULL, 0) == 1) + { + pg_debug("sent row %d\n", rows_to_send); + + rows_to_send--; + if (rows_to_send == 0) + send_step = BI_COMMIT_TX; + } + else + { + /* + * in nonblocking mode, so it's OK for an insert to fail + * to send + */ + fprintf(stderr, "WARNING: failed to send insert #%d: %s\n", + rows_to_send, PQerrorMessage(conn)); + } + } + else if (send_step == BI_COMMIT_TX) + { + if (PQsendQueryParams(conn, "COMMIT", + 0, NULL, NULL, NULL, NULL, 0) == 1) + { + pg_debug("sent COMMIT\n"); + send_step = BI_SYNC; + } + else + { + fprintf(stderr, "WARNING: failed to send commit: %s\n", + PQerrorMessage(conn)); + } + } + else if (send_step == BI_SYNC) + { + if (PQsendPipeline(conn) == 1) + { + fprintf(stdout, "Sent pipeline\n"); + send_step = BI_DONE; + } + else + { + fprintf(stderr, "WARNING: Ending pipeline failed: %s\n", + PQerrorMessage(conn)); + } + } + } + } + + /* We've got the sync message and the pipeline should be done */ + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s\n", + PQerrorMessage(conn)); + + if (PQsetnonblocking(conn, 0) != 0) + pg_fatal("failed to clear nonblocking mode: %s\n", PQerrorMessage(conn)); +} + +static void +test_singlerowmode(PGconn *conn) +{ + PGresult *res; + int i; + bool pipeline_ended = false; + + /* 1 pipeline, 3 queries in it */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s\n", + PQerrorMessage(conn)); + + for (i = 0; i < 3; i++) + { + char *param[1]; + + param[0] = psprintf("%d", 44 + i); + + if (PQsendQueryParams(conn, + "SELECT generate_series(42, $1)", + 1, + NULL, + (const char **) param, + NULL, + NULL, + 0) != 1) + pg_fatal("failed to send query: %s\n", + PQerrorMessage(conn)); + pfree(param[0]); + } + PQsendPipeline(conn); + + for (i = 0; !pipeline_ended; i++) + { + bool first = true; + bool saw_ending_tuplesok; + bool isSingleTuple = false; + + /* Set single row mode for only first 2 SELECT queries */ + if (i < 2) + { + if (PQsetSingleRowMode(conn) != 1) + pg_fatal("PQsetSingleRowMode() failed for i=%d\n", i); + } + + /* Consume rows for this query */ + saw_ending_tuplesok = false; + while ((res = PQgetResult(conn)) != NULL) + { + ExecStatusType est = PQresultStatus(res); + + if (est == PGRES_PIPELINE_END) + { + fprintf(stderr, "end of pipeline reached\n"); + pipeline_ended = true; + PQclear(res); + if (i != 3) + pg_fatal("Expected three results, got %d\n", i); + break; + } + + /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */ + if (first) + { + if (i <= 1 && est != PGRES_SINGLE_TUPLE) + pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s\n", + i, PQresStatus(est)); + if (i >= 2 && est != PGRES_TUPLES_OK) + pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s\n", + i, PQresStatus(est)); + first = false; + } + + fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i); + switch (est) + { + case PGRES_TUPLES_OK: + fprintf(stderr, ", tuples: %d\n", PQntuples(res)); + saw_ending_tuplesok = true; + if (isSingleTuple) + { + if (PQntuples(res) == 0) + fprintf(stderr, "all tuples received in query %d\n", i); + else + pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, " + "but received PGRES_TUPLES_OK directly instead\n"); + } + break; + + case PGRES_SINGLE_TUPLE: + fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0)); + break; + + default: + pg_fatal("unexpected\n"); + } + PQclear(res); + } + if (!pipeline_ended && !saw_ending_tuplesok) + pg_fatal("didn't get expected terminating TUPLES_OK\n"); + } + + if (PQexitPipelineMode(conn) != 1) + pg_fatal("failed to end pipeline mode: %s\n", PQerrorMessage(conn)); +} + +static void +usage(const char *progname) +{ + fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname); + fprintf(stderr, "Usage:\n"); + fprintf(stderr, " %s testname [conninfo [number_of_rows]]\n", progname); + fprintf(stderr, "Tests:\n"); + fprintf(stderr, " disallowed_in_pipeline\n"); + fprintf(stderr, " simple_pipeline\n"); + fprintf(stderr, " multi_pipeline\n"); + fprintf(stderr, " pipeline_abort\n"); + fprintf(stderr, " singlerow\n"); + fprintf(stderr, " pipeline_insert\n"); +} + +int +main(int argc, char **argv) +{ + const char *conninfo = ""; + PGconn *conn; + int numrows = 10000; + + /* + * The testname parameter is mandatory; it can be followed by a conninfo + * string and number of rows. + */ + if (argc < 2 || argc > 4) + { + usage(argv[0]); + exit(1); + } + + if (argc >= 3) + conninfo = pg_strdup(argv[2]); + + if (argc >= 4) + { + errno = 0; + numrows = strtol(argv[3], NULL, 10); + if (errno != 0 || numrows <= 0) + { + fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n", argv[3]); + exit(1); + } + } + + /* Make a connection to the database */ + conn = PQconnectdb(conninfo); + if (PQstatus(conn) != CONNECTION_OK) + { + fprintf(stderr, "Connection to database failed: %s\n", + PQerrorMessage(conn)); + exit_nicely(conn); + } + + if (strcmp(argv[1], "disallowed_in_pipeline") == 0) + test_disallowed(conn); + else if (strcmp(argv[1], "simple_pipeline") == 0) + test_simple_pipeline(conn); + else if (strcmp(argv[1], "multi_pipeline") == 0) + test_multi_pipelines(conn); + else if (strcmp(argv[1], "pipeline_abort") == 0) + test_aborted_pipeline(conn); + else if (strcmp(argv[1], "pipeline_insert") == 0) + test_pipelined_insert(conn, numrows); + else if (strcmp(argv[1], "singlerow") == 0) + test_singlerowmode(conn); + else + { + fprintf(stderr, "\"%s\" is not a recognized test name\n", argv[1]); + usage(argv[0]); + exit(1); + } + + /* close the connection to the database and cleanup */ + PQfinish(conn); + return 0; +} diff --git a/src/test/modules/test_libpq/t/001_libpq_async.pl b/src/test/modules/test_libpq/t/001_libpq_async.pl new file mode 100644 index 0000000000..211629db2d --- /dev/null +++ b/src/test/modules/test_libpq/t/001_libpq_async.pl @@ -0,0 +1,30 @@ +use strict; +use warnings; + +use Config; +use PostgresNode; +use TestLib; +use Test::More tests => 6; +use Cwd; + +my $node = get_new_node('main'); +$node->init; +$node->start; + +my $numrows = 10000; +my @tests = + qw(disallowed_in_pipeline + simple_pipeline + multi_pipeline + pipeline_abort + pipeline_insert + singlerow); +$ENV{PATH} = "$ENV{PATH}:" . getcwd(); +for my $testname (@tests) +{ + $node->command_ok( + [ 'pipeline', $testname, $node->connstr('postgres'), $numrows ], + "pipeline $testname"); +} + +$node->stop('fast'); diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index 49614106dc..d1c019c283 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -50,7 +50,8 @@ my @contrib_excludes = ( 'pgcrypto', 'sepgsql', 'brin', 'test_extensions', 'test_misc', 'test_pg_dump', - 'snapshot_too_old', 'unsafe_tests'); + 'snapshot_too_old', 'unsafe_tests', + 'test_libpq'); # Set of variables for frontend modules my $frontend_defines = { 'initdb' => 'FRONTEND' }; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index bab4f3adb3..c433002c77 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1561,10 +1561,12 @@ PG_Locale_Strategy PG_Lock_Status PG_init_t PGcancel +PGcommandQueueEntry PGconn PGdataValue PGlobjfuncs PGnotify +PGpipelineStatus PGresAttDesc PGresAttValue PGresParamDesc