Pipeline mode and PQpipelineSync()

2021-06-16 Thread Boris Kolpackov
I am trying to add bulk operation support to ODB (a C++ ORM) using
the new pipeline mode added to libpq in PostgreSQL 14. However, things
don't seem to be working according to the documentation (or perhaps I
am misunderstanding something). Specifically, the documentation[1]
makes it sound like the use of PQpipelineSync() is optional (34.5.1.1
"Issuing Queries"):

"After entering pipeline mode, the application dispatches requests using
PQsendQuery, PQsendQueryParams, or its prepared-query sibling
PQsendQueryPrepared. These requests are queued on the client-side until
flushed to the server; this occurs when PQpipelineSync is used to establish a
synchronization point in the pipeline, or when PQflush is called. [...]

The server executes statements, and returns results, in the order the client
sends them. The server will begin executing the commands in the pipeline
immediately, not waiting for the end of the pipeline. [...]"

Based on this I expect to be able to queue a single prepared INSERT
statement with PQsendQueryPrepared() and then call PQflush() and
PQconsumeInput() to send/receive the data. This, however, does not
work: the client gets blocked because there is no data to read. Here
is the call sequence:

select()   # socket is writable
PQsendQueryPrepared()  # success
PQflush()  # returns 0 (queue is now empty)
select()   # blocked here indefinitely

In contrast, if I add the PQpipelineSync() call after PQsendQueryPrepared(),
then everything starts functioning as expected:

select()   # socket is writable
PQsendQueryPrepared()  # success
PQpipelineSync()   # success
PQflush()  # returns 0 (queue is now empty)
select()   # socket is readable
PQconsumeInput()   # success
PQgetResult()  # INSERT result
PQgetResult()  # NULL
PQgetResult()  # PGRES_PIPELINE_SYNC

So to me it looks like, contrary to the documentation, the server does
not start executing the statements immediately, instead waiting for the
synchronization point. Or am I missing something here?

The above tests were performed using libpq from 14beta1 running against
PostgreSQL server version 9.5. If you would like to take a look at the
actual code, you can find it here[2] (the PIPELINE_SYNC macro controls
whether PQpipelineSync() is used).

On a related note, I've been using libpq_pipeline.c[3] as a reference
and I believe it has a busy loop calling PQflush() repeatedly on line
721 since once everything has been sent and we are waiting for the
result, select() will keep returning with an indication that the socket
is writable (you can find one way to fix this in [2]).

[1] https://www.postgresql.org/docs/14/libpq-pipeline-mode.html
[2] 
https://git.codesynthesis.com/cgit/odb/libodb-pgsql/tree/odb/pgsql/statement.cxx?h=bulk#n771
[3] https://doxygen.postgresql.org/libpq__pipeline_8c_source.html




Add version macro to libpq-fe.h

2021-06-17 Thread Boris Kolpackov
I am making use of the new pipeline mode added to libpq in
PostgreSQL 14. At the same time I would still like to support
older libpq versions by not providing the extended functionality
that depends on this mode.

The natural way to achieve this in C/C++ is to conditionally
enable code that depends on the additional APIs based on the
preprocessor macro. And I could easily do this if libpq-fe.h
provided a macro containing its version.

Now, such a macro (PG_VERSION_NUM) is provided by pg_config.h
that normally accompanies libpq-fe.h. However, I don't believe
the presence of this file is guaranteed. All the documentation
says[1] about headers is this:

"Client programs that use libpq must include the header file 
libpq-fe.h and must link with the libpq library."

And there are good reasons why packagers of libpq may decide to
omit this header (in a nutshell, it embeds target architecture-
specific information, see this discussion for background[2]). And
I may not want to include it in my code (it defines a lot of free-
named macros that may clash with my names).

So I am wondering if it would make sense to provide a better way
to obtain the libpq version as a macro?

To me, as a user, the simplest way would be to have such a macro
defined by libpq-fe.h. This would also provide a reasonable
fallback for previous versions: if this macro is not defined, I
know I am dealing with version prior to 14 and if I need to know
which exactly I can try to include pg_config.h (perhaps with the
help of __has_include if I am using C++).

