On 2023-Feb-08, Alvaro Herrera wrote:

> I propose instead the following: each command is prepared just before
> it's executed, as previously, and if we see a \startpipeline, then we
> prepare all commands starting with the one just after, and until the
> \endpipeline.

Here's the patch.

-- 
Álvaro Herrera        Breisgau, Deutschland  —  https://www.EnterpriseDB.com/
>From 6d8938009b246463efe4104f5312e37b28b55235 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 8 Feb 2023 13:01:47 +0100
Subject: [PATCH v7] Prepare commands in pipelines in advance

---
 src/bin/pgbench/pgbench.c                    | 147 +++++++++++++------
 src/bin/pgbench/t/001_pgbench_with_server.pl |  17 +++
 2 files changed, 117 insertions(+), 47 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 508ed218e8..a66fd51f92 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -628,7 +628,8 @@ typedef struct
 	pg_time_usec_t txn_begin;	/* used for measuring schedule lag times */
 	pg_time_usec_t stmt_begin;	/* used for measuring statement latencies */
 
-	bool		prepared[MAX_SCRIPTS];	/* whether client prepared the script */
+	/* whether client prepared each command of each script */
+	bool	  **prepared;
 
 	/*
 	 * For processing failures and repeating transactions with serialization
@@ -739,6 +740,7 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
  * aset			do gset on all possible queries of a combined query (\;).
  * expr			Parsed expression, if needed.
  * stats		Time spent in this command.
+ * prepname		The name that this command is prepared under, in prepare mode
  * retries		Number of retries after a serialization or deadlock error in the
  *				current command.
  * failures		Number of errors in the current command that were not retried.
@@ -754,6 +756,7 @@ typedef struct Command
 	char	   *varprefix;
 	PgBenchExpr *expr;
 	SimpleStats stats;
+	char	   *prepname;
 	int64		retries;
 	int64		failures;
 } Command;
@@ -3006,13 +3009,6 @@ runShellCommand(Variables *variables, char *variable, char **argv, int argc)
 	return true;
 }
 
-#define MAX_PREPARE_NAME		32
-static void
-preparedStatementName(char *buffer, int file, int state)
-{
-	sprintf(buffer, "P%d_%d", file, state);
-}
-
 /*
  * Report the abortion of the client when processing SQL commands.
  */
@@ -3053,6 +3049,87 @@ chooseScript(TState *thread)
 	return i - 1;
 }
 
+/*
+ * Prepare the SQL command from st->use_file at command_num.  The name of the
+ * new prepared statement is stored in command->prepname.
+ */
+static void
+prepareCommand(CState *st, int command_num)
+{
+	Command    *command = sql_script[st->use_file].commands[command_num];
+	static int	prepnr = 0;
+
+	/* No prepare for non-SQL commands */
+	if (command->type != SQL_COMMAND)
+		return;
+
+	if (!st->prepared)
+	{
+		st->prepared = pg_malloc(sizeof(bool *) * num_scripts);
+		for (int i = 0; i < num_scripts; i++)
+		{
+			ParsedScript *script = &sql_script[i];
+			int			numcmds;
+
+			for (numcmds = 0; script->commands[numcmds] != NULL; numcmds++)
+				;
+			st->prepared[i] = pg_malloc0(numcmds);
+		}
+	}
+
+	if (!st->prepared[st->use_file][command_num])
+	{
+		PGresult   *res;
+
+		if (!command->prepname)
+			command->prepname = psprintf("P%d_%d", st->use_file, prepnr++);
+		pg_log_debug("client %d preparing %s", st->id, command->prepname);
+		res = PQprepare(st->con, command->prepname,
+						command->argv[0], command->argc - 1, NULL);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pg_log_error("%s", PQerrorMessage(st->con));
+		PQclear(res);
+		st->prepared[st->use_file][command_num] = true;
+	}
+}
+
+/*
+ * Prepare all the commands in the script that come after the \startpipeline
+ * that's at position st->command, and the first \endpipeline we find.
+ *
+ * (This sets the ->prepared flag for each relevant command, but doesn't move
+ * the st->command counter)
+ */
+static void
+prepareCommandsInPipeline(CState *st)
+{
+	int			j;
+	Command   **commands = sql_script[st->use_file].commands;
+
+	Assert(commands[st->command]->type == META_COMMAND &&
+		   commands[st->command]->meta == META_STARTPIPELINE);
+
+	/*
+	 * We set the 'prepared' flag on the \startpipeline itself to flag that we
+	 * don't need to do this next time, even though we don't actually prepare
+	 * it.
+	 */
+	if (st->prepared &&
+		st->prepared[st->use_file][st->command])
+		return;
+
+	for (j = st->command + 1; commands[j] != NULL; j++)
+	{
+		if (commands[j]->type == META_COMMAND &&
+			commands[j]->meta == META_ENDPIPELINE)
+			break;
+
+		prepareCommand(st, j);
+	}
+
+	st->prepared[st->use_file][st->command] = true;
+}
+
 /* Send a SQL command, using the chosen querymode */
 static bool
 sendCommand(CState *st, Command *command)
