Michaƫl-san, Yugo-san,

I am fine with this version, but I think it would be better if we have a comment explaining what "tx" is for.

Yes. Done.

Also, how about adding Assert(tx) instead of using "else if (tx)" because
we are assuming that tx is always true when agg_interval is not used, right?

Ok. Done.

Agreed on both points.  From what I get, this code could be clarified
much more,

I agree that the code is a little bit awkward.

and perhaps partially refactored to have less spaghetti
code between the point where we call it at the end of a thread or when
gathering stats of a transaction mid-run, but that's not something to
do post-beta1.

Yep.

I am not completely sure that the result would be worth it either.

I'm not either.

Let's document things and let's the readers know better the
assumptions this area of the code relies on, for clarity.

Sure.

The dependency between agg_interval and sample_rate is one of those things, somebody needs now to look at the option parsing why only one is possible at the time.

Actually it would work if both are mixed: the code would aggregate a sample. However it does not look very useful to do that, so it is arbitrary forbidden. Not sure whether this is that useful to prevent this use case.

Using an extra tx flag to track what to do after the loop for the aggregate print to the log file is an improvement in this direction.

Yep.

Attached v4 improves comments and moves tx as an assert.

--
Fabien.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index d7479925cb..4ef329c6c9 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -648,7 +648,7 @@ static void setDoubleValue(PgBenchValue *pv, double dval);
 static bool evaluateExpr(CState *st, PgBenchExpr *expr,
 						 PgBenchValue *retval);
 static ConnectionStateEnum executeMetaCommand(CState *st, pg_time_usec_t *now);
-static void doLog(TState *thread, CState *st,
+static void doLog(TState *thread, CState *st, bool tx,
 				  StatsData *agg, bool skipped, double latency, double lag);
 static void processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
 							 bool skipped, StatsData *agg);
@@ -3765,16 +3765,47 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
 	return CSTATE_END_COMMAND;
 }
 
+/* print aggregated report to logfile */
+static void
+logAgg(FILE *logfile, StatsData *agg)
+{
+	fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
+			agg->start_time,
+			agg->cnt,
+			agg->latency.sum,
+			agg->latency.sum2,
+			agg->latency.min,
+			agg->latency.max);
+	if (throttle_delay)
+	{
+		fprintf(logfile, " %.0f %.0f %.0f %.0f",
+				agg->lag.sum,
+				agg->lag.sum2,
+				agg->lag.min,
+				agg->lag.max);
+		if (latency_limit)
+			fprintf(logfile, " " INT64_FORMAT, agg->skipped);
+	}
+	fputc('\n', logfile);
+}
+
 /*
  * Print log entry after completing one transaction.
  *
+ * Param tx tells whether the call actually represents a transaction,
+ * or if it is used to flush aggregated logs.
+ *
+ * The function behaviors changes depending on sample_rate (a fraction of
+ * transaction is reported) and agg_interval (transactions are aggregated
+ * over the interval and reported once).
+ *
  * We print Unix-epoch timestamps in the log, so that entries can be
  * correlated against other logs.  On some platforms this could be obtained
  * from the caller, but rather than get entangled with that, we just eat
  * the cost of an extra syscall in all cases.
  */
 static void
-doLog(TState *thread, CState *st,
+doLog(TState *thread, CState *st, bool tx,
 	  StatsData *agg, bool skipped, double latency, double lag)
 {
 	FILE	   *logfile = thread->logfile;
@@ -3793,43 +3824,33 @@ doLog(TState *thread, CState *st,
 	/* should we aggregate the results or not? */
 	if (agg_interval > 0)
 	{
+		pg_time_usec_t	next;
+
 		/*
 		 * Loop until we reach the interval of the current moment, and print
 		 * any empty intervals in between (this may happen with very low tps,
 		 * e.g. --rate=0.1).
 		 */
-
-		while (agg->start_time + agg_interval <= now)
+		while ((next = agg->start_time + agg_interval * INT64CONST(1000000)) <= now)
 		{
-			/* print aggregated report to logfile */
-			fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
-					agg->start_time,
-					agg->cnt,
-					agg->latency.sum,
-					agg->latency.sum2,
-					agg->latency.min,
-					agg->latency.max);
-			if (throttle_delay)
-			{
-				fprintf(logfile, " %.0f %.0f %.0f %.0f",
-						agg->lag.sum,
-						agg->lag.sum2,
-						agg->lag.min,
-						agg->lag.max);
-				if (latency_limit)
-					fprintf(logfile, " " INT64_FORMAT, agg->skipped);
-			}
-			fputc('\n', logfile);
+			logAgg(logfile, agg);
 
 			/* reset data and move to next interval */
-			initStats(agg, agg->start_time + agg_interval);
+			initStats(agg, next);
 		}
 
-		/* accumulate the current transaction */
-		accumStats(agg, skipped, latency, lag);
+		if (tx)
+			/* accumulate the current transaction */
+			accumStats(agg, skipped, latency, lag);
+		else
+			/* final call to show the last aggregate */
+			logAgg(logfile, agg);
 	}
 	else
 	{
+		/* !tx only used for aggregated data */
+		Assert(tx);
+
 		/* no, print raw transactions */
 		if (skipped)
 			fprintf(logfile, "%d " INT64_FORMAT " skipped %d " INT64_FORMAT " "
@@ -3889,7 +3910,7 @@ processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
 	st->cnt++;
 
 	if (use_log)
-		doLog(thread, st, agg, skipped, latency, lag);
+		doLog(thread, st, true, agg, skipped, latency, lag);
 
 	/* XXX could use a mutex here, but we choose not to */
 	if (per_script_stats)
@@ -6794,8 +6815,9 @@ done:
 		if (agg_interval > 0)
 		{
 			/* log aggregated but not yet reported transactions */
-			doLog(thread, state, &aggs, false, 0, 0);
+			doLog(thread, state, false, &aggs, false, 0.0, 0.0);
 		}
+
 		fclose(thread->logfile);
 		thread->logfile = NULL;
 	}

Reply via email to