On 2021-Jul-06, Boris Kolpackov wrote:

> Alvaro Herrera <alvaro.herr...@2ndquadrant.com> writes:
> 
> > Ah, yes it does.  I can reproduce this now.  I thought PQconsumeInput
> > was sufficient, but it's not: you have to have the PQgetResult in there
> > too.  Looking ...
> 
> Any progress on fixing this?

Can you please try with this patch?

Thanks

-- 
Álvaro Herrera           39°49'30"S 73°17'W  —  https://www.EnterpriseDB.com/
>From c84b66821249631ba1654b22866ca54c49f238c4 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Tue, 6 Jul 2021 12:46:42 -0400
Subject: [PATCH] fix libpq state machine

---
 src/interfaces/libpq/fe-exec.c | 46 ++++++++++++++++++++++++++--------
 1 file changed, 35 insertions(+), 11 deletions(-)

diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b13ddab393..1295a417c1 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1223,7 +1223,8 @@ pqAllocCmdQueueEntry(PGconn *conn)
 
 /*
  * pqAppendCmdQueueEntry
- *		Append a caller-allocated command queue entry to the queue.
+ *		Append a caller-allocated command queue entry to the queue, and update
+ *		conn->asyncStatus as needed to account for it.
  *
  * The query itself must already have been put in the output buffer by the
  * caller.
@@ -1239,6 +1240,33 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
 		conn->cmd_queue_tail->next = entry;
 
 	conn->cmd_queue_tail = entry;
+
+	switch (conn->pipelineStatus)
+	{
+		case PQ_PIPELINE_OFF:
+		case PQ_PIPELINE_ON:
+			/*
+			 * If there's a result ready to be consumed, let it be so (that is,
+			 * don't change away from READY or READY_MORE); otherwise we wait
+			 * for some result to arrive from the server.
+			 */
+			if (conn->asyncStatus == PGASYNC_IDLE)
+				conn->asyncStatus = PGASYNC_BUSY;
+			break;
+
+			/*
+			 * If in aborted pipeline state, we don't expect anything from the
+			 * server, so we have to let PQgetResult consume from the aborted
+			 * queue right away.
+			 */
+		case PQ_PIPELINE_ABORTED:
+			if (conn->asyncStatus == PGASYNC_IDLE)
+			{
+				resetPQExpBuffer(&conn->errorMessage);
+				pqPipelineProcessQueue(conn);
+			}
+			break;
+	}
 }
 
 /*
@@ -1375,7 +1403,6 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
-	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
@@ -1510,10 +1537,6 @@ PQsendPrepare(PGconn *conn,
 	/* if insufficient memory, query just winds up NULL */
 	entry->query = strdup(query);
 
-	pqAppendCmdQueueEntry(conn, entry);
-
-	conn->asyncStatus = PGASYNC_BUSY;
-
 	/*
 	 * Give the data a push (in pipeline mode, only if we're past the size
 	 * threshold).  In nonblock mode, don't complain if we're unable to send
@@ -1522,6 +1545,8 @@ PQsendPrepare(PGconn *conn,
 	if (pqPipelineFlush(conn) < 0)
 		goto sendFailed;
 
+	pqAppendCmdQueueEntry(conn, entry);
+
 	return 1;
 
 sendFailed:
@@ -1815,7 +1840,7 @@ PQsendQueryGuts(PGconn *conn,
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
-	conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 
 sendFailed:
@@ -2445,7 +2470,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
-	conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 
 sendFailed:
@@ -3072,15 +3097,14 @@ PQpipelineSync(PGconn *conn)
 		pqPutMsgEnd(conn) < 0)
 		goto sendFailed;
 
-	pqAppendCmdQueueEntry(conn, entry);
-
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
 	 */
 	if (PQflush(conn) < 0)
 		goto sendFailed;
-	conn->asyncStatus = PGASYNC_BUSY;
+
+	pqAppendCmdQueueEntry(conn, entry);
 
 	return 1;
 
-- 
2.20.1

Reply via email to