Interested, yes. But there may be reasons not to do that. Last time I looked the binary format wasn't documented.
Anyway, I tried re-implementing pqGetCopyData3() using the callback. Wasn't hard, but I did have to add a way for the callback to return an error. Kept it pretty dumb for now, hoping a sensible rule will become obvious later. Saw no obvious performance impact, so that's good. Jeroen On Fri, 3 Mar 2023 at 19:53, Tom Lane <t...@sss.pgh.pa.us> wrote: > Jeroen Vermeulen <jtv...@gmail.com> writes: > > The printf() is just the simplest example that sprang to mind though. > > There may be other use-cases out there involving libraries that require > > zero-terminated strings, and I figured an ability to set a sentinel could > > help those. > > Well, since it won't help for binary COPY, I'm skeptical that this is > something we should cater to. Anybody who's sufficiently worried about > performance to be trying to remove malloc/free cycles ought to be > interested in binary COPY as well. > > regards, tom lane >
diff --git a/bench.c b/bench.c new file mode 100644 index 0000000000..35f8c7a36f --- /dev/null +++ b/bench.c @@ -0,0 +1,132 @@ +/* + * Minimal benchmark for PQgetCopyData alternative. + * + * Define CALL to 0 (to use the classic PQgetCopyData) or 1 (to use the + * proposed new function), then run the binary through "time" to get time and + * CPU usage stats. + * + * DO NOT UPSTREAM THIS FILE. It's just a demonstration for the prototype + * patch. + */ +#include <stdio.h> +#include <stdlib.h> + +#include <libpq-fe.h> + +/* Define CALL to... + * 0: Use classic PQgetCopyData() + * 1: Use experimental PQhandleCopyData() + */ + +/* Benchmark results (best result per category, out of 4 runs): + * + * PQgetCopyData: + * real - 0m32.972s + * user - 0m11.364s + * sys - 0m1.255s + * + * PQhandleCopyData: + * real - 0m32.839s + * user - 0m3.407s + * sys - 0m0.872s + */ + +#if CALL == 1 +/* + * Print line, add newline. + */ +static int +print_row_and_newline(void *, const char *buf, size_t len) +{ + fwrite(buf, 1, len, stdout); + return 0; +} +#endif + + +int +main() +{ +#if !defined(CALL) +#error "Set CALL: 0 = PQgetCopyDta, 1 = PQhandleCopyData." +#elif CALL == 0 + fprintf(stderr, "Testing classic PQgetCopyData().\n"); +#elif CALL == 1 + fprintf(stderr, "Testing experimental PQhandleCopyData.\n"); +#else +#error "Unknown CALL value." +#endif + + PGconn *cx = PQconnectdb(""); + + if (!cx) + { + fprintf(stderr, "Could not connect.\n"); + exit(1); + } + PGresult *tx = PQexec(cx, "BEGIN"); + + if (!tx) + { + fprintf(stderr, "No result from BEGIN!\n"); + exit(1); + } + int s = PQresultStatus(tx); + + if (s != PGRES_COMMAND_OK) + { + fprintf(stderr, "Failed to start transaction: status %d.\n", s); + exit(1); + } + + PGresult *r = PQexec( + cx, + "COPY (" + "SELECT generate_series, 'row #' || generate_series " + "FROM generate_series(1, 100000000)" + ") TO STDOUT" + ); + + if (!r) + { + fprintf(stderr, "No result!\n"); + exit(1); + } + int status = PQresultStatus(r); + + if (status != PGRES_COPY_OUT) + { + fprintf(stderr, "Failed to start COPY: status %d.\n", status); + exit(1); + } + + int bytes; +#if CALL == 0 + char *buffer = NULL; + + for ( + bytes = PQgetCopyData(cx, &buffer, 0); + bytes > 0; + bytes = PQgetCopyData(cx, &buffer, 0) + ) + { + if (buffer) + { + printf("%s", buffer); + PQfreemem(buffer); + } + } +#elif CALL == 1 + while ((bytes = PQhandleCopyData(cx, print_row_and_newline, NULL, 0)) > 0); +#else +#error "Unknown CALL value." +#endif + + if (bytes != -1) + { + fprintf(stderr, "Got unexpected result: %d.\n", bytes); + exit(1); + } + + /* (Don't bother cleaning up.) */ +} diff --git a/src/interfaces/ecpg/ecpglib/execute.c b/src/interfaces/ecpg/ecpglib/execute.c index 641851983d..aaf31ea216 100644 --- a/src/interfaces/ecpg/ecpglib/execute.c +++ b/src/interfaces/ecpg/ecpglib/execute.c @@ -32,6 +32,16 @@ #include "sqlda-compat.h" #include "sqlda-native.h" +/* + * Print non-zero-terminated line received from COPY. + */ +static int +print_row(void *, const char *buf, size_t len) +{ + fwrite(buf, 1, len, stdout); + return 0; +} + /* * This function returns a newly malloced string that has ' and \ * escaped. @@ -1876,16 +1886,10 @@ ecpg_process_output(struct statement *stmt, bool clear_result) break; case PGRES_COPY_OUT: { - char *buffer; int res; ecpg_log("ecpg_process_output on line %d: COPY OUT data transfer in progress\n", stmt->lineno); - while ((res = PQgetCopyData(stmt->connection->connection, - &buffer, 0)) > 0) - { - printf("%s", buffer); - PQfreemem(buffer); - } + while ((res = PQhandleCopyData(stmt->connection->connection, print_row, NULL, 0)) > 0); if (res == -1) { /* COPY done */ diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index e8bcc88370..add1ff1591 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -186,3 +186,4 @@ PQpipelineStatus 183 PQsetTraceFlags 184 PQmblenBounded 185 PQsendFlushRequest 186 +PQhandleCopyData 187 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index ec62550e38..1c9f0ee175 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -2684,6 +2684,27 @@ PQputCopyEnd(PGconn *conn, const char *errormsg) return 1; } +struct GetCopyData_context +{ + PGconn *conn; + char *buffer; +}; + +static int alloc_copy_buffer(void *context, const char *inbuf, size_t len) +{ + struct GetCopyData_context *params = (struct GetCopyData_context *) context; + PGconn *conn = params->conn; + params->buffer = (char *) malloc(len + 1); + if (params->buffer == NULL) + { + libpq_append_conn_error(conn, "out of memory"); + return -2; + } + memcpy(params->buffer, &conn->inBuffer[conn->inCursor], len); + params->buffer[len] = '\0'; /* Add terminating null */ + return 0; +} + /* * PQgetCopyData - read a row of data from the backend during COPY OUT * or COPY BOTH @@ -2697,7 +2718,41 @@ PQputCopyEnd(PGconn *conn, const char *errormsg) int PQgetCopyData(PGconn *conn, char **buffer, int async) { - *buffer = NULL; /* for all failure cases */ + struct GetCopyData_context context; + int result; + context.conn = conn; + context.buffer = NULL; /* for all failure cases */ + result = pqGetCopyData3(conn, alloc_copy_buffer, &context, async); + *buffer = context.buffer; + return result; +} + +/* + * PQhandleCopyData - read a row of data from the backend during COPY OUT + * or COPY BOTH, and invoke a callback. + * + * Pass a "handler" callback which takes a buffer and its size. (Its return + * value is currently stil meaningless, but could become a flag like "this + * ride is making me sick and I'd like to get off.) + * + * Calls handler only after receiving a full row. The buffer does NOT have a + * terminating zero, so do not go beyond the given size. + * + * The context pointer can be anything; this function will pass it to handler. + * + * If successful, calls handler and returns row length (always > 0) as result. + * If no row is available yet (only possible if async is true), does not call + * handler, and returns 0 as result. + * If the copy has ended (consult PQgetResult), does not call handler, and + * returns -1. + * On failure, does not call handler, and returns -2 (consult PQerrorMessage). + */ +int +PQhandleCopyData(PGconn *conn, + int (*handler) (void *, const char *, size_t), + void *context, + int async) +{ if (!conn) return -2; if (conn->asyncStatus != PGASYNC_COPY_OUT && @@ -2706,7 +2761,7 @@ PQgetCopyData(PGconn *conn, char **buffer, int async) libpq_append_conn_error(conn, "no COPY in progress"); return -2; } - return pqGetCopyData3(conn, buffer, async); + return pqGetCopyData3(conn, handler, context, async); } /* diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 8ab6a88416..8c89462704 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -1720,27 +1720,35 @@ getCopyDataMessage(PGconn *conn) } /* - * PQgetCopyData - read a row of data from the backend during COPY OUT - * or COPY BOTH + * PQhandleCopyData - read a row of data from the backend during COPY OUT + * or COPY BOTH, and pass it to a caller-supplied buffer. * - * If successful, sets *buffer to point to a malloc'd row of data, and - * returns row length (always > 0) as result. - * Returns 0 if no row available yet (only possible if async is true), - * -1 if end of copy (consult PQgetResult), or -2 if error (consult - * PQerrorMessage). + * Pass a "handler" callback which takes a buffer and its size. (Its return + * value is currently stil meaningless, but could become a flag like "this + * ride is making me sick and I'd like to get off.) + * + * Calls handler only after receiving a full row. The buffer does NOT have a + * terminating zero, so do not go beyond the given size. + * + * The context pointer can be anything; this function will pass it to handler. + * + * If successful, calls handler and returns row length (always > 0) as result. + * If no row is available yet (only possible if async is true), does not call + * handler, and returns 0 as result. + * If the copy has ended (consult PQgetResult), does not call handler, and + * returns -1. + * On failure, does not call handler, and returns -2 (consult PQerrorMessage). */ int -pqGetCopyData3(PGconn *conn, char **buffer, int async) +pqGetCopyData3(PGconn *conn, + int (*handler) (void *, const char *, size_t), + void *context, + int async) { int msgLength; for (;;) { - /* - * Collect the next input message. To make life simpler for async - * callers, we keep returning 0 until the next message is fully - * available, even if it is not Copy Data. - */ msgLength = getCopyDataMessage(conn); if (msgLength < 0) return msgLength; /* end-of-copy or error */ @@ -1756,29 +1764,19 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async) continue; } - /* - * Drop zero-length messages (shouldn't happen anyway). Otherwise - * pass the data back to the caller. - */ msgLength -= 4; if (msgLength > 0) { - *buffer = (char *) malloc(msgLength + 1); - if (*buffer == NULL) - { - libpq_append_conn_error(conn, "out of memory"); - return -2; - } - memcpy(*buffer, &conn->inBuffer[conn->inCursor], msgLength); - (*buffer)[msgLength] = '\0'; /* Add terminating null */ - - /* Mark message consumed */ + /* We have a row. Call the handler. */ + int result = handler(context, + &conn->inBuffer[conn->inCursor], + msgLength); conn->inStart = conn->inCursor + msgLength; - + if (result < 0) + return result; return msgLength; } - /* Empty, so drop it and loop around for another */ conn->inStart = conn->inCursor; } } diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index f3d9220496..4963aa1e51 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -482,6 +482,12 @@ extern int PQputCopyData(PGconn *conn, const char *buffer, int nbytes); extern int PQputCopyEnd(PGconn *conn, const char *errormsg); extern int PQgetCopyData(PGconn *conn, char **buffer, int async); +/* TODO: "House style" would be int, rather than size_t. */ +extern int PQhandleCopyData(PGconn *conn, + int handler(void *, const char *, size_t), + void *context, + int async); + /* Deprecated routines for copy in/out */ extern int PQgetline(PGconn *conn, char *buffer, int length); extern int PQputline(PGconn *conn, const char *string); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index d94b648ea5..b5f8b609bc 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -686,7 +686,10 @@ extern int pqGetErrorNotice3(PGconn *conn, bool isError); extern void pqBuildErrorMessage3(PQExpBuffer msg, const PGresult *res, PGVerbosity verbosity, PGContextVisibility show_context); extern int pqGetNegotiateProtocolVersion3(PGconn *conn); -extern int pqGetCopyData3(PGconn *conn, char **buffer, int async); +extern int pqGetCopyData3(PGconn *conn, + int (*handler) (void *, const char *, size_t), + void *context, + int async); extern int pqGetline3(PGconn *conn, char *s, int maxlen); extern int pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize); extern int pqEndcopy3(PGconn *conn);