Attached is a v3, where I have updated inaccurate comments.
Attached v4 is a rebase after 409231919443984635b7ae9b7e2e261ab984eb1e
--
Fabien.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 73d3de0677..ed6ff75426 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -295,23 +295,31 @@ typedef enum
/*
* The client must first choose a script to execute. Once chosen, it can
* either be throttled (state CSTATE_START_THROTTLE under --rate) or start
- * right away (state CSTATE_START_TX).
+ * right away (state CSTATE_START_TX) or not start at all if the timer was
+ * exceeded (state CSTATE_FINISHED).
*/
CSTATE_CHOOSE_SCRIPT,
/*
* In CSTATE_START_THROTTLE state, we calculate when to begin the next
* transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
- * sleeps until that moment. (If throttling is not enabled, doCustom()
- * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
+ * sleeps until that moment.
+ *
+ * It may also detect that the next transaction would start beyond the end
+ * of run, and switch to CSTATE_FINISHED.
*/
CSTATE_START_THROTTLE,
CSTATE_THROTTLE,
/*
* CSTATE_START_TX performs start-of-transaction processing. Establishes
- * a new connection for the transaction, in --connect mode, and records
- * the transaction start time.
+ * a new connection for the transaction in --connect mode, records
+ * the transaction start time, and proceed to the first command.
+ *
+ * Note: once a script is started, it will either error or run till
+ * its end, where it may be interrupted. It is not interrupted while
+ * running, so pgbench --time is to be understood as tx are allowed to
+ * start in that time, and will finish when their work is completed.
*/
CSTATE_START_TX,
@@ -324,9 +332,6 @@ typedef enum
* and we enter the CSTATE_SLEEP state to wait for it to expire. Other
* meta-commands are executed immediately.
*
- * CSTATE_SKIP_COMMAND for conditional branches which are not executed,
- * quickly skip commands that do not need any evaluation.
- *
* CSTATE_WAIT_RESULT waits until we get a result set back from the server
* for the current command.
*
@@ -334,19 +339,25 @@ typedef enum
*
* CSTATE_END_COMMAND records the end-of-command timestamp, increments the
* command counter, and loops back to CSTATE_START_COMMAND state.
+ *
+ * CSTATE_SKIP_COMMAND is used by conditional branches which are not
+ * executed. It quickly skip commands that do not need any evaluation.
+ * This state can move forward several commands, till there is something
+ * to do or the end of the script.
*/
CSTATE_START_COMMAND,
- CSTATE_SKIP_COMMAND,
CSTATE_WAIT_RESULT,
CSTATE_SLEEP,
CSTATE_END_COMMAND,
+ CSTATE_SKIP_COMMAND,
/*
- * CSTATE_END_TX performs end-of-transaction processing. Calculates
- * latency, and logs the transaction. In --connect mode, closes the
- * current connection. Chooses the next script to execute and starts over
- * in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have no
- * more work to do.
+ * CSTATE_END_TX performs end-of-transaction processing. It calculates
+ * latency, and logs the transaction. In --connect mode, it closes the
+ * current connection.
+ *
+ * Then either starts over in CSTATE_CHOOSE_SCRIPT, or enters CSTATE_FINISHED
+ * if we have no more work to do.
*/
CSTATE_END_TX,
@@ -2821,16 +2832,13 @@ evaluateSleep(CState *st, int argc, char **argv, int *usecs)
/*
* Advance the state machine of a connection, if possible.
+ *
+ * All state changes are performed within this function called
+ * by threadRun.
*/
static void
doCustom(TState *thread, CState *st, StatsData *agg)
{
- PGresult *res;
- Command *command;
- instr_time now;
- bool end_tx_processed = false;
- int64 wait;
-
/*
* gettimeofday() isn't free, so we get the current timestamp lazily the
* first time it's needed, and reuse the same value throughout this
@@ -2839,37 +2847,44 @@ doCustom(TState *thread, CState *st, StatsData *agg)
* means "not set yet". Reset "now" when we execute shell commands or
* expressions, which might take a non-negligible amount of time, though.
*/
+ instr_time now;
INSTR_TIME_SET_ZERO(now);
/*
* Loop in the state machine, until we have to wait for a result from the
- * server (or have to sleep, for throttling or for \sleep).
+ * server or have to sleep for throttling or \sleep.
*
* Note: In the switch-statement below, 'break' will loop back here,
* meaning "continue in the state machine". Return is used to return to
- * the caller.
+ * the caller, giving the thread opportunity to move forward another client.
*/
for (;;)
{
+ PGresult *res;
+ Command *command;
+
switch (st->state)
{
/*
* Select transaction to run.
*/
case CSTATE_CHOOSE_SCRIPT:
-
st->use_file = chooseScript(thread);
if (debug)
fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
sql_script[st->use_file].desc);
- if (throttle_delay > 0)
+ /* check stack consistency */
+ Assert(conditional_stack_empty(st->cstack));
+
+ if (timer_exceeded)
+ st->state = CSTATE_FINISHED;
+ else if (throttle_delay > 0)
st->state = CSTATE_START_THROTTLE;
else
st->state = CSTATE_START_TX;
- /* check consistency */
- Assert(conditional_stack_empty(st->cstack));
+
break;
/*
@@ -2887,21 +2902,10 @@ doCustom(TState *thread, CState *st, StatsData *agg)
* away.
*/
Assert(throttle_delay > 0);
- wait = getPoissonRand(&thread->ts_throttle_rs, throttle_delay);
- thread->throttle_trigger += wait;
+ thread->throttle_trigger += getPoissonRand(&thread->ts_throttle_rs, throttle_delay);
st->txn_scheduled = thread->throttle_trigger;
- /*
- * stop client if next transaction is beyond pgbench end of
- * execution
- */
- if (duration > 0 && st->txn_scheduled > end_time)
- {
- st->state = CSTATE_FINISHED;
- break;
- }
-
/*
* If --latency-limit is used, and this slot is already late
* so that the transaction will miss the latency limit even if
@@ -2913,20 +2917,20 @@ doCustom(TState *thread, CState *st, StatsData *agg)
{
int64 now_us;
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
+ INSTR_TIME_SET_CURRENT_LAZY(now);
now_us = INSTR_TIME_GET_MICROSEC(now);
+
while (thread->throttle_trigger < now_us - latency_limit &&
(nxacts <= 0 || st->cnt < nxacts))
{
processXactStats(thread, st, &now, true, agg);
/* next rendez-vous */
- wait = getPoissonRand(&thread->ts_throttle_rs,
- throttle_delay);
- thread->throttle_trigger += wait;
+ thread->throttle_trigger +=
+ getPoissonRand(&thread->ts_throttle_rs, throttle_delay);
st->txn_scheduled = thread->throttle_trigger;
}
- /* stop client if -t exceeded */
+
+ /* stop client if -t was exceeded in the previous skip loop */
if (nxacts > 0 && st->cnt >= nxacts)
{
st->state = CSTATE_FINISHED;
@@ -2934,38 +2938,45 @@ doCustom(TState *thread, CState *st, StatsData *agg)
}
}
+ /*
+ * stop client if next transaction is beyond pgbench end of
+ * execution.
+ */
+ if (duration > 0 && st->txn_scheduled > end_time)
+ {
+ st->state = CSTATE_FINISHED;
+ break;
+ }
+
st->state = CSTATE_THROTTLE;
- if (debug)
- fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
- st->id, wait);
break;
/*
* Wait until it's time to start next transaction.
*/
case CSTATE_THROTTLE:
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
+
+ INSTR_TIME_SET_CURRENT_LAZY(now);
+
if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
- return; /* Still sleeping, nothing to do here */
+ return; /* still sleeping, nothing to do here */
- /* Else done sleeping, start the transaction */
- st->state = CSTATE_START_TX;
+ /* done sleeping, but do not start if transaction if we are done */
+ if (timer_exceeded)
+ st->state = CSTATE_FINISHED;
+ else
+ st->state = CSTATE_START_TX;
break;
/* Start new transaction */
case CSTATE_START_TX:
- /*
- * Establish connection on first call, or if is_connect is
- * true.
- */
+ /* establish connection if needed, i.e. under --connect */
if (st->con == NULL)
{
instr_time start;
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
+ INSTR_TIME_SET_CURRENT_LAZY(now);
start = now;
if ((st->con = doConnect()) == NULL)
{
@@ -2981,28 +2992,20 @@ doCustom(TState *thread, CState *st, StatsData *agg)
memset(st->prepared, 0, sizeof(st->prepared));
}
+ /* record transaction start time. */
+ INSTR_TIME_SET_CURRENT_LAZY(now);
+ st->txn_begin = now;
+
/*
- * Record transaction start time under logging, progress or
- * throttling.
+ * When not throttling, this is also the transaction's
+ * scheduled start time.
*/
- if (use_log || progress || throttle_delay || latency_limit ||
- per_script_stats)
- {
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
- st->txn_begin = now;
-
- /*
- * When not throttling, this is also the transaction's
- * scheduled start time.
- */
- if (!throttle_delay)
- st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
- }
+ if (!throttle_delay)
+ st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
/* Begin with the first command */
- st->command = 0;
st->state = CSTATE_START_COMMAND;
+ st->command = 0;
break;
/*
@@ -3021,17 +3024,11 @@ doCustom(TState *thread, CState *st, StatsData *agg)
break;
}
- /*
- * Record statement start time if per-command latencies are
- * requested
- */
- if (is_latencies)
- {
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
- st->stmt_begin = now;
- }
+ /* record statement start time. */
+ INSTR_TIME_SET_CURRENT_LAZY(now);
+ st->stmt_begin = now;
+ /* execute the command */
if (command->type == SQL_COMMAND)
{
if (!sendCommand(st, command))
@@ -3074,8 +3071,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
break;
}
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
+ INSTR_TIME_SET_CURRENT_LAZY(now);
+
st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
st->state = CSTATE_SLEEP;
break;
@@ -3126,10 +3123,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
}
else /* elif */
{
- /*
- * we should get here only if the "elif"
- * needed evaluation
- */
+ /* we should get here only if the "elif" needed evaluation */
Assert(conditional_stack_peek(st->cstack) == IFSTATE_FALSE);
conditional_stack_poke(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
}
@@ -3161,43 +3155,23 @@ doCustom(TState *thread, CState *st, StatsData *agg)
}
else if (command->meta == META_SETSHELL)
{
- bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
-
- if (timer_exceeded) /* timeout */
- {
- st->state = CSTATE_FINISHED;
- break;
- }
- else if (!ret) /* on error */
+ if (!runShellCommand(st, argv[1], argv + 2, argc - 2))
{
commandFailed(st, "setshell", "execution of meta-command failed");
st->state = CSTATE_ABORTED;
break;
}
- else
- {
- /* succeeded */
- }
+ /* else success */
}
else if (command->meta == META_SHELL)
{
- bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
-
- if (timer_exceeded) /* timeout */
- {
- st->state = CSTATE_FINISHED;
- break;
- }
- else if (!ret) /* on error */
+ if (!runShellCommand(st, NULL, argv + 1, argc - 1))
{
commandFailed(st, "shell", "execution of meta-command failed");
st->state = CSTATE_ABORTED;
break;
}
- else
- {
- /* succeeded */
- }
+ /* else success */
}
move_to_end_command:
@@ -3299,6 +3273,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
}
if (st->state != CSTATE_SKIP_COMMAND)
+ /* out of quick skip command loop */
break;
}
break;
@@ -3348,10 +3323,9 @@ doCustom(TState *thread, CState *st, StatsData *agg)
* instead of CSTATE_START_TX.
*/
case CSTATE_SLEEP:
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
+ INSTR_TIME_SET_CURRENT_LAZY(now);
if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
- return; /* Still sleeping, nothing to do here */
+ return; /* still sleeping, nothing to do here */
/* Else done sleeping. */
st->state = CSTATE_END_COMMAND;
break;
@@ -3366,17 +3340,13 @@ doCustom(TState *thread, CState *st, StatsData *agg)
* in thread-local data structure, if per-command latencies
* are requested.
*/
- if (is_latencies)
- {
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
+ INSTR_TIME_SET_CURRENT_LAZY(now);
- /* XXX could use a mutex here, but we choose not to */
- command = sql_script[st->use_file].commands[st->command];
- addToSimpleStats(&command->stats,
- INSTR_TIME_GET_DOUBLE(now) -
- INSTR_TIME_GET_DOUBLE(st->stmt_begin));
- }
+ /* XXX could use a mutex here, but we choose not to */
+ command = sql_script[st->use_file].commands[st->command];
+ addToSimpleStats(&command->stats,
+ INSTR_TIME_GET_DOUBLE(now) -
+ INSTR_TIME_GET_DOUBLE(st->stmt_begin));
/* Go ahead with next command, to be executed or skipped */
st->command++;
@@ -3385,19 +3355,15 @@ doCustom(TState *thread, CState *st, StatsData *agg)
break;
/*
- * End of transaction.
+ * End of transaction (end of script, really).
*/
case CSTATE_END_TX:
/* transaction finished: calculate latency and do log */
processXactStats(thread, st, &now, false, agg);
- /* conditional stack must be empty */
- if (!conditional_stack_empty(st->cstack))
- {
- fprintf(stderr, "end of script reached within a conditional, missing \\endif\n");
- exit(1);
- }
+ /* missing \endif... cannot happen if CheckConditional was okay */
+ Assert(conditional_stack_empty(st->cstack));
if (is_connect)
{
@@ -3411,26 +3377,17 @@ doCustom(TState *thread, CState *st, StatsData *agg)
st->state = CSTATE_FINISHED;
break;
}
+ else
+ {
+ /* next transaction */
+ st->state = CSTATE_CHOOSE_SCRIPT;
- /*
- * No transaction is underway anymore.
- */
- st->state = CSTATE_CHOOSE_SCRIPT;
-
- /*
- * If we paced through all commands in the script in this
- * loop, without returning to the caller even once, do it now.
- * This gives the thread a chance to process other
- * connections, and to do progress reporting. This can
- * currently only happen if the script consists entirely of
- * meta-commands.
- */
- if (end_tx_processed)
+ /*
+ * Ensure that we always return on this point, so as
+ * to avoid an infinite loop if the script only contains
+ * meta commands.
+ */
return;
- else
- {
- end_tx_processed = true;
- break;
}
/*
@@ -3544,8 +3501,7 @@ processXactStats(TState *thread, CState *st, instr_time *now,
if (detailed && !skipped)
{
- if (INSTR_TIME_IS_ZERO(*now))
- INSTR_TIME_SET_CURRENT(*now);
+ INSTR_TIME_SET_CURRENT_LAZY(*now);
/* compute latency & lag */
latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
@@ -5809,7 +5765,7 @@ threadRun(void *arg)
if (!is_connect)
{
- /* make connections to the database */
+ /* make connections to the database before starting */
for (i = 0; i < nstate; i++)
{
if ((state[i].con = doConnect()) == NULL)
@@ -5845,14 +5801,7 @@ threadRun(void *arg)
{
CState *st = &state[i];
- if (st->state == CSTATE_THROTTLE && timer_exceeded)
- {
- /* interrupt client that has not started a transaction */
- st->state = CSTATE_FINISHED;
- finishCon(st);
- remains--;
- }
- else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
+ if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
{
/* a nap from the script, or under throttling */
int64 this_usec;
diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h
index f968444671..c46f6825bb 100644
--- a/src/include/portability/instr_time.h
+++ b/src/include/portability/instr_time.h
@@ -20,6 +20,8 @@
*
* INSTR_TIME_SET_CURRENT(t) set t to current time
*
+ * INSTR_TIME_SET_CURRENT_LAZY(t) set t to current time if t is zero
+ *
* INSTR_TIME_ADD(x, y) x += y
*
* INSTR_TIME_SUBTRACT(x, y) x -= y
@@ -245,4 +247,9 @@ GetTimerFrequency(void)
#endif /* WIN32 */
+/* same macro on all platforms */
+#define INSTR_TIME_SET_CURRENT_LAZY(t) \
+ if (INSTR_TIME_IS_ZERO(t)) \
+ INSTR_TIME_SET_CURRENT(t)
+
#endif /* INSTR_TIME_H */