On Thursday, December 04, 2014 10:30:46 PM Craig Ringer wrote:
> On 12/04/2014 05:08 PM, Heikki Linnakangas wrote:
> > A good API is crucial for this. It should make it easy to write an
> > application that does pipelining, and to handle all the error conditions
> > in a predictable way. I'd suggest that you write the documentation
> > first, before writing any code, so that we can discuss the API. It
> > doesn't have to be in SGML format yet, a plain-text description of the
> > API will do.
> 
> I strongly agree.
> 
First pass at the documentation changes attached, along with a new example 
that demonstrates pipelining 3 queries, with the middle one resulting in a 
PGRES_FATAL_ERROR response.

With the API i am proposing, only 2 new functions (PQgetFirstQuery, 
PQgetLastQuery) are required to be able to match each result to the query that 
caused it.  Another function, PQgetNextQuery allows iterating through the 
pending queries, and PQgetQueryCommand permits getting the original query 
text.

Adding the ability to set a user supplied pointer on the PGquery struct might 
make it much easier for some frameworks, and other users might want a 
callback, but I don't think either are required.

> Applications need to be able to reliably predict what will happen if
> there's an error in the middle of a pipeline.
> 
Yes, the API i am proposing makes it easy to get results for each submitted 
query independently of the success or failure of previous queries in the 
pipeline.

> Consideration of implicit transactions (autocommit), the whole pipeline
> being one transaction, or multiple transactions is needed.
The more I think about this the more confident I am that no extra work is 
needed.

Unless we start doing some preliminary processing of the query inside of 
libpq, our hands are tied wrt sending a sync at the end of each query.  The 
reason for this is that we rely on the ReadyForQuery message to indicate the 
end of a query, so without the sync there is no way to tell if the next result 
is from another statement in the current query, or the first statement in the 
next query.

I also don't see a reason to need multiple queries without a sync statement.  
If the user wants all queries to succeed or fail together it should be no 
problem to start the pipeline with begin and complete it commit.  But I may be 
missing some detail...


> 
> Apps need to be able to wait for the result of a query partway through a
> pipeline, e.g. scheduling four queries, then waiting for the result of
> the 2nd.
> 
Right.  With the api i am proposing the user does have to process each result 
until it gets to the one it wants, but it's no problem doing that.  It would 
also be trivial to add a function

PGresult * PQgetNextQueryResult(PQquery *query);

that discards all results from previous queries.  Very similar to how a PQexec 
disregards all results from previous async queries.

It would also be possible to queue the results and be able to retrieve them 
out of order, but I think that add unnecessary complexity and might also make 
it easy for users to never retrieve and free some results.

> There are probably plenty of other wrinkly bits to think about.

Yup, I'm sure i'm still missing some significant things at this point...

Matt Newell
/*
 * src/test/examples/testlibpqpipeline2.c
 *
 *
 * testlibpqpipeline.c
 *		this test program tests query pipelining.  It shows how to issue multiple
 *      pipelined queries, and identify from which query a result originated.  It 
 *      also demonstrates how failure of one query does not impact subsequent queries
 *      when they are not part of the same transaction.
 *
 *
 */
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>

#include "libpq-fe.h"

static void checkResult(PGconn *conn, PGresult *result, PGquery *query, int expectedResultStatus)
{
	if (PQresultStatus(result) != expectedResultStatus)
	{
		printf( "Got unexpected result status '%s', expected '%s'\nQuery:%s\n", 
			PQresStatus(PQresultStatus(result)), PQresStatus(expectedResultStatus),
			PQgetQueryCommand(query));
		PQclear(result);
		PQclear(PQexec(conn,"DROP TABLE test"));
		PQfinish(conn);
		exit(1);
	}
	PQclear(result);
}

