On 20 May 2016 at 15:35, Craig Ringer <cr...@2ndquadrant.com> wrote:
> > You can, however, omit Sync from between messages and send a series of > protocol messages, like > > Parse/Bind/Execute/Bind/Execute/Bind/Execute/Sync > > to avoid round-trip overheads. > > I implemented what I think is a pretty solid proof of concept of this for kicks this evening. Attached, including basic test program. Patch attached. The performance difference over higher latency links is huge, see below. Demo/test program in src/test/examples/testlibpqbatch.c. github: https://github.com/2ndQuadrant/postgres/tree/dev/libpq-async-batch I still need to add the logic for handling an error during a batch by discarding all input until the next Sync, but otherwise I think it's pretty reasonable. The time difference for 10k inserts on the local host over a unix socket shows a solid improvement: batch insert elapsed: 0.244293s sequential insert elapsed: 0.375402s ... but over, say, a connection to a random AWS RDS instance fired up for the purpose that lives about 320ms away the difference is huge: batch insert elapsed: 9.029995s sequential insert elapsed: (I got bored after 10 minutes; it should take a bit less then an hour based on the latency numbers) With 500 rows on the remote AWS RDS instance, once the I/O quota is already saturated: batch insert elapsed: 1.229024s sequential insert elapsed: 156.962180s which is an improvement by a factor of over 120 I didn't compare vs COPY. I'm sure COPY will be faster, but COPY doesn't let you do INSERT ... ON CONFLICT, do UPDATE, do DELETE, etc. Not without temp tables and a bunch of hoop jumping anyway. If COPY solved everything there'd be no point having pipelining. No docs yet, but if folks think the interface is reasonable I can add them easily since the comments on each of the new functoins should be easy to adapt into the SGML docs. With a bit of polishing I think this can probably go in the next CF, though I only wrote it as an experiment. Can I get opinions on the API? The TL;DR API, using the usual async libpq routines, is: PQbeginBatchMode(conn); PQsendQueryParams(conn, "BEGIN", 0, NULL, NULL, NULL, NULL, 0); PQsendPrepare(conn, "my_update", "UPDATE ..."); PQsetnonblocking(conn, 1); while (!all_responses_received) { select(...) if (can-write) { if (app-has-more-data-to-send) { PQsendQueryPrepared(conn, "my_update", params-go-here); } else if (havent-sent-commit-yet) { PQsendQueryParams(conn, "COMMIT", ...); } else if (havent-sent-endbatch-yet) { PqEndBatch(conn); } PQflush(conn); } if (can-read) { PQconsumeInput(conn); if (PQisBusy(conn)) continue; res = PQgetResult(conn); if (res == NULL) { PQgetNextQuery(conn); continue; } /* process results in the same order we sent the commands */ /* client keeps track of that, libpq just supplies the results */ ... } } PQendBatch(conn); Note that: * PQsendQuery cannot be used as it uses simple query protocol, use PQsendQueryParams instead; * Batch supports PQsendQueryParams, PQsendPrepare, PQsendQueryPrepared, PQsendDescribePrepared, PQsendDescribePortal; * You don't call PQgetResult after dispatching each query * Multiple batches may be pipelined, you don't have to wait for one to end to start another (an advantage over JDBC's API) * non-blocking mode isn't required, but is strongly advised -- Craig Ringer http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From f0ca25bdc2bacf65530e4f180fdfc7c219866541 Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Fri, 20 May 2016 12:45:18 +0800 Subject: [PATCH] Draft of libpq async pipelining support Now with test in src/test/examples/testlibpqbatch.c --- src/interfaces/libpq/exports.txt | 6 + src/interfaces/libpq/fe-connect.c | 16 + src/interfaces/libpq/fe-exec.c | 540 ++++++++++++++++++++++++++-- src/interfaces/libpq/fe-protocol2.c | 6 + src/interfaces/libpq/fe-protocol3.c | 13 +- src/interfaces/libpq/libpq-fe.h | 12 +- src/interfaces/libpq/libpq-int.h | 36 +- src/test/examples/Makefile | 2 +- src/test/examples/testlibpqbatch.c | 690 ++++++++++++++++++++++++++++++++++++ 9 files changed, 1278 insertions(+), 43 deletions(-) create mode 100644 src/test/examples/testlibpqbatch.c diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 21dd772..e297c4b 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -171,3 +171,9 @@ PQsslAttributeNames 168 PQsslAttribute 169 PQsetErrorContextVisibility 170 PQresultVerboseErrorMessage 171 +PQisInBatchMode 172 +PQqueriesInBatch 173 +PQbeginBatchMode 174 +PQendBatchMode 175 +PQendBatch 176 +PQgetNextQuery 177 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 9b2839b..78154e2 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2949,6 +2949,7 @@ static void closePGconn(PGconn *conn) { PGnotify *notify; + PGcommandQueueEntry *queue; pgParameterStatus *pstatus; /* @@ -2995,6 +2996,21 @@ closePGconn(PGconn *conn) free(prev); } conn->notifyHead = conn->notifyTail = NULL; + queue = conn->cmd_queue_head; + while (queue != NULL) + { + PGcommandQueueEntry *prev = queue; + queue = queue->next; + free(prev); + } + conn->cmd_queue_head = conn->cmd_queue_tail = NULL; + queue = conn->cmd_queue_recycle; + { + PGcommandQueueEntry *prev = queue; + queue = queue->next; + free(prev); + } + conn->cmd_queue_recycle = NULL; pstatus = conn->pstatus; while (pstatus != NULL) { diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 2621767..c12cb2c 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -39,7 +39,9 @@ char *const pgresStatus[] = { "PGRES_NONFATAL_ERROR", "PGRES_FATAL_ERROR", "PGRES_COPY_BOTH", - "PGRES_SINGLE_TUPLE" + "PGRES_SINGLE_TUPLE", + "PGRES_BATCH_OK", + "PGRES_BATCH_ABORTED" }; /* @@ -69,6 +71,9 @@ static PGresult *PQexecFinish(PGconn *conn); static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target); static int check_field_number(const PGresult *res, int field_num); +static PGcommandQueueEntry* PQmakePipelinedCommand(PGconn *conn); +static void PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry); +static void PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry); /* ---------------- @@ -1090,7 +1095,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) conn->next_result = conn->result; conn->result = res; /* And mark the result ready to return */ - conn->asyncStatus = PGASYNC_READY; + conn->asyncStatus = PGASYNC_READY_MORE; } return 1; @@ -1113,6 +1118,13 @@ fail: int PQsendQuery(PGconn *conn, const char *query) { + if (conn->in_batch) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot PQsendQuery in batch mode, use PQsendQueryParams\n")); + return false; + } + if (!PQsendQueryStart(conn)) return 0; @@ -1211,9 +1223,29 @@ PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes) { + PGcommandQueueEntry *pipeCmd = NULL; + char **last_query; + PGQueryClass *queryclass; + if (!PQsendQueryStart(conn)) return 0; + if (conn->in_batch) + { + pipeCmd = PQmakePipelinedCommand(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + last_query = &pipeCmd->query; + queryclass = &pipeCmd->queryclass; + } + else + { + last_query = &conn->last_query; + queryclass = &conn->queryclass; + } + /* check the arguments */ if (!stmtName) { @@ -1269,18 +1301,21 @@ PQsendPrepare(PGconn *conn, goto sendFailed; /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (!conn->in_batch) + { + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are doing just a Parse */ - conn->queryclass = PGQUERY_PREPARE; + *queryclass = PGQUERY_PREPARE; /* and remember the query text too, if possible */ /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); - conn->last_query = strdup(query); + if (*last_query) + free(*last_query); + *last_query = strdup(query); /* * Give the data a push. In nonblock mode, don't complain if we're unable @@ -1290,10 +1325,14 @@ PQsendPrepare(PGconn *conn, goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->in_batch) + PQappendPipelinedCommand(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + PQrecyclePipelinedCommand(conn, pipeCmd); pqHandleSendFailure(conn); return 0; } @@ -1340,6 +1379,81 @@ PQsendQueryPrepared(PGconn *conn, resultFormat); } +/* Get a new command queue entry, allocating it if required. Doesn't add it to + * the tail of the queue yet, use PQappendPipelinedCommand once the command has + * been written for that. If a command fails once it's called this, it should + * use PQrecyclePipelinedCommand to put it on the freelist or release it. + * + * If allocation fails sets the error message and returns null. + */ +static PGcommandQueueEntry* +PQmakePipelinedCommand(PGconn *conn) +{ + PGcommandQueueEntry *entry; + + if (conn->cmd_queue_recycle == NULL) + { + entry = (PGcommandQueueEntry*) malloc(sizeof(PGcommandQueueEntry)); + if (entry == NULL) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("out of memory\n")); + return NULL; + } + } + else + { + entry = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry->next; + } + entry->next = NULL; + entry->query = NULL; + + return entry; +} + +/* Append a precreated command queue entry to the queue after it's been + * sent successfully. + */ +static void +PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry) +{ + if (conn->cmd_queue_head == NULL) + conn->cmd_queue_head = entry; + else + conn->cmd_queue_tail->next = entry; + conn->cmd_queue_tail = entry; +} + +/* Push a command queue entry onto the freelist. It must be a dangling entry + * with null next pointer and not referenced by any other entry's next pointer. + */ +static void +PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry) +{ + if (entry == NULL) + return; + if (entry->next != NULL) + { + fprintf(stderr, "tried to recycle non-dangling command queue entry"); + abort(); + } + entry->next = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry; +} + +/* Set up for processing a new query's results */ +static void +PQstartProcessingNewQuery(PGconn *conn) +{ + /* initialize async result-accumulation state */ + conn->result = NULL; + conn->next_result = NULL; + + /* reset single-row processing mode */ + conn->singleRowMode = false; +} + /* * Common startup code for PQsendQuery and sibling routines */ @@ -1359,20 +1473,52 @@ PQsendQueryStart(PGconn *conn) libpq_gettext("no connection to the server\n")); return false; } - /* Can't send while already busy, either. */ - if (conn->asyncStatus != PGASYNC_IDLE) + + /* Can't send while already busy, either, unless enqueuing for later */ + if (conn->asyncStatus != PGASYNC_IDLE && !conn->in_batch) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("another command is already in progress\n")); return false; } - /* initialize async result-accumulation state */ - conn->result = NULL; - conn->next_result = NULL; - - /* reset single-row processing mode */ - conn->singleRowMode = false; + if (conn->in_batch) + { + /* When enqueuing a message we don't change much of the connection + * state since it's already in use for the current command. The + * connection state will get updated when PQgetNextQuery(...) advances + * to start processing the queued message. + * + * Just make sure we can safely enqueue given the current connection + * state. We can enqueue behind another queue item, or behind a + * non-queue command (one that sends its own sync), but we can't + * enqueue if the connection is in a copy state. + */ + switch (conn->asyncStatus) + { + case PGASYNC_QUEUED: + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* ok to queue */ + break; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot queue commands during COPY\n")); + return false; + case PGASYNC_IDLE: + fprintf(stderr, "internal error, idle state in batch mode"); + abort(); + break; + } + } + else + { + /* This command's results will come in immediately */ + PQstartProcessingNewQuery(conn); + } /* ready to send command message */ return true; @@ -1397,6 +1543,10 @@ PQsendQueryGuts(PGconn *conn, int resultFormat) { int i; + PGcommandQueueEntry *pipeCmd = NULL; + char **last_query; + PGQueryClass *queryclass; + /* This isn't gonna work on a 2.0 server */ if (PG_PROTOCOL_MAJOR(conn->pversion) < 3) @@ -1406,6 +1556,23 @@ PQsendQueryGuts(PGconn *conn, return 0; } + if (conn->in_batch) + { + pipeCmd = PQmakePipelinedCommand(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + last_query = &pipeCmd->query; + queryclass = &pipeCmd->queryclass; + } + else + { + last_query = &conn->last_query; + queryclass = &conn->queryclass; + } + + /* * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync, * using specified statement name and the unnamed portal. @@ -1518,22 +1685,25 @@ PQsendQueryGuts(PGconn *conn, pqPutMsgEnd(conn) < 0) goto sendFailed; - /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (!conn->in_batch) + { + /* construct the Sync message */ + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are using extended query protocol */ - conn->queryclass = PGQUERY_EXTENDED; + *queryclass = PGQUERY_EXTENDED; /* and remember the query text too, if possible */ /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); + if (*last_query) + free(*last_query); if (command) - conn->last_query = strdup(command); + *last_query = strdup(command); else - conn->last_query = NULL; + *last_query = NULL; /* * Give the data a push. In nonblock mode, don't complain if we're unable @@ -1543,10 +1713,15 @@ PQsendQueryGuts(PGconn *conn, goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->in_batch) + PQappendPipelinedCommand(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; + return 1; sendFailed: + PQrecyclePipelinedCommand(conn, pipeCmd); pqHandleSendFailure(conn); return 0; } @@ -1578,6 +1753,9 @@ pqHandleSendFailure(PGconn *conn) * but it prevents buffer bloat if there's a lot of data available.) */ parseInput(conn); + + /* TODO: handle pipelined queries by sending a sync and flushing until + * the next sync?? */ } /* @@ -1673,6 +1851,245 @@ PQisBusy(PGconn *conn) return conn->asyncStatus == PGASYNC_BUSY; } +/* PQisInBatchMode + * Return true if currently in batch mode + */ +int +PQisInBatchMode(PGconn *conn) +{ + if (!conn) + return FALSE; + + return conn->in_batch; +} + +/* PQqueriesInBatch + * Return true if there are queries currently pending in batch mode + */ +int +PQqueriesInBatch(PGconn *conn) +{ + if (!PQisInBatchMode(conn)) + return false; + + return conn->cmd_queue_head != NULL; +} + +/* Put an idle connection in batch mode. Commands submitted after this + * can be pipelined on the connection, there's no requirement to wait for + * one to finish before the next is dispatched. + * + * COPY is not permitted in batch mode. + * + * A set of commands is terminated by a PQendBatch. Multiple sets of batched + * commands may be sent while in batch mode. Batch mode can be exited by + * calling PQendBatchMode() once all results are processed. + * + * This doesn't actually send anything on the wire, it just puts libpq + * into a state where it can pipeline work. + */ +int +PQbeginBatchMode(PGconn *conn) +{ + if (!conn) + return false; + + if (conn->in_batch) + return true; + + if (conn->asyncStatus != PGASYNC_IDLE) + return false; + + conn->in_batch = true; + conn->asyncStatus = PGASYNC_QUEUED; + + return true; +} + +/* End batch mode and return to normal command mode. + * + * Has no effect unless the client has processed all results + * from all outstanding batches and the connection is idle, + * i.e. PQisInBatch() && + * + * Returns true if batch mode ended. + */ +int +PQendBatchMode(PGconn *conn) +{ + if (!conn) + return false; + + if (!conn->in_batch) + return true; + + switch (conn->asyncStatus) + { + case PGASYNC_IDLE: + fprintf(stderr, "internal error, IDLE in batch mode"); + abort(); + break; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + fprintf(stderr, "internal error, COPY in batch mode"); + abort(); + break; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* can't end batch while busy */ + return false; + case PGASYNC_QUEUED: + break; + } + + /* still work to process */ + if (conn->cmd_queue_head != NULL) + return false; + + conn->in_batch = false; + conn->asyncStatus = PGASYNC_IDLE; + + return true; +} + +/* End a batch submission by sending a protocol sync. The connection will + * remain in batch mode and unavailable for new non-batch commands until all + * results from the batch are processed by the client. + * + * It's legal to start submitting another batch immediately, without waiting + * for the results of the current batch. There's no need to end batch mode + * and start it again. + * + * If a command in a batch fails, every subsequent command up to and including + * the PQendBatch command result gets set to PGRES_BATCH_ABORTED state. If the + * whole batch is processed without error, a PGresult with PGRES_BATCH_OK is + * produced. + */ +int +PQendBatch(PGconn *conn) +{ + PGcommandQueueEntry *entry; + + if (!conn) + return false; + + if (!conn->in_batch) + return false; + + switch (conn->asyncStatus) + { + case PGASYNC_IDLE: + fprintf(stderr, "internal error, IDLE in batch mode"); + abort(); + break; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + fprintf(stderr, "internal error, COPY in batch mode"); + abort(); + break; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + case PGASYNC_QUEUED: + /* can send sync to end this batch of cmds */ + break; + } + + entry = PQmakePipelinedCommand(conn); + entry->queryclass = PGQUERY_SYNC; + entry->query = NULL; + + /* construct the Sync message */ + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + PQappendPipelinedCommand(conn, entry); + + return true; + +sendFailed: + PQrecyclePipelinedCommand(conn, entry); + pqHandleSendFailure(conn); + return false; +} + +/* PQgetNextQuery + * In batch mode, start processing the next query in the queue. + * + * Returns true if the next query was popped from the queue and can + * be processed by PQconsumeInput, PQgetResult, etc. + * + * Returns false if the current query isn't done yet, the connection + * is not in a batch, or there are no more queries to process. + */ +int +PQgetNextQuery(PGconn *conn) +{ + PGcommandQueueEntry *next_query; + + if (!conn) + return FALSE; + + if (!conn->in_batch) + return false; + + switch (conn->asyncStatus) + { + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + fprintf(stderr, "internal error, COPY in batch mode"); + abort(); + break; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* client still has to process current query or results */ + return false; + break; + case PGASYNC_IDLE: + fprintf(stderr, "internal error, idle in batch mode"); + abort(); + break; + case PGASYNC_QUEUED: + /* next query please */ + break; + } + + if (conn->cmd_queue_head == NULL) + { + /* In batch mode but nothing left on the queue; caller can submit + * more work or PQendBatchMode() now. */ + return false; + } + + /* Pop the next query from the queue and set up the connection state + * as if it'd just been dispatched from a non-batched call */ + next_query = conn->cmd_queue_head; + conn->cmd_queue_head = next_query->next; + next_query->next = NULL; + + PQstartProcessingNewQuery(conn); + + conn->last_query = next_query->query; + next_query->query = NULL; + conn->queryclass = next_query->queryclass; + + PQrecyclePipelinedCommand(conn, next_query); + + /* Allow parsing of the query */ + conn->asyncStatus = PGASYNC_BUSY; + + /* Parse any available data */ + parseInput(conn); + + return true; +} + /* * PQgetResult @@ -1718,6 +2135,8 @@ PQgetResult(PGconn *conn) /* * conn->errorMessage has been set by pqWait or pqReadData. We * want to append it to any already-received error message. + * + * TODO: handle purging the queue here */ pqSaveErrorResult(conn); conn->asyncStatus = PGASYNC_IDLE; @@ -1732,13 +2151,33 @@ PQgetResult(PGconn *conn) switch (conn->asyncStatus) { case PGASYNC_IDLE: + case PGASYNC_QUEUED: res = NULL; /* query is complete */ break; case PGASYNC_READY: res = pqPrepareAsyncResult(conn); + if (conn->in_batch) + { + /* batched queries aren't followed by a Sync to put us back in + * PGASYNC_IDLE state, and when we do get a sync we could still + * have another batch coming after this one. + * + * The connection isn't idle since we can't submit new + * nonbatched commands. It isn't also busy since the current + * command is done and we need to process a new one. + */ + conn->asyncStatus = PGASYNC_QUEUED; + } + else + { + /* Set the state back to BUSY, allowing parsing to proceed. */ + conn->asyncStatus = PGASYNC_BUSY; + } + break; + case PGASYNC_READY_MORE: + res = pqPrepareAsyncResult(conn); /* Set the state back to BUSY, allowing parsing to proceed. */ conn->asyncStatus = PGASYNC_BUSY; - break; case PGASYNC_COPY_IN: res = getCopyResult(conn, PGRES_COPY_IN); break; @@ -1915,6 +2354,13 @@ PQexecStart(PGconn *conn) if (!conn) return false; + if (conn->asyncStatus == PGASYNC_QUEUED || conn->in_batch) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot PQexec in batch mode\n")); + return false; + } + /* * Silently discard any prior query result that application didn't eat. * This is probably poor design, but it's here for backward compatibility. @@ -2109,6 +2555,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal) static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) { + PGcommandQueueEntry *pipeCmd = NULL; + PGQueryClass *queryclass; + /* Treat null desc_target as empty string */ if (!desc_target) desc_target = ""; @@ -2124,6 +2573,20 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) return 0; } + if (conn->in_batch) + { + pipeCmd = PQmakePipelinedCommand(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + queryclass = &pipeCmd->queryclass; + } + else + { + queryclass = &conn->queryclass; + } + /* construct the Describe message */ if (pqPutMsgStart('D', false, conn) < 0 || pqPutc(desc_type, conn) < 0 || @@ -2132,15 +2595,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) goto sendFailed; /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (!conn->in_batch) + { + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are doing a Describe */ - conn->queryclass = PGQUERY_DESCRIBE; + *queryclass = PGQUERY_DESCRIBE; /* reset last-query string (not relevant now) */ - if (conn->last_query) + if (conn->last_query && !conn->in_batch) { free(conn->last_query); conn->last_query = NULL; @@ -2154,10 +2620,14 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->in_batch) + PQappendPipelinedCommand(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + PQrecyclePipelinedCommand(conn, pipeCmd); pqHandleSendFailure(conn); return 0; } diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index f1b90f3..e824f36 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -412,6 +412,12 @@ pqParseInput2(PGconn *conn) { char id; + if (conn->asyncStatus == PGASYNC_QUEUED || conn->in_batch) + { + fprintf(stderr, "internal error, attempt to read v2 protocol in batch mode"); + abort(); + } + /* * Loop to parse successive complete messages available in the buffer. */ diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 0b8c62f..30c8dfd 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -220,10 +220,17 @@ pqParseInput3(PGconn *conn) return; conn->asyncStatus = PGASYNC_READY; break; - case 'Z': /* backend is ready for new query */ + case 'Z': /* sync response, backend is ready for new query */ if (getReadyForQuery(conn)) return; - conn->asyncStatus = PGASYNC_IDLE; + if (conn->in_batch) + { + conn->result = PQmakeEmptyPGresult(conn, + PGRES_BATCH_OK); + conn->asyncStatus = PGASYNC_READY; + } + else + conn->asyncStatus = PGASYNC_IDLE; break; case 'I': /* empty query */ if (conn->result == NULL) @@ -305,7 +312,7 @@ pqParseInput3(PGconn *conn) * parsing until the application accepts the current * result. */ - conn->asyncStatus = PGASYNC_READY; + conn->asyncStatus = PGASYNC_READY_MORE; return; } break; diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 9ca0756..a210e3e 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -91,7 +91,9 @@ typedef enum PGRES_NONFATAL_ERROR, /* notice or warning message */ PGRES_FATAL_ERROR, /* query failed */ PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */ - PGRES_SINGLE_TUPLE /* single tuple from larger resultset */ + PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */ + PGRES_BATCH_OK, /* successful end of a batch of commands */ + PGRES_BATCH_ABORTED, /* Command didn't run because of an abort earlier in a batch */ } ExecStatusType; typedef enum @@ -421,6 +423,14 @@ extern PGresult *PQgetResult(PGconn *conn); extern int PQisBusy(PGconn *conn); extern int PQconsumeInput(PGconn *conn); +/* Routines for batch mode management */ +extern int PQisInBatchMode(PGconn *conn); +extern int PQqueriesInBatch(PGconn *conn); +extern int PQbeginBatchMode(PGconn *conn); +extern int PQendBatchMode(PGconn *conn); +extern int PQendBatch(PGconn *conn); +extern int PQgetNextQuery(PGconn *conn); + /* LISTEN/NOTIFY support */ extern PGnotify *PQnotifies(PGconn *conn); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 1183323..6caed9f 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -217,10 +217,13 @@ typedef enum { PGASYNC_IDLE, /* nothing's happening, dude */ PGASYNC_BUSY, /* query in progress */ - PGASYNC_READY, /* result ready for PQgetResult */ + PGASYNC_READY, /* query done, waiting for client to fetch result */ + PGASYNC_READY_MORE, /* query done, waiting for client to fetch result, + More results expected from this query */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */ - PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */ + PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */ + PGASYNC_QUEUED /* Current query done, more in queue */ } PGAsyncStatusType; /* PGQueryClass tracks which query protocol we are now executing */ @@ -229,7 +232,8 @@ typedef enum PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */ PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */ PGQUERY_PREPARE, /* Parse only (PQprepare) */ - PGQUERY_DESCRIBE /* Describe Statement or Portal */ + PGQUERY_DESCRIBE, /* Describe Statement or Portal */ + PGQUERY_SYNC /* A protocol sync to end a batch */ } PGQueryClass; /* PGSetenvStatusType defines the state of the PQSetenv state machine */ @@ -292,6 +296,22 @@ typedef struct pgDataValue const char *value; /* data value, without zero-termination */ } PGdataValue; +/* An entry in the pending command queue. Used by batch mode to keep track + * of the expected results of future commands we've dispatched. + * + * Note that entries in this list are reused by being zeroed and appended to + * the tail when popped off the head. The entry with null next pointer is not + * the end of the list of expected commands, that's the tail pointer in + * pg_conn. + */ +typedef struct pgCommandQueueEntry +{ + PGQueryClass queryclass; /* Query type; PGQUERY_SYNC for sync msg */ + char *query; /* SQL command, or NULL if unknown */ + struct pgCommandQueueEntry *next; +} PGcommandQueueEntry; + + /* * PGconn stores all the state data associated with a single connection * to a backend. @@ -356,6 +376,7 @@ struct pg_conn bool options_valid; /* true if OK to attempt connection */ bool nonblocking; /* whether this connection is using nonblock * sending semantics */ + bool in_batch; /* connection is in batch (pipelined) mode */ bool singleRowMode; /* return current query result row-by-row? */ char copy_is_binary; /* 1 = copy binary, 0 = copy text */ int copy_already_done; /* # bytes already returned in COPY @@ -363,6 +384,15 @@ struct pg_conn PGnotify *notifyHead; /* oldest unreported Notify msg */ PGnotify *notifyTail; /* newest unreported Notify msg */ + /* The command queue + * + * head is the next pending cmd, tail is where we append new commands. + * Freed entries for recycling go on the recycle linked list. + */ + PGcommandQueueEntry *cmd_queue_head; + PGcommandQueueEntry *cmd_queue_tail; + PGcommandQueueEntry *cmd_queue_recycle; + /* Connection data */ /* See PQconnectPoll() for how we use 'int' and not 'pgsocket'. */ pgsocket sock; /* FD for socket, PGINVALID_SOCKET if diff --git a/src/test/examples/Makefile b/src/test/examples/Makefile index 31da210..92a6faf 100644 --- a/src/test/examples/Makefile +++ b/src/test/examples/Makefile @@ -14,7 +14,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) override LDLIBS := $(libpq_pgport) $(LDLIBS) -PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64 +PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64 testlibpqbatch all: $(PROGS) diff --git a/src/test/examples/testlibpqbatch.c b/src/test/examples/testlibpqbatch.c new file mode 100644 index 0000000..f82bce2 --- /dev/null +++ b/src/test/examples/testlibpqbatch.c @@ -0,0 +1,690 @@ +/* + * src/test/examples/testlibpqbatch.c + * + * + * testlibpqbatch.c + * Test of batch execution funtionality + */ + +#ifdef WIN32 +#include <windows.h> +#endif +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <sys/time.h> +#include <sys/types.h> +#include "libpq-fe.h" + +static void exit_nicely(PGconn *conn); +static void simple_batch(PGconn *conn); +static void test_disallowed_in_batch(PGconn *conn); +static void batch_insert_pipelined(PGconn *conn, int n_rows); +static void batch_insert_sequential(PGconn *conn, int n_rows); + +#ifndef VERBOSE +#define VERBOSE 0 +#endif + +static const Oid INT4OID = 23; + +static void +exit_nicely(PGconn *conn) +{ + PQfinish(conn); + exit(1); +} + +static void +simple_batch(PGconn *conn) +{ + PGresult *res = NULL; + const char *dummy_params[1] = {"1"}; + Oid dummy_param_oids[1] = {INT4OID}; + + /* + * Enter batch mode and dispatch a set of operations, which we'll then process + * the results of as they come in. + * + * For a simple case we should be able to do this without interim processing + * of results since our out buffer will give us enough slush to work with + * and we won't block on sending. So blocking mode is fine. + */ + if (PQisnonblocking(conn)) + { + fprintf(stderr, "Expected blocking connection mode\n"); + goto fail; + } + + if (!PQbeginBatchMode(conn)) + { + fprintf(stderr, "failed to enter batch mode: %s\n", PQerrorMessage(conn)); + goto fail; + } + + if (!PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, + dummy_params, NULL, NULL, 0)) + { + fprintf(stderr, "dispatching SELECT failed: %s\n", PQerrorMessage(conn)); + goto fail; + } + + if (PQendBatchMode(conn)) + { + fprintf(stderr, "exiting batch mode with work in progress should fail, but succeeded\n"); + goto fail; + } + + if (!PQendBatch(conn)) + { + fprintf(stderr, "Ending a batch failed: %s\n", PQerrorMessage(conn)); + goto fail; + } + + /* in batch mode we have to ask for the first result to be processed; until we + * do PQgetResult will return null: */ + if (PQgetResult(conn) != NULL) + { + fprintf(stderr, "PQgetResult returned something in a batch before first PQgetNextQuery() call\n"); + goto fail; + } + + if (!PQgetNextQuery(conn)) + { + fprintf(stderr, "PQgetNextQuery() failed at first batch entry: %s\n", PQerrorMessage(conn)); + goto fail; + } + + res = PQgetResult(conn); + if (res == NULL) + { + fprintf(stderr, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn)); + goto fail; + } + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, "Unexpected result code %s from first batch item\n", + PQresStatus(PQresultStatus(res))); + goto fail; + } + + PQclear(res); + + if (PQgetResult(conn) != NULL) + { + fprintf(stderr, "PQgetResult returned something extra after first result before PQgetNextQuery() call\n"); + goto fail; + } + + /* Even though we've processed the result there's still a sync to come and we + * can't exit batch mode yet */ + if (PQendBatchMode(conn)) + { + fprintf(stderr, "exiting batch mode after query but before sync succeeded incorrectly\n"); + goto fail; + } + + /* should now get an explicit sync result */ + if (!PQgetNextQuery(conn)) + { + fprintf(stderr, "PQgetNextQuery() failed at sync after first batch entry: %s\n", PQerrorMessage(conn)); + goto fail; + } + + res = PQgetResult(conn); + if (res == NULL) + { + fprintf(stderr, "PQgetResult returned null when sync result expected: %s\n", PQerrorMessage(conn)); + goto fail; + } + + if (PQresultStatus(res) != PGRES_BATCH_OK) + { + fprintf(stderr, "Unexpected result code %s instead of sync result, error: %s\n", + PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + goto fail; + } + + PQclear(res); + + /* We're still in a batch... */ + if (!PQisInBatchMode(conn)) + { + fprintf(stderr, "Fell out of batch mode somehow\n"); + goto fail; + } + + /* until we end it, which we can safely do now */ + if (!PQendBatchMode(conn)) + { + fprintf(stderr, "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn)); + goto fail; + } + + if (PQisInBatchMode(conn)) + { + fprintf(stderr, "exiting batch mode didn't seem to work\n"); + goto fail; + } + + return; + +fail: + PQclear(res); + exit_nicely(conn); +} + +static void +test_disallowed_in_batch(PGconn *conn) +{ + PGresult *res = NULL; + + if (PQisnonblocking(conn)) + { + fprintf(stderr, "Expected blocking connection mode: %u\n", __LINE__); + goto fail; + } + + if (!PQbeginBatchMode(conn)) + { + fprintf(stderr, "Unable to enter batch mode\n"); + goto fail; + } + + if (!PQisInBatchMode(conn)) + { + fprintf(stderr, "Batch mode not activated properly\n"); + goto fail; + } + + /* PQexec should fail in batch mode */ + res = PQexec(conn, "SELECT 1"); + if (PQresultStatus(res) != PGRES_FATAL_ERROR) + { + fprintf(stderr, "PQexec should fail in batch mode but succeeded\n"); + goto fail; + } + + /* So should PQsendQuery */ + if (PQsendQuery(conn, "SELECT 1") != 0) + { + fprintf(stderr, "PQsendQuery should fail in batch mode but succeeded\n"); + goto fail; + } + + /* Entering batch mode when already in batch mode is OK */ + if (!PQbeginBatchMode(conn)) + { + fprintf(stderr, "re-entering batch mode should be a no-op but failed\n"); + goto fail; + } + + if (PQisBusy(conn)) + { + fprintf(stderr, "PQisBusy should return false when idle in batch, returned true\n"); + goto fail; + } + + /* ok, back to normal command mode */ + if (!PQendBatchMode(conn)) + { + fprintf(stderr, "couldn't exit idle empty batch mode\n"); + goto fail; + } + + if (PQisInBatchMode(conn)) + { + fprintf(stderr, "Batch mode not terminated properly\n"); + goto fail; + } + + /* exiting batch mode when not in batch mode should be a no-op */ + if (!PQendBatchMode(conn)) + { + fprintf(stderr, "batch mode exit when not in batch mode should succeed but failed\n"); + goto fail; + } + + /* can now PQexec again */ + res = PQexec(conn, "SELECT 1"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, "PQexec should succeed after exiting batch mode but failed with: %s\n", + PQerrorMessage(conn)); + goto fail; + } + + return; + +fail: + PQclear(res); + exit_nicely(conn); +} + +/* max char length of an int32, plus sign and null terminator */ +#define MAXINTLEN 12 + +/* State machine enums for batch insert */ +typedef enum BatchInsertStep +{ + BI_BEGIN_TX, + BI_DROP_TABLE, + BI_CREATE_TABLE, + BI_PREPARE, + BI_INSERT_ROWS, + BI_COMMIT_TX, + BI_SYNC, + BI_DONE +} BatchInsertStep; + +static const char * const drop_table_sql + = "DROP TABLE IF EXISTS batch_demo"; +static const char * const create_table_sql + = "CREATE UNLOGGED TABLE batch_demo(id serial primary key, itemno integer);"; +static const char * const insert_sql + = "INSERT INTO batch_demo(itemno) VALUES ($1);"; + +static void +batch_insert_pipelined(PGconn *conn, int n_rows) +{ + PGresult *res = NULL; + const char *insert_params[1]; + Oid insert_param_oids[1] = {INT4OID}; + char insert_param_0[MAXINTLEN]; + BatchInsertStep send_step = BI_BEGIN_TX, recv_step = BI_BEGIN_TX; + int rows_to_send, rows_to_receive; + + insert_params[0] = &insert_param_0[0]; + + rows_to_send = rows_to_receive = n_rows; + + /* + * Do a batched insert into a table created at the start of the batch + */ + if (!PQbeginBatchMode(conn)) + { + fprintf(stderr, "failed to enter batch mode: %s\n", PQerrorMessage(conn)); + goto fail; + } + + if (!PQsendQueryParams(conn, "BEGIN", + 0, NULL, NULL, NULL, NULL, 0)) + { + fprintf(stderr, "xact start failed: %s\n", PQerrorMessage(conn)); + goto fail; + } + #if VERBOSE + fprintf(stdout, "sent BEGIN\n"); + #endif + send_step = BI_DROP_TABLE; + + if (!PQsendQueryParams(conn, drop_table_sql, + 0, NULL, NULL, NULL, NULL, 0)) + { + fprintf(stderr, "dispatching DROP TABLE failed: %s\n", PQerrorMessage(conn)); + goto fail; + } + #if VERBOSE + fprintf(stdout, "sent DROP\n"); + #endif + send_step = BI_CREATE_TABLE; + + if (!PQsendQueryParams(conn, create_table_sql, + 0, NULL, NULL, NULL, NULL, 0)) + { + fprintf(stderr, "dispatching CREATE TABLE failed: %s\n", PQerrorMessage(conn)); + goto fail; + } + #if VERBOSE + fprintf(stdout, "sent CREATE\n"); + #endif + send_step = BI_PREPARE; + + if (!PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids)) + { + fprintf(stderr, "dispatching PREPARE failed: %s\n", PQerrorMessage(conn)); + goto fail; + } + #if VERBOSE + fprintf(stdout, "sent PREPARE\n"); + #endif + send_step = BI_INSERT_ROWS; + + /* Now we start inserting. We'll be sending enough data that we could fill + * our out buffer, so to avoid deadlocking we need to enter nonblocking + * mode and consume input while we send more output. As results of each query are + * processed we should pop them to allow processing of the next query. There's + * no need to finish the batch before processing results. + */ + if (PQsetnonblocking(conn, 1) != 0) + { + fprintf(stderr, "failed to set nonblocking mode: %s\n", PQerrorMessage(conn)); + goto fail; + } + + while (recv_step != BI_DONE) + { + int sock; + fd_set input_mask; + fd_set output_mask; + + sock = PQsocket(conn); + + if (sock < 0) + break; /* shouldn't happen */ + + FD_ZERO(&input_mask); + FD_SET(sock, &input_mask); + FD_ZERO(&output_mask); + FD_SET(sock, &output_mask); + + if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0) + { + fprintf(stderr, "select() failed: %s\n", strerror(errno)); + exit_nicely(conn); + } + + /* Process any results, so we keep the server's out buffer free flowing + * and it can continue to process input */ + if (FD_ISSET(sock, &input_mask)) + { + PQconsumeInput(conn); + + /* Read until we'd block if we tried to read */ + while (!PQisBusy(conn) && recv_step < BI_DONE) + { + const char * cmdtag; + const char * description = NULL; + int status; + BatchInsertStep next_step; + + + res = PQgetResult(conn); + + if (res == NULL) + { + /* No more results from this query, advance to the next result */ + if (!PQgetNextQuery(conn)) + { + fprintf(stderr, "Expected next query result but unable to dequeue: %s\n", + PQerrorMessage(conn)); + goto fail; + } + #if VERBOSE + fprintf(stdout, "next query!\n"); + #endif + continue; + } + + status = PGRES_COMMAND_OK; + next_step = recv_step + 1; + switch (recv_step) + { + case BI_BEGIN_TX: + cmdtag = "BEGIN"; + break; + case BI_DROP_TABLE: + cmdtag = "DROP TABLE"; + break; + case BI_CREATE_TABLE: + cmdtag = "CREATE TABLE"; + break; + case BI_PREPARE: + cmdtag = ""; + description = "PREPARE"; + break; + case BI_INSERT_ROWS: + cmdtag = "INSERT"; + rows_to_receive --; + if (rows_to_receive > 0) + next_step = BI_INSERT_ROWS; + break; + case BI_COMMIT_TX: + cmdtag = "COMMIT"; + break; + case BI_SYNC: + cmdtag = ""; + description = "SYNC"; + status = PGRES_BATCH_OK; + break; + case BI_DONE: + /* unreachable */ + abort(); + } + if (description == NULL) + description = cmdtag; + + #if VERBOSE + fprintf(stderr, "At state %d (%s) expect tag '%s', result code %s, expect %d more rows, transition to %d\n", + recv_step, description, cmdtag, PQresStatus(status), rows_to_receive, next_step); + #endif + + if (PQresultStatus(res) != status) + { + fprintf(stderr, "%s reported status %s, expected %s. Error msg is [%s]\n", + description, PQresStatus(PQresultStatus(res)), PQresStatus(status), PQerrorMessage(conn)); + goto fail; + } + if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0) + { + fprintf(stderr, "%s expected command tag '%s', got '%s'\n", + description, cmdtag, PQcmdStatus(res)); + goto fail; + } + #if VERBOSE + fprintf(stdout, "Got %s OK\n", cmdtag); + #endif + recv_step = next_step; + + PQclear(res); + } + } + + /* Write more rows and/or the end batch message, if needed */ + if (FD_ISSET(sock, &output_mask)) + { + PQflush(conn); + + if (send_step == BI_INSERT_ROWS) + { + snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send); + insert_param_0[MAXINTLEN-1] = '\0'; + + if (PQsendQueryPrepared(conn, "my_insert", + 1, insert_params, NULL, NULL, 0)) + { + #if VERBOSE + fprintf(stdout, "sent row %d\n", rows_to_send); + #endif + rows_to_send --; + if (rows_to_send == 0) + send_step = BI_COMMIT_TX; + } + else + { + /* in nonblocking mode, so it's OK for an insert to fail to send */ + fprintf(stderr, "WARNING: failed to send insert #%d: %s\n", + rows_to_send, PQerrorMessage(conn)); + } + } + else if (send_step == BI_COMMIT_TX) + { + if (PQsendQueryParams(conn, "COMMIT", + 0, NULL, NULL, NULL, NULL, 0)) + { + #if VERBOSE + fprintf(stdout, "sent COMMIT\n"); + #endif + send_step = BI_SYNC; + } + else + { + fprintf(stderr, "WARNING: failed to send commit: %s\n", + PQerrorMessage(conn)); + } + } else if (send_step == BI_SYNC) + { + if (PQendBatch(conn)) + { + #if VERBOSE + fprintf(stdout, "Dispatched end batch message\n"); + #endif + send_step = BI_DONE; + } + else + { + fprintf(stderr, "WARNING: Ending a batch failed: %s\n", + PQerrorMessage(conn)); + } + } + } + + } + + /* We've got the sync message and the batch should be done */ + if (!PQendBatchMode(conn)) + { + fprintf(stderr, "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn)); + goto fail; + } + + if (PQsetnonblocking(conn, 0) != 0) + { + fprintf(stderr, "failed to clear nonblocking mode: %s\n", PQerrorMessage(conn)); + goto fail; + } + + return; + +fail: + PQclear(res); + exit_nicely(conn); +} + + +static void +batch_insert_sequential(PGconn *conn, int nrows) +{ + PGresult *res = NULL; + const char *insert_params[1]; + Oid insert_param_oids[1] = {INT4OID}; + char insert_param_0[MAXINTLEN]; + + insert_params[0] = &insert_param_0[0]; + + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, "BEGIN failed: %s\n", PQerrorMessage(conn)); + goto fail; + } + PQclear(res); + + res = PQexec(conn, drop_table_sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, "DROP TABLE failed: %s\n", PQerrorMessage(conn)); + goto fail; + } + PQclear(res); + + res =PQexec(conn, create_table_sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, "CREATE TABLE failed: %s\n", PQerrorMessage(conn)); + goto fail; + } + PQclear(res); + + res = PQprepare(conn, "my_insert2", insert_sql, 1, insert_param_oids); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, "prepare failed: %s\n", PQerrorMessage(conn)); + goto fail; + } + PQclear(res); + + while (nrows > 0) + { + snprintf(&insert_param_0[0], MAXINTLEN, "%d", nrows); + insert_param_0[MAXINTLEN-1] = '\0'; + + res = PQexecPrepared(conn, "my_insert2", + 1, insert_params, NULL, NULL, 0); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, "INSERT failed: %s\n", PQerrorMessage(conn)); + goto fail; + } + PQclear(res); + nrows --; + } + + res = PQexec(conn, "COMMIT"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, "COMMIT failed: %s\n", PQerrorMessage(conn)); + goto fail; + } + PQclear(res); + + return; + +fail: + PQclear(res); +} + + +int +main(int argc, char **argv) +{ + const char *conninfo; + PGconn *conn; + struct timeval start_time, end_time, elapsed_time; + + /* + * If the user supplies a parameter on the command line, use it as the + * conninfo string; otherwise default to setting dbname=postgres and using + * environment variables or defaults for all other connection parameters. + */ + if (argc > 1) + conninfo = argv[1]; + else + conninfo = "dbname = postgres"; + + /* Make a connection to the database */ + conn = PQconnectdb(conninfo); + + /* Check to see that the backend connection was successfully made */ + if (PQstatus(conn) != CONNECTION_OK) + { + fprintf(stderr, "Connection to database failed: %s\n", + PQerrorMessage(conn)); + exit_nicely(conn); + } + + test_disallowed_in_batch(conn); + simple_batch(conn); + + gettimeofday(&start_time, NULL); + batch_insert_pipelined(conn, 10000); + gettimeofday(&end_time, NULL); + timersub(&end_time, &start_time, &elapsed_time); + printf("batch insert elapsed: %ld.%06lds\n", elapsed_time.tv_sec, elapsed_time.tv_usec); + + gettimeofday(&start_time, NULL); + batch_insert_sequential(conn, 10000); + gettimeofday(&end_time, NULL); + timersub(&end_time, &start_time, &elapsed_time); + printf("sequential insert elapsed: %ld.%06lds\n", elapsed_time.tv_sec, elapsed_time.tv_usec); + + fprintf(stderr, "Done.\n"); + + + /* close the connection to the database and cleanup */ + PQfinish(conn); + + return 0; +} -- 2.5.5
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers