On Wed, Jan 10, 2024 at 03:40:36PM +0900, Michael Paquier wrote: > Hence, as a whole, wouldn't it be more consistent if the new > PQsendPipelineSync() and the existing PQpipelineSync() call an > internal static routine (PQPipelineSyncInternal?) that can switch > between both modes? Let's just make the extra argument a boolean.
Yeah, I'll go with that after a second look. Attached is what I am finishing with, and I have reproduced some numbers with the pgbench metacommand mentioned upthread, which is reeeaaally nice. I have also made a few edits to the tests. -- Michael
From b653759f8dfdfe83d0d8bc1d4a0ac9d4e272a061 Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@paquier.xyz> Date: Mon, 15 Jan 2024 16:15:00 +0900 Subject: [PATCH v6] Add PQsendPipelineSync() to libpq This new function is equivalent to PQpipelineSync(), except that it does not flush anything to the server; the user must subsequently call PQflush() instead. Its purpose is to reduce the system call overhead of pipeline mode. --- src/interfaces/libpq/exports.txt | 1 + src/interfaces/libpq/fe-exec.c | 41 +++++++++++++++-- src/interfaces/libpq/libpq-fe.h | 1 + .../modules/libpq_pipeline/libpq_pipeline.c | 43 ++++++++++++++++++ .../traces/multi_pipelines.trace | 11 +++++ doc/src/sgml/libpq.sgml | 45 ++++++++++++++++--- 6 files changed, 131 insertions(+), 11 deletions(-) diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 28b861fd93..088592deb1 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -192,3 +192,4 @@ PQclosePortal 189 PQsendClosePrepared 190 PQsendClosePortal 191 PQchangePassword 192 +PQsendPipelineSync 193 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 106d14e6ee..9e7e670921 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -81,6 +81,7 @@ static int PQsendTypedCommand(PGconn *conn, char command, char type, const char *target); static int check_field_number(const PGresult *res, int field_num); static void pqPipelineProcessQueue(PGconn *conn); +static int pqPipelineSyncInternal(PGconn *conn, bool immediate_flush); static int pqPipelineFlush(PGconn *conn); @@ -3224,6 +3225,25 @@ pqPipelineProcessQueue(PGconn *conn) /* * PQpipelineSync * Send a Sync message as part of a pipeline, and flush to server + */ +int +PQpipelineSync(PGconn *conn) +{ + return pqPipelineSyncInternal(conn, true); +} + +/* + * PQsendPipelineSync + * Send a Sync message as part of a pipeline, without flushing to server + */ +int +PQsendPipelineSync(PGconn *conn) +{ + return pqPipelineSyncInternal(conn, false); +} + +/* + * Wrapper for PQpipelineSync and PQsendPipelineSync. * * It's legal to start submitting more commands in the pipeline immediately, * without waiting for the results of the current pipeline. There's no need to @@ -3240,9 +3260,12 @@ pqPipelineProcessQueue(PGconn *conn) * The connection will remain in pipeline mode and unavailable for new * synchronous command execution functions until all results from the pipeline * are processed by the client. + * + * immediate_flush controls if the flush happens immediately after sending the + * Sync message or not. */ -int -PQpipelineSync(PGconn *conn) +static int +pqPipelineSyncInternal(PGconn *conn, bool immediate_flush) { PGcmdQueueEntry *entry; @@ -3288,9 +3311,19 @@ PQpipelineSync(PGconn *conn) /* * Give the data a push. In nonblock mode, don't complain if we're unable * to send it all; PQgetResult() will do any additional flushing needed. + * If immediate_flush is disabled, the data is pushed if we are past the + * size threshold. */ - if (PQflush(conn) < 0) - goto sendFailed; + if (immediate_flush) + { + if (pqFlush(conn) < 0) + goto sendFailed; + } + else + { + if (pqPipelineFlush(conn) < 0) + goto sendFailed; + } /* OK, it's launched! */ pqAppendCmdQueueEntry(conn, entry); diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index f0ec660cb6..defc415fa3 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -474,6 +474,7 @@ extern int PQenterPipelineMode(PGconn *conn); extern int PQexitPipelineMode(PGconn *conn); extern int PQpipelineSync(PGconn *conn); extern int PQsendFlushRequest(PGconn *conn); +extern int PQsendPipelineSync(PGconn *conn); /* LISTEN/NOTIFY support */ extern PGnotify *PQnotifies(PGconn *conn); diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index 71cd04c5f2..c68e20d0b5 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -162,6 +162,7 @@ test_multi_pipelines(PGconn *conn) if (PQenterPipelineMode(conn) != 1) pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + /* first pipeline */ if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, dummy_params, NULL, NULL, 0) != 1) pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn)); @@ -169,6 +170,16 @@ test_multi_pipelines(PGconn *conn) if (PQpipelineSync(conn) != 1) pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn)); + /* second pipeline */ + if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, + dummy_params, NULL, NULL, 0) != 1) + pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn)); + + /* Skip flushing once. */ + if (PQsendPipelineSync(conn) != 1) + pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn)); + + /* third pipeline */ if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, dummy_params, NULL, NULL, 0) != 1) pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn)); @@ -177,6 +188,9 @@ test_multi_pipelines(PGconn *conn) pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); /* OK, start processing the results */ + + /* first pipeline */ + res = PQgetResult(conn); if (res == NULL) pg_fatal("PQgetResult returned null when there's a pipeline item: %s", @@ -206,6 +220,35 @@ test_multi_pipelines(PGconn *conn) /* second pipeline */ + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + PQclear(res); + res = NULL; + + if (PQgetResult(conn) != NULL) + pg_fatal("PQgetResult returned something extra after first result"); + + if (PQexitPipelineMode(conn) != 0) + pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly"); + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when sync result expected: %s", + PQerrorMessage(conn)); + + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("Unexpected result code %s instead of sync result, error: %s", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + PQclear(res); + + /* third pipeline */ + res = PQgetResult(conn); if (res == NULL) pg_fatal("PQgetResult returned null when there's a pipeline item: %s", diff --git a/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace index 4b9ab07ca4..1ee21f61dc 100644 --- a/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace +++ b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace @@ -8,6 +8,17 @@ F 19 Bind "" "" 0 1 1 '1' 1 0 F 6 Describe P "" F 9 Execute "" 0 F 4 Sync +F 21 Parse "" "SELECT $1" 1 NNNN +F 19 Bind "" "" 0 1 1 '1' 1 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 4 Sync +B 4 ParseComplete +B 4 BindComplete +B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 +B 11 DataRow 1 1 '1' +B 13 CommandComplete "SELECT 1" +B 5 ReadyForQuery I B 4 ParseComplete B 4 BindComplete B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 21195e0e72..6d1d477276 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -3547,8 +3547,9 @@ ExecStatusType PQresultStatus(const PGresult *res); <listitem> <para> The <structname>PGresult</structname> represents a - synchronization point in pipeline mode, requested by - <xref linkend="libpq-PQpipelineSync"/>. + synchronization point in pipeline mode, requested by either + <xref linkend="libpq-PQpipelineSync"/> or + <xref linkend="libpq-PQsendPipelineSync"/>. This status occurs only when pipeline mode has been selected. </para> </listitem> @@ -5122,7 +5123,8 @@ int PQsendClosePortal(PGconn *conn, const char *portalName); <xref linkend="libpq-PQsendDescribePrepared"/>, <xref linkend="libpq-PQsendDescribePortal"/>, <xref linkend="libpq-PQsendClosePrepared"/>, - <xref linkend="libpq-PQsendClosePortal"/>, or + <xref linkend="libpq-PQsendClosePortal"/>, + <xref linkend="libpq-PQsendPipelineSync"/>, or <xref linkend="libpq-PQpipelineSync"/> call, and returns it. A null pointer is returned when the command is complete and there @@ -5507,8 +5509,9 @@ int PQflush(PGconn *conn); client sends them. The server will begin executing the commands in the pipeline immediately, not waiting for the end of the pipeline. Note that results are buffered on the server side; the server flushes - that buffer when a synchronization point is established with - <function>PQpipelineSync</function>, or when + that buffer when a synchronization point is established with either + <function>PQpipelineSync</function> or + <function>PQsendPipelineSync</function>, or when <function>PQsendFlushRequest</function> is called. If any statement encounters an error, the server aborts the current transaction and does not execute any subsequent command in the queue @@ -5565,7 +5568,8 @@ int PQflush(PGconn *conn); <type>PGresult</type> types <literal>PGRES_PIPELINE_SYNC</literal> and <literal>PGRES_PIPELINE_ABORTED</literal>. <literal>PGRES_PIPELINE_SYNC</literal> is reported exactly once for each - <function>PQpipelineSync</function> at the corresponding point + <function>PQpipelineSync</function> or + <function>PQsendPipelineSync</function> at the corresponding point in the pipeline. <literal>PGRES_PIPELINE_ABORTED</literal> is emitted in place of a normal query result for the first error and all subsequent results @@ -5603,7 +5607,8 @@ int PQflush(PGconn *conn); <function>PQresultStatus</function> will report a <literal>PGRES_PIPELINE_ABORTED</literal> result for each remaining queued operation in an aborted pipeline. The result for - <function>PQpipelineSync</function> is reported as + <function>PQpipelineSync</function> or + <function>PQsendPipelineSync</function> is reported as <literal>PGRES_PIPELINE_SYNC</literal> to signal the end of the aborted pipeline and resumption of normal result processing. </para> @@ -5835,6 +5840,32 @@ int PQsendFlushRequest(PGconn *conn); </para> </listitem> </varlistentry> + + <varlistentry id="libpq-PQsendPipelineSync"> + <term><function>PQsendPipelineSync</function><indexterm><primary>PQsendPipelineSync</primary></indexterm></term> + + <listitem> + <para> + Marks a synchronization point in a pipeline by sending a + <link linkend="protocol-flow-ext-query">sync message</link> + without flushing the send buffer. This serves as + the delimiter of an implicit transaction and an error recovery + point; see <xref linkend="libpq-pipeline-errors"/>. + +<synopsis> +int PQsendPipelineSync(PGconn *conn); +</synopsis> + </para> + <para> + Returns 1 for success. Returns 0 if the connection is not in + pipeline mode or sending a + <link linkend="protocol-flow-ext-query">sync message</link> + failed. + Note that the message is not itself flushed to the server automatically; + use <function>PQflush</function> if necessary. + </para> + </listitem> + </varlistentry> </variablelist> </sect2> -- 2.43.0
signature.asc
Description: PGP signature