Interested, yes.  But there may be reasons not to do that.  Last time I
looked the binary format wasn't documented.

Anyway, I tried re-implementing pqGetCopyData3() using the callback.
Wasn't hard, but I did have to add a way for the callback to return an
error.  Kept it pretty dumb for now, hoping a sensible rule will become
obvious later.

Saw no obvious performance impact, so that's good.


Jeroen

On Fri, 3 Mar 2023 at 19:53, Tom Lane <t...@sss.pgh.pa.us> wrote:

> Jeroen Vermeulen <jtv...@gmail.com> writes:
> > The printf() is just the simplest example that sprang to mind though.
> > There may be other use-cases out there involving  libraries that require
> > zero-terminated strings, and I figured an ability to set a sentinel could
> > help those.
>
> Well, since it won't help for binary COPY, I'm skeptical that this is
> something we should cater to.  Anybody who's sufficiently worried about
> performance to be trying to remove malloc/free cycles ought to be
> interested in binary COPY as well.
>
>                         regards, tom lane
>
diff --git a/bench.c b/bench.c
new file mode 100644
index 0000000000..35f8c7a36f
--- /dev/null
+++ b/bench.c
@@ -0,0 +1,132 @@
+/*
+ * Minimal benchmark for PQgetCopyData alternative.
+ *
+ * Define CALL to 0 (to use the classic PQgetCopyData) or 1 (to use the
+ * proposed new function), then run the binary through "time" to get time and
+ * CPU usage stats.
+ *
+ * DO NOT UPSTREAM THIS FILE.  It's just a demonstration for the prototype
+ * patch.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <libpq-fe.h>
+
+/* Define CALL to...
+ * 0: Use classic PQgetCopyData()
+ * 1: Use experimental PQhandleCopyData()
+ */
+
+/* Benchmark results (best result per category, out of 4 runs):
+ *
+ * PQgetCopyData:
+ * real - 0m32.972s
+ * user - 0m11.364s
+ * sys - 0m1.255s
+ *
+ * PQhandleCopyData:
+ * real - 0m32.839s
+ * user - 0m3.407s
+ * sys - 0m0.872s
+ */
+
+#if CALL == 1
+/*
+ * Print line, add newline.
+ */
+static int
+print_row_and_newline(void *, const char *buf, size_t len)
+{
+	fwrite(buf, 1, len, stdout);
+	return 0;
+}
+#endif
+
+
+int
+main()
+{
+#if !defined(CALL)
+#error "Set CALL: 0 = PQgetCopyDta, 1 = PQhandleCopyData."
+#elif CALL == 0
+	fprintf(stderr, "Testing classic PQgetCopyData().\n");
+#elif CALL == 1
+	fprintf(stderr, "Testing experimental PQhandleCopyData.\n");
+#else
+#error "Unknown CALL value."
+#endif
+
+	PGconn	   *cx = PQconnectdb("");
+
+	if (!cx)
+	{
+		fprintf(stderr, "Could not connect.\n");
+		exit(1);
+	}
+	PGresult   *tx = PQexec(cx, "BEGIN");
+
+	if (!tx)
+	{
+		fprintf(stderr, "No result from BEGIN!\n");
+		exit(1);
+	}
+	int			s = PQresultStatus(tx);
+
+	if (s != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, "Failed to start transaction: status %d.\n", s);
+		exit(1);
+	}
+
+	PGresult   *r = PQexec(
+						   cx,
+						   "COPY ("
+						   "SELECT generate_series, 'row #' || generate_series "
+						   "FROM generate_series(1, 100000000)"
+						   ") TO STDOUT"
+	);
+
+	if (!r)
+	{
+		fprintf(stderr, "No result!\n");
+		exit(1);
+	}
+	int			status = PQresultStatus(r);
+
+	if (status != PGRES_COPY_OUT)
+	{
+		fprintf(stderr, "Failed to start COPY: status %d.\n", status);
+		exit(1);
+	}
+
+	int			bytes;
+#if CALL == 0
+	char	   *buffer = NULL;
+
+	for (
+		 bytes = PQgetCopyData(cx, &buffer, 0);
+		 bytes > 0;
+		 bytes = PQgetCopyData(cx, &buffer, 0)
+		)
+	{
+		if (buffer)
+		{
+			printf("%s", buffer);
+			PQfreemem(buffer);
+		}
+	}
+#elif CALL == 1
+	while ((bytes = PQhandleCopyData(cx, print_row_and_newline, NULL, 0)) > 0);
+#else
+#error "Unknown CALL value."
+#endif
+
+	if (bytes != -1)
+	{
+		fprintf(stderr, "Got unexpected result: %d.\n", bytes);
+		exit(1);
+	}
+
+	/* (Don't bother cleaning up.) */
+}
diff --git a/src/interfaces/ecpg/ecpglib/execute.c b/src/interfaces/ecpg/ecpglib/execute.c
index 641851983d..aaf31ea216 100644
--- a/src/interfaces/ecpg/ecpglib/execute.c
+++ b/src/interfaces/ecpg/ecpglib/execute.c
@@ -32,6 +32,16 @@
 #include "sqlda-compat.h"
 #include "sqlda-native.h"
 
+/*
+ * Print non-zero-terminated line received from COPY.
+ */
+static int
+print_row(void *, const char *buf, size_t len)
+{
+	fwrite(buf, 1, len, stdout);
+	return 0;
+}
+
 /*
  *	This function returns a newly malloced string that has ' and \
  *	escaped.
@@ -1876,16 +1886,10 @@ ecpg_process_output(struct statement *stmt, bool clear_result)
 			break;
 		case PGRES_COPY_OUT:
 			{
-				char	   *buffer;
 				int			res;
 
 				ecpg_log("ecpg_process_output on line %d: COPY OUT data transfer in progress\n", stmt->lineno);
-				while ((res = PQgetCopyData(stmt->connection->connection,
-											&buffer, 0)) > 0)
-				{
-					printf("%s", buffer);
-					PQfreemem(buffer);
-				}
+				while ((res = PQhandleCopyData(stmt->connection->connection, print_row, NULL, 0)) > 0);
 				if (res == -1)
 				{
 					/* COPY done */
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index e8bcc88370..add1ff1591 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -186,3 +186,4 @@ PQpipelineStatus          183
 PQsetTraceFlags           184
 PQmblenBounded            185
 PQsendFlushRequest        186
+PQhandleCopyData         187
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index ec62550e38..1c9f0ee175 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -2684,6 +2684,27 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
 	return 1;
 }
 
+struct GetCopyData_context
+{
+	PGconn	*conn;
+	char	*buffer;
+};
+
+static int alloc_copy_buffer(void *context, const char *inbuf, size_t len)
+{
+	struct GetCopyData_context *params = (struct GetCopyData_context *) context;
+	PGconn *conn = params->conn;
+	params->buffer = (char *) malloc(len + 1);
+	if (params->buffer == NULL)
+	{
+		libpq_append_conn_error(conn, "out of memory");
+		return -2;
+	}
+	memcpy(params->buffer, &conn->inBuffer[conn->inCursor], len);
+	params->buffer[len] = '\0';	/* Add terminating null */
+	return 0;
+}
+
 /*
  * PQgetCopyData - read a row of data from the backend during COPY OUT
  * or COPY BOTH
@@ -2697,7 +2718,41 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
 int
 PQgetCopyData(PGconn *conn, char **buffer, int async)
 {
-	*buffer = NULL;				/* for all failure cases */
+	struct GetCopyData_context context;
+	int result;
+	context.conn = conn;
+	context.buffer = NULL;	/* for all failure cases */
+	result = pqGetCopyData3(conn, alloc_copy_buffer, &context, async);
+	*buffer = context.buffer;
+	return result;
+}
+
+/*
+ * PQhandleCopyData - read a row of data from the backend during COPY OUT
+ * or COPY BOTH, and invoke a callback.
+ *
+ * Pass a "handler" callback which takes a buffer and its size.  (Its return
+ * value is currently stil meaningless, but could become a flag like "this
+ * ride is making me sick and I'd like to get off.)
+ *
+ * Calls handler only after receiving a full row.  The buffer does NOT have a
+ * terminating zero, so do not go beyond the given size.
+ *
+ * The context pointer can be anything; this function will pass it to handler.
+ *
+ * If successful, calls handler and returns row length (always > 0) as result.
+ * If no row is available yet (only possible if async is true), does not call
+ * handler, and returns 0 as result.
+ * If the copy has ended (consult PQgetResult), does not call handler, and
+ * returns -1.
+ * On failure, does not call handler, and returns -2 (consult PQerrorMessage).
+ */
+int
+PQhandleCopyData(PGconn *conn,
+				 int (*handler) (void *, const char *, size_t),
+				 void *context,
+				 int async)
+{
 	if (!conn)
 		return -2;
 	if (conn->asyncStatus != PGASYNC_COPY_OUT &&
@@ -2706,7 +2761,7 @@ PQgetCopyData(PGconn *conn, char **buffer, int async)
 		libpq_append_conn_error(conn, "no COPY in progress");
 		return -2;
 	}
-	return pqGetCopyData3(conn, buffer, async);
+	return pqGetCopyData3(conn, handler, context, async);
 }
 
 /*
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 8ab6a88416..8c89462704 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -1720,27 +1720,35 @@ getCopyDataMessage(PGconn *conn)
 }
 
 /*
- * PQgetCopyData - read a row of data from the backend during COPY OUT
- * or COPY BOTH
+ * PQhandleCopyData - read a row of data from the backend during COPY OUT
+ * or COPY BOTH, and pass it to a caller-supplied buffer.
  *
- * If successful, sets *buffer to point to a malloc'd row of data, and
- * returns row length (always > 0) as result.
- * Returns 0 if no row available yet (only possible if async is true),
- * -1 if end of copy (consult PQgetResult), or -2 if error (consult
- * PQerrorMessage).
+ * Pass a "handler" callback which takes a buffer and its size.  (Its return
+ * value is currently stil meaningless, but could become a flag like "this
+ * ride is making me sick and I'd like to get off.)
+ *
+ * Calls handler only after receiving a full row.  The buffer does NOT have a
+ * terminating zero, so do not go beyond the given size.
+ *
+ * The context pointer can be anything; this function will pass it to handler.
+ *
+ * If successful, calls handler and returns row length (always > 0) as result.
+ * If no row is available yet (only possible if async is true), does not call
+ * handler, and returns 0 as result.
+ * If the copy has ended (consult PQgetResult), does not call handler, and
+ * returns -1.
+ * On failure, does not call handler, and returns -2 (consult PQerrorMessage).
  */
 int
