Hi there, attached is a patch implementing merging of pgbench logs. These logs are written by each thread, so with N threads you get N files with names
pgbench_log.PID pgbench_log.PID.1 ... pgbench_log.PID.N Before analyzing these logs, these files need to be combined. I usually ended up wrinting ad-hoc scripts doing that, lost them, written them again and so on over and over again. The other disadvantage of the external scripts is that you have to pass all the info about the logs (whether the logs are aggregated, whther there's throttling, etc.). So integrating this into pgbench directly seems like a better approach, and the attached patch implements that. With '-m' or '--merge-logs' on the command-line, the logs are merged at the end, using a simple 2-way merge to keep the log sorted by the time field. It should work with all the other options that influence the log format (--rate, --aggregate-interval and --latency-limit). I'll add this to CF 2016-06. -- Tomas Vondra http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c index 706fdf5..d6ec87e 100644 --- a/contrib/pgbench/pgbench.c +++ b/contrib/pgbench/pgbench.c @@ -157,6 +157,11 @@ char *tablespace = NULL; char *index_tablespace = NULL; /* + * merge logs (transaction logs, aggregated logs) at the end + */ +bool merge_logs = false; + +/* * end of configurable parameters *********************************************************************/ @@ -367,6 +372,10 @@ static void *threadRun(void *arg); static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg, bool skipped); +static void merge_log_files(int agg_interval, int nthreads); +static void merge_aggregated_logs(FILE *infile_a, FILE *infile_b, FILE *outfile); +static void merge_simple_logs(FILE *infile_a, FILE *infile_b, FILE *outfile); + static void usage(void) { @@ -408,6 +417,7 @@ usage(void) " -v, --vacuum-all vacuum all four standard tables before tests\n" " --aggregate-interval=NUM aggregate data over NUM seconds\n" " --sampling-rate=NUM fraction of transactions to log (e.g. 0.01 for 1%%)\n" + " -m, --merge-logs merge logs produced by multiple threads\n" "\nCommon options:\n" " -d, --debug print debugging output\n" " -h, --host=HOSTNAME database server host or socket directory\n" @@ -2733,6 +2743,7 @@ main(int argc, char **argv) {"aggregate-interval", required_argument, NULL, 5}, {"rate", required_argument, NULL, 'R'}, {"latency-limit", required_argument, NULL, 'L'}, + {"merge-logs", no_argument, NULL, 'm'}, {NULL, 0, NULL, 0} }; @@ -2808,7 +2819,7 @@ main(int argc, char **argv) state = (CState *) pg_malloc(sizeof(CState)); memset(state, 0, sizeof(CState)); - while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1) + while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:m", long_options, &optindex)) != -1) { switch (c) { @@ -3017,6 +3028,10 @@ main(int argc, char **argv) latency_limit = (int64) (limit_ms * 1000); } break; + case 'm': + printf("merge logs\n"); + merge_logs = true; + break; case 0: /* This covers long options which take no argument. */ if (foreign_keys || unlogged_tables) @@ -3137,6 +3152,12 @@ main(int argc, char **argv) exit(1); } + if (merge_logs && (! use_log)) + { + fprintf(stderr, "log merging is allowed only when actually logging transactions\n"); + exit(1); + } + /* * is_latencies only works with multiple threads in thread-based * implementations, not fork-based ones, because it supposes that the @@ -3418,6 +3439,10 @@ main(int argc, char **argv) throttle_lag, throttle_lag_max, throttle_latency_skipped, latency_late); + /* Merge logs, if needed */ + if (merge_logs) + merge_log_files(agg_interval, nthreads); + return 0; } @@ -3783,6 +3808,339 @@ done: return result; } +static void +merge_log_files(int agg_interval, int nthreads) +{ + int i; + + /* we can do this as 2-way merges (all the logs are sorted by timestamp) */ + for (i = 1; i < nthreads; i++) + { + char logpath[64]; + char logpath_new[64]; + + /* input and output files */ + FILE *infile_a, *infile_b; + FILE *outfile; + + /* the first input is always the 'main' logfile */ + snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid); + infile_a = fopen(logpath, "r"); + + if (infile_b == NULL) + { + fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno)); + return; + } + + snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, i); + infile_b = fopen(logpath, "r"); + + if (infile_b == NULL) + { + fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno)); + return; + } + + snprintf(logpath, sizeof(logpath), "pgbench_log.%d.tmp", main_pid); + outfile = fopen(logpath, "w"); + + if (outfile == NULL) + { + fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno)); + return; + } + + if (agg_interval > 0) + merge_aggregated_logs(infile_a, infile_b, outfile); + else + merge_simple_logs(infile_a, infile_b, outfile); + + fclose(infile_a); + fclose(infile_b); + fclose(outfile); + + snprintf(logpath_new, sizeof(logpath), "pgbench_log.%d", main_pid); + unlink(logpath_new); + + snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, i); + unlink(logpath); + + snprintf(logpath, sizeof(logpath), "pgbench_log.%d.tmp", main_pid); + rename(logpath, logpath_new); + } +} + +static void +merge_simple_logs(FILE *infile_a, FILE *infile_b, FILE *outfile) +{ + bool fetch_a = true, fetch_b = true; + bool done_a = false, done_b = false; + + long tv_sec_a, tv_sec_b; + long tv_usec_a, tv_usec_b; + int id_a, id_b; + int cnt_a, cnt_b; + int use_file_a, use_file_b; + double lag_a, lag_b; + + /* latency may be 'skipped' in some cases */ + char latency_a[64], latency_b[64]; + + /* repeat until we exhaust data from both files */ + while (! (done_a && done_b)) + { + if (fetch_a) + { + if (throttle_delay) + { + if (fscanf(infile_a, "%d %d %s %d %ld %ld %lf", + &id_a, &cnt_a, latency_a, &use_file_a, + &tv_sec_a, &tv_usec_a, &lag_a) != 7) + done_a = true; + } + else + if (fscanf(infile_a, "%d %d %s %d %ld %ld", + &id_a, &cnt_a, latency_a, &use_file_a, + &tv_sec_a, &tv_usec_a) != 6) + done_a = true; + + fetch_a = false; + } + + if (fetch_b) + { + if (throttle_delay) + { + if (fscanf(infile_b, "%d %d %s %d %ld %ld %lf", + &id_b, &cnt_b, latency_b, &use_file_b, + &tv_sec_b, &tv_usec_b, &lag_b) != 7) + done_b = true; + } + else + if (fscanf(infile_b, "%d %d %s %d %ld %ld", + &id_b, &cnt_b, latency_b, &use_file_b, + &tv_sec_b, &tv_usec_b) != 6) + done_b = true; + + fetch_b = false; + } + + /* both files completed */ + if (done_a && done_b) + break; + + if ((!done_a) && (! done_b)) + { + /* 'a' before 'b' (or at the same time) */ + if ((tv_sec_a < tv_sec_b) || ((tv_sec_a == tv_sec_b) && (tv_usec_a <= tv_usec_b))) + { + if (throttle_delay) + fprintf(outfile, "%d %d %s %d %ld %ld %.0f\n", + id_a, cnt_a, latency_a, use_file_a, + tv_sec_a, tv_usec_a, lag_a); + else + fprintf(outfile, "%d %d %s %d %ld %ld\n", + id_a, cnt_a, latency_a, use_file_a, + tv_sec_a, tv_usec_a); + fetch_a = true; + } + else /* b after a */ + { + if (throttle_delay) + fprintf(outfile, "%d %d %s %d %ld %ld %.0f\n", + id_b, cnt_b, latency_b, use_file_b, + tv_sec_b, tv_usec_b, lag_b); + else + fprintf(outfile, "%d %d %s %d %ld %ld\n", + id_b, cnt_b, latency_b, use_file_b, + tv_sec_b, tv_usec_b); + fetch_b = true; + } + } + else if (!done_a) + { + if (throttle_delay) + fprintf(outfile, "%d %d %s %d %ld %ld %.0f\n", + id_a, cnt_a, latency_a, use_file_a, + tv_sec_a, tv_usec_a, lag_a); + else + fprintf(outfile, "%d %d %s %d %ld %ld\n", + id_a, cnt_a, latency_a, use_file_a, + tv_sec_a, tv_usec_a); + fetch_a = true; + } + else if (!done_b) + { + if (throttle_delay) + fprintf(outfile, "%d %d %s %d %ld %ld %.0f\n", + id_b, cnt_b, latency_b, use_file_b, + tv_sec_b, tv_usec_b, lag_b); + else + fprintf(outfile, "%d %d %s %d %ld %ld\n", + id_b, cnt_b, latency_b, use_file_b, + tv_sec_b, tv_usec_b); + fetch_b = true; + } + } +} + +static void +merge_aggregated_logs(FILE *infile_a, FILE *infile_b, FILE *outfile) +{ + bool fetch_a = true, fetch_b = true; + bool done_a = false, done_b = false; + + long start_time_a, start_time_b; + int cnt_a, cnt_b; + + double min_latency_a, min_latency_b; + double max_latency_a, max_latency_b; + double sum_latency_a, sum_latency_b; + double sum2_latency_a, sum2_latency_b; + + /* used with 'throttle_delay' */ + double min_lag_a, min_lag_b; + double max_lag_a, max_lag_b; + double sum_lag_a, sum_lag_b; + double sum2_lag_a, sum2_lag_b; + + /* repeat until we exhaust data from both files */ + while (! (done_a && done_b)) + { + if (fetch_a) + { + if (throttle_delay) + { + if (fscanf(infile_a, "%ld %d %lf %lf %lf %lf %lf %lf %lf %lf", + &start_time_a, &cnt_a, &sum_latency_a, &sum2_latency_a, + &min_latency_a, &max_latency_a, &sum_lag_a, &sum2_lag_a, + &min_lag_a, &max_lag_a) != 10) + done_a = true; + } + else if (fscanf(infile_a, "%ld %d %lf %lf %lf %lf", + &start_time_a, &cnt_a, &sum_latency_a, &sum2_latency_a, + &min_latency_a, &max_latency_a) != 6) + done_a = true; + + fetch_a = false; + } + + if (fetch_b) + { + if (throttle_delay) + { + if (fscanf(infile_b, "%ld %d %lf %lf %lf %lf %lf %lf %lf %lf", + &start_time_b, &cnt_b, &sum_latency_b, &sum2_latency_b, + &min_latency_b, &max_latency_b, &sum_lag_b, &sum2_lag_b, + &min_lag_b, &max_lag_b) != 10) + done_b = true; + } + else if (fscanf(infile_b, "%ld %d %lf %lf %lf %lf", + &start_time_b, &cnt_b, &sum_latency_b, &sum2_latency_b, + &min_latency_b, &max_latency_b) != 6) + done_b = true; + + fetch_b = false; + } + + /* both files completed */ + if (done_a && done_b) + break; + + if ((!done_a) && (! done_b)) + { + /* we need to decide whether to merge the data (may contain gaps) */ + if (start_time_a == start_time_b) + { + if (throttle_delay) + fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f %lf %lf %lf %lf\n", + start_time_a, (cnt_a + cnt_b), + (sum_latency_a + sum_latency_b), (sum2_latency_a + sum2_latency_b), + (min_latency_a < min_latency_b) ? min_latency_a : min_latency_b, + (max_latency_a > max_latency_b) ? max_latency_a : max_latency_b, + (sum_lag_a + sum_lag_b), (sum2_lag_a + sum2_lag_b), + (min_lag_a < min_lag_b) ? min_lag_a : min_lag_b, + (max_lag_a > max_lag_b) ? max_lag_a : max_lag_b); + else + fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f\n", + start_time_a, cnt_a + cnt_b, + sum_latency_a + sum_latency_b, + sum2_latency_a + sum2_latency_b, + (min_latency_a < min_latency_b) ? min_latency_a : min_latency_b, + (max_latency_a > max_latency_b) ? max_latency_a : max_latency_b); + + fetch_a = true; + fetch_b = true; + } + else if (start_time_a < start_time_b) + { + if (throttle_delay) + fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f %lf %lf %lf %lf\n", + start_time_a, cnt_a, + sum_latency_a, sum2_latency_a, + min_latency_a, max_latency_a, + sum_lag_a, sum2_lag_a, + min_lag_a, max_lag_a); + else + fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f\n", + start_time_a, cnt_a, + sum_latency_a, sum2_latency_a, + min_latency_a, max_latency_a); + fetch_a = true; + } + else + { + if (throttle_delay) + fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f %lf %lf %lf %lf\n", + start_time_b, cnt_b, + sum_latency_b, sum2_latency_b, + min_latency_b, max_latency_b, + sum_lag_b, sum2_lag_b, + min_lag_b, max_lag_b); + else + fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f\n", + start_time_b, cnt_b, + sum_latency_b, sum2_latency_b, + min_latency_b, max_latency_b); + fetch_b = true; + } + } + else if (!done_a) + { + if (throttle_delay) + fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f %lf %lf %lf %lf\n", + start_time_a, cnt_a, + sum_latency_a, sum2_latency_a, + min_latency_a, max_latency_a, + sum_lag_a, sum2_lag_a, + min_lag_a, max_lag_a); + else + fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f\n", + start_time_a, cnt_a, + sum_latency_a, sum2_latency_a, + min_latency_a, max_latency_a); + fetch_a = true; + } + else if (!done_b) + { + if (throttle_delay) + fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f %lf %lf %lf %lf\n", + start_time_b, cnt_b, + sum_latency_b, sum2_latency_b, + min_latency_b, max_latency_b, + sum_lag_b, sum2_lag_b, + min_lag_b, max_lag_b); + else + fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f\n", + start_time_b, cnt_b, + sum_latency_b, sum2_latency_b, + min_latency_b, max_latency_b); + fetch_b = true; + } + } +} + /* * Support for duration option: set timer_exceeded after so many seconds. */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers