On 2023-Feb-08, Alvaro Herrera wrote: > I propose instead the following: each command is prepared just before > it's executed, as previously, and if we see a \startpipeline, then we > prepare all commands starting with the one just after, and until the > \endpipeline.
Here's the patch. -- Álvaro Herrera Breisgau, Deutschland — https://www.EnterpriseDB.com/
>From 6d8938009b246463efe4104f5312e37b28b55235 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Wed, 8 Feb 2023 13:01:47 +0100 Subject: [PATCH v7] Prepare commands in pipelines in advance --- src/bin/pgbench/pgbench.c | 147 +++++++++++++------ src/bin/pgbench/t/001_pgbench_with_server.pl | 17 +++ 2 files changed, 117 insertions(+), 47 deletions(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 508ed218e8..a66fd51f92 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -628,7 +628,8 @@ typedef struct pg_time_usec_t txn_begin; /* used for measuring schedule lag times */ pg_time_usec_t stmt_begin; /* used for measuring statement latencies */ - bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */ + /* whether client prepared each command of each script */ + bool **prepared; /* * For processing failures and repeating transactions with serialization @@ -739,6 +740,7 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"}; * aset do gset on all possible queries of a combined query (\;). * expr Parsed expression, if needed. * stats Time spent in this command. + * prepname The name that this command is prepared under, in prepare mode * retries Number of retries after a serialization or deadlock error in the * current command. * failures Number of errors in the current command that were not retried. @@ -754,6 +756,7 @@ typedef struct Command char *varprefix; PgBenchExpr *expr; SimpleStats stats; + char *prepname; int64 retries; int64 failures; } Command; @@ -3006,13 +3009,6 @@ runShellCommand(Variables *variables, char *variable, char **argv, int argc) return true; } -#define MAX_PREPARE_NAME 32 -static void -preparedStatementName(char *buffer, int file, int state) -{ - sprintf(buffer, "P%d_%d", file, state); -} - /* * Report the abortion of the client when processing SQL commands. */ @@ -3053,6 +3049,87 @@ chooseScript(TState *thread) return i - 1; } +/* + * Prepare the SQL command from st->use_file at command_num. The name of the + * new prepared statement is stored in command->prepname. + */ +static void +prepareCommand(CState *st, int command_num) +{ + Command *command = sql_script[st->use_file].commands[command_num]; + static int prepnr = 0; + + /* No prepare for non-SQL commands */ + if (command->type != SQL_COMMAND) + return; + + if (!st->prepared) + { + st->prepared = pg_malloc(sizeof(bool *) * num_scripts); + for (int i = 0; i < num_scripts; i++) + { + ParsedScript *script = &sql_script[i]; + int numcmds; + + for (numcmds = 0; script->commands[numcmds] != NULL; numcmds++) + ; + st->prepared[i] = pg_malloc0(numcmds); + } + } + + if (!st->prepared[st->use_file][command_num]) + { + PGresult *res; + + if (!command->prepname) + command->prepname = psprintf("P%d_%d", st->use_file, prepnr++); + pg_log_debug("client %d preparing %s", st->id, command->prepname); + res = PQprepare(st->con, command->prepname, + command->argv[0], command->argc - 1, NULL); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_log_error("%s", PQerrorMessage(st->con)); + PQclear(res); + st->prepared[st->use_file][command_num] = true; + } +} + +/* + * Prepare all the commands in the script that come after the \startpipeline + * that's at position st->command, and the first \endpipeline we find. + * + * (This sets the ->prepared flag for each relevant command, but doesn't move + * the st->command counter) + */ +static void +prepareCommandsInPipeline(CState *st) +{ + int j; + Command **commands = sql_script[st->use_file].commands; + + Assert(commands[st->command]->type == META_COMMAND && + commands[st->command]->meta == META_STARTPIPELINE); + + /* + * We set the 'prepared' flag on the \startpipeline itself to flag that we + * don't need to do this next time, even though we don't actually prepare + * it. + */ + if (st->prepared && + st->prepared[st->use_file][st->command]) + return; + + for (j = st->command + 1; commands[j] != NULL; j++) + { + if (commands[j]->type == META_COMMAND && + commands[j]->meta == META_ENDPIPELINE) + break; + + prepareCommand(st, j); + } + + st->prepared[st->use_file][st->command] = true; +} + /* Send a SQL command, using the chosen querymode */ static bool sendCommand(CState *st, Command *command) @@ -3083,49 +3160,14 @@ sendCommand(CState *st, Command *command) } else if (querymode == QUERY_PREPARED) { - char name[MAX_PREPARE_NAME]; const char *params[MAX_ARGS]; - if (!st->prepared[st->use_file]) - { - int j; - Command **commands = sql_script[st->use_file].commands; - - for (j = 0; commands[j] != NULL; j++) - { - PGresult *res; - - if (commands[j]->type != SQL_COMMAND) - continue; - preparedStatementName(name, st->use_file, j); - if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF) - { - res = PQprepare(st->con, name, - commands[j]->argv[0], commands[j]->argc - 1, NULL); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pg_log_error("%s", PQerrorMessage(st->con)); - PQclear(res); - } - else - { - /* - * In pipeline mode, we use asynchronous functions. If a - * server-side error occurs, it will be processed later - * among the other results. - */ - if (!PQsendPrepare(st->con, name, - commands[j]->argv[0], commands[j]->argc - 1, NULL)) - pg_log_error("%s", PQerrorMessage(st->con)); - } - } - st->prepared[st->use_file] = true; - } + prepareCommand(st, st->command); getQueryParams(&st->variables, command, params); - preparedStatementName(name, st->use_file, st->command); - pg_log_debug("client %d sending %s", st->id, name); - r = PQsendQueryPrepared(st->con, name, command->argc - 1, + pg_log_debug("client %d sending %s", st->id, command->prepname); + r = PQsendQueryPrepared(st->con, command->prepname, command->argc - 1, params, NULL, NULL, 0); } else /* unknown sql mode */ @@ -3597,7 +3639,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) thread->conn_duration += now - start; /* Reset session-local state */ - memset(st->prepared, 0, sizeof(st->prepared)); + pg_free(st->prepared); + st->prepared = NULL; } /* @@ -4360,6 +4403,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now) return CSTATE_ABORTED; } + /* + * If we're in prepared-query mode, we need to prepare all the + * commands that are inside the pipeline. This solves the problem + * that running BEGIN ISOLATION LEVEL SERIALIZABLE would fail, because + * by the time the command is executed, a snapshot would have already + * been acquired, so an error would be thrown. + */ + if (querymode == QUERY_PREPARED) + prepareCommandsInPipeline(st); + if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF) { commandFailed(st, "startpipeline", "already in pipeline mode"); diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index 4bf508ea96..dfdeb1f70f 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -839,6 +839,23 @@ select 1 \gset f } }); +# Working \startpipeline in prepared query mode with serializable +$node->pgbench( + '-t 1 -n -M prepared', + 0, + [ qr{type: .*/001_pgbench_pipeline_serializable}, qr{actually processed: 1/1} ], + [], + 'working \startpipeline with serializable', + { + '001_pgbench_pipeline_serializable' => q{ +-- test startpipeline with serializable +\startpipeline +BEGIN ISOLATION LEVEL SERIALIZABLE; +} . "select 1;\n" x 10 . q{ +END; +\endpipeline +} + }); # trigger many expression errors my @errors = ( -- 2.30.2