If simply moving this macro to libpq-fe.h is not desirable (for
example, because it is auto-generated), then perhaps we could
move this (and a few other version-related macros[3]) to a
separate header (for example, libpq-version.h) and either include
it from libpq-fe.h or define a macro in libpq-fe.h that signals
its presence (e.g., PG_HAS_VERSION or some such).

What do you think?


[1] https://www.postgresql.org/docs/9.3/libpq.html

[2] https://bugzilla.redhat.com/show_bug.cgi?id=828467

[3] PG_MAJORVERSION
PG_MAJORVERSION_NUM
PG_MINORVERSION_NUM
PG_VERSION
PG_VERSION_NUM
PG_VERSION_STR (this one includes target so maybe leave it in pg_config.h)




Re: Add version macro to libpq-fe.h

2021-06-18 Thread Boris Kolpackov
Tom Lane  writes:

> I think putting a version number as such in there is a truly
> horrid idea.  However, I could get behind adding a boolean flag
> that says specifically whether the pipeline feature exists.
> Then you'd do something like
> 
> #ifdef LIBPQ_HAS_PIPELINING
> 
> rather than embedding knowledge of exactly which release
> added that.

That would be even better, but I agree with what others have
said: we would have to keep adding such feature test macros
going forward.

I think ideally you would want to have both since the version
macro could still be helpful in dealing with "features" that you
did not plan to add (aka bugs).


> Comparing v13 and v14 libpq-fe.h, I see that there is a solution
> available now: "#ifdef PQ_QUERY_PARAM_MAX_LIMIT".

Hm, it must have been added recently since I don't see it in 14beta1.
But thanks for the pointer, if nothing better comes up this will
have to do.




Re: Pipeline mode and PQpipelineSync()

2021-06-21 Thread Boris Kolpackov
Alvaro Herrera  writes:

> I think I should rephrase this to say that PQpipelineSync() is needed
> where the user needs the server to start executing commands; and
> separately indicate that it is possible (but not promised) that the
> server would start executing commands ahead of time because $reasons.

I think always requiring PQpipelineSync() is fine since it also serves
as an error recovery boundary. But the fact that the server waits until
the sync message to start executing the pipeline is surprising. To me
this seems to go contrary to the idea of a "pipeline".

In fact, I see the following ways the server could behave:

1. The server starts executing queries and sending their results before
   receiving the sync message.

2. The server starts executing queries before receiving the sync message
   but buffers the results until it receives the sync message.

3. The server buffers the queries and only starts executing them and
   sending the results after receiving the sync message.

My observations suggest that the server behaves as (3) but it could
also be (2).

While it can be tempting to say that this is an implementation detail,
this affects the way one writes a client. For example, I currently have
the following comment in my code:

  // Send queries until we get blocked. This feels like a better
  // overall strategy to keep the server busy compared to sending one
  // query at a time and then re-checking if there is anything to read
  // because the results of INSERT/UPDATE/DELETE are presumably small
  // and quite a few of them can get buffered before the server gets
  // blocked.

This would be a good strategy for behavior (1) but not (3) (where it
would make more sense to queue the queries on the client side). So I
think it would be useful to clarify the server behavior and specify
it in the documentation.


> Do I have it right that other than this documentation problem, you've
> been able to use pipeline mode successfully?

So far I've only tried it in a simple prototype (single INSERT statement).
But I am busy plugging it into ODB's bulk operation support (that we
already have for Oracle and MSSQL) and once that's done I should be
able to exercise things in more meaningful ways.




Re: Pipeline mode and PQpipelineSync()

2021-06-23 Thread Boris Kolpackov
Alvaro Herrera  writes:

> > I think always requiring PQpipelineSync() is fine since it also serves
> > as an error recovery boundary. But the fact that the server waits until
> > the sync message to start executing the pipeline is surprising. To me
> > this seems to go contrary to the idea of a "pipeline".
> 
> But does that actually happen? There's a very easy test we can do by
> sending queries that sleep.  If my libpq program sends a "SELECT
> pg_sleep(2)", then PQflush(), then sleep in the client program two more
> seconds without sending the sync; and *then* send the sync, I find that
> the program takes 2 seconds, not four.  This shows that both client and
> server slept in parallel, even though I didn't send the Sync until after
> the client was done sleeping.

