My apologies.  The wiki said to discuss early, even before writing the code
if possible, but I added a link to the PR for those who really wanted to
see the details.

I'm attaching a diff now.  This is not a patch, it's just a discussion
piece.

The problem was that PQgetCopyData loops use a lot of CPU time, and this
alternative reduces that by a lot.


Jeroen

On Thu, 2 Mar 2023 at 13:38, Daniel Gustafsson <dan...@yesql.se> wrote:

> > On 1 Mar 2023, at 15:23, Jeroen Vermeulen <jtv...@gmail.com> wrote:
>
> > PR for easy discussion: https://github.com/jtv/postgres/pull/1
>
> The process for discussing work on pgsql-hackers is to attach the patch to
> the
> email and discuss it inline in the thread.  That way all versions of the
> patch
> as well as the discussion is archived and searchable.
>
> --
> Daniel Gustafsson
>
>
diff --git a/bench.c b/bench.c
new file mode 100644
index 0000000000..c3206d2927
--- /dev/null
+++ b/bench.c
@@ -0,0 +1,134 @@
+/*
+ * Minimal benchmark for PQgetCopyData alternative.
+ *
+ * Define CALL to 0 (to use the classic PQgetCopyData) or 1 (to use the
+ * proposed new function), then run the binary through "time" to get time and
+ * CPU usage stats.
+ *
+ * DO NOT UPSTREAM THIS FILE.  It's just a demonstration for the prototype
+ * patch.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <libpq-fe.h>
+
+/* Define CALL to...
+ * 0: Use classic PQgetCopyData()
+ * 1: Use experimental PQhandleCopyData()
+ */
+
+/* Benchmark results (best result per category, out of 4 runs):
+ *
+ * PQgetCopyData:
+ * real - 0m32.972s
+ * user - 0m11.364s
+ * sys - 0m1.255s
+ *
+ * PQhandleCopyData:
+ * real - 0m32.839s
+ * user - 0m3.407s
+ * sys - 0m0.872s
+ */
+
+#if CALL == 1
+/*
+ * Print line, add newline.
+ */
+static int
+print_row_and_newline(void *, char *buf, size_t len)
+{
+	/* Zero-terminate the buffer. */
+	buf[len - 1] = '\0';
+	printf("%s\n", buf);
+	return 0;
+}
+#endif
+
+
+int
+main()
+{
+#if !defined(CALL)
+#error "Set CALL: 0 = PQgetCopyDta, 1 = PQhandleCopyData."
+#elif CALL == 0
+	fprintf(stderr, "Testing classic PQgetCopyData().\n");
+#elif CALL == 1
+	fprintf(stderr, "Testing experimental PQhandleCopyData.\n");
+#else
+#error "Unknown CALL value."
+#endif
+
+	PGconn	   *cx = PQconnectdb("");
+
+	if (!cx)
+	{
+		fprintf(stderr, "Could not connect.\n");
+		exit(1);
+	}
+	PGresult   *tx = PQexec(cx, "BEGIN");
+
+	if (!tx)
+	{
+		fprintf(stderr, "No result from BEGIN!\n");
+		exit(1);
+	}
+	int			s = PQresultStatus(tx);
+
+	if (s != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "Failed to start transaction: status %d.\n", s);
+		exit(1);
+	}
+
+	PGresult   *r = PQexec(
+						   cx,
+						   "COPY ("
+						   "SELECT generate_series, 'row #' || generate_series "
+						   "FROM generate_series(1, 100000000)"
+						   ") TO STDOUT"
+	);
+
+	if (!r)
+	{
+		fprintf(stderr, "No result!\n");
+		exit(1);
+	}
+	int			status = PQresultStatus(r);
+
+	if (status != PGRES_COPY_OUT)
+	{
+		fprintf(stderr, "Failed to start COPY: status %d.\n", status);
+		exit(1);
+	}
+
+	int			bytes;
+#if CALL == 0
+	char	   *buffer = NULL;
+
+	for (
+		 bytes = PQgetCopyData(cx, &buffer, 0);
+		 bytes > 0;
+		 bytes = PQgetCopyData(cx, &buffer, 0)
+		)
+	{
+		if (buffer)
+		{
+			printf("%s", buffer);
+			PQfreemem(buffer);
+		}
+	}
+#elif CALL == 1
+	while ((bytes = PQhandleCopyData(cx, print_row_and_newline, NULL, 0)) > 0);
+#else
+#error "Unknown CALL value."
+#endif
+
+	if (bytes != -1)
+	{
+		fprintf(stderr, "Got unexpected result: %d.\n", bytes);
+		exit(1);
+	}
+
+	/* (Don't bother cleaning up.) */
+}
diff --git a/src/interfaces/ecpg/ecpglib/execute.c b/src/interfaces/ecpg/ecpglib/execute.c
index 641851983d..1e4eba58a2 100644
--- a/src/interfaces/ecpg/ecpglib/execute.c
+++ b/src/interfaces/ecpg/ecpglib/execute.c
@@ -32,6 +32,17 @@
 #include "sqlda-compat.h"
 #include "sqlda-native.h"
 
