Alvaro Herrera <alvaro.herr...@2ndquadrant.com> writes: > On 2021-Jul-07, Boris Kolpackov wrote: > > > I don't get any difference in behavior with this patch. That is, I > > still get the unexpected NULL result. Does it make a difference for > > your reproducer? > > Yes, the behavior changes for my repro. Is it possible for you to > share a full program I can compile and run, plesse?
Here is the test sans the connection setup: ----------------------------------------------------------------------- #include <libpq-fe.h> #include <errno.h> #include <stdio.h> #include <string.h> #include <stddef.h> #include <assert.h> #include <sys/select.h> // Note: hack. // #include <arpa/inet.h> #define htonll(x) ((((long long)htonl(x)) << 32) + htonl((x) >> 32)) static const size_t columns = 3; struct data { long long id; long long idata; const char* sdata; }; static char* values[columns]; static int lengths[columns]; static int formats[columns] = {1, 1, 1}; static const unsigned int types[columns] = { 20, // int8 20, // int8 25 // text }; static void init (const struct data* d) { values[0] = (char*)&d->id; lengths[0] = sizeof (d->id); values[1] = (char*)&d->idata; lengths[1] = sizeof (d->idata); values[2] = (char*)d->sdata; lengths[2] = strlen (d->sdata); } static void execute (PGconn* conn, const struct data* ds, size_t n) { int sock = PQsocket (conn); assert (sock != -1); if (PQsetnonblocking (conn, 1) == -1 || PQenterPipelineMode (conn) == 0) assert (false); // True if we've written and read everything, respectively. // bool wdone = false; bool rdone = false; size_t wn = 0; size_t rn = 0; while (!rdone) { fd_set wds; if (!wdone) { FD_ZERO (&wds); FD_SET (sock, &wds); } fd_set rds; FD_ZERO (&rds); FD_SET (sock, &rds); if (select (sock + 1, &rds, wdone ? NULL : &wds, NULL, NULL) == -1) { if (errno == EINTR) continue; assert (false); } // Try to minimize the chance of blocking the server by first processing // the result and then sending more queries. // if (FD_ISSET (sock, &rds)) { if (PQconsumeInput (conn) == 0) assert (false); while (PQisBusy (conn) == 0) { //fprintf (stderr, "PQgetResult %zu\n", rn); PGresult* res = PQgetResult (conn); assert (res != NULL); ExecStatusType stat = PQresultStatus (res); if (stat == PGRES_PIPELINE_SYNC) { assert (wdone && rn == n); PQclear (res); rdone = true; break; } if (stat == PGRES_FATAL_ERROR) { const char* s = PQresultErrorField (res, PG_DIAG_SQLSTATE); if (strcmp (s, "23505") == 0) fprintf (stderr, "duplicate id at %zu\n", rn); } PQclear (res); assert (rn != n); ++rn; // We get a NULL result after each query result. // { PGresult* end = PQgetResult (conn); assert (end == NULL); } } } if (!wdone && FD_ISSET (sock, &wds)) { // Send queries until we get blocked (write-biased). This feels like // a better overall strategy to keep the server busy compared to // sending one query at a time and then re-checking if there is // anything to read because the results of INSERT/UPDATE/DELETE are // presumably small and quite a few of them can get buffered before // the server gets blocked. // for (;;) { if (wn != n) { //fprintf (stderr, "PQsendQueryPrepared %zu\n", wn); init (ds + wn); if (PQsendQueryPrepared (conn, "persist_object", (int)(columns), values, lengths, formats, 1) == 0) assert (false); if (++wn == n) { if (PQpipelineSync (conn) == 0) assert (false); } } // PQflush() result: // // 0 -- success (queue is now empty) // 1 -- blocked // -1 -- error // int r = PQflush (conn); assert (r != -1); if (r == 0) { if (wn != n) { // If we continue here, then we are write-biased. And if we // break, then we are read-biased. // #if 1 break; #else continue; #endif } wdone = true; } break; // Blocked or done. } } } if (PQexitPipelineMode (conn) == 0 || PQsetnonblocking (conn, 0) == -1) assert (false); } static void test (PGconn* conn) { const size_t batch = 500; struct data ds[batch]; for (size_t i = 0; i != batch; ++i) { ds[i].id = htonll (i == batch / 2 ? i - 1 : i); // Cause duplicate PK. ds[i].idata = htonll (i); ds[i].sdata = "abc"; } // Prepare the statement. // { PGresult* res = PQprepare ( conn, "persist_object", "INSERT INTO \"pgsql_bulk_object\" " "(\"id\", " "\"idata\", " "\"sdata\") " "VALUES " "($1, $2, $3)", (int)(columns), types); assert (PQresultStatus (res) == PGRES_COMMAND_OK); PQclear (res); } // Begin transaction. // { PGresult* res = PQexec (conn, "begin"); assert (PQresultStatus (res) == PGRES_COMMAND_OK); PQclear (res); } execute (conn, ds, batch); // Commit transaction. // { PGresult* res = PQexec (conn, "commit"); assert (PQresultStatus (res) == PGRES_COMMAND_OK); PQclear (res); } } ----------------------------------------------------------------------- Use the following statements to (re)create the table: DROP TABLE IF EXISTS "pgsql_bulk_object" CASCADE; CREATE TABLE "pgsql_bulk_object" ( "id" BIGINT NOT NULL PRIMARY KEY, "idata" BIGINT NOT NULL, "sdata" TEXT NOT NULL); It fails consistently for me when running against the local PostgreSQL 9.5 server (connecting via the UNIX socket): duplicate id at 250 driver: driver.cxx:105: void execute(PGconn*, const data*, size_t): Assertion `res != NULL' failed.