int
main(int argc, char **argv)
{
	PGconn * conn;
	PGquery * query1;
	PGquery * query2;
	PGquery * query3;
	PGquery * curQuery;
	PGresult * result;
	
	conn = NULL;
	query1 = query2 = query3 = curQuery = NULL;
	result = NULL;
	
	/* make a connection to the database */
	conn = PQsetdb(NULL, NULL, NULL, NULL, NULL);

	/* check to see that the backend connection was successfully made */
	if (PQstatus(conn) != CONNECTION_OK)
	{
		fprintf(stderr, "Connection to database failed: %s",
				PQerrorMessage(conn));
		exit(1);
	}

	checkResult(conn,PQexec(conn,"DROP TABLE IF EXISTS test"),NULL,PGRES_COMMAND_OK);
	checkResult(conn,PQexec(conn,"CREATE TABLE test ( id SERIAL PRIMARY KEY )"),NULL,PGRES_COMMAND_OK);
	
	PQsendQuery(conn, "INSERT INTO test(id) VALUES (DEFAULT),(DEFAULT) RETURNING id");
	query1 = PQgetLastQuery(conn);
	
	/* Duplicate primary key error */
	PQsendQuery(conn, "UPDATE test SET id=2 WHERE id=1");
	query2 = PQgetLastQuery(conn);
	
	PQsendQuery(conn, "SELECT * FROM test");
	query3 = PQgetLastQuery(conn);
	
	while( 1 )
	{
		if( PQconsumeInput(conn) == 0 ) {
			printf( "PQconsumeInput ERROR: %s\n", PQerrorMessage(conn) );
			return 1;
		}
		
		if (PQisBusy(conn))
			continue;
		
		curQuery = PQgetFirstQuery(conn);
		result = PQgetResult(conn);
		
		/* Finished processing results? */
		if (result == NULL)
			break;
		
		if (curQuery == query1)
			checkResult(conn,result,curQuery,PGRES_TUPLES_OK);
		if (curQuery == query2)
			checkResult(conn,result,curQuery,PGRES_FATAL_ERROR);
		if (curQuery == query3)
			checkResult(conn,result,curQuery,PGRES_TUPLES_OK);
	}

	PQexec(conn,"DROP TABLE test");
	
	return 0;
} 
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index d829a4b..96e3b2a 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3947,9 +3947,13 @@ int PQsendQuery(PGconn *conn, const char *command);
 
        After successfully calling <function>PQsendQuery</function>, call
        <function>PQgetResult</function> one or more times to obtain the
-       results.  <function>PQsendQuery</function> cannot be called again
-       (on the same connection) until <function>PQgetResult</function>
-       has returned a null pointer, indicating that the command is done.
+       results.  <function>PQsendQuery</function> may be called multiple
+       times (on the same connection), allowing query pipelining. Call
+       <function>PQgetLastQuery</function> to get a <structname>PGquery</structname>
+       which can be used to identify which results obtained from
+       <function>PQgetResult</function> belong to each pipelined query dispatch.
+       If only one query is dispatched at a time, you can call <function>PQgetResult</function>
+       until a NULL value is returned to indicate the end of the query.
       </para>
      </listitem>
     </varlistentry>
@@ -4133,8 +4137,8 @@ PGresult *PQgetResult(PGconn *conn);
 
       <para>
        <function>PQgetResult</function> must be called repeatedly until
