Please find attached a v12, which under timer_exceeded interrupts clients
which are being throttled instead of waiting for the end of the transaction,
as the transaction is not started yet.
Oops, I forgot the attachment. Here it is!
--
Fabien.
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
index 1303217..4c9e55d 100644
--- a/contrib/pgbench/pgbench.c
+++ b/contrib/pgbench/pgbench.c
@@ -137,6 +137,12 @@ int unlogged_tables = 0;
double sample_rate = 0.0;
/*
+ * When threads are throttled to a given rate limit, this is the target delay
+ * to reach that rate in usec. 0 is the default and means no throttling.
+ */
+int64 throttle_delay = 0;
+
+/*
* tablespace selection
*/
char *tablespace = NULL;
@@ -200,11 +206,13 @@ typedef struct
int listen; /* 0 indicates that an async query has been
* sent */
int sleeping; /* 1 indicates that the client is napping */
+ bool throttling; /* whether nap is for throttling */
int64 until; /* napping until (usec) */
Variable *variables; /* array of variable definitions */
int nvariables;
instr_time txn_begin; /* used for measuring transaction latencies */
instr_time stmt_begin; /* used for measuring statement latencies */
+ bool throttled; /* whether current transaction was throttled */
int use_file; /* index in sql_files for this client */
bool prepared[MAX_FILES];
} CState;
@@ -222,6 +230,10 @@ typedef struct
instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
int *exec_count; /* number of cmd executions (per Command) */
unsigned short random_state[3]; /* separate randomness for each thread */
+ int64 throttle_trigger; /* previous/next throttling (us) */
+ int64 throttle_lag; /* total transaction lag behind throttling */
+ int64 throttle_lag_max; /* max transaction lag */
+
} TState;
#define INVALID_THREAD ((pthread_t) 0)
@@ -230,6 +242,8 @@ typedef struct
{
instr_time conn_time;
int xacts;
+ int64 throttle_lag;
+ int64 throttle_lag_max;
} TResult;
/*
@@ -355,6 +369,8 @@ usage(void)
" -n do not run VACUUM before tests\n"
" -N do not update tables \"pgbench_tellers\" and \"pgbench_branches\"\n"
" -r report average latency per command\n"
+ " -R SPEC, --rate SPEC\n"
+ " target rate in transactions per second\n"
" -s NUM report this scale factor in output\n"
" -S perform SELECT-only transactions\n"
" -t NUM number of transactions each client runs (default: 10)\n"
@@ -898,17 +914,58 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa
{
PGresult *res;
Command **commands;
+ bool do_throttle = false;
top:
commands = sql_files[st->use_file];
+ /* handle throttling once per transaction by inserting a sleep.
+ * this is simpler than doing it at the end.
+ */
+ if (throttle_delay && ! st->throttled)
+ {
+ /* compute delay to approximate a Poisson distribution
+ * 1000000 => 13.8 .. 0 multiplier
+ * 100000 => 11.5 .. 0
+ * 10000 => 9.2 .. 0
+ * 1000 => 6.9 .. 0
+ * if transactions are too slow or a given wait shorter than
+ * a transaction, the next transaction will start right away.
+ */
+ int64 wait = (int64)
+ throttle_delay * -log(getrand(thread, 1, 1000)/1000.0);
+
+ thread->throttle_trigger += wait;
+
+ st->until = thread->throttle_trigger;
+ st->sleeping = 1;
+ st->throttling = true;
+ st->throttled = true;
+ if (debug)
+ fprintf(stderr, "client %d throttling "INT64_FORMAT" us\n",
+ st->id, wait);
+ }
+
if (st->sleeping)
{ /* are we sleeping? */
instr_time now;
+ int64 now_us;
INSTR_TIME_SET_CURRENT(now);
- if (st->until <= INSTR_TIME_GET_MICROSEC(now))
+ now_us = INSTR_TIME_GET_MICROSEC(now);
+ if (st->until <= now_us)
+ {
st->sleeping = 0; /* Done sleeping, go ahead with next command */
+ if (st->throttling)
+ {
+ /* measure lag of throttled transaction */
+ int64 lag = now_us - st->until;
+ thread->throttle_lag += lag;
+ if (lag > thread->throttle_lag_max)
+ thread->throttle_lag_max = lag;
+ st->throttling = false;
+ }
+ }
else
return true; /* Still sleeping, nothing to do here */
}
@@ -1037,7 +1094,7 @@ top:
* This is more than we really ought to know about
* instr_time
*/
- fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
+ fprintf(logfile, "%d %d %.0f %d %ld.%06ld\n",
st->id, st->cnt, usec, st->use_file,
(long) now.tv_sec, (long) now.tv_usec);
#else
@@ -1046,7 +1103,7 @@ top:
* On Windows, instr_time doesn't provide a timestamp
* anyway
*/
- fprintf(logfile, "%d %d %.0f %d 0 0\n",
+ fprintf(logfile, "%d %d %.0f %d 0.0\n",
st->id, st->cnt, usec, st->use_file);
#endif
}
@@ -1095,6 +1152,13 @@ top:
st->state = 0;
st->use_file = (int) getrand(thread, 0, num_files - 1);
commands = sql_files[st->use_file];
+ st->throttled = false;
+ /* no transaction is underway, there is nothing to listen any more.
+ * under throttling, a sleep is going to be inserted, and then
+ * some SQL command will set listen back to 1.
+ */
+ st->listen = 0;
+ do_throttle = (throttle_delay>0);
}
}
@@ -1113,6 +1177,12 @@ top:
INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
}
+ if (do_throttle) {
+ /* delay throttling after reopenning the connection */
+ do_throttle = false;
+ goto top;
+ }
+
/* Record transaction start time if logging is enabled */
if (logfile && st->state == 0)
INSTR_TIME_SET_CURRENT(st->txn_begin);
@@ -2017,7 +2087,8 @@ process_builtin(char *tb)
static void
printResults(int ttype, int normal_xacts, int nclients,
TState *threads, int nthreads,
- instr_time total_time, instr_time conn_total_time)
+ instr_time total_time, instr_time conn_total_time,
+ int64 throttle_lag, int64 throttle_lag_max)
{
double time_include,
tps_include,
@@ -2055,6 +2126,18 @@ printResults(int ttype, int normal_xacts, int nclients,
printf("number of transactions actually processed: %d\n",
normal_xacts);
}
+
+ if (throttle_delay)
+ {
+ /* Report average transaction lag under throttling, i.e. the delay
+ between scheduled and actual start times for the transaction.
+ The measured lag may be linked to the thread/client load,
+ the database load, or the Poisson throttling process.
+ */
+ printf("average transaction lag: %.3f ms (max %.3f ms)\n",
+ 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
+ }
+
printf("tps = %f (including connections establishing)\n", tps_include);
printf("tps = %f (excluding connections establishing)\n", tps_exclude);
@@ -2115,6 +2198,7 @@ main(int argc, char **argv)
{"unlogged-tables", no_argument, &unlogged_tables, 1},
{"sampling-rate", required_argument, NULL, 4},
{"aggregate-interval", required_argument, NULL, 5},
+ {"rate", required_argument, NULL, 'R'},
{NULL, 0, NULL, 0}
};
@@ -2137,6 +2221,8 @@ main(int argc, char **argv)
instr_time total_time;
instr_time conn_total_time;
int total_xacts;
+ int64 throttle_lag = 0;
+ int64 throttle_lag_max = 0;
int i;
@@ -2181,7 +2267,7 @@ main(int argc, char **argv)
state = (CState *) pg_malloc(sizeof(CState));
memset(state, 0, sizeof(CState));
- while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
+ while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:R:", long_options, &optindex)) != -1)
{
switch (c)
{
@@ -2336,6 +2422,19 @@ main(int argc, char **argv)
exit(1);
}
break;
+ case 'R':
+ {
+ /* get a double from the beginning of option value */
+ double throttle_value = atof(optarg);
+ if (throttle_value <= 0.0)
+ {
+ fprintf(stderr, "invalid rate limit: %s\n", optarg);
+ exit(1);
+ }
+ /* Invert rate limit into a time offset */
+ throttle_delay = (int64) (1000000.0 / throttle_value);
+ }
+ break;
case 0:
/* This covers long options which take no argument. */
break;
@@ -2373,6 +2472,9 @@ main(int argc, char **argv)
}
}
+ /* compute a per thread delay */
+ throttle_delay *= nthreads;
+
if (argc > optind)
dbName = argv[optind];
else
@@ -2685,6 +2787,9 @@ main(int argc, char **argv)
TResult *r = (TResult *) ret;
total_xacts += r->xacts;
+ throttle_lag += r->throttle_lag;
+ if (r->throttle_lag_max > throttle_lag_max)
+ throttle_lag_max = r->throttle_lag_max;
INSTR_TIME_ADD(conn_total_time, r->conn_time);
free(ret);
}
@@ -2695,7 +2800,7 @@ main(int argc, char **argv)
INSTR_TIME_SET_CURRENT(total_time);
INSTR_TIME_SUBTRACT(total_time, start_time);
printResults(ttype, total_xacts, nclients, threads, nthreads,
- total_time, conn_total_time);
+ total_time, conn_total_time, throttle_lag, throttle_lag_max);
return 0;
}
@@ -2715,6 +2820,15 @@ threadRun(void *arg)
AggVals aggs;
+ /* SHOULD take actual thread start time when the thread is running? */
+ /* INSTR_TIME_SET_CURRENT(thread->start_time); */
+
+ /* throttling for all thread's clients */
+ INSTR_TIME_SET_CURRENT(start);
+ thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
+ thread->throttle_lag = 0;
+ thread->throttle_lag_max = 0;
+
result = pg_malloc(sizeof(TResult));
INSTR_TIME_SET_ZERO(result->conn_time);
@@ -2790,25 +2904,38 @@ threadRun(void *arg)
Command **commands = sql_files[st->use_file];
int sock;
- if (st->sleeping)
+ if (st->con == NULL)
{
- int this_usec;
-
- if (min_usec == INT64_MAX)
+ continue;
+ }
+ else if (st->sleeping)
+ {
+ if (st->throttling && timer_exceeded)
{
- instr_time now;
-
- INSTR_TIME_SET_CURRENT(now);
- now_usec = INSTR_TIME_GET_MICROSEC(now);
+ /* interrupt client which has not started a transaction */
+ remains--;
+ st->sleeping = 0;
+ st->throttling = false;
+ PQfinish(st->con);
+ st->con = NULL;
+ continue;
}
+ else /* just a nap from the script */
+ {
+ int this_usec;
- this_usec = st->until - now_usec;
- if (min_usec > this_usec)
- min_usec = this_usec;
- }
- else if (st->con == NULL)
- {
- continue;
+ if (min_usec == INT64_MAX)
+ {
+ instr_time now;
+
+ INSTR_TIME_SET_CURRENT(now);
+ now_usec = INSTR_TIME_GET_MICROSEC(now);
+ }
+
+ this_usec = st->until - now_usec;
+ if (min_usec > this_usec)
+ min_usec = this_usec;
+ }
}
else if (commands[st->state]->type == META_COMMAND)
{
@@ -2883,6 +3010,8 @@ done:
result->xacts = 0;
for (i = 0; i < nstate; i++)
result->xacts += state[i].cnt;
+ result->throttle_lag = thread->throttle_lag;
+ result->throttle_lag_max = thread->throttle_lag_max;
INSTR_TIME_SET_CURRENT(end);
INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
if (logfile)
diff --git a/doc/src/sgml/pgbench.sgml b/doc/src/sgml/pgbench.sgml
index 8775606..3196c25 100644
--- a/doc/src/sgml/pgbench.sgml
+++ b/doc/src/sgml/pgbench.sgml
@@ -392,6 +392,27 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
</varlistentry>
<varlistentry>
+ <term><option>-R</option> <replaceable>rate</></term>
+ <term><option>--rate</option> <replaceable>rate</></term>
+ <listitem>
+ <para>
+ Execute transactions targeting the specified rate instead of
+ running as fast as possible (the default). The rate is given in
+ transactions per second. If the targeted rate is
+ above the maximum possible rate these transactions can execute at,
+ the rate limit won't have any impact on results.
+
+ The rate is targeted by starting transactions along a
+ Poisson-distributed event time line. When a rate limit is
+ active, the average and maximum transaction lag time
+ (the delay between the scheduled and actual transaction start times)
+ are reported in ms. High values indicate that the database
+ could not handle the scheduled load at some time.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><option>-s</option> <replaceable>scale_factor</></term>
<listitem>
<para>
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers