Hello Thomas,

After looking at it again, here is an update which ensure 64 bits on
epoch_shift computation.

The code in pgbench 13 aggregates into buckets that begin on the
boundaries of wall clock seconds, because it is triggered by changes
in time_t.  In the current patch, we aggregate data into buckets that
begin on the boundaries of whole seconds since start_time.  Those
boundaries are not aligned with wall clock seconds, and yet we print
out the times rounded to wall clock seconds.

Yes, I noticed this small changed, and did not feel it was an issue at the time.

I thought of doing something like the format change your are suggesting. However people would like it and it would need to be discussed, hence it stayed that way… People have scripts to process log files and do not like format changes, basically.

Perhaps we should round the start time of the first aggregate down to
the nearest wall clock second?

Yep, but that requires a common start point for all threads. Why not.

That would mean that the first aggregate misses a part of a second (as it does in pgbench 13), but all later aggregates begin at the time we write in the log (as it does in pgbench 13). That is, if we log 1625115080 we mean "all results >= 1625115080.000000". It's a small detail, but it could be important for someone trying to correlate the log with other data. What do you think?

I think that you are right. The simplest way is to align on whole seconds, which is easier than changing the format and have complaints about that, or not align and have complaints about the timestamp being rounded.

Attached a v14 in that spirit.

--
Fabien.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 4aeccd93af..003ba3382e 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -343,6 +343,17 @@ typedef struct StatsData
 	SimpleStats lag;
 } StatsData;
 
+/*
+ * For displaying epoch timestamp, as some time functions may have
+ * another reference.
+ */
+pg_time_usec_t epoch_shift;
+
+/*
+ * When the bench run started, just before creating threads
+ */
+pg_time_usec_t run_start_time;
+
 /*
  * Struct to keep random state.
  */
@@ -649,7 +660,7 @@ static void setDoubleValue(PgBenchValue *pv, double dval);
 static bool evaluateExpr(CState *st, PgBenchExpr *expr,
 						 PgBenchValue *retval);
 static ConnectionStateEnum executeMetaCommand(CState *st, pg_time_usec_t *now);
-static void doLog(TState *thread, CState *st,
+static void doLog(TState *thread, CState *st, bool tx,
 				  StatsData *agg, bool skipped, double latency, double lag);
 static void processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
 							 bool skipped, StatsData *agg);
@@ -3768,16 +3779,47 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
 	return CSTATE_END_COMMAND;
 }
 
+/* print aggregated report to logfile */
+static void
+logAgg(FILE *logfile, StatsData *agg)
+{
+	fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
+			(agg->start_time + epoch_shift) / 1000000,
+			agg->cnt,
+			agg->latency.sum,
+			agg->latency.sum2,
+			agg->latency.min,
+			agg->latency.max);
+	if (throttle_delay)
+	{
+		fprintf(logfile, " %.0f %.0f %.0f %.0f",
+				agg->lag.sum,
+				agg->lag.sum2,
+				agg->lag.min,
+				agg->lag.max);
+		if (latency_limit)
+			fprintf(logfile, " " INT64_FORMAT, agg->skipped);
+	}
+	fputc('\n', logfile);
+}
+
 /*
  * Print log entry after completing one transaction.
  *
+ * Param tx tells whether the call actually represents a transaction,
+ * or if it is used to flush aggregated logs.
+ *
+ * The function behavior changes depending on sample_rate (a fraction of
+ * transaction is reported) and agg_interval (transactions are aggregated
+ * and reported once every agg_interval seconds).
+ *
  * We print Unix-epoch timestamps in the log, so that entries can be
  * correlated against other logs.  On some platforms this could be obtained
  * from the caller, but rather than get entangled with that, we just eat
  * the cost of an extra syscall in all cases.
  */
 static void
