My apologies. The wiki said to discuss early, even before writing the code if possible, but I added a link to the PR for those who really wanted to see the details.
I'm attaching a diff now. This is not a patch, it's just a discussion piece. The problem was that PQgetCopyData loops use a lot of CPU time, and this alternative reduces that by a lot. Jeroen On Thu, 2 Mar 2023 at 13:38, Daniel Gustafsson <dan...@yesql.se> wrote: > > On 1 Mar 2023, at 15:23, Jeroen Vermeulen <jtv...@gmail.com> wrote: > > > PR for easy discussion: https://github.com/jtv/postgres/pull/1 > > The process for discussing work on pgsql-hackers is to attach the patch to > the > email and discuss it inline in the thread. That way all versions of the > patch > as well as the discussion is archived and searchable. > > -- > Daniel Gustafsson > >
diff --git a/bench.c b/bench.c new file mode 100644 index 0000000000..c3206d2927 --- /dev/null +++ b/bench.c @@ -0,0 +1,134 @@ +/* + * Minimal benchmark for PQgetCopyData alternative. + * + * Define CALL to 0 (to use the classic PQgetCopyData) or 1 (to use the + * proposed new function), then run the binary through "time" to get time and + * CPU usage stats. + * + * DO NOT UPSTREAM THIS FILE. It's just a demonstration for the prototype + * patch. + */ +#include <stdio.h> +#include <stdlib.h> + +#include <libpq-fe.h> + +/* Define CALL to... + * 0: Use classic PQgetCopyData() + * 1: Use experimental PQhandleCopyData() + */ + +/* Benchmark results (best result per category, out of 4 runs): + * + * PQgetCopyData: + * real - 0m32.972s + * user - 0m11.364s + * sys - 0m1.255s + * + * PQhandleCopyData: + * real - 0m32.839s + * user - 0m3.407s + * sys - 0m0.872s + */ + +#if CALL == 1 +/* + * Print line, add newline. + */ +static int +print_row_and_newline(void *, char *buf, size_t len) +{ + /* Zero-terminate the buffer. */ + buf[len - 1] = '\0'; + printf("%s\n", buf); + return 0; +} +#endif + + +int +main() +{ +#if !defined(CALL) +#error "Set CALL: 0 = PQgetCopyDta, 1 = PQhandleCopyData." +#elif CALL == 0 + fprintf(stderr, "Testing classic PQgetCopyData().\n"); +#elif CALL == 1 + fprintf(stderr, "Testing experimental PQhandleCopyData.\n"); +#else +#error "Unknown CALL value." +#endif + + PGconn *cx = PQconnectdb(""); + + if (!cx) + { + fprintf(stderr, "Could not connect.\n"); + exit(1); + } + PGresult *tx = PQexec(cx, "BEGIN"); + + if (!tx) + { + fprintf(stderr, "No result from BEGIN!\n"); + exit(1); + } + int s = PQresultStatus(tx); + + if (s != PGRES_COMMAND_OK) + { + fprintf(stderr, "Failed to start transaction: status %d.\n", s); + exit(1); + } + + PGresult *r = PQexec( + cx, + "COPY (" + "SELECT generate_series, 'row #' || generate_series " + "FROM generate_series(1, 100000000)" + ") TO STDOUT" + ); + + if (!r) + { + fprintf(stderr, "No result!\n"); + exit(1); + } + int status = PQresultStatus(r); + + if (status != PGRES_COPY_OUT) + { + fprintf(stderr, "Failed to start COPY: status %d.\n", status); + exit(1); + } + + int bytes; +#if CALL == 0 + char *buffer = NULL; + + for ( + bytes = PQgetCopyData(cx, &buffer, 0); + bytes > 0; + bytes = PQgetCopyData(cx, &buffer, 0) + ) + { + if (buffer) + { + printf("%s", buffer); + PQfreemem(buffer); + } + } +#elif CALL == 1 + while ((bytes = PQhandleCopyData(cx, print_row_and_newline, NULL, 0)) > 0); +#else +#error "Unknown CALL value." +#endif + + if (bytes != -1) + { + fprintf(stderr, "Got unexpected result: %d.\n", bytes); + exit(1); + } + + /* (Don't bother cleaning up.) */ +} diff --git a/src/interfaces/ecpg/ecpglib/execute.c b/src/interfaces/ecpg/ecpglib/execute.c index 641851983d..1e4eba58a2 100644 --- a/src/interfaces/ecpg/ecpglib/execute.c +++ b/src/interfaces/ecpg/ecpglib/execute.c @@ -32,6 +32,17 @@ #include "sqlda-compat.h" #include "sqlda-native.h" +/* + * Print non-zero-terminated line received from COPY. + */ +static int +print_row(void *, char *buf, size_t len) +{ + buf[len - 1] = '\0'; + printf("%s\n", buf); + return 0; +} + /* * This function returns a newly malloced string that has ' and \ * escaped. @@ -1876,16 +1887,10 @@ ecpg_process_output(struct statement *stmt, bool clear_result) break; case PGRES_COPY_OUT: { - char *buffer; int res; ecpg_log("ecpg_process_output on line %d: COPY OUT data transfer in progress\n", stmt->lineno); - while ((res = PQgetCopyData(stmt->connection->connection, - &buffer, 0)) > 0) - { - printf("%s", buffer); - PQfreemem(buffer); - } + while ((res = PQhandleCopyData(stmt->connection->connection, print_row, NULL, 0)) > 0); if (res == -1) { /* COPY done */ diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index e8bcc88370..add1ff1591 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -186,3 +186,4 @@ PQpipelineStatus 183 PQsetTraceFlags 184 PQmblenBounded 185 PQsendFlushRequest 186 +PQhandleCopyData 187 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index ec62550e38..d0f0501e80 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -2709,6 +2709,42 @@ PQgetCopyData(PGconn *conn, char **buffer, int async) return pqGetCopyData3(conn, buffer, async); } +/* + * PQhandleCopyData - read a row of data from the backend during COPY OUT + * or COPY BOTH, and invoke a callback. + * + * Pass a "handler" callback which takes a buffer and its size. (Its return + * value is currently stil meaningless, but could become a flag like "this + * ride is making me sick and I'd like to get off.) + * + * Calls handler only after receiving a full row. The buffer does NOT have a + * terminating zero, so do not go beyond the given size. However, you may + * modify the buffer's contents, and the line ends in a newline. If you need + * a terminating zero, you are free to overwrite the newline. + * + * The context pointer can be anything; this function will pass it to handler. + * + * If successful, calls handler and returns row length (always > 0) as result. + * If no row is available yet (only possible if async is true), does not call + * handler, and returns 0 as result. + * If the copy has ended (consult PQgetResult), does not call handler, and + * returns -1. + * On failure, does not call handler, and returns -2 (consult PQerrorMessage). + */ +int +PQhandleCopyData(PGconn *conn, int (*handler) (void *, char *, size_t), void *context, int async) +{ + if (!conn) + return -2; + if (conn->asyncStatus != PGASYNC_COPY_OUT && + conn->asyncStatus != PGASYNC_COPY_BOTH) + { + libpq_append_conn_error(conn, "no COPY in progress"); + return -2; + } + return pqHandleCopyData3(conn, handler, context, async); +} + /* * PQgetline - gets a newline-terminated string from the backend. * diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 8ab6a88416..1f4ecec9bd 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -1783,6 +1783,63 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async) } } +/* + * PQhandleCopyData - read a row of data from the backend during COPY OUT + * or COPY BOTH, and pass it to a caller-supplied buffer. + * + * Pass a "handler" callback which takes a buffer and its size. (Its return + * value is currently stil meaningless, but could become a flag like "this + * ride is making me sick and I'd like to get off.) + * + * Calls handler only after receiving a full row. The buffer does NOT have a + * terminating zero, so do not go beyond the given size. However, you may + * modify the buffer's contents, and the line ends in a newline. If you need + * a terminating zero, you are free to overwrite the newline. + * + * The context pointer can be anything; this function will pass it to handler. + * + * If successful, calls handler and returns row length (always > 0) as result. + * If no row is available yet (only possible if async is true), does not call + * handler, and returns 0 as result. + * If the copy has ended (consult PQgetResult), does not call handler, and + * returns -1. + * On failure, does not call handler, and returns -2 (consult PQerrorMessage). + */ +int +pqHandleCopyData3(PGconn *conn, int (*handler) (void *, char *, size_t), void *context, int async) +{ + int msgLength; + + for (;;) + { + msgLength = getCopyDataMessage(conn); + if (msgLength < 0) + return msgLength; /* end-of-copy or error */ + if (msgLength == 0) + { + /* Don't block if async read requested */ + if (async) + return 0; + /* Need to load more data */ + if (pqWait(true, false, conn) || + pqReadData(conn) < 0) + return -2; + continue; + } + + msgLength -= 4; + if (msgLength > 0) + { + /* We have a row. Call the handler. */ + handler(context, &conn->inBuffer[conn->inCursor], msgLength); + conn->inStart = conn->inCursor + msgLength; + return msgLength; + } + + conn->inStart = conn->inCursor; + } +} + /* * PQgetline - gets a newline-terminated string from the backend. * diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index f3d9220496..c07544b255 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -482,6 +482,9 @@ extern int PQputCopyData(PGconn *conn, const char *buffer, int nbytes); extern int PQputCopyEnd(PGconn *conn, const char *errormsg); extern int PQgetCopyData(PGconn *conn, char **buffer, int async); +/* TODO: "House style" would be int, rather than size_t. */ +extern int PQhandleCopyData(PGconn *conn, int handler(void *, char *, size_t), void *context, int async); + /* Deprecated routines for copy in/out */ extern int PQgetline(PGconn *conn, char *buffer, int length); extern int PQputline(PGconn *conn, const char *string); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index d94b648ea5..8936ea0388 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -687,6 +687,7 @@ extern void pqBuildErrorMessage3(PQExpBuffer msg, const PGresult *res, PGVerbosity verbosity, PGContextVisibility show_context); extern int pqGetNegotiateProtocolVersion3(PGconn *conn); extern int pqGetCopyData3(PGconn *conn, char **buffer, int async); +extern int pqHandleCopyData3(PGconn *conn, int (*handler) (void *, char *, size_t), void *context, int async); extern int pqGetline3(PGconn *conn, char *s, int maxlen); extern int pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize); extern int pqEndcopy3(PGconn *conn);