Found one last problem: if you do "-f foobar.sql -M prepared" in that
order, then the prepare fails because the statement names would not be
assigned when the file is parsed.  This coding only supported doing
"-M prepared -f foobar.sql", which funnily enough is the only one that
PostgreSQL/Cluster.pm->pgbench() supports.  So I moved the prepared
statement name generation to the postprocess step.

-- 
Álvaro Herrera               48°01'N 7°57'E  —  https://www.EnterpriseDB.com/
"La rebeldía es la virtud original del hombre" (Arthur Schopenhauer)
>From 14ed26c3f1a7033daaa93d6cd2d33d1100dd033d Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Mon, 20 Feb 2023 20:25:26 +0100
Subject: [PATCH v9] pgbench: Prepare commands in pipelines in advance

Failing to do so results in an error when a pgbench script tries to
start a serializable transaction inside a pipeline, because by the time
BEGIN ISOLATION LEVEL SERIALIZABLE is executed, we're already in a
transaction that has acquired a snapshot, so the server rightfully
complains.

We can work around that by preparing all commands in the pipeline before
actually starting the pipeline.  This changes the existing code in two
aspects: first, we now prepare each command individually at the point
where that command is about to be executed; previously, we would prepare
all commands in a script as soon as the first command of that script
would be executed.  It's hard to see that this would make much of a
difference (particularly since it only affects the first time to execute
each script in a client), but I didn't actually try to measure it.

Secondly, we no longer use PQsendPrepare() in pipeline mode, but only
PQprepare.  There's no specific reason for this change other than no
longer needing to do differently in pipeline mode.  (Previously we had
no choice, because in pipeline mode PQprepare could not be used.)

Backpatch to 14, where pgbench got support for pipeline mode.

Reported-by: Yugo NAGATA <nag...@sraoss.co.jp>
Discussion: https://postgr.es/m/20210716153013.fc53b1c780b06fccc07a7...@sraoss.co.jp
---
 src/bin/pgbench/pgbench.c                    | 153 +++++++++++++------
 src/bin/pgbench/t/001_pgbench_with_server.pl |  20 +++
 2 files changed, 124 insertions(+), 49 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 508ed218e8..a3a2040a3c 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -628,7 +628,8 @@ typedef struct
 	pg_time_usec_t txn_begin;	/* used for measuring schedule lag times */
 	pg_time_usec_t stmt_begin;	/* used for measuring statement latencies */
 
-	bool		prepared[MAX_SCRIPTS];	/* whether client prepared the script */
+	/* whether client prepared each command of each script */
+	bool	  **prepared;
 
 	/*
 	 * For processing failures and repeating transactions with serialization
@@ -733,7 +734,8 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
  * argv			Command arguments, the first of which is the command or SQL
  *				string itself.  For SQL commands, after post-processing
  *				argv[0] is the same as 'lines' with variables substituted.
- * varprefix 	SQL commands terminated with \gset or \aset have this set
+ * prepname		The name that this command is prepared under, in prepare mode
+ * varprefix	SQL commands terminated with \gset or \aset have this set
  *				to a non NULL value.  If nonempty, it's used to prefix the
  *				variable name that receives the value.
  * aset			do gset on all possible queries of a combined query (\;).
@@ -751,6 +753,7 @@ typedef struct Command
 	MetaCommand meta;
 	int			argc;
 	char	   *argv[MAX_ARGS];
+	char	   *prepname;
 	char	   *varprefix;
 	PgBenchExpr *expr;
 	SimpleStats stats;
@@ -3006,13 +3009,6 @@ runShellCommand(Variables *variables, char *variable, char **argv, int argc)
 	return true;
 }
 
-#define MAX_PREPARE_NAME		32
-static void
-preparedStatementName(char *buffer, int file, int state)
-{
-	sprintf(buffer, "P%d_%d", file, state);
-}
-
 /*
  * Report the abortion of the client when processing SQL commands.
  */
@@ -3053,6 +3049,87 @@ chooseScript(TState *thread)
 	return i - 1;
 }
 
+/*
+ * Prepare the SQL command from st->use_file at command_num.
+ */
+static void
+prepareCommand(CState *st, int command_num)
+{
+	Command    *command = sql_script[st->use_file].commands[command_num];
+
+	/* No prepare for non-SQL commands */
+	if (command->type != SQL_COMMAND)
+		return;
+
+	/*
+	 * If not already done, allocate space for 'prepared' flags: one boolean
+	 * for each command of each script.
+	 */
+	if (!st->prepared)
+	{
+		st->prepared = pg_malloc(sizeof(bool *) * num_scripts);
+		for (int i = 0; i < num_scripts; i++)
+		{
+			ParsedScript *script = &sql_script[i];
+			int			numcmds;
+
+			for (numcmds = 0; script->commands[numcmds] != NULL; numcmds++)
+				;
+			st->prepared[i] = pg_malloc0(sizeof(bool) * numcmds);
+		}
+	}
+
+	if (!st->prepared[st->use_file][command_num])
+	{
+		PGresult   *res;
+
+		pg_log_debug("client %d preparing %s", st->id, command->prepname);
+		res = PQprepare(st->con, command->prepname,
+						command->argv[0], command->argc - 1, NULL);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pg_log_error("%s", PQerrorMessage(st->con));
+		PQclear(res);
+		st->prepared[st->use_file][command_num] = true;
+	}
+}
+
+/*
+ * Prepare all the commands in the script that come after the \startpipeline
+ * that's at position st->command, and the first \endpipeline we find.
+ *
+ * This sets the ->prepared flag for each relevant command as well as the
+ * \startpipeline itself, but doesn't move the st->command counter.
+ */
+static void
+prepareCommandsInPipeline(CState *st)
+{
+	int			j;
+	Command   **commands = sql_script[st->use_file].commands;
+
+	Assert(commands[st->command]->type == META_COMMAND &&
+		   commands[st->command]->meta == META_STARTPIPELINE);
+
+	/*
+	 * We set the 'prepared' flag on the \startpipeline itself to flag that we
+	 * don't need to do this next time without calling prepareCommand(), even
+	 * though we don't actually prepare this command.
+	 */
+	if (st->prepared &&
+		st->prepared[st->use_file][st->command])
+		return;
+
+	for (j = st->command + 1; commands[j] != NULL; j++)
+	{
+		if (commands[j]->type == META_COMMAND &&
+			commands[j]->meta == META_ENDPIPELINE)
+			break;
+
+		prepareCommand(st, j);
+	}
+
+	st->prepared[st->use_file][st->command] = true;
+}
+
 /* Send a SQL command, using the chosen querymode */
 static bool
 sendCommand(CState *st, Command *command)
