Pipeline mode and PQpipelineSync()
I am trying to add bulk operation support to ODB (a C++ ORM) using the new pipeline mode added to libpq in PostgreSQL 14. However, things don't seem to be working according to the documentation (or perhaps I am misunderstanding something). Specifically, the documentation[1] makes it sound like the use of PQpipelineSync() is optional (34.5.1.1 "Issuing Queries"): "After entering pipeline mode, the application dispatches requests using PQsendQuery, PQsendQueryParams, or its prepared-query sibling PQsendQueryPrepared. These requests are queued on the client-side until flushed to the server; this occurs when PQpipelineSync is used to establish a synchronization point in the pipeline, or when PQflush is called. [...] The server executes statements, and returns results, in the order the client sends them. The server will begin executing the commands in the pipeline immediately, not waiting for the end of the pipeline. [...]" Based on this I expect to be able to queue a single prepared INSERT statement with PQsendQueryPrepared() and then call PQflush() and PQconsumeInput() to send/receive the data. This, however, does not work: the client gets blocked because there is no data to read. Here is the call sequence: select() # socket is writable PQsendQueryPrepared() # success PQflush() # returns 0 (queue is now empty) select() # blocked here indefinitely In contrast, if I add the PQpipelineSync() call after PQsendQueryPrepared(), then everything starts functioning as expected: select() # socket is writable PQsendQueryPrepared() # success PQpipelineSync() # success PQflush() # returns 0 (queue is now empty) select() # socket is readable PQconsumeInput() # success PQgetResult() # INSERT result PQgetResult() # NULL PQgetResult() # PGRES_PIPELINE_SYNC So to me it looks like, contrary to the documentation, the server does not start executing the statements immediately, instead waiting for the synchronization point. Or am I missing something here? The above tests were performed using libpq from 14beta1 running against PostgreSQL server version 9.5. If you would like to take a look at the actual code, you can find it here[2] (the PIPELINE_SYNC macro controls whether PQpipelineSync() is used). On a related note, I've been using libpq_pipeline.c[3] as a reference and I believe it has a busy loop calling PQflush() repeatedly on line 721 since once everything has been sent and we are waiting for the result, select() will keep returning with an indication that the socket is writable (you can find one way to fix this in [2]). [1] https://www.postgresql.org/docs/14/libpq-pipeline-mode.html [2] https://git.codesynthesis.com/cgit/odb/libodb-pgsql/tree/odb/pgsql/statement.cxx?h=bulk#n771 [3] https://doxygen.postgresql.org/libpq__pipeline_8c_source.html
Add version macro to libpq-fe.h
I am making use of the new pipeline mode added to libpq in PostgreSQL 14. At the same time I would still like to support older libpq versions by not providing the extended functionality that depends on this mode. The natural way to achieve this in C/C++ is to conditionally enable code that depends on the additional APIs based on the preprocessor macro. And I could easily do this if libpq-fe.h provided a macro containing its version. Now, such a macro (PG_VERSION_NUM) is provided by pg_config.h that normally accompanies libpq-fe.h. However, I don't believe the presence of this file is guaranteed. All the documentation says[1] about headers is this: "Client programs that use libpq must include the header file libpq-fe.h and must link with the libpq library." And there are good reasons why packagers of libpq may decide to omit this header (in a nutshell, it embeds target architecture- specific information, see this discussion for background[2]). And I may not want to include it in my code (it defines a lot of free- named macros that may clash with my names). So I am wondering if it would make sense to provide a better way to obtain the libpq version as a macro? To me, as a user, the simplest way would be to have such a macro defined by libpq-fe.h. This would also provide a reasonable fallback for previous versions: if this macro is not defined, I know I am dealing with version prior to 14 and if I need to know which exactly I can try to include pg_config.h (perhaps with the help of __has_include if I am using C++). If simply moving this macro to libpq-fe.h is not desirable (for example, because it is auto-generated), then perhaps we could move this (and a few other version-related macros[3]) to a separate header (for example, libpq-version.h) and either include it from libpq-fe.h or define a macro in libpq-fe.h that signals its presence (e.g., PG_HAS_VERSION or some such). What do you think? [1] https://www.postgresql.org/docs/9.3/libpq.html [2] https://bugzilla.redhat.com/show_bug.cgi?id=828467 [3] PG_MAJORVERSION PG_MAJORVERSION_NUM PG_MINORVERSION_NUM PG_VERSION PG_VERSION_NUM PG_VERSION_STR (this one includes target so maybe leave it in pg_config.h)
Re: Add version macro to libpq-fe.h
Tom Lane writes: > I think putting a version number as such in there is a truly > horrid idea. However, I could get behind adding a boolean flag > that says specifically whether the pipeline feature exists. > Then you'd do something like > > #ifdef LIBPQ_HAS_PIPELINING > > rather than embedding knowledge of exactly which release > added that. That would be even better, but I agree with what others have said: we would have to keep adding such feature test macros going forward. I think ideally you would want to have both since the version macro could still be helpful in dealing with "features" that you did not plan to add (aka bugs). > Comparing v13 and v14 libpq-fe.h, I see that there is a solution > available now: "#ifdef PQ_QUERY_PARAM_MAX_LIMIT". Hm, it must have been added recently since I don't see it in 14beta1. But thanks for the pointer, if nothing better comes up this will have to do.
Re: Pipeline mode and PQpipelineSync()
Alvaro Herrera writes: > I think I should rephrase this to say that PQpipelineSync() is needed > where the user needs the server to start executing commands; and > separately indicate that it is possible (but not promised) that the > server would start executing commands ahead of time because $reasons. I think always requiring PQpipelineSync() is fine since it also serves as an error recovery boundary. But the fact that the server waits until the sync message to start executing the pipeline is surprising. To me this seems to go contrary to the idea of a "pipeline". In fact, I see the following ways the server could behave: 1. The server starts executing queries and sending their results before receiving the sync message. 2. The server starts executing queries before receiving the sync message but buffers the results until it receives the sync message. 3. The server buffers the queries and only starts executing them and sending the results after receiving the sync message. My observations suggest that the server behaves as (3) but it could also be (2). While it can be tempting to say that this is an implementation detail, this affects the way one writes a client. For example, I currently have the following comment in my code: // Send queries until we get blocked. This feels like a better // overall strategy to keep the server busy compared to sending one // query at a time and then re-checking if there is anything to read // because the results of INSERT/UPDATE/DELETE are presumably small // and quite a few of them can get buffered before the server gets // blocked. This would be a good strategy for behavior (1) but not (3) (where it would make more sense to queue the queries on the client side). So I think it would be useful to clarify the server behavior and specify it in the documentation. > Do I have it right that other than this documentation problem, you've > been able to use pipeline mode successfully? So far I've only tried it in a simple prototype (single INSERT statement). But I am busy plugging it into ODB's bulk operation support (that we already have for Oracle and MSSQL) and once that's done I should be able to exercise things in more meaningful ways.
Re: Pipeline mode and PQpipelineSync()
Alvaro Herrera writes: > > I think always requiring PQpipelineSync() is fine since it also serves > > as an error recovery boundary. But the fact that the server waits until > > the sync message to start executing the pipeline is surprising. To me > > this seems to go contrary to the idea of a "pipeline". > > But does that actually happen? There's a very easy test we can do by > sending queries that sleep. If my libpq program sends a "SELECT > pg_sleep(2)", then PQflush(), then sleep in the client program two more > seconds without sending the sync; and *then* send the sync, I find that > the program takes 2 seconds, not four. This shows that both client and > server slept in parallel, even though I didn't send the Sync until after > the client was done sleeping. Thanks for looking into it. My experiments were with INSERT and I now was able to try things with larger pipelines. I can now see the server starts sending results after ~400 statements. So I think you are right, the server does start executing the pipeline before receiving the sync message, though there is still something strange going on (but probably on the client side): I have a pipeline of say 500 INSERTs. If I "execute" this pipeline by first sending all the statements and then reading the results, then everything works as expected. This is the call sequence I am talking about: PQsendQueryPrepared() # INSERT #1 PQflush() PQsendQueryPrepared() # INSERT #2 PQflush() ... PQsendQueryPrepared() # INSERT #500 PQpipelineSync() PQflush() PQconsumeInput() PQgetResult() # INSERT #1 PQgetResult() # NULL PQgetResult() # INSERT #2 PQgetResult() # NULL ... PQgetResult() # INSERT #500 PQgetResult() # NULL PQgetResult() # PGRES_PIPELINE_SYNC If, however, I execute it by checking for results before sending the next INSERT, I get the following call sequence: PQsendQueryPrepared() # INSERT #1 PQflush() PQsendQueryPrepared() # INSERT #2 PQflush() ... PQsendQueryPrepared() # INSERT #~400 PQflush() PQconsumeInput() # At this point select() indicates we can read. PQgetResult() # NULL (???) PQgetResult() # INSERT #1 PQgetResult() # NULL PQgetResult() # INSERT #2 PQgetResult() # NULL ... What's strange here is that the first PQgetResult() call (marked with ???) returns NULL instead of result for INSERT #1 as in the first call sequence. Interestingly, if I skip it, the rest seems to progress as expected. Any idea what might be going on here? My hunch is that there is an issue with libpq's state machine. In particular, in the second case, PQgetResult() is called before the sync message is sent. Did you have a chance to test such a scenario (i.e., a large pipeline where the first result is processed before the PQpipelineSync() call)? Of course, this could very well be a bug on my side or me misunderstanding something.
Re: Pipeline mode and PQpipelineSync()
Alvaro Herrera writes: > On 2021-Jun-22, Alvaro Herrera wrote: > > > > So I think it would be useful to clarify the server behavior and > > > specify it in the documentation. > > > > I'll see about improving the docs on these points. > > So I started to modify the second paragraph to indicate that the client > would send data on PQflush/buffer full/PQpipelineSync, only to realize > that the first paragraph already explains this. So I'm not sure if any > changes are needed. > > Maybe your complaint is only based on disagreement about what does libpq > do regarding queueing commands; and as far as I can tell in quick > experimentation with libpq, it works as the docs state already. I think one change that is definitely needed is to make it clear that the PQpipelineSync() call is not optional. I would also add a note saying that while the server starts processing the pipeline immediately, it may buffer the results and the only way to flush them out is to call PQpipelineSync().
Re: Pipeline mode and PQpipelineSync()
Boris Kolpackov writes: > What's strange here is that the first PQgetResult() call (marked with ???) > returns NULL instead of result for INSERT #1 as in the first call sequence. I've hit another similar case except now an unexpected NULL result is returned in the middle of PGRES_PIPELINE_ABORTED result sequence. The call sequence is as follows: PQsendQueryPrepared() # INSERT #1 PQflush() PQsendQueryPrepared() # INSERT #2 PQflush() ... PQsendQueryPrepared() # INSERT #251 -- insert duplicate PK PQflush() ... PQsendQueryPrepared() # INSERT #343 PQflush() PQconsumeInput() # At this point select() indicates we can read. PQgetResult() # NULL -- unexpected but skipped (see prev. email) PQgetResult() # INSERT #1 PQgetResult() # NULL PQgetResult() # INSERT #2 PQgetResult() # NULL ... PQgetResult() # INSERT #251 error result, SQLSTATE 23505 PQgetResult() # NULL PQgetResult() # INSERT #252 PGRES_PIPELINE_ABORTED PQgetResult() # NULL PQgetResult() # INSERT #253 PGRES_PIPELINE_ABORTED PQgetResult() # NULL ... PQgetResult() # INSERT #343 NULL (???) Notice that result #343 corresponds to the last PQsendQueryPrepared() call made before the socket became readable (it's not always 343 but around there). For completeness, the statement in question is: INSERT INTO pgsql_bulk_object (id, idata, sdata) VALUES ($1, $2, $3) The table: CREATE TABLE pgsql_bulk_object ( id BIGINT NOT NULL PRIMARY KEY, idata BIGINT NOT NULL, sdata TEXT NOT NULL); And the data inserted is in the form: 1, 1, "1" 2, 2, "2" ...
Re: Pipeline mode and PQpipelineSync()
Alvaro Herrera writes: > Curious -- I just noticed that the server understands a message 'H' that > requests a flush of the server buffer. However, libpq has no way to > generate that message as far as I can see. I think you could use that > to request results from the pipeline, without the sync point. > > I wonder if it's worth adding an entry point to libpq to allow access to > this. Yes, I think this can be useful. For example, an application may wish to receive the result as soon as possible in case it's used as input to some further computation. > PQrequestFlush() or something like that ... I think I would prefer PQflushResult() or something along these lines ("request" is easy to misinterpret as "client request").
Re: Pipeline mode and PQpipelineSync()
Alvaro Herrera writes: > Subject: [PATCH] Clarify that pipeline sync is mandatory > > --- > doc/src/sgml/libpq.sgml | 6 -- > 1 file changed, 4 insertions(+), 2 deletions(-) > > diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml > index 441cc0da3a..0217f8d8c7 100644 > --- a/doc/src/sgml/libpq.sgml > +++ b/doc/src/sgml/libpq.sgml > @@ -5103,10 +5103,12 @@ int PQflush(PGconn *conn); > The server executes statements, and returns results, in the order the > client sends them. The server will begin executing the commands in the > pipeline immediately, not waiting for the end of the pipeline. > + Do note that results are buffered on the server side; a synchronization > + point, establshied with PQpipelineSync, is > necessary > + in order for all results to be flushed to the client. s/establshied/established/ Otherwise, LGTM, thanks!
Re: Pipeline mode and PQpipelineSync()
Alvaro Herrera writes: > IIUC the problem is that PQgetResult is indeed not prepared to deal with > a result the first time until after the queue has been "prepared", and > this happens on calling PQpipelineSync. But I think the formulation in > the attached patch works too, and the resulting code is less surprising. > > I wrote a test case that works as you describe, and indeed with the > original code it gets a NULL initially; that disappears with the > attached patch. Can you give it a try? Yes, I can confirm this appears to have addressed the first issue, thanks! The second issue [1], however, is still there even with this patch. [1] https://www.postgresql.org/message-id/boris.20210624103805%40codesynthesis.com
Re: Pipeline mode and PQpipelineSync()
Alvaro Herrera writes: > I forgot to mention: > > > + /* XXX useless without a flush ...? */ > > + pqFlush(conn); > > + > > + return 1; > > +} > > I'm not sure if it's a good idea for PQrequestFlush to itself flush > libpq's buffer. We can just document that PQflush is required ... > opinions? Yes, I think not calling PQflush() gives more flexibility. For example, an application may "insert" them periodically after a certain number of queries but call PQflush() at different intervals.
Re: Pipeline mode and PQpipelineSync()
Alvaro Herrera writes: > No luck reproducing any problems with this sequence as yet. Can you try to recreate the call flow as implemented here (it's pretty much plain old C if you ignore error handling): https://git.codesynthesis.com/cgit/odb/libodb-pgsql/tree/odb/pgsql/statement.cxx?h=bulk#n789 Except replacing `continue` on line 966 with `break` (that will make the code read-biased which I find triggers the error more readily, though I was able to trigger it both ways). Then in an explicit transaction send 500 prepared insert statements (see previous email for details) with 250'th having a duplicate primary key.
Re: Pipeline mode and PQpipelineSync()
Alvaro Herrera writes: > Ah, yes it does. I can reproduce this now. I thought PQconsumeInput > was sufficient, but it's not: you have to have the PQgetResult in there > too. Looking ... Any progress on fixing this?
Re: Pipeline mode and PQpipelineSync()
Alvaro Herrera writes: > Can you please try with this patch? I don't get any difference in behavior with this patch. That is, I still get the unexpected NULL result. Does it make a difference for your reproducer?
Re: Pipeline mode and PQpipelineSync()
Alvaro Herrera writes: > On 2021-Jul-07, Boris Kolpackov wrote: > > > I don't get any difference in behavior with this patch. That is, I > > still get the unexpected NULL result. Does it make a difference for > > your reproducer? > > Yes, the behavior changes for my repro. Is it possible for you to > share a full program I can compile and run, plesse? Here is the test sans the connection setup: --- #include #include #include #include #include #include #include // Note: hack. // #include #define htonll(x) long long)htonl(x)) << 32) + htonl((x) >> 32)) static const size_t columns = 3; struct data { long long id; long long idata; const char* sdata; }; static char* values[columns]; static int lengths[columns]; static int formats[columns] = {1, 1, 1}; static const unsigned int types[columns] = { 20, // int8 20, // int8 25 // text }; static void init (const struct data* d) { values[0] = (char*)&d->id; lengths[0] = sizeof (d->id); values[1] = (char*)&d->idata; lengths[1] = sizeof (d->idata); values[2] = (char*)d->sdata; lengths[2] = strlen (d->sdata); } static void execute (PGconn* conn, const struct data* ds, size_t n) { int sock = PQsocket (conn); assert (sock != -1); if (PQsetnonblocking (conn, 1) == -1 || PQenterPipelineMode (conn) == 0) assert (false); // True if we've written and read everything, respectively. // bool wdone = false; bool rdone = false; size_t wn = 0; size_t rn = 0; while (!rdone) { fd_set wds; if (!wdone) { FD_ZERO (&wds); FD_SET (sock, &wds); } fd_set rds; FD_ZERO (&rds); FD_SET (sock, &rds); if (select (sock + 1, &rds, wdone ? NULL : &wds, NULL, NULL) == -1) { if (errno == EINTR) continue; assert (false); } // Try to minimize the chance of blocking the server by first processing // the result and then sending more queries. // if (FD_ISSET (sock, &rds)) { if (PQconsumeInput (conn) == 0) assert (false); while (PQisBusy (conn) == 0) { //fprintf (stderr, "PQgetResult %zu\n", rn); PGresult* res = PQgetResult (conn); assert (res != NULL); ExecStatusType stat = PQresultStatus (res); if (stat == PGRES_PIPELINE_SYNC) { assert (wdone && rn == n); PQclear (res); rdone = true; break; } if (stat == PGRES_FATAL_ERROR) { const char* s = PQresultErrorField (res, PG_DIAG_SQLSTATE); if (strcmp (s, "23505") == 0) fprintf (stderr, "duplicate id at %zu\n", rn); } PQclear (res); assert (rn != n); ++rn; // We get a NULL result after each query result. // { PGresult* end = PQgetResult (conn); assert (end == NULL); } } } if (!wdone && FD_ISSET (sock, &wds)) { // Send queries until we get blocked (write-biased). This feels like // a better overall strategy to keep the server busy compared to // sending one query at a time and then re-checking if there is // anything to read because the results of INSERT/UPDATE/DELETE are // presumably small and quite a few of them can get buffered before // the server gets blocked. // for (;;) { if (wn != n) { //fprintf (stderr, "PQsendQueryPrepared %zu\n", wn); init (ds + wn); if (PQsendQueryPrepared (conn, "persist_object", (int)(columns), values, lengths, formats, 1) == 0) assert (false); if (++wn == n) { if (PQpipelineSync (conn) == 0) assert (false); } } // PQflush() result: // // 0 -- success (queue is now empty) // 1 -- blocked // -1 -- error // int r = PQflush (conn); assert (r != -1); if (r == 0) { if (wn != n) { // If we continue here, then we are write-biased. And if we // break, then we are read-biased. // #if 1 break; #else continue; #endif } wdone = true; } break; // Blocked or done. } } } if (PQexitPipelineMode (conn) == 0 || PQsetnonblocking (conn, 0) == -1) assert (false); } static void test (PGconn* conn) { const size_t batch = 500; struct
Re: Pipeline mode and PQpipelineSync()
Alvaro Herrera writes: > Hmm ... aren't you trying to read more results than you sent queries? Hm, but should I be able to? Or, to put another way, should PQisBusy() indicate there is a result available without me sending a query for it? That sounds very counter-intuitive to me.
Re: Pipeline mode and PQpipelineSync()
Alvaro Herrera writes: > On 2021-Jul-08, Boris Kolpackov wrote: > > > Alvaro Herrera writes: > > > > > Hmm ... aren't you trying to read more results than you sent queries? > > > > Hm, but should I be able to? Or, to put another way, should PQisBusy() > > indicate there is a result available without me sending a query for it? > > That sounds very counter-intuitive to me. > > That seems a fair complaint, but I think PQisBusy is doing the right > thing per its charter. It is documented as "would PQgetResult block?" > and it is returning correctly that PQgetResult would not block in that > situation, because no queries are pending. Well, that's one way to view it. But in this case one can say that the entire pipeline is still "busy" since we haven't seen the PQpipelineSync() call. So maybe we could change the charter only for this special situation (that is, inside the pipeline)? But I agree, it may not be worth the trouble and a note in the documentation may be an acceptable "solution". I am happy to go either way, just let me know what it will be. And also if the latest patch to libpq that you have shared[1] is still necessary. [1] https://www.postgresql.org/message-id/202107061747.tlss7f2somqf%40alvherre.pgsql
Re: Pipeline mode and PQpipelineSync()
Alvaro Herrera writes: > To be honest, I am hesitant to changing the charter in that way; I fear > it may have consequences I don't foresee. I think the workaround is not > *that* bad. Ok, fair enough. I've updated my code to account for this and it seems to be working fine now. > I'm having a bit of trouble documenting this. I modified the paragraph in the > pipeline mode docs to read: > > > PQisBusy, PQconsumeInput, etc > operate as normal when processing pipeline results. Note that if no > queries are pending receipt of the corresponding results, > PQisBusy returns 0. > How about the following for the second sentence: "In particular, a call to PQisBusy in the middle of a pipeline returns 0 if all the results for queries issued so far have been consumed."