-doLog(TState *thread, CState *st,
+doLog(TState *thread, CState *st, bool tx,
 	  StatsData *agg, bool skipped, double latency, double lag)
 {
 	FILE	   *logfile = thread->logfile;
@@ -3796,43 +3838,36 @@ doLog(TState *thread, CState *st,
 	/* should we aggregate the results or not? */
 	if (agg_interval > 0)
 	{
+		pg_time_usec_t	next;
+
 		/*
 		 * Loop until we reach the interval of the current moment, and print
 		 * any empty intervals in between (this may happen with very low tps,
 		 * e.g. --rate=0.1).
 		 */
-
-		while (agg->start_time + agg_interval <= now)
+		while ((next = agg->start_time + agg_interval * INT64CONST(1000000)) <= now)
 		{
-			/* print aggregated report to logfile */
-			fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
-					agg->start_time,
-					agg->cnt,
-					agg->latency.sum,
-					agg->latency.sum2,
-					agg->latency.min,
-					agg->latency.max);
-			if (throttle_delay)
-			{
-				fprintf(logfile, " %.0f %.0f %.0f %.0f",
-						agg->lag.sum,
-						agg->lag.sum2,
-						agg->lag.min,
-						agg->lag.max);
-				if (latency_limit)
-					fprintf(logfile, " " INT64_FORMAT, agg->skipped);
-			}
-			fputc('\n', logfile);
+			logAgg(logfile, agg);
 
 			/* reset data and move to next interval */
-			initStats(agg, agg->start_time + agg_interval);
+			initStats(agg, next);
 		}
 
-		/* accumulate the current transaction */
-		accumStats(agg, skipped, latency, lag);
+		if (tx)
+			/* accumulate the current transaction */
+			accumStats(agg, skipped, latency, lag);
+		else
+			/* final call to show the last aggregate */
+			logAgg(logfile, agg);
 	}
 	else
 	{
+		/* switch to epoch */
+		now += epoch_shift;
+
+		/* !tx only used for aggregated data */
+		Assert(tx);
+
 		/* no, print raw transactions */
 		if (skipped)
 			fprintf(logfile, "%d " INT64_FORMAT " skipped %d " INT64_FORMAT " "
@@ -3892,7 +3927,7 @@ processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
 	st->cnt++;
 
 	if (use_log)
-		doLog(thread, st, agg, skipped, latency, lag);
+		doLog(thread, st, true, agg, skipped, latency, lag);
 
 	/* XXX could use a mutex here, but we choose not to */
 	if (per_script_stats)
@@ -5458,7 +5493,7 @@ printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now,
 
 	if (progress_timestamp)
 	{
-		snprintf(tbuf, sizeof(tbuf), "%.3f s", PG_TIME_GET_DOUBLE(now));
+		snprintf(tbuf, sizeof(tbuf), "%.3f s", PG_TIME_GET_DOUBLE(now + epoch_shift));
 	}
 	else
 	{
@@ -5789,7 +5824,6 @@ main(int argc, char **argv)
 	TState	   *threads;		/* array of thread */
 
 	pg_time_usec_t
-				start_time,		/* start up time */
 				bench_start = 0,	/* first recorded benchmarking time */
 				conn_total_duration;	/* cumulated connection time in
 										 * threads */
@@ -5808,6 +5842,11 @@ main(int argc, char **argv)
 	char	   *env;
 
 	int			exit_code = 0;
+	struct timeval tv;
+
+	/* record shift between epoch and now() */
+	gettimeofday(&tv, NULL);
+	epoch_shift = tv.tv_sec * INT64CONST(1000000) + tv.tv_usec - pg_time_now();
 
 	pg_logging_init(argv[0]);
 	progname = get_progname(argv[0]);
@@ -6465,7 +6504,7 @@ main(int argc, char **argv)
 	Assert(nclients_dealt == nclients);
 
 	/* get start up time for the whole computation */
-	start_time = pg_time_now();
+	run_start_time = pg_time_now();
 
 	/* set alarm if duration is specified. */
 	if (duration > 0)
@@ -6543,7 +6582,7 @@ main(int argc, char **argv)
 	 * underestimated.
 	 */
 	printResults(&stats, pg_time_now() - bench_start, conn_total_duration,
-				 bench_start - start_time, latency_late);
+				 bench_start - run_start_time, latency_late);
 
 	THREAD_BARRIER_DESTROY(&barrier);
 
@@ -6637,7 +6676,8 @@ threadRun(void *arg)
 	thread->bench_start = start;
 	thread->throttle_trigger = start;
 
-	initStats(&aggs, start);
+	/* aggregations are do one whole seconds, the same for all threads */
+	initStats(&aggs, run_start_time / 1000000 * 1000000);
 	last = aggs;
 
 	/* loop till all clients have terminated */
@@ -6830,8 +6870,9 @@ done:
 		if (agg_interval > 0)
 		{
 			/* log aggregated but not yet reported transactions */
-			doLog(thread, state, &aggs, false, 0, 0);
+			doLog(thread, state, false, &aggs, false, 0.0, 0.0);
 		}
+
 		fclose(thread->logfile);
 		thread->logfile = NULL;
 	}
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index 3aa9d5d753..41250bf91d 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -8,6 +8,7 @@ use PostgresNode;
 use TestLib;
 use Test::More;
 use Config;
+use Time::HiRes qw( time );
 
 # start a pgbench specific server
 my $node = get_new_node('main');
@@ -54,12 +55,14 @@ sub pgbench
 
 	push @cmd, @args;
 
+	my $start = time();
 	$node->command_checks_all(\@cmd, $stat, $out, $err, $name);
+	my $stop = time();
 
 	# cleanup?
 	#unlink @filenames or die "cannot unlink files (@filenames): $!";
 
-	return;
+	return $stop - $start;
 }
 
 # tablespace for testing, because partitioned tables cannot use pg_default
@@ -1187,7 +1190,7 @@ sub check_pgbench_logs
 
 	# $prefix is simple enough, thus does not need escaping
 	my @logs = list_files($dir, qr{^$prefix\..*$});
-	ok(@logs == $nb, "number of log files");
+	ok(@logs == $nb, "number of log files (@logs)");
 	ok(grep(/\/$prefix\.\d+(\.\d+)?$/, @logs) == $nb, "file name format");
 
 	my $log_number = 0;
@@ -1220,6 +1223,58 @@ sub check_pgbench_logs
 my $bdir = $node->basedir;
 
 # Run with sampling rate, 2 clients with 50 transactions each.
+#
+# Test time-sensitive features on a light read-only transaction:
+#
+#   -T: bench duration, 2 seconds to exercise progress & logs
+#   -P: progress report
+#   --aggregate-interval: periodic aggregated logs
+#   --rate: schedule load
+#   --latency-limit: max delay, not deeply exercice
+#
+# note: the --rate behavior is probabilistic in nature.
+# note: --progress-timestamp is not tested.
+my $delay = pgbench(
+	'-T 2 -P 1 -l --aggregate-interval=1 -S -b se@2'
+	. ' --rate=20 --latency-limit=1000 -j ' . $nthreads
+	. ' -c 3 -r',
+	0,
+	[   qr{type: multiple},
+		qr{clients: 3},
+		qr{threads: $nthreads},
+		qr{duration: 2 s},
+		qr{script 1: .* select only},
+		qr{script 2: .* select only},
+		qr{statement latencies in milliseconds},
+		qr{FROM pgbench_accounts} ],
+	[ qr{vacuum}, qr{progress: 1\b} ],
+	'pgbench progress', undef,
+	"--log-prefix=$bdir/001_pgbench_log_1");
+
+# cool check that we are around 2 seconds
+
+TODO: {
+	local $TODO = "possibly unreliable on slow hosts or unlucky runs";
+
+	# The rate may results in an unlucky schedule which triggers
+	# an early exit, hence the loose bound.
+
+	# also, the delay may totally fail on very slow or overloard hosts,
+	# valgrind runs...
+
+	ok(1.5 < $delay && $delay < 2.5, "-T 2 run around 2 seconds");
+}
+
+# $nthreads threads, 2 seconds, but due to timing imprecision we might get
+# only 1 or as many as 3 progress reports per thread.
+# aggregate log format is:
+#   epoch #tx sum sum2 min max [sum sum2 min max [skipped]]
+# first serie about latency ; second about lag (--rate) ;
+# skipped only if --latency-limit is set.
+check_pgbench_logs($bdir, '001_pgbench_log_1', $nthreads, 1, 3,
+	qr{^\d{10,} \d{1,2} \d+ \d+ \d+ \d+ \d+ \d+ \d+ \d+ \d+$});
+
+# with sampling rate, 2 clients with 50 tx each
 pgbench(
 	"-n -S -t 50 -c 2 --log --sampling-rate=0.5", 0,
 	[ qr{select only}, qr{processed: 100/100} ], [qr{^$}],

Reply via email to