Laurenz Albe wrote: > Here is the code review for patch number 2:
> +static void > +CloseGOutput(FILE *gfile_fout, bool is_pipe) > > It makes sense to factor out this code. > But shouldn't these functions have a prototype at the beginning of the file? Looking at the other static functions in psql/common.c, there are 22 of them but only 3 have prototypes at the top of the file. These 3 functions are called before being defined, so these prototypes are mandatory. The other static functions that are defined before being called happen not to have forward declarations, so SetupGOutput() and CloseGOutput() follow that model. > Here is a suggestion for a consolidated comment: > > Fetch the result in chunks if FETCH_COUNT is set. We don't enable chunking > if SHOW_ALL_RESULTS is false, since that requires us to accumulate all rows > before we can tell what should be displayed, which would counter the idea > of FETCH_COUNT. Chunk fetching is also disabled if \gset, \crosstab, > \gexec and \watch are used. OK, done like that. > > + if (fetch_count > 0 && result_status == PGRES_TUPLES_CHUNK) > > Could it be that result_status == PGRES_TUPLES_CHUNK, but fetch_count is 0? > if not, perhaps there should be an Assert that verifies that, and the "if" > statement should only check for the latter condition. Good point. In fact it can be simplified to if (result_status == PGRES_TUPLES_CHUNK), and fetch_count as a variable can be removed from the function. Done that way. > > --- a/src/bin/psql/t/001_basic.pl > > +++ b/src/bin/psql/t/001_basic.pl > > @@ -184,10 +184,10 @@ like( > > "\\set FETCH_COUNT 1\nSELECT error;\n\\errverbose", > > on_error_stop => 0))[2], > > qr/\A^psql:<stdin>:2: ERROR: .*$ > > -^LINE 2: SELECT error;$ > > +^LINE 1: SELECT error;$ > > ^ *^.*$ > > ^psql:<stdin>:3: error: ERROR: [0-9A-Z]{5}: .*$ > > -^LINE 2: SELECT error;$ > > +^LINE 1: SELECT error;$ > > Why does the output change? Perhaps there is a good and harmless > explanation, but the naïve expectation would be that it doesn't. Unpatched, psql builds this query: DECLARE _psql_cursor NO SCROLL CURSOR FOR \n <user-query> therefore the user query starts at line 2. With the patch, the user query is sent as-is, starting at line1, hence the different error location. > After fixing the problem manually, it builds without warning. > The regression tests pass, and the feature works as expected. Thanks for testing. Updated patches are attached. Best regards, -- Daniel Vérité https://postgresql.verite.pro/ Twitter: @DanielVerite
From 0fefaee7d5b3003ad0d089ea9e92675c6f50245f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20V=C3=A9rit=C3=A9?= <dan...@manitou-mail.org> Date: Mon, 1 Apr 2024 19:46:20 +0200 Subject: [PATCH v7 1/2] Implement retrieval of results in chunks with libpq. This mode is similar to the single-row mode except that chunks of results contain up to N rows instead of a single row. It is meant to reduce the overhead of the row-by-row allocations for large result sets. The mode is selected with PQsetChunkedRowsMode(int maxRows) and results have the new status code PGRES_TUPLES_CHUNK. --- doc/src/sgml/libpq.sgml | 98 +++++++++++---- .../libpqwalreceiver/libpqwalreceiver.c | 1 + src/bin/pg_amcheck/pg_amcheck.c | 1 + src/interfaces/libpq/exports.txt | 1 + src/interfaces/libpq/fe-exec.c | 117 +++++++++++++++--- src/interfaces/libpq/libpq-fe.h | 4 +- src/interfaces/libpq/libpq-int.h | 7 +- 7 files changed, 185 insertions(+), 44 deletions(-) diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index d3e87056f2..1814921d5a 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -3545,7 +3545,20 @@ ExecStatusType PQresultStatus(const PGresult *res); The <structname>PGresult</structname> contains a single result tuple from the current command. This status occurs only when single-row mode has been selected for the query - (see <xref linkend="libpq-single-row-mode"/>). + (see <xref linkend="libpq-chunked-results-modes"/>). + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-pgres-tuples-chunk"> + <term><literal>PGRES_TUPLES_CHUNK</literal></term> + <listitem> + <para> + The <structname>PGresult</structname> contains several tuples + from the current command. The count of tuples cannot exceed + the maximum passed to <xref linkend="libpq-PQsetChunkedRowsMode"/>. + This status occurs only when the chunked mode has been selected + for the query (see <xref linkend="libpq-chunked-results-modes"/>). </para> </listitem> </varlistentry> @@ -5197,8 +5210,8 @@ PGresult *PQgetResult(PGconn *conn); <para> Another frequently-desired feature that can be obtained with <xref linkend="libpq-PQsendQuery"/> and <xref linkend="libpq-PQgetResult"/> - is retrieving large query results a row at a time. This is discussed - in <xref linkend="libpq-single-row-mode"/>. + is retrieving large query results a limited number of rows at a time. This is discussed + in <xref linkend="libpq-chunked-results-modes"/>. </para> <para> @@ -5562,12 +5575,13 @@ int PQflush(PGconn *conn); </para> <para> - To enter single-row mode, call <function>PQsetSingleRowMode</function> - before retrieving results with <function>PQgetResult</function>. - This mode selection is effective only for the query currently - being processed. For more information on the use of - <function>PQsetSingleRowMode</function>, - refer to <xref linkend="libpq-single-row-mode"/>. + To enter single-row or chunked modes, call + respectively <function>PQsetSingleRowMode</function> + or <function>PQsetChunkedRowsMode</function> before retrieving results + with <function>PQgetResult</function>. This mode selection is effective + only for the query currently being processed. For more information on the + use of these functions refer + to <xref linkend="libpq-chunked-results-modes" />. </para> <para> @@ -5934,10 +5948,10 @@ UPDATE mytable SET x = x + 1 WHERE id = 42; </sect2> </sect1> - <sect1 id="libpq-single-row-mode"> - <title>Retrieving Query Results Row-by-Row</title> + <sect1 id="libpq-chunked-results-modes"> + <title>Retrieving Query Results in chunks</title> - <indexterm zone="libpq-single-row-mode"> + <indexterm zone="libpq-chunked-results-modes"> <primary>libpq</primary> <secondary>single-row mode</secondary> </indexterm> @@ -5948,13 +5962,15 @@ UPDATE mytable SET x = x + 1 WHERE id = 42; <structname>PGresult</structname>. This can be unworkable for commands that return a large number of rows. For such cases, applications can use <xref linkend="libpq-PQsendQuery"/> and <xref linkend="libpq-PQgetResult"/> in - <firstterm>single-row mode</firstterm>. In this mode, the result row(s) are - returned to the application one at a time, as they are received from the - server. + <firstterm>single-row mode</firstterm> or <firstterm>chunked mode</firstterm>. + In these modes, the result row(s) are returned to the application one at a + time for the single-row mode and by chunks for the chunked mode, as they + are received from the server. </para> <para> - To enter single-row mode, call <xref linkend="libpq-PQsetSingleRowMode"/> + To enter these modes, call <xref linkend="libpq-PQsetSingleRowMode"/> + or <xref linkend="libpq-PQsetChunkedRowsMode"/> immediately after a successful call of <xref linkend="libpq-PQsendQuery"/> (or a sibling function). This mode selection is effective only for the currently executing query. Then call <xref linkend="libpq-PQgetResult"/> @@ -5962,7 +5978,8 @@ UPDATE mytable SET x = x + 1 WHERE id = 42; linkend="libpq-async"/>. If the query returns any rows, they are returned as individual <structname>PGresult</structname> objects, which look like normal query results except for having status code - <literal>PGRES_SINGLE_TUPLE</literal> instead of + <literal>PGRES_SINGLE_TUPLE</literal> for the single-row mode and + <literal>PGRES_TUPLES_CHUNK</literal> for the chunked mode, instead of <literal>PGRES_TUPLES_OK</literal>. After the last row, or immediately if the query returns zero rows, a zero-row object with status <literal>PGRES_TUPLES_OK</literal> is returned; this is the signal that no @@ -5975,9 +5992,9 @@ UPDATE mytable SET x = x + 1 WHERE id = 42; </para> <para> - When using pipeline mode, single-row mode needs to be activated for each - query in the pipeline before retrieving results for that query - with <function>PQgetResult</function>. + When using pipeline mode, the single-row or chunked mode need to be + activated for each query in the pipeline before retrieving results for that + query with <function>PQgetResult</function>. See <xref linkend="libpq-pipeline-mode"/> for more information. </para> @@ -6011,14 +6028,49 @@ int PQsetSingleRowMode(PGconn *conn); </variablelist> </para> + <para> + <variablelist> + <varlistentry id="libpq-PQsetChunkedRowsMode"> + <term><function>PQsetChunkedRowsMode</function> + <indexterm><primary>PQsetChunkedRowsMode</primary></indexterm></term> + <listitem> + <para> + Select to receive the results for the currently-executing query in chunks. + +<synopsis> + int PQsetChunkedRowsMode(PGconn *conn, + int maxRows); +</synopsis> + </para> + + <para> + This function is similar to <xref linkend="libpq-PQsetSingleRowMode"/>, + except that it can retrieve <replaceable>maxRows</replaceable> rows + per call to <xref linkend="libpq-PQgetResult"/> instead of a single row. + This function can only be called immediately after + <xref linkend="libpq-PQsendQuery"/> or one of its sibling functions, + before any other operation on the connection such as + <xref linkend="libpq-PQconsumeInput"/> or + <xref linkend="libpq-PQgetResult"/>. If called at the correct time, + the function activates the chunked mode for the current query and + returns 1. Otherwise the mode stays unchanged and the function + returns 0. In any case, the mode reverts to normal after + completion of the current query. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + <caution> <para> While processing a query, the server may return some rows and then encounter an error, causing the query to be aborted. Ordinarily, <application>libpq</application> discards any such rows and reports only the - error. But in single-row mode, those rows will have already been - returned to the application. Hence, the application will see some - <literal>PGRES_SINGLE_TUPLE</literal> <structname>PGresult</structname> + error. But in the single-row or chunked modes, those rows will have already + been returned to the application. Hence, the application will see some + <literal>PGRES_SINGLE_TUPLE</literal> or <literal>PGRES_TUPLES_CHUNK</literal> + <structname>PGresult</structname> objects followed by a <literal>PGRES_FATAL_ERROR</literal> object. For proper transactional behavior, the application must be designed to discard or undo whatever has been done with the previously-processed diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 761bf0f677..83a465a390 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -1249,6 +1249,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, switch (PQresultStatus(pgres)) { case PGRES_SINGLE_TUPLE: + case PGRES_TUPLES_CHUNK: case PGRES_TUPLES_OK: walres->status = WALRCV_OK_TUPLES; libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes); diff --git a/src/bin/pg_amcheck/pg_amcheck.c b/src/bin/pg_amcheck/pg_amcheck.c index e5f9eedc47..728305a7cf 100644 --- a/src/bin/pg_amcheck/pg_amcheck.c +++ b/src/bin/pg_amcheck/pg_amcheck.c @@ -989,6 +989,7 @@ should_processing_continue(PGresult *res) case PGRES_COPY_IN: case PGRES_COPY_BOTH: case PGRES_SINGLE_TUPLE: + case PGRES_TUPLES_CHUNK: case PGRES_PIPELINE_SYNC: case PGRES_PIPELINE_ABORTED: return false; diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 9fbd3d3407..c7d01958ab 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -202,3 +202,4 @@ PQcancelSocket 199 PQcancelErrorMessage 200 PQcancelReset 201 PQcancelFinish 202 +PQsetChunkedRowsMode 203 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index c02a9180b2..b9a73b583e 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -41,7 +41,8 @@ char *const pgresStatus[] = { "PGRES_COPY_BOTH", "PGRES_SINGLE_TUPLE", "PGRES_PIPELINE_SYNC", - "PGRES_PIPELINE_ABORTED" + "PGRES_PIPELINE_ABORTED", + "PGRES_TUPLES_CHUNK" }; /* We return this if we're unable to make a PGresult at all */ @@ -83,7 +84,7 @@ static int check_field_number(const PGresult *res, int field_num); static void pqPipelineProcessQueue(PGconn *conn); static int pqPipelineSyncInternal(PGconn *conn, bool immediate_flush); static int pqPipelineFlush(PGconn *conn); - +static bool canChangeRowMode(PGconn *conn); /* ---------------- * Space management for PGresult. @@ -200,6 +201,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) case PGRES_COPY_IN: case PGRES_COPY_BOTH: case PGRES_SINGLE_TUPLE: + case PGRES_TUPLES_CHUNK: /* non-error cases */ break; default: @@ -913,8 +915,9 @@ pqPrepareAsyncResult(PGconn *conn) /* * Replace conn->result with next_result, if any. In the normal case * there isn't a next result and we're just dropping ownership of the - * current result. In single-row mode this restores the situation to what - * it was before we created the current single-row result. + * current result. In single-row and chunked modes this restores the + * situation to what it was before we created the current single-row or + * chunk-of-rows result. */ conn->result = conn->next_result; conn->error_result = false; /* next_result is never an error */ @@ -1200,10 +1203,11 @@ pqSaveParameterStatus(PGconn *conn, const char *name, const char *value) * (Such a string should already be translated via libpq_gettext().) * If it is left NULL, the error is presumed to be "out of memory". * - * In single-row mode, we create a new result holding just the current row, - * stashing the previous result in conn->next_result so that it becomes - * active again after pqPrepareAsyncResult(). This allows the result metadata - * (column descriptions) to be carried forward to each result row. + * In single-row or chunked mode, we create a new result holding just the + * current set of rows, stashing the previous result in conn->next_result so + * that it becomes active again after pqPrepareAsyncResult(). This allows the + * result metadata (column descriptions) to be carried forward to each result + * row. */ int pqRowProcessor(PGconn *conn, const char **errmsgp) @@ -1228,6 +1232,28 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) if (!res) return 0; } + else if (conn->rowsChunkSize > 0) + { + /* + * In chunked mode, make a new PGresult that will hold N rows; the + * original conn->result is left unchanged, as in the single-row mode. + */ + if (!conn->chunk_result) + { + /* Allocate and initialize the result to hold a chunk of rows */ + res = PQcopyResult(res, + PG_COPYRES_ATTRS | PG_COPYRES_EVENTS | + PG_COPYRES_NOTICEHOOKS); + if (!res) + return 0; + /* Change result status to special chunk-of-rows value */ + res->resultStatus = PGRES_TUPLES_CHUNK; + /* Keep this result to reuse for the next rows of the chunk */ + conn->chunk_result = res; + } + else + res = conn->chunk_result; /* Use the current chunk */ + } /* * Basically we just allocate space in the PGresult for each field and @@ -1290,6 +1316,21 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) conn->asyncStatus = PGASYNC_READY_MORE; } + /* + * In chunked mode, if the count has reached the requested limit, make the + * rows of the current chunk available immediately. + */ + else if (conn->rowsChunkSize > 0 && res->ntups >= conn->rowsChunkSize) + { + /* Stash old result for re-use later */ + conn->next_result = conn->result; + conn->result = res; + /* Do not reuse that chunk of results */ + conn->chunk_result = NULL; + /* And mark the result ready to return */ + conn->asyncStatus = PGASYNC_READY_MORE; + } + return 1; fail: @@ -1745,8 +1786,9 @@ PQsendQueryStart(PGconn *conn, bool newQuery) */ pqClearAsyncResult(conn); - /* reset single-row processing mode */ + /* reset row-by-row and chunked processing modes */ conn->singleRowMode = false; + conn->rowsChunkSize = 0; } /* ready to send command message */ @@ -1930,25 +1972,51 @@ sendFailed: */ int PQsetSingleRowMode(PGconn *conn) +{ + if (canChangeRowMode(conn)) + { + conn->singleRowMode = true; + return 1; + } + else + return 0; +} + +/* + * Select chunked results processing mode + */ +int +PQsetChunkedRowsMode(PGconn *conn, int chunkSize) +{ + if (chunkSize >= 0 && canChangeRowMode(conn)) + { + conn->rowsChunkSize = chunkSize; + return 1; + } + else + return 0; +} + +static +bool +canChangeRowMode(PGconn *conn) { /* - * Only allow setting the flag when we have launched a query and not yet - * received any results. + * Only allow setting the row-by-row or by-chunks modes when we have + * launched a query and not yet received any results. */ if (!conn) - return 0; + return false; if (conn->asyncStatus != PGASYNC_BUSY) - return 0; + return false; if (!conn->cmd_queue_head || (conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE && conn->cmd_queue_head->queryclass != PGQUERY_EXTENDED)) - return 0; + return false; if (pgHavePendingResult(conn)) - return 0; + return false; - /* OK, set flag */ - conn->singleRowMode = true; - return 1; + return true; } /* @@ -2115,6 +2183,16 @@ PQgetResult(PGconn *conn) break; case PGASYNC_READY: + /* + * If there is a pending chunk of results, return it + */ + if (conn->chunk_result != NULL) + { + res = conn->chunk_result; + conn->chunk_result = NULL; + break; + } + res = pqPrepareAsyncResult(conn); /* Advance the queue as appropriate */ @@ -3173,10 +3251,11 @@ pqPipelineProcessQueue(PGconn *conn) } /* - * Reset single-row processing mode. (Client has to set it up for each + * Reset to full result sets mode. (Client has to set it up for each * query, if desired.) */ conn->singleRowMode = false; + conn->rowsChunkSize = 0; /* * If there are no further commands to process in the queue, get us in diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 09b485bd2b..0cea4f6b5b 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -112,8 +112,9 @@ typedef enum PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */ PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */ PGRES_PIPELINE_SYNC, /* pipeline synchronization point */ - PGRES_PIPELINE_ABORTED /* Command didn't run because of an abort + PGRES_PIPELINE_ABORTED, /* Command didn't run because of an abort * earlier in a pipeline */ + PGRES_TUPLES_CHUNK /* set of tuples from larger resultset */ } ExecStatusType; typedef enum @@ -489,6 +490,7 @@ extern int PQsendQueryPrepared(PGconn *conn, const int *paramFormats, int resultFormat); extern int PQsetSingleRowMode(PGconn *conn); +extern int PQsetChunkedRowsMode(PGconn *conn, int chunkSize); extern PGresult *PQgetResult(PGconn *conn); /* Routines for managing an asynchronous query */ diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 9c05f11a6e..23c7a399ab 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -435,6 +435,8 @@ struct pg_conn * sending semantics */ PGpipelineStatus pipelineStatus; /* status of pipeline mode */ bool singleRowMode; /* return current query result row-by-row? */ + int rowsChunkSize; /* non-zero to return query results by chunks + * not exceeding that number of rows */ char copy_is_binary; /* 1 = copy binary, 0 = copy text */ int copy_already_done; /* # bytes already returned in COPY OUT */ PGnotify *notifyHead; /* oldest unreported Notify msg */ @@ -540,7 +542,10 @@ struct pg_conn */ PGresult *result; /* result being constructed */ bool error_result; /* do we need to make an ERROR result? */ - PGresult *next_result; /* next result (used in single-row mode) */ + PGresult *next_result; /* next result (used in single-row and + * by-chunks modes) */ + PGresult *chunk_result; /* current chunk of results (limited to + * rowsChunkSize) */ /* Assorted state for SASL, SSL, GSS, etc */ const pg_fe_sasl_mech *sasl; -- 2.34.1
From 33f043aeaf3969e66f6c80af6ef6ea27499b4740 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20V=C3=A9rit=C3=A9?= <dan...@manitou-mail.org> Date: Mon, 1 Apr 2024 19:46:42 +0200 Subject: [PATCH v7 2/2] Reimplement FETCH_COUNT with the chunked mode in libpq. Cursors were used only when the command starts with the keyword "SELECT", excluding queries that start with "WITH" or "UPDATE" or "INSERT" that may also return large result sets. --- src/bin/psql/common.c | 538 ++++++++++------------------- src/bin/psql/t/001_basic.pl | 6 +- src/test/regress/expected/psql.out | 9 +- src/test/regress/sql/psql.sql | 4 +- 4 files changed, 189 insertions(+), 368 deletions(-) diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c index 2830bde495..2112e1a423 100644 --- a/src/bin/psql/common.c +++ b/src/bin/psql/common.c @@ -31,7 +31,6 @@ #include "settings.h" static bool DescribeQuery(const char *query, double *elapsed_msec); -static bool ExecQueryUsingCursor(const char *query, double *elapsed_msec); static int ExecQueryAndProcessResults(const char *query, double *elapsed_msec, bool *svpt_gone_p, @@ -40,8 +39,6 @@ static int ExecQueryAndProcessResults(const char *query, const printQueryOpt *opt, FILE *printQueryFout); static bool command_no_begin(const char *query); -static bool is_select_command(const char *query); - /* * openQueryOutputFile --- attempt to open a query output file @@ -373,6 +370,7 @@ AcceptResult(const PGresult *result, bool show_error) { case PGRES_COMMAND_OK: case PGRES_TUPLES_OK: + case PGRES_TUPLES_CHUNK: case PGRES_EMPTY_QUERY: case PGRES_COPY_IN: case PGRES_COPY_OUT: @@ -1135,16 +1133,10 @@ SendQuery(const char *query) /* Describe query's result columns, without executing it */ OK = DescribeQuery(query, &elapsed_msec); } - else if (pset.fetch_count <= 0 || pset.gexec_flag || - pset.crosstab_flag || !is_select_command(query)) - { - /* Default fetch-it-all-and-print mode */ - OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0); - } else { - /* Fetch-in-segments mode */ - OK = ExecQueryUsingCursor(query, &elapsed_msec); + /* Default fetch-and-print mode */ + OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0); } if (!OK && pset.echo == PSQL_ECHO_ERRORS) @@ -1396,6 +1388,47 @@ DescribeQuery(const char *query, double *elapsed_msec) return OK; } +/* + * Check if an output stream for \g needs to be opened, and if + * yes, open it. + * Return false if an error occurred, true otherwise. + */ +static bool +SetupGOutput(PGresult *result, FILE **gfile_fout, bool *is_pipe) +{ + ExecStatusType status = PQresultStatus(result); + if (pset.gfname != NULL && /* there is a \g file or program */ + *gfile_fout == NULL && /* and it's not already opened */ + (status == PGRES_TUPLES_OK || + status == PGRES_TUPLES_CHUNK || + status == PGRES_COPY_OUT)) + { + if (openQueryOutputFile(pset.gfname, gfile_fout, is_pipe)) + { + if (is_pipe) + disable_sigpipe_trap(); + } + else + return false; + } + return true; +} + +static void +CloseGOutput(FILE *gfile_fout, bool is_pipe) +{ + /* close \g file if we opened it */ + if (gfile_fout) + { + if (is_pipe) + { + SetShellResultVariables(pclose(gfile_fout)); + restore_sigpipe_trap(); + } + else + fclose(gfile_fout); + } +} /* * ExecQueryAndProcessResults: utility function for use by SendQuery() @@ -1429,9 +1462,14 @@ ExecQueryAndProcessResults(const char *query, instr_time before, after; PGresult *result; + FILE *gfile_fout = NULL; bool gfile_is_pipe = false; + int64 total_tuples = 0; + int flush_error = 0; + bool is_pager = false; + if (timing) INSTR_TIME_SET_CURRENT(before); else @@ -1454,6 +1492,23 @@ ExecQueryAndProcessResults(const char *query, return -1; } + /* + * Fetch the result in chunks if FETCH_COUNT is set. + * We don't enable chunking if SHOW_ALL_RESULTS is false, since that + * requires us to accumulate all rows before we can tell what should be + * displayed, which would counter the idea of FETCH_COUNT. + * Chunk fetching is also disabled if \gset, \crosstab, \gexec and \watch + * are used. + */ + if (pset.fetch_count > 0 && !pset.crosstab_flag && !pset.gexec_flag && !is_watch + && !pset.gset_prefix && pset.show_all_results) + { + if (!PQsetChunkedRowsMode(pset.db, pset.fetch_count)) + { + pg_log_warning("fetching results in chunks mode is unavailable"); + } + } + /* * If SIGINT is sent while the query is processing, the interrupt will be * consumed. The user's intention, though, is to cancel the entire watch @@ -1477,6 +1532,8 @@ ExecQueryAndProcessResults(const char *query, ExecStatusType result_status; PGresult *next_result; bool last; + /* whether the output starts before results are fully fetched */ + bool partial_display = false; if (!AcceptResult(result, false)) { @@ -1572,20 +1629,9 @@ ExecQueryAndProcessResults(const char *query, } else if (pset.gfname) { - /* send to \g file, which we may have opened already */ - if (gfile_fout == NULL) - { - if (openQueryOutputFile(pset.gfname, - &gfile_fout, &gfile_is_pipe)) - { - if (gfile_is_pipe) - disable_sigpipe_trap(); - copy_stream = gfile_fout; - } - else - success = false; - } - else + /* COPY followed by \g filename or \g |program */ + success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe); + if (success) copy_stream = gfile_fout; } else @@ -1603,6 +1649,90 @@ ExecQueryAndProcessResults(const char *query, success &= HandleCopyResult(&result, copy_stream); } + if (result_status == PGRES_TUPLES_CHUNK) + { + FILE *tuples_fout = printQueryFout ? printQueryFout : stdout; + printQueryOpt my_popt = pset.popt; + + total_tuples = 0; + partial_display = true; + + success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe); + if (gfile_fout) + tuples_fout = gfile_fout; + + /* initialize print options for partial table output */ + my_popt.topt.start_table = true; + my_popt.topt.stop_table = false; + my_popt.topt.prior_records = 0; + + while (success) + { + /* pager: open at most once per resultset */ + if (tuples_fout == stdout && !is_pager) + { + tuples_fout = PageOutput(INT_MAX, &(my_popt.topt)); + is_pager = true; + } + /* display the current chunk of results unless the output stream is not working */ + if (!flush_error) + { + printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile); + flush_error = fflush(tuples_fout); + } + + /* after the first result set, disallow header decoration */ + my_popt.topt.start_table = false; + my_popt.topt.prior_records += PQntuples(result); + total_tuples += PQntuples(result); + + ClearOrSaveResult(result); + + result = PQgetResult(pset.db); + if (result == NULL) + { + /* + * Error. We expect a PGRES_TUPLES_OK result with + * zero tuple in it to finish the fetch sequence. + */ + success = false; + if (is_pager) + ClosePager(tuples_fout); + break; + } + else if (PQresultStatus(result) == PGRES_TUPLES_OK) + { + /* + * The last row has been read. Display the footer. + */ + my_popt.topt.stop_table = true; + printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile); + total_tuples += PQntuples(result); + + if (is_pager) + ClosePager(tuples_fout); + ClearOrSaveResult(result); + result = NULL; + break; + } + else if (PQresultStatus(result) != PGRES_TUPLES_CHUNK) + { + /* + * Error. We expect either PGRES_TUPLES_CHUNK or + * PGRES_TUPLES_OK. + */ + if (is_pager) + ClosePager(tuples_fout); + success = false; + AcceptResult(result, true); /* display error whenever appropriate */ + SetResultVariables(result, success); + break; + } + } + } + else + partial_display = false; + /* * Check PQgetResult() again. In the typical case of a single-command * string, it will return NULL. Otherwise, we'll have other results @@ -1631,7 +1761,7 @@ ExecQueryAndProcessResults(const char *query, } /* this may or may not print something depending on settings */ - if (result != NULL) + if (result != NULL && !partial_display) { /* * If results need to be printed into the file specified by \g, @@ -1640,32 +1770,33 @@ ExecQueryAndProcessResults(const char *query, * tuple output, but it's still used for status output. */ FILE *tuples_fout = printQueryFout; - bool do_print = true; - - if (PQresultStatus(result) == PGRES_TUPLES_OK && - pset.gfname) - { - if (gfile_fout == NULL) - { - if (openQueryOutputFile(pset.gfname, - &gfile_fout, &gfile_is_pipe)) - { - if (gfile_is_pipe) - disable_sigpipe_trap(); - } - else - success = do_print = false; - } + success = SetupGOutput(result, &gfile_fout, &gfile_is_pipe); + if (gfile_fout) tuples_fout = gfile_fout; - } - if (do_print) + if (success) success &= PrintQueryResult(result, last, opt, tuples_fout, printQueryFout); } /* set variables from last result */ if (!is_watch && last) - SetResultVariables(result, success); + { + if (!partial_display) + SetResultVariables(result, success); + else if (success) + { + /* + * fake SetResultVariables(). If an error occurred when + * retrieving chunks, these variables have been set already. + */ + char buf[32]; + + SetVariable(pset.vars, "ERROR", "false"); + SetVariable(pset.vars, "SQLSTATE", "00000"); + snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples); + SetVariable(pset.vars, "ROW_COUNT", buf); + } + } ClearOrSaveResult(result); result = next_result; @@ -1677,17 +1808,7 @@ ExecQueryAndProcessResults(const char *query, } } - /* close \g file if we opened it */ - if (gfile_fout) - { - if (gfile_is_pipe) - { - SetShellResultVariables(pclose(gfile_fout)); - restore_sigpipe_trap(); - } - else - fclose(gfile_fout); - } + CloseGOutput(gfile_fout, gfile_is_pipe); /* may need this to recover from conn loss during COPY */ if (!CheckConnection()) @@ -1700,274 +1821,6 @@ ExecQueryAndProcessResults(const char *query, } -/* - * ExecQueryUsingCursor: run a SELECT-like query using a cursor - * - * This feature allows result sets larger than RAM to be dealt with. - * - * Returns true if the query executed successfully, false otherwise. - * - * If pset.timing is on, total query time (exclusive of result-printing) is - * stored into *elapsed_msec. - */ -static bool -ExecQueryUsingCursor(const char *query, double *elapsed_msec) -{ - bool OK = true; - PGresult *result; - PQExpBufferData buf; - printQueryOpt my_popt = pset.popt; - bool timing = pset.timing; - FILE *fout; - bool is_pipe; - bool is_pager = false; - bool started_txn = false; - int64 total_tuples = 0; - int ntuples; - int fetch_count; - char fetch_cmd[64]; - instr_time before, - after; - int flush_error; - - *elapsed_msec = 0; - - /* initialize print options for partial table output */ - my_popt.topt.start_table = true; - my_popt.topt.stop_table = false; - my_popt.topt.prior_records = 0; - - if (timing) - INSTR_TIME_SET_CURRENT(before); - else - INSTR_TIME_SET_ZERO(before); - - /* if we're not in a transaction, start one */ - if (PQtransactionStatus(pset.db) == PQTRANS_IDLE) - { - result = PQexec(pset.db, "BEGIN"); - OK = AcceptResult(result, true) && - (PQresultStatus(result) == PGRES_COMMAND_OK); - ClearOrSaveResult(result); - if (!OK) - return false; - started_txn = true; - } - - /* Send DECLARE CURSOR */ - initPQExpBuffer(&buf); - appendPQExpBuffer(&buf, "DECLARE _psql_cursor NO SCROLL CURSOR FOR\n%s", - query); - - result = PQexec(pset.db, buf.data); - OK = AcceptResult(result, true) && - (PQresultStatus(result) == PGRES_COMMAND_OK); - if (!OK) - SetResultVariables(result, OK); - ClearOrSaveResult(result); - termPQExpBuffer(&buf); - if (!OK) - goto cleanup; - - if (timing) - { - INSTR_TIME_SET_CURRENT(after); - INSTR_TIME_SUBTRACT(after, before); - *elapsed_msec += INSTR_TIME_GET_MILLISEC(after); - } - - /* - * In \gset mode, we force the fetch count to be 2, so that we will throw - * the appropriate error if the query returns more than one row. - */ - if (pset.gset_prefix) - fetch_count = 2; - else - fetch_count = pset.fetch_count; - - snprintf(fetch_cmd, sizeof(fetch_cmd), - "FETCH FORWARD %d FROM _psql_cursor", - fetch_count); - - /* prepare to write output to \g argument, if any */ - if (pset.gfname) - { - if (!openQueryOutputFile(pset.gfname, &fout, &is_pipe)) - { - OK = false; - goto cleanup; - } - if (is_pipe) - disable_sigpipe_trap(); - } - else - { - fout = pset.queryFout; - is_pipe = false; /* doesn't matter */ - } - - /* clear any pre-existing error indication on the output stream */ - clearerr(fout); - - for (;;) - { - if (timing) - INSTR_TIME_SET_CURRENT(before); - - /* get fetch_count tuples at a time */ - result = PQexec(pset.db, fetch_cmd); - - if (timing) - { - INSTR_TIME_SET_CURRENT(after); - INSTR_TIME_SUBTRACT(after, before); - *elapsed_msec += INSTR_TIME_GET_MILLISEC(after); - } - - if (PQresultStatus(result) != PGRES_TUPLES_OK) - { - /* shut down pager before printing error message */ - if (is_pager) - { - ClosePager(fout); - is_pager = false; - } - - OK = AcceptResult(result, true); - Assert(!OK); - SetResultVariables(result, OK); - ClearOrSaveResult(result); - break; - } - - if (pset.gset_prefix) - { - /* StoreQueryTuple will complain if not exactly one row */ - OK = StoreQueryTuple(result); - ClearOrSaveResult(result); - break; - } - - /* - * Note we do not deal with \gdesc, \gexec or \crosstabview modes here - */ - - ntuples = PQntuples(result); - total_tuples += ntuples; - - if (ntuples < fetch_count) - { - /* this is the last result set, so allow footer decoration */ - my_popt.topt.stop_table = true; - } - else if (fout == stdout && !is_pager) - { - /* - * If query requires multiple result sets, hack to ensure that - * only one pager instance is used for the whole mess - */ - fout = PageOutput(INT_MAX, &(my_popt.topt)); - is_pager = true; - } - - printQuery(result, &my_popt, fout, is_pager, pset.logfile); - - ClearOrSaveResult(result); - - /* after the first result set, disallow header decoration */ - my_popt.topt.start_table = false; - my_popt.topt.prior_records += ntuples; - - /* - * Make sure to flush the output stream, so intermediate results are - * visible to the client immediately. We check the results because if - * the pager dies/exits/etc, there's no sense throwing more data at - * it. - */ - flush_error = fflush(fout); - - /* - * Check if we are at the end, if a cancel was pressed, or if there - * were any errors either trying to flush out the results, or more - * generally on the output stream at all. If we hit any errors - * writing things to the stream, we presume $PAGER has disappeared and - * stop bothering to pull down more data. - */ - if (ntuples < fetch_count || cancel_pressed || flush_error || - ferror(fout)) - break; - } - - if (pset.gfname) - { - /* close \g argument file/pipe */ - if (is_pipe) - { - SetShellResultVariables(pclose(fout)); - restore_sigpipe_trap(); - } - else - fclose(fout); - } - else if (is_pager) - { - /* close transient pager */ - ClosePager(fout); - } - - if (OK) - { - /* - * We don't have a PGresult here, and even if we did it wouldn't have - * the right row count, so fake SetResultVariables(). In error cases, - * we already set the result variables above. - */ - char buf[32]; - - SetVariable(pset.vars, "ERROR", "false"); - SetVariable(pset.vars, "SQLSTATE", "00000"); - snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples); - SetVariable(pset.vars, "ROW_COUNT", buf); - } - -cleanup: - if (timing) - INSTR_TIME_SET_CURRENT(before); - - /* - * We try to close the cursor on either success or failure, but on failure - * ignore the result (it's probably just a bleat about being in an aborted - * transaction) - */ - result = PQexec(pset.db, "CLOSE _psql_cursor"); - if (OK) - { - OK = AcceptResult(result, true) && - (PQresultStatus(result) == PGRES_COMMAND_OK); - ClearOrSaveResult(result); - } - else - PQclear(result); - - if (started_txn) - { - result = PQexec(pset.db, OK ? "COMMIT" : "ROLLBACK"); - OK &= AcceptResult(result, true) && - (PQresultStatus(result) == PGRES_COMMAND_OK); - ClearOrSaveResult(result); - } - - if (timing) - { - INSTR_TIME_SET_CURRENT(after); - INSTR_TIME_SUBTRACT(after, before); - *elapsed_msec += INSTR_TIME_GET_MILLISEC(after); - } - - return OK; -} - - /* * Advance the given char pointer over white space and SQL comments. */ @@ -2247,43 +2100,6 @@ command_no_begin(const char *query) } -/* - * Check whether the specified command is a SELECT (or VALUES). - */ -static bool -is_select_command(const char *query) -{ - int wordlen; - - /* - * First advance over any whitespace, comments and left parentheses. - */ - for (;;) - { - query = skip_white_space(query); - if (query[0] == '(') - query++; - else - break; - } - - /* - * Check word length (since "selectx" is not "select"). - */ - wordlen = 0; - while (isalpha((unsigned char) query[wordlen])) - wordlen += PQmblenBounded(&query[wordlen], pset.encoding); - - if (wordlen == 6 && pg_strncasecmp(query, "select", 6) == 0) - return true; - - if (wordlen == 6 && pg_strncasecmp(query, "values", 6) == 0) - return true; - - return false; -} - - /* * Test if the current user is a database superuser. */ diff --git a/src/bin/psql/t/001_basic.pl b/src/bin/psql/t/001_basic.pl index 9f0b6cf8ca..b5fedbc091 100644 --- a/src/bin/psql/t/001_basic.pl +++ b/src/bin/psql/t/001_basic.pl @@ -161,7 +161,7 @@ psql_like( '\errverbose with no previous error'); # There are three main ways to run a query that might affect -# \errverbose: The normal way, using a cursor by setting FETCH_COUNT, +# \errverbose: The normal way, piecemeal retrieval using FETCH_COUNT, # and using \gdesc. Test them all. like( @@ -184,10 +184,10 @@ like( "\\set FETCH_COUNT 1\nSELECT error;\n\\errverbose", on_error_stop => 0))[2], qr/\A^psql:<stdin>:2: ERROR: .*$ -^LINE 2: SELECT error;$ +^LINE 1: SELECT error;$ ^ *^.*$ ^psql:<stdin>:3: error: ERROR: [0-9A-Z]{5}: .*$ -^LINE 2: SELECT error;$ +^LINE 1: SELECT error;$ ^ *^.*$ ^LOCATION: .*$/m, '\errverbose after FETCH_COUNT query with error'); diff --git a/src/test/regress/expected/psql.out b/src/test/regress/expected/psql.out index 69060fe3c0..8580db7c00 100644 --- a/src/test/regress/expected/psql.out +++ b/src/test/regress/expected/psql.out @@ -4755,7 +4755,7 @@ number of rows: 0 last error message: syntax error at end of input \echo 'last error code:' :LAST_ERROR_SQLSTATE last error code: 42601 --- check row count for a cursor-fetched query +-- check row count for a query with chunked results \set FETCH_COUNT 10 select unique2 from tenk1 order by unique2 limit 19; unique2 @@ -4787,7 +4787,7 @@ error: false error code: 00000 \echo 'number of rows:' :ROW_COUNT number of rows: 19 --- cursor-fetched query with an error after the first group +-- chunked results with an error after the first chunk select 1/(15-unique2) from tenk1 order by unique2 limit 19; ?column? ---------- @@ -4801,6 +4801,11 @@ select 1/(15-unique2) from tenk1 order by unique2 limit 19; 0 0 0 + 0 + 0 + 0 + 0 + 1 ERROR: division by zero \echo 'error:' :ERROR error: true diff --git a/src/test/regress/sql/psql.sql b/src/test/regress/sql/psql.sql index 129f853353..33076cad79 100644 --- a/src/test/regress/sql/psql.sql +++ b/src/test/regress/sql/psql.sql @@ -1161,14 +1161,14 @@ SELECT 4 AS \gdesc \echo 'last error message:' :LAST_ERROR_MESSAGE \echo 'last error code:' :LAST_ERROR_SQLSTATE --- check row count for a cursor-fetched query +-- check row count for a query with chunked results \set FETCH_COUNT 10 select unique2 from tenk1 order by unique2 limit 19; \echo 'error:' :ERROR \echo 'error code:' :SQLSTATE \echo 'number of rows:' :ROW_COUNT --- cursor-fetched query with an error after the first group +-- chunked results with an error after the first chunk select 1/(15-unique2) from tenk1 order by unique2 limit 19; \echo 'error:' :ERROR \echo 'error code:' :SQLSTATE -- 2.34.1