-       it returns a null pointer, indicating that the command is done.
-       (If called when no command is active,
+       it returns a null pointer, indicating that all dispatched commands
+       are done. (If called when no command is active,
        <function>PQgetResult</function> will just return a null pointer
        at once.) Each non-null result from
        <function>PQgetResult</function> should be processed using the
@@ -4144,14 +4148,19 @@ PGresult *PQgetResult(PGconn *conn);
        <function>PQgetResult</function> will block only if a command is
        active and the necessary response data has not yet been read by
        <function>PQconsumeInput</function>.
+       If query pipelining is being used, call 
+       <function>PQgetFirstQuery</function> on a non-busy connection
+       (<function>PQisBusy</function> returns false) to determine which
+       query the next result belongs to.
       </para>
 
       <note>
        <para>
         Even when <function>PQresultStatus</function> indicates a fatal
-        error, <function>PQgetResult</function> should be called until it
-        returns a null pointer, to allow <application>libpq</> to
-        process the error information completely.
+        error, <function>PQgetResult</function> should be called until the
+        query has no more results (null pointer return if not using query
+        pipelining, otherwise see <function>PQgetFirstQuery</function>),
+        to allow <application>libpq</> to process the error information completely.
        </para>
       </note>
      </listitem>
@@ -4385,6 +4394,109 @@ int PQflush(PGconn *conn);
    read-ready and then read the response as described above.
   </para>
 
+ <variablelist>
+  <varlistentry id="libpq-pqgetquerycommand">
+   <term>
+    <function>PQgetQueryCommand</function>
+    <indexterm>
+     <primary>PQgetQueryCommand</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Returns the query string associated with the <structure>PGquery</structure>.
+<synopsis>
+const char * PQgetQueryCommand(PGquery *query);
+</synopsis>
+    </para>
+
+    <para>
+     When using query pipelining this function can be used to retrieve the command
+     that created the query object.
+    </para>
+   </listitem>
+  </varlistentry>
+
+  <varlistentry id="libpq-pqgetfirstquery">
+   <term>
+    <function>PQgetFirstQuery</function>
+    <indexterm>
+     <primary>PQgetFirstQuery</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Returns the first async query to recieve results, or NULL if no
+     async queries are active.
+<synopsis>
+PGquery * PQgetFirstQuery(PGconn *conn);
+</synopsis>
+    </para>
+
+    <para>
+     When using query pipelining this function can be used to indicate
+     which query the next result of <function>PQgetResult</function>
+     belongs. The result is only guaranteed to be valid for the next call to
+     <function>PQgetResult</function> if <function>PQisBusy</function>
+     is false, otherwise the returned <structure>PGquery</structure>
+     may or may not have more results.  The <structure>PGquery</structure>
+     remains valid after <function>PQgetResult</function> until the next
+     libpq call that consumes input.
+    </para>
+   </listitem>
+  </varlistentry>
+
+  <varlistentry id="libpq-pqgetnextquery">
+   <term>
+    <function>PQgetNextQuery</function>
+    <indexterm>
+     <primary>PQgetNextQuery</primary>
+    </indexterm>
+   </term>
+
+   <listitem>
+    <para>
+     Returns the next <structure>PGquery</structure> in the list of
+     dispatched async queries waiting for results.
+<synopsis>
+PGquery * PQgetNextQuery(PGquery *query);
+</synopsis>
+    </para>
+
+    <para>
+     This function can be used to iterate each pending async query starting
+     with the result of <function>PQgetFirstQuery</function>, and ending
+     with the result of <function>PQgetLastQuery</function>, then null.
+    </para>
+   </varlistentry>
+
+   <varlistentry>
+    <term>
+     <function>PQgetLastQuery</function>
+     <indexterm>
+      <primary>PQgetLastQuery</primary>
+     </indexterm>
+    </term>
+
+    <listitem>
+     <para>
+      Returns the last <structure>PGquery</structure> in the list of
+      dispatched async queries waiting for results.
+<synopsis>
+PGquery * PQgetLastQuery(PGquery *query);
+</synopsis>
+     </para>
+
+     <para>
+      Call this function after dispatching an async query to get
+      a <structure>PGquery</structure> that can be used to identify
+      the originating query for each result obtained by
+      <function>PGgetResult</function>.
+     </para>
+   </varlistentry>
+  </variablelist>
  </sect1>
 
  <sect1 id="libpq-single-row-mode">
@@ -4411,7 +4523,7 @@ int PQflush(PGconn *conn);
    immediately after a successful call of <function>PQsendQuery</function>
    (or a sibling function).  This mode selection is effective only for the
    currently executing query.  Then call <function>PQgetResult</function>
-   repeatedly, until it returns null, as documented in <xref
+   repeatedly, until the last query result is returned, as documented in <xref
    linkend="libpq-async">.  If the query returns any rows, they are returned
    as individual <structname>PGresult</structname> objects, which look like
    normal query results except for having status code
@@ -4420,8 +4532,8 @@ int PQflush(PGconn *conn);
    the query returns zero rows, a zero-row object with status
    <literal>PGRES_TUPLES_OK</literal> is returned; this is the signal that no
    more rows will arrive.  (But note that it is still necessary to continue
-   calling <function>PQgetResult</function> until it returns null.)  All of
-   these <structname>PGresult</structname> objects will contain the same row
+   calling <function>PQgetResult</function> until the last query result is returned.)
+   All of these <structname>PGresult</structname> objects will contain the same row
    description data (column names, types, etc) that an ordinary
    <structname>PGresult</structname> object for the query would have.
    Each object should be freed with <function>PQclear</function> as usual.
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 93da50d..74d76f3 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -165,3 +165,7 @@ lo_lseek64                162
 lo_tell64                 163
 lo_truncate64             164
 PQconninfo                165
+PQgetFirstQuery           166
+PQgetLastQuery            167
+PQgetNextQuery            168
+PQgetQueryCommand         169
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 3af222b..31fa437 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2893,8 +2893,6 @@ freePGconn(PGconn *conn)
 		free(conn->gsslib);
 #endif
 	/* Note that conn->Pfdebug is not ours to close or free */
-	if (conn->last_query)
-		free(conn->last_query);
 	if (conn->inBuffer)
 		free(conn->inBuffer);
 	if (conn->outBuffer)
@@ -2956,6 +2954,29 @@ closePGconn(PGconn *conn)
 										 * absent */
 	conn->asyncStatus = PGASYNC_IDLE;
 	pqClearAsyncResult(conn);	/* deallocate result */