@@ -3083,49 +3160,14 @@ sendCommand(CState *st, Command *command)
 	}
 	else if (querymode == QUERY_PREPARED)
 	{
-		char		name[MAX_PREPARE_NAME];
 		const char *params[MAX_ARGS];
 
-		if (!st->prepared[st->use_file])
-		{
-			int			j;
-			Command   **commands = sql_script[st->use_file].commands;
-
-			for (j = 0; commands[j] != NULL; j++)
-			{
-				PGresult   *res;
-
-				if (commands[j]->type != SQL_COMMAND)
-					continue;
-				preparedStatementName(name, st->use_file, j);
-				if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
-				{
-					res = PQprepare(st->con, name,
-									commands[j]->argv[0], commands[j]->argc - 1, NULL);
-					if (PQresultStatus(res) != PGRES_COMMAND_OK)
-						pg_log_error("%s", PQerrorMessage(st->con));
-					PQclear(res);
-				}
-				else
-				{
-					/*
-					 * In pipeline mode, we use asynchronous functions. If a
-					 * server-side error occurs, it will be processed later
-					 * among the other results.
-					 */
-					if (!PQsendPrepare(st->con, name,
-									   commands[j]->argv[0], commands[j]->argc - 1, NULL))
-						pg_log_error("%s", PQerrorMessage(st->con));
-				}
-			}
-			st->prepared[st->use_file] = true;
-		}
+		prepareCommand(st, st->command);
 
 		getQueryParams(&st->variables, command, params);
-		preparedStatementName(name, st->use_file, st->command);
 
-		pg_log_debug("client %d sending %s", st->id, name);
-		r = PQsendQueryPrepared(st->con, name, command->argc - 1,
+		pg_log_debug("client %d sending %s", st->id, command->prepname);
+		r = PQsendQueryPrepared(st->con, command->prepname, command->argc - 1,
 								params, NULL, NULL, 0);
 	}
 	else						/* unknown sql mode */
@@ -3597,7 +3639,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 					thread->conn_duration += now - start;
 
 					/* Reset session-local state */
-					memset(st->prepared, 0, sizeof(st->prepared));
+					pg_free(st->prepared);
+					st->prepared = NULL;
 				}
 
 				/*
@@ -4360,6 +4403,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
 			return CSTATE_ABORTED;
 		}
 
+		/*
+		 * If we're in prepared-query mode, we need to prepare all the
+		 * commands that are inside the pipeline.  This solves the problem
+		 * that running BEGIN ISOLATION LEVEL SERIALIZABLE would fail, because
+		 * by the time the command is executed, a snapshot would have already
+		 * been acquired, so an error would be thrown.
+		 */
+		if (querymode == QUERY_PREPARED)
+			prepareCommandsInPipeline(st);
+
 		if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
 		{
 			commandFailed(st, "startpipeline", "already in pipeline mode");
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index 4bf508ea96..dfdeb1f70f 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -839,6 +839,23 @@ select 1 \gset f
 }
 	});
 
+# Working \startpipeline in prepared query mode with serializable
+$node->pgbench(
+	'-t 1 -n -M prepared',
+	0,
+	[ qr{type: .*/001_pgbench_pipeline_serializable}, qr{actually processed: 1/1} ],
+	[],
+	'working \startpipeline with serializable',
+	{
+		'001_pgbench_pipeline_serializable' => q{
+-- test startpipeline with serializable
+\startpipeline
+BEGIN ISOLATION LEVEL SERIALIZABLE;
+} . "select 1;\n" x 10 . q{
+END;
+\endpipeline
+}
+	});
 
 # trigger many expression errors
 my @errors = (
-- 
2.30.2

Reply via email to