+/*
+ * Print non-zero-terminated line received from COPY.
+ */
+static int
+print_row(void *, char *buf, size_t len)
+{
+	buf[len - 1] = '\0';
+	printf("%s\n", buf);
+	return 0;
+}
+
 /*
  *	This function returns a newly malloced string that has ' and \
  *	escaped.
@@ -1876,16 +1887,10 @@ ecpg_process_output(struct statement *stmt, bool clear_result)
 			break;
 		case PGRES_COPY_OUT:
 			{
-				char	   *buffer;
 				int			res;
 
 				ecpg_log("ecpg_process_output on line %d: COPY OUT data transfer in progress\n", stmt->lineno);
-				while ((res = PQgetCopyData(stmt->connection->connection,
-											&buffer, 0)) > 0)
-				{
-					printf("%s", buffer);
-					PQfreemem(buffer);
-				}
+				while ((res = PQhandleCopyData(stmt->connection->connection, print_row, NULL, 0)) > 0);
 				if (res == -1)
 				{
 					/* COPY done */
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index e8bcc88370..add1ff1591 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -186,3 +186,4 @@ PQpipelineStatus          183
 PQsetTraceFlags           184
 PQmblenBounded            185
 PQsendFlushRequest        186
+PQhandleCopyData         187
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index ec62550e38..d0f0501e80 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -2709,6 +2709,42 @@ PQgetCopyData(PGconn *conn, char **buffer, int async)
 	return pqGetCopyData3(conn, buffer, async);
 }
 