+	
+	/*
+	 * Link active queries into the free list so we can free them
+	 */
+	if (conn->queryTail)
+	{
+		conn->queryTail->next = conn->queryFree;
+		conn->queryFree = conn->queryHead;
+	}
+	conn->queryHead = conn->queryTail = NULL;
+	
+	/*
+	 * Free all query objects
+	 */
+	while (conn->queryFree)
+	{
+		PGquery * prev = conn->queryFree;
+		conn->queryFree = prev->next;
+		if (prev->querycmd)
+			free(prev->querycmd);
+		free(prev);
+	}
+
 	resetPQExpBuffer(&conn->errorMessage);
 	pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist);
 	conn->addrlist = NULL;
@@ -3135,7 +3156,7 @@ PQresetPoll(PGconn *conn)
 }
 
 /*
- * PQcancelGet: get a PGcancel structure corresponding to a connection.
+ * PQgetCancel: get a PGcancel structure corresponding to a connection.
  *
  * A copy is needed to be able to cancel a running query from a different
  * thread. If the same structure is used all structure members would have
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 4075e51..2a421a8 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1020,7 +1020,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 	 * row; the original conn->result is left unchanged so that it can be used
 	 * again as the template for future rows.
 	 */
-	if (conn->singleRowMode)
+	if (conn->queryHead && conn->queryHead->singleRowMode)
 	{
 		/* Copy everything that should be in the result at this point */
 		res = PQcopyResult(res,
@@ -1080,7 +1080,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 	 * Success.  In single-row mode, make the result available to the client
 	 * immediately.
 	 */
-	if (conn->singleRowMode)
+	if (conn->queryHead && conn->queryHead->singleRowMode)
 	{
 		/* Change result status to special single-row value */
 		res->resultStatus = PGRES_SINGLE_TUPLE;
@@ -1088,6 +1088,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 		conn->next_result = conn->result;
 		conn->result = res;
 		/* And mark the result ready to return */
+		/* TODO: Still correct ? */
 		conn->asyncStatus = PGASYNC_READY;
 	}
 
@@ -1132,14 +1133,12 @@ PQsendQuery(PGconn *conn, const char *query)
 	}
 
 	/* remember we are using simple query protocol */
-	conn->queryclass = PGQUERY_SIMPLE;
-
+	conn->queryTail->queryclass = PGQUERY_SIMPLE;
+	
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
-
+	/* if insufficient memory, querycmd just winds up NULL */
+	conn->queryTail->querycmd = strdup(query);
+	
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
@@ -1151,7 +1150,9 @@ PQsendQuery(PGconn *conn, const char *query)
 	}
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if( conn->asyncStatus == PGASYNC_IDLE )
+		conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 }
 
@@ -1272,13 +1273,11 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* remember we are doing just a Parse */
-	conn->queryclass = PGQUERY_PREPARE;
+	conn->queryTail->queryclass = PGQUERY_PREPARE;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	/* if insufficient memory, querycmd just winds up NULL */
+	conn->queryTail->querycmd = strdup(query);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1288,6 +1287,7 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
+	/* TODO: Check status first! */
 	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
@@ -1344,6 +1344,8 @@ PQsendQueryPrepared(PGconn *conn,
 static bool
 PQsendQueryStart(PGconn *conn)
 {
+	PGquery * query;
+	
 	if (!conn)
 		return false;
 
@@ -1357,21 +1359,46 @@ PQsendQueryStart(PGconn *conn)
 						  libpq_gettext("no connection to the server\n"));
 		return false;
 	}
-	/* Can't send while already busy, either. */
-	if (conn->asyncStatus != PGASYNC_IDLE)
+
+	/* Can't send while in copy mode, either. */
+	switch (conn->asyncStatus)
 	{
-		printfPQExpBuffer(&conn->errorMessage,
+		case PGASYNC_IDLE:
+		case PGASYNC_BUSY:
+		case PGASYNC_READY:
+			break;
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			printfPQExpBuffer(&conn->errorMessage,
 				  libpq_gettext("another command is already in progress\n"));
-		return false;
+			return false;
 	}
 
-	/* initialize async result-accumulation state */
-	conn->result = NULL;
-	conn->next_result = NULL;
-
-	/* reset single-row processing mode */
-	conn->singleRowMode = false;
-
+	if( !conn->queryFree )
+	{
+		query = (PGquery*) malloc(sizeof(PGquery));
+		query->querycmd = 0;
+		query->singleRowMode = false;
+		query->next = 0;
+	}
+	else
+	{
+		query = conn->queryFree;
+		conn->queryFree = query->next;
+		if (query->querycmd)
+			free(query->querycmd);
+		query->querycmd = NULL;
+		query->next = NULL;
+	}
+	
+	if( conn->queryTail )
+		conn->queryTail->next = query;
+	else
+		conn->queryHead = query;
+	
+	conn->queryTail = query;
+	
 	/* ready to send command message */
 	return true;
 }
@@ -1522,16 +1549,12 @@ PQsendQueryGuts(PGconn *conn,
 		goto sendFailed;
 
 	/* remember we are using extended query protocol */
-	conn->queryclass = PGQUERY_EXTENDED;
+	conn->queryTail->queryclass = PGQUERY_EXTENDED;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
+	/* if insufficient memory, querycmd just winds up NULL */
 	if (command)
-		conn->last_query = strdup(command);
-	else
-		conn->last_query = NULL;
+		conn->queryTail->querycmd = strdup(command);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1541,6 +1564,7 @@ PQsendQueryGuts(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
+	/* TODO: Check status first! */
 	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
@@ -1576,7 +1600,7 @@ pqHandleSendFailure(PGconn *conn)
 }
 
 /*
- * Select row-by-row processing mode
+ * Select row-by-row processing mode for the last launched query
  */
 int
 PQsetSingleRowMode(PGconn *conn)
@@ -1585,18 +1609,16 @@ PQsetSingleRowMode(PGconn *conn)
 	 * Only allow setting the flag when we have launched a query and not yet
 	 * received any results.
 	 */
-	if (!conn)
-		return 0;
-	if (conn->asyncStatus != PGASYNC_BUSY)
+	if (!conn || !conn->queryTail)
 		return 0;
-	if (conn->queryclass != PGQUERY_SIMPLE &&
-		conn->queryclass != PGQUERY_EXTENDED)
+	if (conn->asyncStatus != PGASYNC_BUSY && conn->queryTail == conn->queryHead)
 		return 0;
-	if (conn->result)
+	if (conn->queryTail->queryclass != PGQUERY_SIMPLE &&
+		conn->queryTail->queryclass != PGQUERY_EXTENDED)
 		return 0;
 
 	/* OK, set flag */
-	conn->singleRowMode = true;
+	conn->queryTail->singleRowMode = true;
 	return 1;
 }
 
@@ -1670,6 +1692,51 @@ PQisBusy(PGconn *conn)
 
 
 /*
+ * PQgetQueryCommand
+ */
+const char *
+PQgetQueryCommand(PGquery *query)
+{
+	if (!query)
+		return NULL;
+	return query->querycmd;
+}
+
+/*
+ * PQgetFirstQuery
+ */
+PGquery *
+PQgetFirstQuery(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+	
+	return conn->queryHead;
+}
+
+/*
+ * PQgetLastQuery
+ */
+PGquery *
+PQgetLastQuery(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+	return conn->queryTail;
+}
+
+/*
+ * PQgetNextQuery
+ */
+PGquery *
+PQgetNextQuery(PGquery *query)
+{
+	if (!query)
+		return 0;
+	return query->next;
+}
+
+/*
  * PQgetResult
  *	  Get the next PGresult produced by a query.  Returns NULL if no
  *	  query work remains or an error has occurred (e.g. out of
@@ -2132,14 +2199,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* remember we are doing a Describe */
-	conn->queryclass = PGQUERY_DESCRIBE;
-
-	/* reset last-query string (not relevant now) */
-	if (conn->last_query)
-	{
-		free(conn->last_query);
-		conn->last_query = NULL;
-	}
+	conn->queryTail->queryclass = PGQUERY_DESCRIBE;
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -2301,7 +2361,7 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
 		 * If we sent the COPY command in extended-query mode, we must issue a
 		 * Sync as well.
 		 */