Thanks for looking into it. My experiments were with INSERT and I now
was able to try things with larger pipelines. I can now see the server
starts sending results after ~400 statements. So I think you are right,
the server does start executing the pipeline before receiving the sync
message, though there is still something strange going on (but probably
on the client side):

I have a pipeline of say 500 INSERTs. If I "execute" this pipeline by first
sending all the statements and then reading the results, then everything
works as expected. This is the call sequence I am talking about:

PQsendQueryPrepared() # INSERT #1
PQflush()
PQsendQueryPrepared() # INSERT #2
PQflush()
...
PQsendQueryPrepared() # INSERT #500
PQpipelineSync()
PQflush()
PQconsumeInput()
PQgetResult() # INSERT #1
PQgetResult() # NULL
PQgetResult() # INSERT #2
PQgetResult() # NULL
...
PQgetResult() # INSERT #500
PQgetResult() # NULL
PQgetResult() # PGRES_PIPELINE_SYNC

If, however, I execute it by checking for results before sending the
next INSERT, I get the following call sequence:

PQsendQueryPrepared() # INSERT #1
PQflush()
PQsendQueryPrepared() # INSERT #2
PQflush()
...
PQsendQueryPrepared() # INSERT #~400
PQflush()
PQconsumeInput()  # At this point select() indicates we can read.
PQgetResult() # NULL (???)
PQgetResult() # INSERT #1
PQgetResult() # NULL
PQgetResult() # INSERT #2
PQgetResult() # NULL
...


What's strange here is that the first PQgetResult() call (marked with ???)
returns NULL instead of result for INSERT #1 as in the first call sequence.
Interestingly, if I skip it, the rest seems to progress as expected.

Any idea what might be going on here? My hunch is that there is an issue
with libpq's state machine. In particular, in the second case, PQgetResult()
is called before the sync message is sent. Did you have a chance to test
such a scenario (i.e., a large pipeline where the first result is processed
before the PQpipelineSync() call)? Of course, this could very well be a bug
on my side or me misunderstanding something.




Re: Pipeline mode and PQpipelineSync()

2021-06-23 Thread Boris Kolpackov
Alvaro Herrera  writes:

> On 2021-Jun-22, Alvaro Herrera wrote:
> 
> > > So I think it would be useful to clarify the server behavior and
> > > specify it in the documentation.
> > 
> > I'll see about improving the docs on these points.
> 
> So I started to modify the second paragraph to indicate that the client
> would send data on PQflush/buffer full/PQpipelineSync, only to realize
> that the first paragraph already explains this.  So I'm not sure if any
> changes are needed.
> 
> Maybe your complaint is only based on disagreement about what does libpq
> do regarding queueing commands; and as far as I can tell in quick
> experimentation with libpq, it works as the docs state already.

I think one change that is definitely needed is to make it clear that
the PQpipelineSync() call is not optional.

I would also add a note saying that while the server starts processing
the pipeline immediately, it may buffer the results and the only way
to flush them out is to call PQpipelineSync().




Re: Pipeline mode and PQpipelineSync()

2021-06-24 Thread Boris Kolpackov
Boris Kolpackov  writes:

> What's strange here is that the first PQgetResult() call (marked with ???)
> returns NULL instead of result for INSERT #1 as in the first call sequence.

I've hit another similar case except now an unexpected NULL result is
returned in the middle of PGRES_PIPELINE_ABORTED result sequence. The
call sequence is as follows:

PQsendQueryPrepared() # INSERT #1
PQflush()
PQsendQueryPrepared() # INSERT #2
PQflush()
...
PQsendQueryPrepared() # INSERT #251 -- insert duplicate PK
PQflush()
...
PQsendQueryPrepared() # INSERT #343
PQflush()
PQconsumeInput()  # At this point select() indicates we can read.
PQgetResult() # NULL -- unexpected but skipped (see prev. email)
PQgetResult() # INSERT #1
PQgetResult() # NULL
PQgetResult() # INSERT #2
PQgetResult() # NULL
...
PQgetResult() # INSERT #251 error result, SQLSTATE 23505
PQgetResult() # NULL
PQgetResult() # INSERT #252 PGRES_PIPELINE_ABORTED
PQgetResult() # NULL
PQgetResult() # INSERT #253 PGRES_PIPELINE_ABORTED
PQgetResult() # NULL
...
PQgetResult() # INSERT #343 NULL (???)

Notice that result #343 corresponds to the last PQsendQueryPrepared()
call made before the socket became readable (it's not always 343 but
around there).

For completeness, the statement in question is:

INSERT INTO pgsql_bulk_object (id, idata, sdata) VALUES ($1, $2, $3)

The table:

CREATE TABLE pgsql_bulk_object (
  id BIGINT NOT NULL PRIMARY KEY,
  idata BIGINT NOT NULL,
  sdata TEXT NOT NULL);

And the data inserted is in the form:

1, 1, "1"
2, 2, "2"
...




Re: Pipeline mode and PQpipelineSync()

2021-06-24 Thread Boris Kolpackov
Alvaro Herrera  writes:

> Curious -- I just noticed that the server understands a message 'H' that
> requests a flush of the server buffer.  However, libpq has no way to
> generate that message as far as I can see.  I think you could use that
> to request results from the pipeline, without the sync point.
> 
> I wonder if it's worth adding an entry point to libpq to allow access to
> this.  

Yes, I think this can be useful. For example, an application may wish
to receive the result as soon as possible in case it's used as input
to some further computation.


> PQrequestFlush() or something like that ...

I think I would prefer PQflushResult() or something along these
lines ("request" is easy to misinterpret as "client request").




Re: Pipeline mode and PQpipelineSync()

2021-06-24 Thread Boris Kolpackov
Alvaro Herrera  writes:

> Subject: [PATCH] Clarify that pipeline sync is mandatory
> 
> ---
>  doc/src/sgml/libpq.sgml | 6 --
>  1 file changed, 4 insertions(+), 2 deletions(-)
> 
> diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
> index 441cc0da3a..0217f8d8c7 100644
> --- a/doc/src/sgml/libpq.sgml
> +++ b/doc/src/sgml/libpq.sgml
> @@ -5103,10 +5103,12 @@ int PQflush(PGconn *conn);
>   The server executes statements, and returns results, in the order the
>   client sends them.  The server will begin executing the commands in the
>   pipeline immediately, not waiting for the end of the pipeline.
> + Do note that results are buffered on the server side; a synchronization
> + point, establshied with PQpipelineSync, is 
> necessary
> + in order for all results to be flushed to the client.

s/establshied/established/

Otherwise, LGTM, thanks!




Re: Pipeline mode and PQpipelineSync()

2021-06-25 Thread Boris Kolpackov
Alvaro Herrera  writes:

> IIUC the problem is that PQgetResult is indeed not prepared to deal with
> a result the first time until after the queue has been "prepared", and
> this happens on calling PQpipelineSync.  But I think the formulation in
> the attached patch works too, and the resulting code is less surprising.
> 
> I wrote a test case that works as you describe, and indeed with the
> original code it gets a NULL initially; that disappears with the
> attached patch.  Can you give it a try?

Yes, I can confirm this appears to have addressed the first issue,
thanks! The second issue [1], however, is still there even with
this patch.

[1] 
https://www.postgresql.org/message-id/boris.20210624103805%40codesynthesis.com




Re: Pipeline mode and PQpipelineSync()

2021-06-28 Thread Boris Kolpackov
Alvaro Herrera  writes:

> I forgot to mention:
> 
> > +   /* XXX useless without a flush ...? */
> > +   pqFlush(conn);
> > +
> > +   return 1;
> > +}
> 
> I'm not sure if it's a good idea for PQrequestFlush to itself flush
> libpq's buffer.  We can just document that PQflush is required ...
> opinions?

Yes, I think not calling PQflush() gives more flexibility. For example,
an application may "insert" them periodically after a certain number of
queries but call PQflush() at different intervals.




Re: Pipeline mode and PQpipelineSync()

2021-06-29 Thread Boris Kolpackov
Alvaro Herrera  writes:

