So I wrote some more test scenarios for this, and as I wrote in some other thread, I realized that there are more problems than just some NOTICE trouble. For instance, if you send a query, then read the result but not the terminating NULL then send another query, everything gets confused; the next thing you'll read is the result for the second query, without having read the NULL that terminates the results of the first query. Any application that expects the usual flow of results will be confused. Kyotaro's patch to add PIPELINE_IDLE fixes this bit too, as far as I can tell.
However, another problem case, not fixed by PIPELINE_IDLE, occurs if you exit pipeline mode after PQsendQuery() and then immediately use PQexec(). The CloseComplete will be received at the wrong time, and a notice is emitted nevertheless. I spent a lot of time trying to understand how to fix this last bit, and the solution I came up with is that PQsendQuery() must add a second entry to the command queue after the PGQUERY_EXTENDED one, to match the CloseComplete message being expected; with that entry in the queue, PQgetResult will really only get to IDLE mode after the Close has been seen, which is what we want. I named it PGQUERY_CLOSE. Sadly, some hacks are needed to make this work fully: 1. the client is never expecting that PQgetResult() would return anything for the CloseComplete, so something needs to consume the CloseComplete silently (including the queue entry for it) when it is received; I chose to do this directly in pqParseInput3. I tried to make PQgetResult itself do it, but it became a pile of hacks until I was no longer sure what was going on. Putting it in fe-protocol3.c ends up a lot cleaner. However, we still need PQgetResult to invoke parseInput() at the point where Close is expected. 2. if an error occurs while executing the query, the CloseComplete will of course never arrive, so we need to erase it from the queue silently if we're returning an error. I toyed with the idea of having parseInput() produce a PGresult that encodes the Close message, and have PQgetResult consume and discard that, then read some further message to have something to return. But it seemed inefficient and equally ugly and I'm not sure that flow control is any simpler. I think another possibility would be to make PQexitPipelineMode responsible for /something/, but I'm not sure what that would be. Checking the queue and seeing if the next message is CloseComplete, then eating that message before exiting pipeline mode? That seems way too magical. I didn't attempt this. I attach a patch series that implements the proposed fix (again for REL_14_STABLE) in steps, to make it easy to read. I intend to squash the whole lot into a single commit before pushing. But while writing this email it occurred to me that I need to add at least one more test, to receive a WARNING while waiting for CloseComplete. AFAICT it should work, but better make sure. I produced pipeline_idle.trace file by running the test in the fully fixed tree, then used it to verify that all tests fail in different ways when run without the fixes. The first fix with PIPELINE_IDLE fixes some of these failures, and the PGQUERY_CLOSE one fixes the remaining one. Reading the trace file, it looks correct to me. -- Álvaro Herrera Breisgau, Deutschland — https://www.EnterpriseDB.com/ "Doing what he did amounts to sticking his fingers under the hood of the implementation; if he gets his fingers burnt, it's his problem." (Tom Lane)
>From 64fc6f56f88cf3d5e6c3eaada32887939ad3b49f Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Wed, 15 Jun 2022 19:42:23 +0200 Subject: [PATCH v7 1/6] Use Test::Differences if available --- src/test/modules/libpq_pipeline/README | 3 +++ .../modules/libpq_pipeline/t/001_libpq_pipeline.pl | 13 ++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/test/modules/libpq_pipeline/README b/src/test/modules/libpq_pipeline/README index d8174dd579..59c6ea8109 100644 --- a/src/test/modules/libpq_pipeline/README +++ b/src/test/modules/libpq_pipeline/README @@ -1 +1,4 @@ Test programs and libraries for libpq + +If you have Test::Differences installed, any differences in the trace files +are displayed in a format that's easier to read than the standard format. diff --git a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl index d8d496c995..49eec8a63a 100644 --- a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl +++ b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl @@ -9,6 +9,17 @@ use PostgresNode; use TestLib; use Test::More; +# Use Test::Differences if installed, and select unified diff output. +# No decent way to select a context line count with this; +# we could use a sub ref to allow that. +BEGIN +{ + if (!eval q{ use Test::Differences; unified_diff(); 1 }) + { + *eq_or_diff = \&is; + } +} + my $node = get_new_node('main'); $node->init; $node->start; @@ -54,7 +65,7 @@ for my $testname (@tests) $result = slurp_file_eval($traceout); next unless $result ne ""; - is($result, $expected, "$testname trace match"); + eq_or_diff($result, $expected, "$testname trace match"); } } -- 2.30.2
>From e0b67f7b20938ebc9d37ef0b1c34af1e536d978c Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Tue, 21 Jun 2022 19:55:12 +0200 Subject: [PATCH v7 2/6] Allow tracefile to be stdout --- src/test/modules/libpq_pipeline/libpq_pipeline.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index c27c4e0ada..8f6f2d4b4b 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -1630,7 +1630,10 @@ main(int argc, char **argv) /* Set the trace file, if requested */ if (tracefile != NULL) { - trace = fopen(tracefile, "w"); + if (strcmp(tracefile, "-") == 0) + trace = stdout; + else + trace = fopen(tracefile, "w"); if (trace == NULL) pg_fatal("could not open file \"%s\": %m", tracefile); -- 2.30.2
>From 435c04917e5081375b25f9f1ea638a3c9c5b1996 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Tue, 21 Jun 2022 19:35:26 +0200 Subject: [PATCH v7 3/6] if the queue becomes empty, reset the tail too --- src/interfaces/libpq/fe-exec.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 4180683194..ed26bab033 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -2988,6 +2988,10 @@ pqCommandQueueAdvance(PGconn *conn) prevquery = conn->cmd_queue_head; conn->cmd_queue_head = conn->cmd_queue_head->next; + /* If the queue is now empty, reset the tail too */ + if (conn->cmd_queue_head == NULL) + conn->cmd_queue_tail = NULL; + /* and make it recyclable */ prevquery->next = NULL; pqRecycleCmdQueueEntry(conn, prevquery); -- 2.30.2
>From 25943c6a5e0df502839851e7f525f37a73182ac6 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Fri, 24 Jun 2022 17:43:16 +0200 Subject: [PATCH v7 4/6] add some tests, maybe wrong --- .../modules/libpq_pipeline/libpq_pipeline.c | 197 ++++++++++++++++++ .../libpq_pipeline/traces/pipeline_idle.trace | 38 ++++ 2 files changed, 235 insertions(+) create mode 100644 src/test/modules/libpq_pipeline/traces/pipeline_idle.trace diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index 8f6f2d4b4b..a3ac10ae4a 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -968,6 +968,198 @@ test_prepared(PGconn *conn) fprintf(stderr, "ok\n"); } +/* Notice processor: print notices, and count how many we got */ +static void +notice_processor(void *arg, const char *message) +{ + int *n_notices = (int *) arg; + + (*n_notices)++; + fprintf(stderr, "NOTICE %d: %s", *n_notices, message); +} + +/* Verify behavior in "idle" state */ +static void +test_pipeline_idle(PGconn *conn) +{ + PGresult *res; + int n_notices = 0; + + fprintf(stderr, "\npipeline idle...\n"); + + PQsetNoticeProcessor(conn, notice_processor, &n_notices); + + /* + * Cause a Close message to be sent to the server, and watch libpq's + * reaction to the resulting CloseComplete. libpq must not get in IDLE + * state until that has been received. + */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + + if (PQsendQuery(conn, "SELECT 1") != 1) + pg_fatal("failed to send query: %s", PQerrorMessage(conn)); + PQsendFlushRequest(conn); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("Unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + PQclear(res); + + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("expected NULL result"); + + if (PQpipelineSync(conn) != 1) + pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn)); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_PIPELINE_SYNC) + pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + PQclear(res); + res = NULL; + + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s", + PQerrorMessage(conn)); + + /* + * Must not have got any notices here; note bug as described in + * https://postgr.es/m/ca+mi_8bvd0_cw3sumgwpvwdnzxy32itog_16tdyru_1s2gv...@mail.gmail.com + */ + if (n_notices > 0) + pg_fatal("got %d notice(s)", n_notices); + fprintf(stderr, "ok - 1\n"); + + /* + * Verify that we can send a query using simple query protocol after one + * in pipeline mode. + */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + if (PQsendQuery(conn, "SELECT 1") != 1) + pg_fatal("failed to send query: %s", PQerrorMessage(conn)); + PQsendFlushRequest(conn); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + fprintf(stderr, "going to expect null-terminating result\n"); + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("got unexpected non-null result"); + fprintf(stderr, "received null-terminating result, exiting pipeline mode\n"); + /* We can exit pipeline mode now */ + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s", + PQerrorMessage(conn)); + res = PQexec(conn, "SELECT 2"); + if (n_notices > 0) + pg_fatal("got %d notice(s)", n_notices); + if (res == NULL) + pg_fatal("PQexec returned NULL"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("unexpected result code %s from non-pipeline query", + PQresStatus(PQresultStatus(res))); + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("did not receive terminating NULL"); + if (n_notices > 0) + pg_fatal("got %d notice(s)", n_notices); + fprintf(stderr, "ok - 2\n"); + + /* + * Case 2: exiting pipeline mode is not OK if a second command is sent. + */ + + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + if (PQsendQuery(conn, "SELECT 1") != 1) + pg_fatal("failed to send query: %s", PQerrorMessage(conn)); + PQsendFlushRequest(conn); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + if (PQsendQuery(conn, "SELECT 2") != 1) + pg_fatal("failed to send query: %s", PQerrorMessage(conn)); + PQsendFlushRequest(conn); + /* read terminating null from first query */ + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("did not receive terminating NULL"); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("did not receive terminating NULL"); + if (PQexitPipelineMode(conn) != 1) + pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s", + PQerrorMessage(conn)); + + /* Try to exit pipeline mode in pipeline-idle state */ + if (PQenterPipelineMode(conn) != 1) + pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn)); + if (PQsendQuery(conn, "SELECT 1") != 1) + pg_fatal("failed to send query: %s", PQerrorMessage(conn)); + PQsendFlushRequest(conn); + res = PQgetResult(conn); + if (res == NULL) + pg_fatal("PQgetResult returned null when there's a pipeline item: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("unexpected result code %s from first pipeline item", + PQresStatus(PQresultStatus(res))); + PQclear(res); + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("did not receive terminating NULL"); + if (PQsendQuery(conn, "SELECT 2") != 1) + pg_fatal("failed to send query: %s", PQerrorMessage(conn)); + if (PQexitPipelineMode(conn) == 1) + pg_fatal("exiting pipeline succeeded when it shouldn't"); + if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode", + strlen("cannot exit pipeline mode")) != 0) + pg_fatal("did not get expected error; got: %s", + PQerrorMessage(conn)); + PQsendFlushRequest(conn); + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("unexpected result code %s from second pipeline item", + PQresStatus(PQresultStatus(res))); + PQclear(res); + if (PQexitPipelineMode(conn) == 1) + pg_fatal("exiting pipeline succeeded when it shouldn't"); + res = PQgetResult(conn); + if (res != NULL) + pg_fatal("did not receive terminating NULL"); + if (PQexitPipelineMode(conn) != 1) + pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn)); + + if (n_notices > 0) + pg_fatal("got %d notice(s)", n_notices); + fprintf(stderr, "ok - 3\n"); +} + + + static void test_simple_pipeline(PGconn *conn) { @@ -1160,6 +1352,8 @@ test_singlerowmode(PGconn *conn) if (PQexitPipelineMode(conn) != 1) pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn)); + + fprintf(stderr, "ok\n"); } /* @@ -1549,6 +1743,7 @@ print_test_list(void) printf("multi_pipelines\n"); printf("nosync\n"); printf("pipeline_abort\n"); + printf("pipeline_idle\n"); printf("pipelined_insert\n"); printf("prepared\n"); printf("simple_pipeline\n"); @@ -1653,6 +1848,8 @@ main(int argc, char **argv) test_nosync(conn); else if (strcmp(testname, "pipeline_abort") == 0) test_pipeline_abort(conn); + else if (strcmp(testname, "pipeline_idle") == 0) + test_pipeline_idle(conn); else if (strcmp(testname, "pipelined_insert") == 0) test_pipelined_insert(conn, numrows); else if (strcmp(testname, "prepared") == 0) diff --git a/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace b/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace new file mode 100644 index 0000000000..7d07c296f7 --- /dev/null +++ b/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace @@ -0,0 +1,38 @@ +F 16 Parse "" "SELECT 1" 0 +F 12 Bind "" "" 0 0 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 6 Close P "" +F 4 Flush +B 4 ParseComplete +B 4 BindComplete +B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 +B 11 DataRow 1 1 '1' +B 13 CommandComplete "SELECT 1" +F 4 Sync +B 4 CloseComplete +B 5 ReadyForQuery I +F 16 Parse "" "SELECT 1" 0 +F 12 Bind "" "" 0 0 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 6 Close P "" +F 4 Flush +B 4 ParseComplete +B 4 BindComplete +B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 +B 11 DataRow 1 1 '1' +B 13 CommandComplete "SELECT 1" +F 16 Parse "" "SELECT 2" 0 +F 12 Bind "" "" 0 0 0 +F 6 Describe P "" +F 9 Execute "" 0 +F 6 Close P "" +F 4 Flush +B 4 CloseComplete +B 4 ParseComplete +B 4 BindComplete +B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0 +B 11 DataRow 1 1 '2' +B 13 CommandComplete "SELECT 1" +F 4 Terminate -- 2.30.2
>From 0630f5d00e21e91c0457b6442c6fa23827241d86 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Fri, 24 Jun 2022 18:13:58 +0200 Subject: [PATCH v7 5/6] add PIPELINE_IDLE state --- src/interfaces/libpq/fe-connect.c | 1 + src/interfaces/libpq/fe-exec.c | 58 ++++++++++++++++++++--------- src/interfaces/libpq/fe-protocol3.c | 12 ------ src/interfaces/libpq/libpq-int.h | 3 +- 4 files changed, 44 insertions(+), 30 deletions(-) diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 709ba15220..afd0bc809a 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -6751,6 +6751,7 @@ PQtransactionStatus(const PGconn *conn) { if (!conn || conn->status != CONNECTION_OK) return PQTRANS_UNKNOWN; + /* XXX what should we do here if status is PGASYNC_PIPELINE_IDLE? */ if (conn->asyncStatus != PGASYNC_IDLE) return PQTRANS_ACTIVE; return conn->xactStatus; diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index ed26bab033..7cb803de94 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1279,7 +1279,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry) * itself consume commands from the queue; if we're in any other * state, we don't have to do anything. */ - if (conn->asyncStatus == PGASYNC_IDLE) + if (conn->asyncStatus == PGASYNC_IDLE || + conn->asyncStatus == PGASYNC_PIPELINE_IDLE) { resetPQExpBuffer(&conn->errorMessage); pqPipelineProcessQueue(conn); @@ -1667,11 +1668,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery) switch (conn->asyncStatus) { case PGASYNC_IDLE: + case PGASYNC_PIPELINE_IDLE: case PGASYNC_READY: case PGASYNC_READY_MORE: case PGASYNC_BUSY: /* ok to queue */ break; + case PGASYNC_COPY_IN: case PGASYNC_COPY_OUT: case PGASYNC_COPY_BOTH: @@ -2047,19 +2050,22 @@ PQgetResult(PGconn *conn) { case PGASYNC_IDLE: res = NULL; /* query is complete */ - if (conn->pipelineStatus != PQ_PIPELINE_OFF) - { - /* - * We're about to return the NULL that terminates the round of - * results from the current query; prepare to send the results - * of the next query when we're called next. Also, since this - * is the start of the results of the next query, clear any - * prior error message. - */ - resetPQExpBuffer(&conn->errorMessage); - pqPipelineProcessQueue(conn); - } break; + case PGASYNC_PIPELINE_IDLE: + Assert(conn->pipelineStatus != PQ_PIPELINE_OFF); + + /* + * We're about to return the NULL that terminates the round of + * results from the current query; prepare to send the results + * of the next query, if any, when we're called next. If there's + * no next element in the command queue, this gets us in IDLE + * state. + */ + resetPQExpBuffer(&conn->errorMessage); + pqPipelineProcessQueue(conn); + res = NULL; /* query is complete */ + break; + case PGASYNC_READY: /* @@ -2080,7 +2086,7 @@ PQgetResult(PGconn *conn) * We're about to send the results of the current query. Set * us idle now, and ... */ - conn->asyncStatus = PGASYNC_IDLE; + conn->asyncStatus = PGASYNC_PIPELINE_IDLE; /* * ... in cases when we're sending a pipeline-sync result, @@ -2939,6 +2945,7 @@ PQexitPipelineMode(PGconn *conn) { case PGASYNC_READY: case PGASYNC_READY_MORE: + case PGASYNC_PIPELINE_IDLE: /* there are some uncollected results */ appendPQExpBufferStr(&conn->errorMessage, libpq_gettext("cannot exit pipeline mode with uncollected results\n")); @@ -3014,15 +3021,31 @@ pqPipelineProcessQueue(PGconn *conn) case PGASYNC_BUSY: /* client still has to process current query or results */ return; + case PGASYNC_IDLE: + /* + * When in IDLE mode, there are no further commands to process, + * and no further action to take on the queue, since it must + * be empty. + */ + Assert(conn->cmd_queue_head == NULL); + return; + + case PGASYNC_PIPELINE_IDLE: + Assert(conn->pipelineStatus != PQ_PIPELINE_OFF); /* next query please */ break; } - /* Nothing to do if not in pipeline mode, or queue is empty */ - if (conn->pipelineStatus == PQ_PIPELINE_OFF || - conn->cmd_queue_head == NULL) + /* + * If there are no further commands to process in the queue, get us in + * "real idle" mode now. + */ + if (conn->cmd_queue_head == NULL) + { + conn->asyncStatus = PGASYNC_IDLE; return; + } /* Initialize async result-accumulation state */ pqClearAsyncResult(conn); @@ -3109,6 +3132,7 @@ PQpipelineSync(PGconn *conn) case PGASYNC_READY_MORE: case PGASYNC_BUSY: case PGASYNC_IDLE: + case PGASYNC_PIPELINE_IDLE: /* OK to send sync */ break; } diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 9ab3bf1fcb..bab8926a63 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn) if (conn->asyncStatus != PGASYNC_IDLE) return; - /* - * We're also notionally not-IDLE when in pipeline mode the state - * says "idle" (so we have completed receiving the results of one - * query from the server and dispatched them to the application) - * but another query is queued; yield back control to caller so - * that they can initiate processing of the next query in the - * queue. - */ - if (conn->pipelineStatus != PQ_PIPELINE_OFF && - conn->cmd_queue_head != NULL) - return; - /* * Unexpected message in IDLE state; need to recover somehow. * ERROR messages are handled using the notice processor; diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 334aea4b6e..e40a657f55 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -224,7 +224,8 @@ typedef enum * query */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */ - PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */ + PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */ + PGASYNC_PIPELINE_IDLE, /* "Idle" between commands in pipeline mode */ } PGAsyncStatusType; /* Target server type (decoded value of target_session_attrs) */ -- 2.30.2
>From f50dca0ee1f0a71111b583c7e3220997ad950ea2 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Sun, 26 Jun 2022 19:53:35 +0200 Subject: [PATCH v7 6/6] Add the CLOSE message fix --- src/interfaces/libpq/fe-exec.c | 37 +++++++++++++++++++++++++++++ src/interfaces/libpq/fe-protocol3.c | 18 +++++++++++++- src/interfaces/libpq/libpq-int.h | 3 ++- 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 7cb803de94..b1a3378b8f 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1339,6 +1339,7 @@ static int PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery) { PGcmdQueueEntry *entry = NULL; + PGcmdQueueEntry *entry2 = NULL; if (!PQsendQueryStart(conn, newQuery)) return 0; @@ -1354,6 +1355,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery) entry = pqAllocCmdQueueEntry(conn); if (entry == NULL) return 0; /* error msg already set */ + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + entry2 = pqAllocCmdQueueEntry(conn); + if (entry2 == NULL) + goto sendFailed; + } /* Send the query message(s) */ if (conn->pipelineStatus == PQ_PIPELINE_OFF) @@ -1423,6 +1430,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery) /* OK, it's launched! */ pqAppendCmdQueueEntry(conn, entry); + + /* + * When pipeline mode is in use, we need a second entry in the command + * queue to represent Close Portal message. This allows us later to wait + * for the CloseComplete message to be received before getting in IDLE + * state. + */ + if (conn->pipelineStatus != PQ_PIPELINE_OFF) + { + entry2->queryclass = PGQUERY_CLOSE; + entry2->query = NULL; + pqAppendCmdQueueEntry(conn, entry2); + } + return 1; sendFailed: @@ -2130,6 +2151,22 @@ PQgetResult(PGconn *conn) break; } + /* If the next command we expect is CLOSE, read and consume it */ + if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE && + conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_CLOSE) + { + if (res && res->resultStatus != PGRES_FATAL_ERROR) + { + conn->asyncStatus = PGASYNC_BUSY; + parseInput(conn); + conn->asyncStatus = PGASYNC_PIPELINE_IDLE; + } + else + /* we won't ever see the Close */ + pqCommandQueueAdvance(conn); + } + if (res) { int i; diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index bab8926a63..c33f904db4 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -284,8 +284,24 @@ pqParseInput3(PGconn *conn) } break; case '2': /* Bind Complete */ + /* Nothing to do for this message type */ + break; case '3': /* Close Complete */ - /* Nothing to do for these message types */ + /* + * If we get CloseComplete when waiting for it, consume + * the queue element and keep going. A result is not + * expected from this message; it is just there so that + * we know to wait for it when PQsendQuery is used in + * pipeline mode, before going in IDLE state. Failing to + * do this makes us receive CloseComplete when IDLE, which + * creates problems. + */ + if (conn->cmd_queue_head && + conn->cmd_queue_head->queryclass == PGQUERY_CLOSE) + { + pqCommandQueueAdvance(conn); + } + break; case 'S': /* parameter status */ if (getParameterStatus(conn)) diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index e40a657f55..df2f17721c 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -311,7 +311,8 @@ typedef enum PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */ PGQUERY_PREPARE, /* Parse only (PQprepare) */ PGQUERY_DESCRIBE, /* Describe Statement or Portal */ - PGQUERY_SYNC /* Sync (at end of a pipeline) */ + PGQUERY_SYNC, /* Sync (at end of a pipeline) */ + PGQUERY_CLOSE } PGQueryClass; /* -- 2.30.2