On Friday, December 05, 2014 12:22:38 PM Heikki Linnakangas wrote: > Oh, that's what the PQgetLastQuery/PQgetNextQuery functions work! I > didn't understand that before. I'd suggest renaming them to something > like PQgetSentQuery() and PQgetResultQuery(). The first/last/next names > made me think that they're used to iterate a list of queries, but in > fact they're supposed to be used at very different stages. > > - Heikki
Okay, I have renamed them with your suggestions, and added PQsetPipelining/PQgetPipelining, defaulting to pipelining off. There should be no behavior change unless pipelining is enabled. Documentation should be mostly complete except the possible addition of an example and maybe a general pipelining overview paragraph. I have implemented async query support (that takes advantage of pipelining) in Qt, along with a couple test cases. This led to me discovering a bug with my last patch where a PGquery object could be reused twice in a row. I have fixed that. I contemplated not reusing the PGquery objects at all, but that wouldn't solve the problem because it's very possible that malloc will return a recent free of the same size anyway. Making the guarantee that a PGquery won't be reused twice in a row should be sufficient, and the only alternative is to add a unique id, but that will add further complexity that I don't think is warranted. Feedback is very welcome and appreciated. Thanks, Matt Newell
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index d829a4b..4e0431e 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -3947,9 +3947,14 @@ int PQsendQuery(PGconn *conn, const char *command); After successfully calling <function>PQsendQuery</function>, call <function>PQgetResult</function> one or more times to obtain the - results. <function>PQsendQuery</function> cannot be called again - (on the same connection) until <function>PQgetResult</function> - has returned a null pointer, indicating that the command is done. + results. If pipelining is enabled <function>PQsendQuery</function> + may be called multiple times before reading the results. See + <function>PQsetPipelining</function> and <function>PQisPipelining</function>. + Call <function>PQgetSentQuery</function> to get a <structname>PGquery</structname> + which can be used to identify which results obtained from + <function>PQgetResult</function> belong to each pipelined query. + If only one query is dispatched at a time, you can call <function>PQgetResult</function> + until a NULL value is returned to indicate the end of the query. </para> </listitem> </varlistentry> @@ -4133,8 +4138,8 @@ PGresult *PQgetResult(PGconn *conn); <para> <function>PQgetResult</function> must be called repeatedly until - it returns a null pointer, indicating that the command is done. - (If called when no command is active, + it returns a null pointer, indicating that all dispatched commands + are done. (If called when no command is active, <function>PQgetResult</function> will just return a null pointer at once.) Each non-null result from <function>PQgetResult</function> should be processed using the @@ -4144,14 +4149,17 @@ PGresult *PQgetResult(PGconn *conn); <function>PQgetResult</function> will block only if a command is active and the necessary response data has not yet been read by <function>PQconsumeInput</function>. + If query pipelining is being used, <function>PQgetResultQuery</function> + can be called after PQgetResult to match the result to the query. </para> <note> <para> Even when <function>PQresultStatus</function> indicates a fatal - error, <function>PQgetResult</function> should be called until it - returns a null pointer, to allow <application>libpq</> to - process the error information completely. + error, <function>PQgetResult</function> should be called until the + query has no more results (null pointer return if not using query + pipelining, otherwise see <function>PQgetResultQuery</function>), + to allow <application>libpq</> to process the error information completely. </para> </note> </listitem> @@ -4385,6 +4393,158 @@ int PQflush(PGconn *conn); read-ready and then read the response as described above. </para> + <variablelist> + <varlistentry id="libpq-pqsetpipelining"> + <term> + <function>PQsetPipelining</function> + <indexterm> + <primary>PQsetPipelining</primary> + </indexterm> + </term> + + <listitem> + <para> + Enables or disables query pipelining. +<synopsis> +int PQsetPipelining(PGconn *conn, int arg); +</synopsis> + </para> + + <para> + Enables pipelining for the connectino if arg is 1, or disables it + if arg is 0. When pipelining is enabled multiple async queries can + be sent before processing the results of the first. If pipelining + is disabled an error will be raised an async query is attempted + while another is active. + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-pqispipelining"> + <term> + <function>PQisPipelining</function> + <indexterm> + <primary>PQisPipelining</primary> + </indexterm> + </term> + + <listitem> + <para> + Returns the pipelining status of the connection +<synopsis> +int PQisPipelining(PGconn *conn); +</synopsis> + </para> + + <para> + Returns 1 if pipelining is enabled, or 0 if pipeling is disabled. + Query pipelining is disabled unless enabled with a call to + <function>PQsetPipelining</function>. + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-pqgetquerycommand"> + <term> + <function>PQgetQueryCommand</function> + <indexterm> + <primary>PQgetQueryCommand</primary> + </indexterm> + </term> + + <listitem> + <para> + Returns the query string associated with the <structure>PGquery</structure>. +<synopsis> +const char * PQgetQueryCommand(PGquery *query); +</synopsis> + </para> + + <para> + When using query pipelining this function can be used to retrieve the command + that created the query object. + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-pqgetresultquery"> + <term> + <function>PQgetResultQuery</function> + <indexterm> + <primary>PQgetResultQuery</primary> + </indexterm> + </term> + + <listitem> + <para> + Returns the first async query to recieve results, or NULL if no + async queries are active. +<synopsis> +PGquery * PQgetResultQuery(PGconn *conn); +</synopsis> + </para> + + <para> + When pipelining queries this function indicates which query the + result of <function>PQgetResult</function> results from. + Call this function immediately after calling + <function>PQgetResult</function>, or immediately before if a result + is ready to read, indicated by <function>PQisBusy</function> + being false. The <structure>PGquery</structure> remains valid + until the next libpq call that consumes input. + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-pqgetnextquery"> + <term> + <function>PQgetNextQuery</function> + <indexterm> + <primary>PQgetNextQuery</primary> + </indexterm> + </term> + + <listitem> + <para> + Returns the next <structure>PGquery</structure> in the list of + pipelined queries. +<synopsis> +PGquery * PQgetNextQuery(PGquery *query); +</synopsis> + </para> + + <para> + This function can be used to iterate each pending async query, + starting with <function>PQgetResultQuery</function> + and ending with <function>PQgetSentQuery</function>. + </para> + </varlistentry> + + <varlistentry> + <term> + <function>PQgetSentQuery</function> + <indexterm> + <primary>PQgetSentQuery</primary> + </indexterm> + </term> + + <listitem> + <para> + Returns the last <structure>PGquery</structure> in the list of + dispatched async queries waiting for results. +<synopsis> +PGquery * PQgetSentQuery(PGquery *query); +</synopsis> + </para> + + <para> + Call this function after dispatching an async query to get + a <structure>PGquery</structure> that can be used to identify + the originating query for each result obtained by + <function>PGgetResult</function>. + </para> + </varlistentry> + </variablelist> </sect1> <sect1 id="libpq-single-row-mode"> @@ -4411,7 +4571,7 @@ int PQflush(PGconn *conn); immediately after a successful call of <function>PQsendQuery</function> (or a sibling function). This mode selection is effective only for the currently executing query. Then call <function>PQgetResult</function> - repeatedly, until it returns null, as documented in <xref + repeatedly, until the last query result is returned, as documented in <xref linkend="libpq-async">. If the query returns any rows, they are returned as individual <structname>PGresult</structname> objects, which look like normal query results except for having status code @@ -4420,8 +4580,8 @@ int PQflush(PGconn *conn); the query returns zero rows, a zero-row object with status <literal>PGRES_TUPLES_OK</literal> is returned; this is the signal that no more rows will arrive. (But note that it is still necessary to continue - calling <function>PQgetResult</function> until it returns null.) All of - these <structname>PGresult</structname> objects will contain the same row + calling <function>PQgetResult</function> until the last query result is returned.) + All of these <structname>PGresult</structname> objects will contain the same row description data (column names, types, etc) that an ordinary <structname>PGresult</structname> object for the query would have. Each object should be freed with <function>PQclear</function> as usual.
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 93da50d..050bf05 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -165,3 +165,9 @@ lo_lseek64 162 lo_tell64 163 lo_truncate64 164 PQconninfo 165 +PQgetResultQuery 166 +PQgetSentQuery 167 +PQgetNextQuery 168 +PQgetQueryCommand 169 +PQsetPipelining 170 +PQisPipelining 171 \ No newline at end of file diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 3af222b..fc72605 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2893,8 +2893,6 @@ freePGconn(PGconn *conn) free(conn->gsslib); #endif /* Note that conn->Pfdebug is not ours to close or free */ - if (conn->last_query) - free(conn->last_query); if (conn->inBuffer) free(conn->inBuffer); if (conn->outBuffer) @@ -2956,6 +2954,29 @@ closePGconn(PGconn *conn) * absent */ conn->asyncStatus = PGASYNC_IDLE; pqClearAsyncResult(conn); /* deallocate result */ + + /* + * Link active queries into the free list so we can free them + */ + if (conn->queryTail) + { + conn->queryTail->next = conn->queryFree; + conn->queryFree = conn->queryHead; + } + conn->queryHead = conn->queryTail = NULL; + + /* + * Free all query objects + */ + while (conn->queryFree) + { + PGquery * prev = conn->queryFree; + conn->queryFree = prev->next; + if (prev->querycmd) + free(prev->querycmd); + free(prev); + } + resetPQExpBuffer(&conn->errorMessage); pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist); conn->addrlist = NULL; @@ -3135,7 +3156,7 @@ PQresetPoll(PGconn *conn) } /* - * PQcancelGet: get a PGcancel structure corresponding to a connection. + * PQgetCancel: get a PGcancel structure corresponding to a connection. * * A copy is needed to be able to cancel a running query from a different * thread. If the same structure is used all structure members would have diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 4075e51..379c38c 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1020,7 +1020,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) * row; the original conn->result is left unchanged so that it can be used * again as the template for future rows. */ - if (conn->singleRowMode) + if (conn->queryHead && conn->queryHead->singleRowMode) { /* Copy everything that should be in the result at this point */ res = PQcopyResult(res, @@ -1080,7 +1080,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) * Success. In single-row mode, make the result available to the client * immediately. */ - if (conn->singleRowMode) + if (conn->queryHead && conn->queryHead->singleRowMode) { /* Change result status to special single-row value */ res->resultStatus = PGRES_SINGLE_TUPLE; @@ -1132,13 +1132,11 @@ PQsendQuery(PGconn *conn, const char *query) } /* remember we are using simple query protocol */ - conn->queryclass = PGQUERY_SIMPLE; + conn->queryTail->queryclass = PGQUERY_SIMPLE; /* and remember the query text too, if possible */ - /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); - conn->last_query = strdup(query); + /* if insufficient memory, querycmd just winds up NULL */ + conn->queryTail->querycmd = strdup(query); /* * Give the data a push. In nonblock mode, don't complain if we're unable @@ -1151,7 +1149,9 @@ PQsendQuery(PGconn *conn, const char *query) } /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->asyncStatus == PGASYNC_IDLE) + conn->asyncStatus = PGASYNC_BUSY; + return 1; } @@ -1272,13 +1272,11 @@ PQsendPrepare(PGconn *conn, goto sendFailed; /* remember we are doing just a Parse */ - conn->queryclass = PGQUERY_PREPARE; + conn->queryTail->queryclass = PGQUERY_PREPARE; /* and remember the query text too, if possible */ - /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); - conn->last_query = strdup(query); + /* if insufficient memory, querycmd just winds up NULL */ + conn->queryTail->querycmd = strdup(query); /* * Give the data a push. In nonblock mode, don't complain if we're unable @@ -1288,7 +1286,9 @@ PQsendPrepare(PGconn *conn, goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->asyncStatus == PGASYNC_IDLE) + conn->asyncStatus = PGASYNC_BUSY; + return 1; sendFailed: @@ -1344,6 +1344,8 @@ PQsendQueryPrepared(PGconn *conn, static bool PQsendQueryStart(PGconn *conn) { + PGquery * query; + if (!conn) return false; @@ -1357,20 +1359,59 @@ PQsendQueryStart(PGconn *conn) libpq_gettext("no connection to the server\n")); return false; } - /* Can't send while already busy, either. */ - if (conn->asyncStatus != PGASYNC_IDLE) + + /* Check if we are in a valid state to send an async query */ + switch (conn->asyncStatus) { - printfPQExpBuffer(&conn->errorMessage, + case PGASYNC_IDLE: + break; + /* Can only send a query during busy or ready state if + * pipelining is enabled */ + case PGASYNC_BUSY: + case PGASYNC_READY: + if (conn->pipelining) + break; + /* Fall through to error */ + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + printfPQExpBuffer(&conn->errorMessage, libpq_gettext("another command is already in progress\n")); - return false; + return false; } - /* initialize async result-accumulation state */ - conn->result = NULL; - conn->next_result = NULL; + query = 0; - /* reset single-row processing mode */ - conn->singleRowMode = false; + /* Check if we have a free PGquery to use if not we create one + * We have to make sure we don't use the same PGquery twice + * in a row, so we will try both the first and second free + * entries, if not create a new one. */ + if (conn->queryFree && conn->queryFree != conn->queryLast) + { + query = conn->queryFree; + conn->queryFree = query->next; + query->next = NULL; + } + else if(conn->queryFree && conn->queryFree->next) + { + query = conn->queryFree->next; + conn->queryFree->next = query->next; + query->next = NULL; + conn->queryLast = NULL; /* First is fine to use again now */ + } else + { + query = (PGquery*) malloc(sizeof(PGquery)); + query->querycmd = 0; + query->singleRowMode = false; + query->next = 0; + } + + if( conn->queryTail ) + conn->queryTail->next = query; + else + conn->queryHead = query; + + conn->queryTail = query; /* ready to send command message */ return true; @@ -1522,16 +1563,12 @@ PQsendQueryGuts(PGconn *conn, goto sendFailed; /* remember we are using extended query protocol */ - conn->queryclass = PGQUERY_EXTENDED; + conn->queryTail->queryclass = PGQUERY_EXTENDED; /* and remember the query text too, if possible */ - /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); + /* if insufficient memory, querycmd just winds up NULL */ if (command) - conn->last_query = strdup(command); - else - conn->last_query = NULL; + conn->queryTail->querycmd = strdup(command); /* * Give the data a push. In nonblock mode, don't complain if we're unable @@ -1541,7 +1578,9 @@ PQsendQueryGuts(PGconn *conn, goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->asyncStatus == PGASYNC_IDLE) + conn->asyncStatus = PGASYNC_BUSY; + return 1; sendFailed: @@ -1576,7 +1615,7 @@ pqHandleSendFailure(PGconn *conn) } /* - * Select row-by-row processing mode + * Select row-by-row processing mode for the last sent query */ int PQsetSingleRowMode(PGconn *conn) @@ -1585,18 +1624,16 @@ PQsetSingleRowMode(PGconn *conn) * Only allow setting the flag when we have launched a query and not yet * received any results. */ - if (!conn) + if (!conn || !conn->queryTail) return 0; - if (conn->asyncStatus != PGASYNC_BUSY) + if (conn->asyncStatus != PGASYNC_BUSY && conn->queryTail == conn->queryHead) return 0; - if (conn->queryclass != PGQUERY_SIMPLE && - conn->queryclass != PGQUERY_EXTENDED) - return 0; - if (conn->result) + if (conn->queryTail->queryclass != PGQUERY_SIMPLE && + conn->queryTail->queryclass != PGQUERY_EXTENDED) return 0; /* OK, set flag */ - conn->singleRowMode = true; + conn->queryTail->singleRowMode = true; return 1; } @@ -1670,6 +1707,50 @@ PQisBusy(PGconn *conn) /* + * PQgetQueryCommand + */ +const char * +PQgetQueryCommand(PGquery *query) +{ + if (!query) + return NULL; + return query->querycmd; +} + +/* + * PQgetFirstQuery + */ +PGquery * +PQgetResultQuery(PGconn *conn) +{ + if (!conn) + return 0; + return conn->queryHead; +} + +/* + * PQgetLastQuery + */ +PGquery * +PQgetSentQuery(PGconn *conn) +{ + if (!conn) + return 0; + return conn->queryTail; +} + +/* + * PQgetNextQuery + */ +PGquery * +PQgetNextQuery(PGquery *query) +{ + if (!query) + return 0; + return query->next; +} + +/* * PQgetResult * Get the next PGresult produced by a query. Returns NULL if no * query work remains or an error has occurred (e.g. out of @@ -2132,14 +2213,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) goto sendFailed; /* remember we are doing a Describe */ - conn->queryclass = PGQUERY_DESCRIBE; - - /* reset last-query string (not relevant now) */ - if (conn->last_query) - { - free(conn->last_query); - conn->last_query = NULL; - } + conn->queryTail->queryclass = PGQUERY_DESCRIBE; /* * Give the data a push. In nonblock mode, don't complain if we're unable @@ -2301,7 +2375,7 @@ PQputCopyEnd(PGconn *conn, const char *errormsg) * If we sent the COPY command in extended-query mode, we must issue a * Sync as well. */ - if (conn->queryclass != PGQUERY_SIMPLE) + if (conn->queryHead->queryclass != PGQUERY_SIMPLE) { if (pqPutMsgStart('S', false, conn) < 0 || pqPutMsgEnd(conn) < 0) @@ -3112,6 +3186,31 @@ PQisnonblocking(const PGconn *conn) return pqIsnonblocking(conn); } +int +PQsetPipelining(PGconn *conn, int arg) +{ + bool barg; + + if (!conn) + return -1; + + barg = (arg ? TRUE : FALSE); + + /* Return error if they are trying to turn pipelining off and + * multiple queries are pending */ + if (!barg && conn->queryHead && conn->queryHead != conn->queryTail) + return -1; + + conn->pipelining = barg; + return 0; +} + +int +PQisPipelining(PGconn *conn) +{ + return conn->pipelining ? 1 : 0; +} + /* libpq is thread-safe? */ int PQisthreadsafe(void) diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index c514ca5..d0c5110 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -55,7 +55,29 @@ static void reportErrorPosition(PQExpBuffer msg, const char *query, int loc, int encoding); static int build_startup_packet(const PGconn *conn, char *packet, const PQEnvironmentOption *options); +static void pqQueryAdvance(PGconn *conn); +void +pqQueryAdvance(PGconn *conn) +{ + PGquery * query; + + query = conn->queryHead; + if (query == NULL) + return; + + conn->queryLast = query; + /* Advance queryHead */ + conn->queryHead = query->next; + /* Push last query onto free stack */ + query->next = conn->queryFree; + conn->queryFree = query; + free(query->querycmd); + query->querycmd = NULL; + + if (conn->queryHead == NULL) + conn->queryTail = NULL; +} /* * parseInput: if appropriate, parse input data from backend @@ -218,7 +240,15 @@ pqParseInput3(PGconn *conn) case 'Z': /* backend is ready for new query */ if (getReadyForQuery(conn)) return; - conn->asyncStatus = PGASYNC_IDLE; + + pqQueryAdvance(conn); + /* initialize async result-accumulation state */ + conn->result = NULL; + conn->next_result = NULL; + if (conn->queryHead != NULL) + conn->asyncStatus = PGASYNC_BUSY; + else + conn->asyncStatus = PGASYNC_IDLE; break; case 'I': /* empty query */ if (conn->result == NULL) @@ -232,7 +262,7 @@ pqParseInput3(PGconn *conn) break; case '1': /* Parse Complete */ /* If we're doing PQprepare, we're done; else ignore */ - if (conn->queryclass == PGQUERY_PREPARE) + if (conn->queryHead->queryclass == PGQUERY_PREPARE) { if (conn->result == NULL) { @@ -266,7 +296,7 @@ pqParseInput3(PGconn *conn) break; case 'T': /* Row Description */ if (conn->result == NULL || - conn->queryclass == PGQUERY_DESCRIBE) + conn->queryHead->queryclass == PGQUERY_DESCRIBE) { /* First 'T' in a query sequence */ if (getRowDescriptions(conn, msgLength)) @@ -299,7 +329,7 @@ pqParseInput3(PGconn *conn) * instead of TUPLES_OK. Otherwise we can just ignore * this message. */ - if (conn->queryclass == PGQUERY_DESCRIBE) + if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE) { if (conn->result == NULL) { @@ -422,6 +452,8 @@ pqParseInput3(PGconn *conn) static void handleSyncLoss(PGconn *conn, char id, int msgLength) { + PGquery * query; + printfPQExpBuffer(&conn->errorMessage, libpq_gettext( "lost synchronization with server: got message type \"%c\", length %d\n"), @@ -430,6 +462,15 @@ handleSyncLoss(PGconn *conn, char id, int msgLength) pqSaveErrorResult(conn); conn->asyncStatus = PGASYNC_READY; /* drop out of GetResult wait loop */ + /* All queries are canceled, move them to the free list and free the query commands */ + while ((query = conn->queryHead) != NULL) + { + free(query->querycmd); + query->querycmd = NULL; + conn->queryHead = query->next; + query->next = conn->queryFree; + } + pqDropConnection(conn); conn->status = CONNECTION_BAD; /* No more connection to backend */ } @@ -455,7 +496,7 @@ getRowDescriptions(PGconn *conn, int msgLength) * PGresult created by getParamDescriptions, and we should fill data into * that. Otherwise, create a new, empty PGresult. */ - if (conn->queryclass == PGQUERY_DESCRIBE) + if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE) { if (conn->result) result = conn->result; @@ -562,7 +603,7 @@ getRowDescriptions(PGconn *conn, int msgLength) * If we're doing a Describe, we're done, and ready to pass the result * back to the client. */ - if (conn->queryclass == PGQUERY_DESCRIBE) + if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE) { conn->asyncStatus = PGASYNC_READY; return 0; @@ -865,10 +906,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError) val = PQresultErrorField(res, PG_DIAG_STATEMENT_POSITION); if (val) { - if (conn->verbosity != PQERRORS_TERSE && conn->last_query != NULL) + if (conn->verbosity != PQERRORS_TERSE && conn->queryHead && conn->queryHead->querycmd != NULL) { /* emit position as a syntax cursor display */ - querytext = conn->last_query; + querytext = conn->queryHead->querycmd; querypos = atoi(val); } else @@ -1696,7 +1737,7 @@ pqEndcopy3(PGconn *conn) * If we sent the COPY command in extended-query mode, we must issue a * Sync as well. */ - if (conn->queryclass != PGQUERY_SIMPLE) + if (conn->queryHead && conn->queryHead->queryclass != PGQUERY_SIMPLE) { if (pqPutMsgStart('S', false, conn) < 0 || pqPutMsgEnd(conn) < 0) diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index b81dc16..ca54116 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -141,6 +141,13 @@ typedef struct pg_result PGresult; */ typedef struct pg_cancel PGcancel; +/* PGquery encapsulates the progress of a single query command issued + * to the async api functions + * The contents of this struct are not supposed to be known to applications. + */ +typedef struct pg_query PGquery; + + /* PGnotify represents the occurrence of a NOTIFY message. * Ideally this would be an opaque typedef, but it's so simple that it's * unlikely to change. @@ -404,6 +411,14 @@ extern PGresult *PQgetResult(PGconn *conn); extern int PQisBusy(PGconn *conn); extern int PQconsumeInput(PGconn *conn); +extern int PQsetPipelining(PGconn *conn, int arg); +extern int PQisPipelining(PGconn *conn); + +extern const char * PQgetQueryCommand(PGquery *query); +extern PGquery *PQgetResultQuery(PGconn *conn); +extern PGquery *PQgetSentQuery(PGconn *conn); +extern PGquery *PQgetNextQuery(PGquery *query); + /* LISTEN/NOTIFY support */ extern PGnotify *PQnotifies(PGconn *conn); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 4ef46ff..7d84d89 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -291,6 +291,16 @@ typedef struct pgDataValue const char *value; /* data value, without zero-termination */ } PGdataValue; +typedef struct pg_query +{ + PGQueryClass queryclass; + char *querycmd; /* last SQL command, or NULL if unknown */ + bool singleRowMode; /* return query result row-by-row? */ + struct pg_query * next; + void *userptr; /* convenience for the user */ +} PGquery; + + /* * PGconn stores all the state data associated with a single connection * to a backend. @@ -350,13 +360,20 @@ struct pg_conn ConnStatusType status; PGAsyncStatusType asyncStatus; PGTransactionStatusType xactStatus; /* never changes to ACTIVE */ - PGQueryClass queryclass; - char *last_query; /* last SQL command, or NULL if unknown */ + + /* queryHead and queryTail form a FIFO representing queries sent + * to the backend. queryHead is the first query sent, and is the + * query we are receiving results from, or have received results from */ + bool pipelining; + PGquery *queryHead; + PGquery *queryTail; + PGquery *queryFree; /* Reuse PGQuery allocations */ + PGquery *queryLast; /* Ensure we never use a query twice in a row */ + char last_sqlstate[6]; /* last reported SQLSTATE */ bool options_valid; /* true if OK to attempt connection */ bool nonblocking; /* whether this connection is using nonblock * sending semantics */ - bool singleRowMode; /* return current query result row-by-row? */ char copy_is_binary; /* 1 = copy binary, 0 = copy text */ int copy_already_done; /* # bytes already returned in COPY * OUT */ diff --git a/src/test/examples/Makefile b/src/test/examples/Makefile index aee5c04..3996760 100644 --- a/src/test/examples/Makefile +++ b/src/test/examples/Makefile @@ -14,7 +14,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) override LDLIBS := $(libpq_pgport) $(LDLIBS) -PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64 +PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64 testlibpqpipeline testlibpqpipeline2 all: $(PROGS)
/* * src/test/examples/testlibpqpipeline2.c * * * testlibpqpipeline.c * this test program tests query pipelining. It shows how to issue multiple * pipelined queries, and identify from which query a result originated. It * also demonstrates how failure of one query does not impact subsequent queries * when they are not part of the same transaction. * * */ #include <stdio.h> #include <stdlib.h> #include <sys/time.h> #include "libpq-fe.h" static void checkResult(PGconn *conn, PGresult *result, PGquery *query, int expectedResultStatus) { if (PQresultStatus(result) != expectedResultStatus) { printf( "Got unexpected result status '%s', expected '%s'\nQuery:%s\n", PQresStatus(PQresultStatus(result)), PQresStatus(expectedResultStatus), PQgetQueryCommand(query)); PQclear(result); PQclear(PQexec(conn,"DROP TABLE test")); PQfinish(conn); exit(1); } PQclear(result); } int main(int argc, char **argv) { PGconn * conn; PGquery * query1; PGquery * query2; PGquery * query3; PGquery * curQuery; PGresult * result; conn = NULL; query1 = query2 = query3 = curQuery = NULL; result = NULL; /* make a connection to the database */ conn = PQsetdb(NULL, NULL, NULL, NULL, NULL); /* check to see that the backend connection was successfully made */ if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "Connection to database failed: %s", PQerrorMessage(conn)); exit(1); } PQsetPipelining(conn,1); checkResult(conn,PQexec(conn,"DROP TABLE IF EXISTS test"),NULL,PGRES_COMMAND_OK); checkResult(conn,PQexec(conn,"CREATE TABLE test ( id SERIAL PRIMARY KEY )"),NULL,PGRES_COMMAND_OK); PQsendQuery(conn, "INSERT INTO test(id) VALUES (DEFAULT),(DEFAULT) RETURNING id"); query1 = PQgetSentQuery(conn); /* Duplicate primary key error */ PQsendQuery(conn, "UPDATE test SET id=2 WHERE id=1"); query2 = PQgetSentQuery(conn); PQsendQuery(conn, "SELECT * FROM test"); query3 = PQgetSentQuery(conn); while( (result = PQgetResult(conn)) != NULL ) { curQuery = PQgetResultQuery(conn); if (curQuery == query1) checkResult(conn,result,curQuery,PGRES_TUPLES_OK); if (curQuery == query2) checkResult(conn,result,curQuery,PGRES_FATAL_ERROR); if (curQuery == query3) checkResult(conn,result,curQuery,PGRES_TUPLES_OK); } PQclear(PQexec(conn,"DROP TABLE test")); PQfinish(conn); return 0; }
/* * src/test/examples/testlibpqpipeline.c * * * testlibpqpipeline.c * this test program test query pipelining and it's performance impact * * */ #include <stdio.h> #include <stdlib.h> #include <sys/time.h> #include "libpq-fe.h" // If defined we won't issue more sql commands if the socket's // write buffer is full //#define MIN_LOCAL_Q //#define PRINT_QUERY_PROGRESS static int testPipelined( PGconn * conn, int totalQueries, int totalQueued, const char * sql ); static int testPipelinedSeries( PGconn * conn, int totalQueries, int totalQueued, int baseline_usecs ); int testPipelined( PGconn * conn, int totalQueries, int totalQueued, const char * sql ) { int nQueriesQueued; int nQueriesTotal; PGresult * result; PGquery * firstQuery; PGquery * curQuery; nQueriesQueued = nQueriesTotal = 0; result = NULL; firstQuery = curQuery = NULL; while( nQueriesQueued > 0 || nQueriesTotal < totalQueries ) { if( PQconsumeInput(conn) == 0 ) { printf( "PQconsumeInput ERROR: %s\n", PQerrorMessage(conn) ); return 1; } do { curQuery = PQgetResultQuery(conn); /* firstQuery is finished */ if( firstQuery != curQuery ) { //printf( "%p done, curQuery=%p\n", firstQuery, curQuery ); #ifdef PRINT_QUERY_PROGRESS printf("-"); #endif firstQuery = curQuery; nQueriesQueued--; } /* Break if no queries are ready */ if( !firstQuery || PQisBusy(conn) ) break; if( (result = PQgetResult(conn)) != 0 ) PQclear(result); } while(1); if( nQueriesTotal < totalQueries && nQueriesQueued < totalQueued ) { #ifdef MIN_LOCAL_Q int flushResult = PQflush(conn); if( flushResult == -1 ) { printf( "PQflush ERROR: %s\n", PQerrorMessage(conn) ); return 1; } else if ( flushResult == 1 ) continue; #endif if( !PQsendQuery(conn,sql) ){ printf( "PQsendQuery failed with error: %s\n", PQerrorMessage(conn) ); return 1; } if( firstQuery == NULL ) firstQuery = PQgetSentQuery(conn); nQueriesTotal++; nQueriesQueued++; #ifdef PRINT_QUERY_PROGRESS printf( "+" ); #endif } } #ifdef PRINT_QUERY_PROGRESS printf( "\n" ); #endif return 0; } int testPipelinedSeries( PGconn * conn, int totalQueries, int totalQueued, int baseline_usecs ) { int result; struct timeval tv1, tv2; int secs, usecs; gettimeofday(&tv1,NULL); #define TEST_P(q) \ if( (result = testPipelined(conn,totalQueries,totalQueued,q)) != 0 ) \ return result; TEST_P("INSERT INTO test(id) VALUES (DEFAULT)"); TEST_P("SELECT * FROM test LIMIT 1"); TEST_P("SELECT * FROM test"); TEST_P("DELETE FROM test"); gettimeofday(&tv2,NULL); secs = tv2.tv_sec - tv1.tv_sec; usecs = secs * 1000000 + tv2.tv_usec - tv1.tv_usec; printf("testPipelinedSeries(%i,%i) took %i.%06i",totalQueries,totalQueued,secs,usecs); if (baseline_usecs == 0) printf("\n"); else printf(", speedup %.2f\n", (double)baseline_usecs / usecs ); return usecs; } int main(int argc, char **argv) { PGconn * conn; int baseline; conn = NULL; /* make a connection to the database */ conn = PQsetdb(NULL, NULL, NULL, NULL, NULL); /* check to see that the backend connection was successfully made */ if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "Connection to database failed: %s", PQerrorMessage(conn)); exit(1); } PQsetPipelining(conn,1); PQsetnonblocking(conn,1); PQclear(PQexec(conn,"CREATE TABLE test ( id SERIAL PRIMARY KEY )")); baseline = testPipelinedSeries(conn,10,1,0); testPipelinedSeries(conn,10,3,baseline); testPipelinedSeries(conn,10,10,baseline); baseline = testPipelinedSeries(conn,100,1,0); testPipelinedSeries(conn,100,3,baseline); testPipelinedSeries(conn,100,10,baseline); testPipelinedSeries(conn,100,25,baseline); testPipelinedSeries(conn,100,50,baseline); testPipelinedSeries(conn,100,100,baseline); PQclear(PQexec(conn,"DROP TABLE test")); PQfinish(conn); return 0; }
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers