On 2021-Jul-06, Boris Kolpackov wrote: > Alvaro Herrera <alvaro.herr...@2ndquadrant.com> writes: > > > Ah, yes it does. I can reproduce this now. I thought PQconsumeInput > > was sufficient, but it's not: you have to have the PQgetResult in there > > too. Looking ... > > Any progress on fixing this?
Can you please try with this patch? Thanks -- Álvaro Herrera 39°49'30"S 73°17'W — https://www.EnterpriseDB.com/
>From c84b66821249631ba1654b22866ca54c49f238c4 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Tue, 6 Jul 2021 12:46:42 -0400 Subject: [PATCH] fix libpq state machine --- src/interfaces/libpq/fe-exec.c | 46 ++++++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index b13ddab393..1295a417c1 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1223,7 +1223,8 @@ pqAllocCmdQueueEntry(PGconn *conn) /* * pqAppendCmdQueueEntry - * Append a caller-allocated command queue entry to the queue. + * Append a caller-allocated command queue entry to the queue, and update + * conn->asyncStatus as needed to account for it. * * The query itself must already have been put in the output buffer by the * caller. @@ -1239,6 +1240,33 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry) conn->cmd_queue_tail->next = entry; conn->cmd_queue_tail = entry; + + switch (conn->pipelineStatus) + { + case PQ_PIPELINE_OFF: + case PQ_PIPELINE_ON: + /* + * If there's a result ready to be consumed, let it be so (that is, + * don't change away from READY or READY_MORE); otherwise we wait + * for some result to arrive from the server. + */ + if (conn->asyncStatus == PGASYNC_IDLE) + conn->asyncStatus = PGASYNC_BUSY; + break; + + /* + * If in aborted pipeline state, we don't expect anything from the + * server, so we have to let PQgetResult consume from the aborted + * queue right away. + */ + case PQ_PIPELINE_ABORTED: + if (conn->asyncStatus == PGASYNC_IDLE) + { + resetPQExpBuffer(&conn->errorMessage); + pqPipelineProcessQueue(conn); + } + break; + } } /* @@ -1375,7 +1403,6 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery) /* OK, it's launched! */ pqAppendCmdQueueEntry(conn, entry); - conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: @@ -1510,10 +1537,6 @@ PQsendPrepare(PGconn *conn, /* if insufficient memory, query just winds up NULL */ entry->query = strdup(query); - pqAppendCmdQueueEntry(conn, entry); - - conn->asyncStatus = PGASYNC_BUSY; - /* * Give the data a push (in pipeline mode, only if we're past the size * threshold). In nonblock mode, don't complain if we're unable to send @@ -1522,6 +1545,8 @@ PQsendPrepare(PGconn *conn, if (pqPipelineFlush(conn) < 0) goto sendFailed; + pqAppendCmdQueueEntry(conn, entry); + return 1; sendFailed: @@ -1815,7 +1840,7 @@ PQsendQueryGuts(PGconn *conn, /* OK, it's launched! */ pqAppendCmdQueueEntry(conn, entry); - conn->asyncStatus = PGASYNC_BUSY; + return 1; sendFailed: @@ -2445,7 +2470,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) /* OK, it's launched! */ pqAppendCmdQueueEntry(conn, entry); - conn->asyncStatus = PGASYNC_BUSY; + return 1; sendFailed: @@ -3072,15 +3097,14 @@ PQpipelineSync(PGconn *conn) pqPutMsgEnd(conn) < 0) goto sendFailed; - pqAppendCmdQueueEntry(conn, entry); - /* * Give the data a push. In nonblock mode, don't complain if we're unable * to send it all; PQgetResult() will do any additional flushing needed. */ if (PQflush(conn) < 0) goto sendFailed; - conn->asyncStatus = PGASYNC_BUSY; + + pqAppendCmdQueueEntry(conn, entry); return 1; -- 2.20.1