> No luck reproducing any problems with this sequence as yet.

Can you try to recreate the call flow as implemented here (it's
pretty much plain old C if you ignore error handling):

https://git.codesynthesis.com/cgit/odb/libodb-pgsql/tree/odb/pgsql/statement.cxx?h=bulk#n789

Except replacing `continue` on line 966 with `break` (that will
make the code read-biased which I find triggers the error more
readily, though I was able to trigger it both ways).

Then in an explicit transaction send 500 prepared insert statements
(see previous email for details) with 250'th having a duplicate
primary key.




Re: Pipeline mode and PQpipelineSync()

2021-07-05 Thread Boris Kolpackov
Alvaro Herrera  writes:

> Ah, yes it does.  I can reproduce this now.  I thought PQconsumeInput
> was sufficient, but it's not: you have to have the PQgetResult in there
> too.  Looking ...

Any progress on fixing this?




Re: Pipeline mode and PQpipelineSync()

2021-07-07 Thread Boris Kolpackov
Alvaro Herrera  writes:

> Can you please try with this patch?

I don't get any difference in behavior with this patch. That is, I
still get the unexpected NULL result. Does it make a difference for
your reproducer?




Re: Pipeline mode and PQpipelineSync()

2021-07-07 Thread Boris Kolpackov
Alvaro Herrera  writes:

> On 2021-Jul-07, Boris Kolpackov wrote:
> 
> > I don't get any difference in behavior with this patch. That is, I
> > still get the unexpected NULL result. Does it make a difference for
> > your reproducer?
> 
> Yes, the behavior changes for my repro.   Is it possible for you to
> share a full program I can compile and run, plesse?

Here is the test sans the connection setup:

---

#include 

#include 
#include 
#include 
#include 
#include 
#include 

// Note: hack.
//
#include 
#define htonll(x) long long)htonl(x)) << 32) + htonl((x) >> 32))

static const size_t columns = 3;

struct data
{
  long long id;
  long long idata;
  const char* sdata;
};

static char* values[columns];
static int lengths[columns];
static int formats[columns] = {1, 1, 1};

static const unsigned int types[columns] = {
  20, // int8
  20, // int8
  25  // text
};

static void
init (const struct data* d)
{
  values[0] = (char*)&d->id;
  lengths[0] = sizeof (d->id);

  values[1] = (char*)&d->idata;
  lengths[1] = sizeof (d->idata);

  values[2] = (char*)d->sdata;
  lengths[2] = strlen (d->sdata);
}

static void
execute (PGconn* conn, const struct data* ds, size_t n)
{
  int sock = PQsocket (conn);
  assert (sock != -1);

  if (PQsetnonblocking (conn, 1) == -1 ||
  PQenterPipelineMode (conn) == 0)
assert (false);

  // True if we've written and read everything, respectively.
  //
  bool wdone = false;
  bool rdone = false;

  size_t wn = 0;
  size_t rn = 0;

  while (!rdone)
  {
fd_set wds;
if (!wdone)
{
  FD_ZERO (&wds);
  FD_SET (sock, &wds);
}

fd_set rds;
FD_ZERO (&rds);
FD_SET (sock, &rds);

if (select (sock + 1, &rds, wdone ? NULL : &wds, NULL, NULL) == -1)
{
  if (errno == EINTR)
continue;

  assert (false);
}

// Try to minimize the chance of blocking the server by first processing
// the result and then sending more queries.
//
if (FD_ISSET (sock, &rds))
{
  if (PQconsumeInput (conn) == 0)
assert (false);

  while (PQisBusy (conn) == 0)
  {
//fprintf (stderr, "PQgetResult %zu\n", rn);

PGresult* res = PQgetResult (conn);
assert (res != NULL);
ExecStatusType stat = PQresultStatus (res);

if (stat == PGRES_PIPELINE_SYNC)
{
  assert (wdone && rn == n);
  PQclear (res);
  rdone = true;
  break;
}

if (stat == PGRES_FATAL_ERROR)
{
  const char* s = PQresultErrorField (res, PG_DIAG_SQLSTATE);

  if (strcmp (s, "23505") == 0)
fprintf (stderr, "duplicate id at %zu\n", rn);
}

PQclear (res);
assert (rn != n);
++rn;

// We get a NULL result after each query result.
//
{
  PGresult* end = PQgetResult (conn);
  assert (end == NULL);
}
  }
}

if (!wdone && FD_ISSET (sock, &wds))
{
  // Send queries until we get blocked (write-biased). This feels like
  // a better overall strategy to keep the server busy compared to
  // sending one query at a time and then re-checking if there is
  // anything to read because the results of INSERT/UPDATE/DELETE are
  // presumably small and quite a few of them can get buffered before
  // the server gets blocked.
  //
  for (;;)
  {
if (wn != n)
{
  //fprintf (stderr, "PQsendQueryPrepared %zu\n", wn);

  init (ds + wn);

  if (PQsendQueryPrepared (conn,
   "persist_object",
   (int)(columns),
   values,
   lengths,
   formats,
   1) == 0)
assert (false);

  if (++wn == n)
  {
if (PQpipelineSync (conn) == 0)
  assert (false);
  }
}

// PQflush() result:
//
//  0  --  success (queue is now empty)
//  1  --  blocked
// -1  --  error
//
int r = PQflush (conn);
assert (r != -1);

if (r == 0)
{
  if (wn != n)
  {
// If we continue here, then we are write-biased. And if we
// break, then we are read-biased.
//
#if 1
break;
#else
continue;
#endif
  }

  wdone = true;
}

break; // Blocked or done.
  }
}
  }

  if (PQexitPipelineMode (conn) == 0 ||
  PQsetnonblocking (conn, 0) == -1)
assert (false);
}

static void
test (PGconn* conn)
{
  const size_t batch = 500;
  struct 

Re: Pipeline mode and PQpipelineSync()

2021-07-07 Thread Boris Kolpackov
Alvaro Herrera  writes:

> Hmm ... aren't you trying to read more results than you sent queries?

Hm, but should I be able to? Or, to put another way, should PQisBusy()
indicate there is a result available without me sending a query for it?
That sounds very counter-intuitive to me.




Re: Pipeline mode and PQpipelineSync()

2021-07-08 Thread Boris Kolpackov
Alvaro Herrera  writes:

> On 2021-Jul-08, Boris Kolpackov wrote:
> 
> > Alvaro Herrera  writes:
> > 
> > > Hmm ... aren't you trying to read more results than you sent queries?
> > 
> > Hm, but should I be able to? Or, to put another way, should PQisBusy()
> > indicate there is a result available without me sending a query for it?
> > That sounds very counter-intuitive to me.
> 
> That seems a fair complaint, but I think PQisBusy is doing the right
> thing per its charter.  It is documented as "would PQgetResult block?"
> and it is returning correctly that PQgetResult would not block in that
> situation, because no queries are pending.

Well, that's one way to view it. But in this case one can say that
the entire pipeline is still "busy" since we haven't seen the
PQpipelineSync() call. So maybe we could change the charter only
for this special situation (that is, inside the pipeline)?

But I agree, it may not be worth the trouble and a note in the
documentation may be an acceptable "solution".

I am happy to go either way, just let me know what it will be. And
also if the latest patch to libpq that you have shared[1] is still
necessary.

[1] 
https://www.postgresql.org/message-id/202107061747.tlss7f2somqf%40alvherre.pgsql




Re: Pipeline mode and PQpipelineSync()

2021-07-08 Thread Boris Kolpackov
Alvaro Herrera  writes:

> To be honest, I am hesitant to changing the charter in that way; I fear
> it may have consequences I don't foresee.  I think the workaround is not
> *that* bad.

Ok, fair enough. I've updated my code to account for this and it seems
to be working fine now.


> I'm having a bit of trouble documenting this.  I modified the paragraph in the
> pipeline mode docs to read:
> 
> 
>  PQisBusy, PQconsumeInput, etc
>  operate as normal when processing pipeline results.  Note that if no
>  queries are pending receipt of the corresponding results,
>  PQisBusy returns 0.
> 

How about the following for the second sentence:

"In particular, a call to PQisBusy in the middle
of a pipeline returns 0 if all the results for queries issued so far
have been consumed."