+/*
+ * PQhandleCopyData - read a row of data from the backend during COPY OUT
+ * or COPY BOTH, and invoke a callback.
+ *
+ * Pass a "handler" callback which takes a buffer and its size.  (Its return
+ * value is currently stil meaningless, but could become a flag like "this
+ * ride is making me sick and I'd like to get off.)
+ *
+ * Calls handler only after receiving a full row.  The buffer does NOT have a
+ * terminating zero, so do not go beyond the given size.  However, you may
+ * modify the buffer's contents, and the line ends in a newline.  If you need
+ * a terminating zero, you are free to overwrite the newline.
+ *
+ * The context pointer can be anything; this function will pass it to handler.
+ *
+ * If successful, calls handler and returns row length (always > 0) as result.
+ * If no row is available yet (only possible if async is true), does not call
+ * handler, and returns 0 as result.
+ * If the copy has ended (consult PQgetResult), does not call handler, and
+ * returns -1.
+ * On failure, does not call handler, and returns -2 (consult PQerrorMessage).
+ */
+int
+PQhandleCopyData(PGconn *conn, int (*handler) (void *, char *, size_t), void *context, int async)
+{
+	if (!conn)
+		return -2;
+	if (conn->asyncStatus != PGASYNC_COPY_OUT &&
+		conn->asyncStatus != PGASYNC_COPY_BOTH)
+	{
+		libpq_append_conn_error(conn, "no COPY in progress");
+		return -2;
+	}
+	return pqHandleCopyData3(conn, handler, context, async);
+}
+
 /*
  * PQgetline - gets a newline-terminated string from the backend.
  *
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 8ab6a88416..1f4ecec9bd 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -1783,6 +1783,63 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
 	}
 }
 
+/*
+ * PQhandleCopyData - read a row of data from the backend during COPY OUT
+ * or COPY BOTH, and pass it to a caller-supplied buffer.
+ *
+ * Pass a "handler" callback which takes a buffer and its size.  (Its return
+ * value is currently stil meaningless, but could become a flag like "this
+ * ride is making me sick and I'd like to get off.)
+ *
+ * Calls handler only after receiving a full row.  The buffer does NOT have a
+ * terminating zero, so do not go beyond the given size.  However, you may
+ * modify the buffer's contents, and the line ends in a newline.  If you need
+ * a terminating zero, you are free to overwrite the newline.
+ *
+ * The context pointer can be anything; this function will pass it to handler.
+ *
+ * If successful, calls handler and returns row length (always > 0) as result.
+ * If no row is available yet (only possible if async is true), does not call
+ * handler, and returns 0 as result.
+ * If the copy has ended (consult PQgetResult), does not call handler, and
+ * returns -1.
+ * On failure, does not call handler, and returns -2 (consult PQerrorMessage).
+ */
+int
+pqHandleCopyData3(PGconn *conn, int (*handler) (void *, char *, size_t), void *context, int async)
+{
+	int			msgLength;
+
+	for (;;)
+	{
+		msgLength = getCopyDataMessage(conn);
+		if (msgLength < 0)
+			return msgLength;	/* end-of-copy or error */
+		if (msgLength == 0)
+		{
+			/* Don't block if async read requested */
+			if (async)
+				return 0;
+			/* Need to load more data */
+			if (pqWait(true, false, conn) ||
+				pqReadData(conn) < 0)
+				return -2;
+			continue;
+		}
+
+		msgLength -= 4;
+		if (msgLength > 0)
+		{
+			/* We have a row.  Call the handler. */
+			handler(context, &conn->inBuffer[conn->inCursor], msgLength);
+			conn->inStart = conn->inCursor + msgLength;
+			return msgLength;
+		}
+
+		conn->inStart = conn->inCursor;
+	}
+}
+
 /*
  * PQgetline - gets a newline-terminated string from the backend.
  *
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index f3d9220496..c07544b255 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -482,6 +482,9 @@ extern int	PQputCopyData(PGconn *conn, const char *buffer, int nbytes);
 extern int	PQputCopyEnd(PGconn *conn, const char *errormsg);
 extern int	PQgetCopyData(PGconn *conn, char **buffer, int async);
 
+/*  TODO: "House style" would be int, rather than size_t. */
+extern int	PQhandleCopyData(PGconn *conn, int handler(void *, char *, size_t), void *context, int async);
+
 /* Deprecated routines for copy in/out */
 extern int	PQgetline(PGconn *conn, char *buffer, int length);
 extern int	PQputline(PGconn *conn, const char *string);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index d94b648ea5..8936ea0388 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -687,6 +687,7 @@ extern void pqBuildErrorMessage3(PQExpBuffer msg, const PGresult *res,
 								 PGVerbosity verbosity, PGContextVisibility show_context);
 extern int	pqGetNegotiateProtocolVersion3(PGconn *conn);
 extern int	pqGetCopyData3(PGconn *conn, char **buffer, int async);
+extern int	pqHandleCopyData3(PGconn *conn, int (*handler) (void *, char *, size_t), void *context, int async);
 extern int	pqGetline3(PGconn *conn, char *s, int maxlen);
 extern int	pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize);
 extern int	pqEndcopy3(PGconn *conn);

Reply via email to