-pqGetCopyData3(PGconn *conn, char **buffer, int async)
+pqGetCopyData3(PGconn *conn,
+			   int (*handler) (void *, const char *, size_t),
+			   void *context,
+			   int async)
 {
 	int			msgLength;
 
 	for (;;)
 	{
-		/*
-		 * Collect the next input message.  To make life simpler for async
-		 * callers, we keep returning 0 until the next message is fully
-		 * available, even if it is not Copy Data.
-		 */
 		msgLength = getCopyDataMessage(conn);
 		if (msgLength < 0)
 			return msgLength;	/* end-of-copy or error */
@@ -1756,29 +1764,19 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
 			continue;
 		}
 
-		/*
-		 * Drop zero-length messages (shouldn't happen anyway).  Otherwise
-		 * pass the data back to the caller.
-		 */
 		msgLength -= 4;
 		if (msgLength > 0)
 		{
-			*buffer = (char *) malloc(msgLength + 1);
-			if (*buffer == NULL)
-			{
-				libpq_append_conn_error(conn, "out of memory");
-				return -2;
-			}
-			memcpy(*buffer, &conn->inBuffer[conn->inCursor], msgLength);
-			(*buffer)[msgLength] = '\0';	/* Add terminating null */
-
-			/* Mark message consumed */
+			/* We have a row.  Call the handler. */
+			int result = handler(context,
+								 &conn->inBuffer[conn->inCursor],
+								 msgLength);
 			conn->inStart = conn->inCursor + msgLength;
-
+			if (result < 0)
+				return result;
 			return msgLength;
 		}
 
-		/* Empty, so drop it and loop around for another */
 		conn->inStart = conn->inCursor;
 	}
 }
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index f3d9220496..4963aa1e51 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -482,6 +482,12 @@ extern int	PQputCopyData(PGconn *conn, const char *buffer, int nbytes);
 extern int	PQputCopyEnd(PGconn *conn, const char *errormsg);
 extern int	PQgetCopyData(PGconn *conn, char **buffer, int async);
 
+/*  TODO: "House style" would be int, rather than size_t. */
+extern int	PQhandleCopyData(PGconn *conn,
+							 int handler(void *, const char *, size_t),
+							 void *context,
+							 int async);
+
 /* Deprecated routines for copy in/out */
 extern int	PQgetline(PGconn *conn, char *buffer, int length);
 extern int	PQputline(PGconn *conn, const char *string);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index d94b648ea5..b5f8b609bc 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -686,7 +686,10 @@ extern int	pqGetErrorNotice3(PGconn *conn, bool isError);
 extern void pqBuildErrorMessage3(PQExpBuffer msg, const PGresult *res,
 								 PGVerbosity verbosity, PGContextVisibility show_context);
 extern int	pqGetNegotiateProtocolVersion3(PGconn *conn);
-extern int	pqGetCopyData3(PGconn *conn, char **buffer, int async);
+extern int	pqGetCopyData3(PGconn *conn,
+						   int (*handler) (void *, const char *, size_t),
+						   void *context,
+						   int async);
 extern int	pqGetline3(PGconn *conn, char *s, int maxlen);
 extern int	pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize);
 extern int	pqEndcopy3(PGconn *conn);

Reply via email to