@@ -3083,49 +3160,13 @@ sendCommand(CState *st, Command *command)
 	}
 	else if (querymode == QUERY_PREPARED)
 	{
-		char		name[MAX_PREPARE_NAME];
 		const char *params[MAX_ARGS];
 
-		if (!st->prepared[st->use_file])
-		{
-			int			j;
-			Command   **commands = sql_script[st->use_file].commands;
-
-			for (j = 0; commands[j] != NULL; j++)
-			{
-				PGresult   *res;
-
-				if (commands[j]->type != SQL_COMMAND)
-					continue;
-				preparedStatementName(name, st->use_file, j);
-				if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
-				{
-					res = PQprepare(st->con, name,
-									commands[j]->argv[0], commands[j]->argc - 1, NULL);
-					if (PQresultStatus(res) != PGRES_COMMAND_OK)
-						pg_log_error("%s", PQerrorMessage(st->con));
-					PQclear(res);
-				}
-				else
-				{
-					/*
-					 * In pipeline mode, we use asynchronous functions. If a
-					 * server-side error occurs, it will be processed later
-					 * among the other results.
-					 */
-					if (!PQsendPrepare(st->con, name,
-									   commands[j]->argv[0], commands[j]->argc - 1, NULL))
-						pg_log_error("%s", PQerrorMessage(st->con));
-				}
-			}
-			st->prepared[st->use_file] = true;
-		}
-
+		prepareCommand(st, st->command);
 		getQueryParams(&st->variables, command, params);
-		preparedStatementName(name, st->use_file, st->command);
 
-		pg_log_debug("client %d sending %s", st->id, name);
-		r = PQsendQueryPrepared(st->con, name, command->argc - 1,
+		pg_log_debug("client %d sending %s", st->id, command->prepname);
+		r = PQsendQueryPrepared(st->con, command->prepname, command->argc - 1,
 								params, NULL, NULL, 0);
 	}
 	else						/* unknown sql mode */
@@ -3597,7 +3638,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 					thread->conn_duration += now - start;
 
 					/* Reset session-local state */
-					memset(st->prepared, 0, sizeof(st->prepared));
+					pg_free(st->prepared);
+					st->prepared = NULL;
 				}
 
 				/*
@@ -4360,6 +4402,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
 			return CSTATE_ABORTED;
 		}
 
+		/*
+		 * If we're in prepared-query mode, we need to prepare all the
+		 * commands that are inside the pipeline before we actually start the
+		 * pipeline itself.  This solves the problem that running BEGIN
+		 * ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a
+		 * snapshot having been acquired by the prepare within the pipeline.
+		 */
+		if (querymode == QUERY_PREPARED)
+			prepareCommandsInPipeline(st);
+
 		if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
 		{
 			commandFailed(st, "startpipeline", "already in pipeline mode");
@@ -5439,6 +5491,7 @@ create_sql_command(PQExpBuffer buf, const char *source)
 	my_command->varprefix = NULL;	/* allocated later, if needed */
 	my_command->expr = NULL;
 	initSimpleStats(&my_command->stats);
+	my_command->prepname = NULL;	/* set later, if needed */
 
 	return my_command;
 }
@@ -5468,6 +5521,7 @@ static void
 postprocess_sql_command(Command *my_command)
 {
 	char		buffer[128];
+	static int	prepnum = 0;
 
 	Assert(my_command->type == SQL_COMMAND);
 
@@ -5487,6 +5541,7 @@ postprocess_sql_command(Command *my_command)
 		case QUERY_PREPARED:
 			if (!parseQuery(my_command))
 				exit(1);
+			my_command->prepname = psprintf("P_%d", prepnum++);
 			break;
 		default:
 			exit(1);
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index 4bf508ea96..99273203f0 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -839,6 +839,26 @@ select 1 \gset f
 }
 	});
 
+# Working \startpipeline in prepared query mode with serializable
+$node->pgbench(
+	'-c4 -j2 -t 10 -n -M prepared',
+	0,
+	[
+		qr{type: .*/001_pgbench_pipeline_serializable},
+		qr{actually processed: (\d+)/\1}
+	],
+	[],
+	'working \startpipeline with serializable',
+	{
+		'001_pgbench_pipeline_serializable' => q{
+-- test startpipeline with serializable
+\startpipeline
+BEGIN ISOLATION LEVEL SERIALIZABLE;
+} . "select 1;\n" x 10 . q{
+END;
+\endpipeline
+}
+	});
 
 # trigger many expression errors
 my @errors = (
-- 
2.30.2

Reply via email to