-		if (conn->queryclass != PGQUERY_SIMPLE)
+		if (conn->queryHead->queryclass != PGQUERY_SIMPLE)
 		{
 			if (pqPutMsgStart('S', false, conn) < 0 ||
 				pqPutMsgEnd(conn) < 0)
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index c514ca5..f8a262c 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -55,7 +55,26 @@ static void reportErrorPosition(PQExpBuffer msg, const char *query,
 					int loc, int encoding);
 static int build_startup_packet(const PGconn *conn, char *packet,
 					 const PQEnvironmentOption *options);
+static void pqQueryAdvance(PGconn *conn);
 
+void
+pqQueryAdvance(PGconn *conn)
+{
+	PGquery * query;
+	
+	query = conn->queryHead;
+	if (query == NULL)
+		return;
+	
+	/* Advance queryHead */
+	conn->queryHead = query->next;
+	/* Push last query onto free stack */
+	query->next = conn->queryFree;
+	conn->queryFree = query;
+	
+	if (conn->queryHead == NULL)
+		conn->queryTail = NULL;
+}
 
 /*
  * parseInput: if appropriate, parse input data from backend
@@ -68,7 +87,7 @@ pqParseInput3(PGconn *conn)
 	char		id;
 	int			msgLength;
 	int			avail;
-
+	
 	/*
 	 * Loop to parse successive complete messages available in the buffer.
 	 */
@@ -218,7 +237,15 @@ pqParseInput3(PGconn *conn)
 				case 'Z':		/* backend is ready for new query */
 					if (getReadyForQuery(conn))
 						return;
-					conn->asyncStatus = PGASYNC_IDLE;
+
+					pqQueryAdvance(conn);
+					/* initialize async result-accumulation state */
+					conn->result = NULL;
+					conn->next_result = NULL;
+					if( conn->queryHead != NULL )
+						conn->asyncStatus = PGASYNC_BUSY;
+					else
+						conn->asyncStatus = PGASYNC_IDLE;
 					break;
 				case 'I':		/* empty query */
 					if (conn->result == NULL)
@@ -232,7 +259,7 @@ pqParseInput3(PGconn *conn)
 					break;
 				case '1':		/* Parse Complete */
 					/* If we're doing PQprepare, we're done; else ignore */
-					if (conn->queryclass == PGQUERY_PREPARE)
+					if (conn->queryHead->queryclass == PGQUERY_PREPARE)
 					{
 						if (conn->result == NULL)
 						{
@@ -266,7 +293,7 @@ pqParseInput3(PGconn *conn)
 					break;
 				case 'T':		/* Row Description */
 					if (conn->result == NULL ||
-						conn->queryclass == PGQUERY_DESCRIBE)
+						conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 					{
 						/* First 'T' in a query sequence */
 						if (getRowDescriptions(conn, msgLength))
@@ -299,7 +326,7 @@ pqParseInput3(PGconn *conn)
 					 * instead of TUPLES_OK.  Otherwise we can just ignore
 					 * this message.
 					 */
-					if (conn->queryclass == PGQUERY_DESCRIBE)
+					if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 					{
 						if (conn->result == NULL)
 						{
@@ -422,6 +449,8 @@ pqParseInput3(PGconn *conn)
 static void
 handleSyncLoss(PGconn *conn, char id, int msgLength)
 {
+	PGquery * query;
+	
 	printfPQExpBuffer(&conn->errorMessage,
 					  libpq_gettext(
 	"lost synchronization with server: got message type \"%c\", length %d\n"),
@@ -430,6 +459,15 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
 	pqSaveErrorResult(conn);
 	conn->asyncStatus = PGASYNC_READY;	/* drop out of GetResult wait loop */
 
+	/* All queries are canceled, move them to the free list and free the query commands */
+	while ((query = conn->queryHead) != NULL)
+	{
+		free(query->querycmd);
+		query->querycmd = NULL;
+		conn->queryHead = query->next;
+		query->next = conn->queryFree;
+	}
+	
 	pqDropConnection(conn);
 	conn->status = CONNECTION_BAD;		/* No more connection to backend */
 }
@@ -455,7 +493,7 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * PGresult created by getParamDescriptions, and we should fill data into
 	 * that.  Otherwise, create a new, empty PGresult.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if (conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 	{
 		if (conn->result)
 			result = conn->result;
@@ -562,7 +600,7 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * If we're doing a Describe, we're done, and ready to pass the result
 	 * back to the client.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if (conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 	{
 		conn->asyncStatus = PGASYNC_READY;
 		return 0;
@@ -865,10 +903,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
 	val = PQresultErrorField(res, PG_DIAG_STATEMENT_POSITION);
 	if (val)
 	{
-		if (conn->verbosity != PQERRORS_TERSE && conn->last_query != NULL)
+		if (conn->verbosity != PQERRORS_TERSE && conn->queryHead->querycmd != NULL)
 		{
 			/* emit position as a syntax cursor display */
-			querytext = conn->last_query;
+			querytext = conn->queryHead->querycmd;
 			querypos = atoi(val);
 		}
 		else
@@ -1696,7 +1734,7 @@ pqEndcopy3(PGconn *conn)
 		 * If we sent the COPY command in extended-query mode, we must issue a
 		 * Sync as well.
 		 */
-		if (conn->queryclass != PGQUERY_SIMPLE)
+		if (conn->queryHead->queryclass != PGQUERY_SIMPLE)
 		{
 			if (pqPutMsgStart('S', false, conn) < 0 ||
 				pqPutMsgEnd(conn) < 0)
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index b81dc16..163a5e0 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -141,6 +141,13 @@ typedef struct pg_result PGresult;
  */
 typedef struct pg_cancel PGcancel;
 
+/* PGquery encapsulates the progress of a single query command issued
+ * to the async api functions
+ * The contents of this struct are not supposed to be known to applications.
+ */
+typedef struct pg_query PGquery;
+
+
 /* PGnotify represents the occurrence of a NOTIFY message.
  * Ideally this would be an opaque typedef, but it's so simple that it's
  * unlikely to change.
@@ -404,6 +411,11 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
 
+extern const char * PQgetQueryCommand(PGquery *query);
+extern PGquery *PQgetFirstQuery(PGconn *conn);
+extern PGquery *PQgetLastQuery(PGconn *conn);
+extern PGquery *PQgetNextQuery(PGquery *query);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 4ef46ff..fb9bd61 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -291,6 +291,16 @@ typedef struct pgDataValue
 	const char *value;			/* data value, without zero-termination */
 } PGdataValue;
 
+typedef struct pg_query
+{
+	PGQueryClass queryclass;
+	char	   *querycmd;		/* last SQL command, or NULL if unknown */
+	bool		singleRowMode;	/* return query result row-by-row? */
+	struct pg_query * next;
+	void	   *userptr;        /* convenience for the user */
+} PGquery;
+
+
 /*
  * PGconn stores all the state data associated with a single connection
  * to a backend.
@@ -350,13 +360,19 @@ struct pg_conn
 	ConnStatusType status;
 	PGAsyncStatusType asyncStatus;
 	PGTransactionStatusType xactStatus; /* never changes to ACTIVE */
-	PGQueryClass queryclass;
-	char	   *last_query;		/* last SQL command, or NULL if unknown */
+	
+	/* queryHead and queryTail form a FIFO representing queries sent
+	 * to the backend.  queryHead is the first query sent, and is the
+	 * query we are receiving results from, or have received results from */
+	PGquery *queryHead;
+	PGquery *queryTail;
+	PGquery *queryFree; /* Reuse PGQuery allocations */
+	int nQueries;
+	
 	char		last_sqlstate[6];		/* last reported SQLSTATE */
 	bool		options_valid;	/* true if OK to attempt connection */
 	bool		nonblocking;	/* whether this connection is using nonblock
 								 * sending semantics */
-	bool		singleRowMode;	/* return current query result row-by-row? */
 	char		copy_is_binary; /* 1 = copy binary, 0 = copy text */
 	int			copy_already_done;		/* # bytes already returned in COPY
 										 * OUT */
diff --git a/src/test/examples/Makefile b/src/test/examples/Makefile
index aee5c04..3996760 100644
--- a/src/test/examples/Makefile
+++ b/src/test/examples/Makefile
@@ -14,7 +14,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 override LDLIBS := $(libpq_pgport) $(LDLIBS)
 
 
-PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64
+PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64 testlibpqpipeline testlibpqpipeline2
 
 all: $(PROGS)
 
diff --git a/src/test/examples/testlibpqpipeline.c b/src/test/examples/testlibpqpipeline.c
index 725aad5..da0466f 100644
--- a/src/test/examples/testlibpqpipeline.c
+++ b/src/test/examples/testlibpqpipeline.c
@@ -101,7 +101,7 @@ int testPipelinedSeries( PGconn * conn, int totalQueries, int totalQueued, int b
 #define TEST_P(q) \
 	if( (result = testPipelined(conn,totalQueries,totalQueued,q)) != 0 ) \
 		return result;
-	TEST_P("INSERT INTO test() VALUES ()");
+	TEST_P("INSERT INTO test(id) VALUES (DEFAULT)");
 	TEST_P("SELECT * FROM test LIMIT 1");
 	TEST_P("SELECT * FROM test");
 	TEST_P("DELETE FROM test");
@@ -137,7 +137,7 @@ main(int argc, char **argv)
 
 	PQsetnonblocking(conn,1);
 	
-	PQexec(conn,"CREATE TABLE test ( id PRIMARY KEY AUTOINCREMENT )");
+	PQclear(PQexec(conn,"CREATE TABLE test ( id SERIAL PRIMARY KEY )"));
 
 	baseline = testPipelinedSeries(conn,10,1,0);
 	testPipelinedSeries(conn,10,3,baseline);
@@ -150,7 +150,7 @@ main(int argc, char **argv)
 	testPipelinedSeries(conn,100,50,baseline);
 	testPipelinedSeries(conn,100,100,baseline);
 	
-	PQexec(conn,"DROP TABLE test");
+	PQclear(PQexec(conn,"DROP TABLE test"));
 	
 	return 0;
 }
\ No newline at end of file
/*
 * src/test/examples/testlibpqpipeline.c
 *
 *
 * testlibpqpipeline.c
 *		this test program test query pipelining and it's performance impact
 *
 *
 */
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>

#include "libpq-fe.h"

// If defined we won't issue more sql commands if the socket's
// write buffer is full
//#define MIN_LOCAL_Q

//#define PRINT_QUERY_PROGRESS

static int testPipelined( PGconn * conn, int totalQueries, int totalQueued, const char * sql );
static int testPipelinedSeries( PGconn * conn, int totalQueries, int totalQueued, int baseline_usecs );


int
testPipelined( PGconn * conn, int totalQueries, int totalQueued, const char * sql )
{
	int nQueriesQueued;
	int nQueriesTotal;
	PGresult * result;
	PGquery * firstQuery;
	PGquery * curQuery;
	
	nQueriesQueued = nQueriesTotal = 0;
	result = NULL;
	firstQuery = curQuery = NULL;
	
	while( nQueriesQueued > 0 || nQueriesTotal < totalQueries ) {
		
		if( PQconsumeInput(conn) == 0 ) {
			printf( "PQconsumeInput ERROR: %s\n", PQerrorMessage(conn) );
			return 1;
		}
		
		do {
			curQuery = PQgetFirstQuery(conn);
			
			/* firstQuery is finished */
			if( firstQuery != curQuery )
			{
				//printf( "%p done, curQuery=%p\n", firstQuery, curQuery );
#ifdef PRINT_QUERY_PROGRESS
				printf("-");
#endif
				firstQuery = curQuery;
				nQueriesQueued--;
			}
			
			/* Break if no queries are ready */
			if( !firstQuery || PQisBusy(conn) )
				break;
			
			if( (result = PQgetResult(conn)) != 0 )
				PQclear(result);
		}
		while(1);
		
		if( nQueriesTotal < totalQueries && nQueriesQueued < totalQueued ) {
#ifdef MIN_LOCAL_Q
			int flushResult = PQflush(conn);
			 if( flushResult == -1 ) {
				printf( "PQflush ERROR: %s\n", PQerrorMessage(conn) );
				return 1;
			} else if ( flushResult == 1 )
				continue;
#endif
			PQsendQuery(conn,sql);
			if( firstQuery == NULL )
				firstQuery = PQgetFirstQuery(conn);
			nQueriesTotal++;
			nQueriesQueued++;
#ifdef PRINT_QUERY_PROGRESS
			printf( "+" );
#endif
		}
	}
#ifdef PRINT_QUERY_PROGRESS
	printf( "\n" );
#endif
	return 0;
}

int testPipelinedSeries( PGconn * conn, int totalQueries, int totalQueued, int baseline_usecs )
{
	int result;
	struct timeval tv1, tv2;
	int secs, usecs;
	
	gettimeofday(&tv1,NULL);
#define TEST_P(q) \
	if( (result = testPipelined(conn,totalQueries,totalQueued,q)) != 0 ) \
		return result;
	TEST_P("INSERT INTO test(id) VALUES (DEFAULT)");
	TEST_P("SELECT * FROM test LIMIT 1");
	TEST_P("SELECT * FROM test");
	TEST_P("DELETE FROM test");
	gettimeofday(&tv2,NULL);
	secs = tv2.tv_sec - tv1.tv_sec;
	usecs = secs * 1000000 + tv2.tv_usec - tv1.tv_usec;
	printf("testPipelinedSeries(%i,%i) took %i.%06i",totalQueries,totalQueued,secs,usecs);
	if (baseline_usecs == 0)
		printf("\n");
	else
		printf(", speedup %.2f\n", (double)baseline_usecs / usecs );
	return usecs;
}

int
main(int argc, char **argv)
{
	PGconn * conn;
	int baseline;
	
	conn = NULL;
	
	/* make a connection to the database */
	conn = PQsetdb(NULL, NULL, NULL, NULL, NULL);

	/* check to see that the backend connection was successfully made */
	if (PQstatus(conn) != CONNECTION_OK)
	{
		fprintf(stderr, "Connection to database failed: %s",
				PQerrorMessage(conn));
		exit(1);
	}

	PQsetnonblocking(conn,1);
	
	PQclear(PQexec(conn,"CREATE TABLE test ( id SERIAL PRIMARY KEY )"));

	baseline = testPipelinedSeries(conn,10,1,0);
	testPipelinedSeries(conn,10,3,baseline);
	testPipelinedSeries(conn,10,10,baseline);
	
	baseline = testPipelinedSeries(conn,100,1,0);
	testPipelinedSeries(conn,100,3,baseline);
	testPipelinedSeries(conn,100,10,baseline);
	testPipelinedSeries(conn,100,25,baseline);
	testPipelinedSeries(conn,100,50,baseline);
	testPipelinedSeries(conn,100,100,baseline);
	
	PQclear(PQexec(conn,"DROP TABLE test"));
	
	return 0;
}
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to