On Sun, 7 Jul 2024 at 20:46, Joel Jacobson <j...@compiler.org> wrote:
>
> This patch adds a mul_var_large() that is dispatched to from mul_var()
> for var1ndigits >= 8, regardless of rscale.
>
> -- var1ndigits == var2ndigits == 16384
> SELECT COUNT(*) FROM n_max WHERE product = var1 * var2;
> Time: 3191.145 ms (00:03.191) -- HEAD
> Time: 1836.404 ms (00:01.836) -- mul_var_large
>

That's pretty nice. For some reason though, this patch seems to
consistently make the numeric_big regression test a bit slower:

ok 224       - numeric_big                               280 ms  [HEAD]
ok 224       - numeric_big                               311 ms  [patch]

though I do get a lot of variability when I run that test.

I think this is related to this code:

+   res_ndigits = var1ndigits + var2ndigits + 1;
+   res_weight = var1->weight + var2->weight + 2 +
+                ((res_ndigits * 2) - (var1->ndigits + var2->ndigits + 1));
+   maxdigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS +
+       MUL_GUARD_DIGITS;
+   res_ndigits = Min(res_ndigits, maxdigits);

In mul_var_large(), var1ndigits, var2ndigits, and res_ndigits are
counts of NBASE^2 digits, whereas maxdigits is a count of NBASE
digits, so it's not legit to compare them like that. I think it's
pretty confusing to use the same variable names as are used elsewhere
for different things.

I also don't like basically duplicating the whole of mul_var() in a
second function.

The fact that this is close to the current speed for numbers with
around 8 digits is encouraging though. My first thought was that if it
could be made just a little faster, maybe it could replace mul_var()
rather than duplicating it.

I had a go at that in the attached v2 patch, which now gives a very
nice speedup when running numeric_big:

ok 224       - numeric_big                               159 ms  [v2 patch]

The v2 patch includes the following additional optimisations:

1). Use unsigned integers, rather than signed integers, as discussed
over in [1].

2). Attempt to fix the formulae incorporating maxdigits mentioned
above. This part really made my brain hurt, and I'm still not quite
sure that I've got it right. In particular, it needs double-checking
to ensure that it's not losing accuracy in the reduced-rscale case.

3). Instead of converting var1 to base NBASE^2 and storing it, just
compute each base-NBASE^2 digit at the point where it's needed, at the
start of the outer loop.

4). Instead of converting all of var2 to base NBASE^2, just convert
the part that actually contributes to the final result. That can make
a big difference when asked for a reduced-rscale result.

5). Use 32-bit integers instead of 64-bit integers to hold the
converted digits of var2.

6). Merge the final carry-propagation pass with the code to convert
the result back to base NBASE.

7). Mark mul_var_short() as pg_noinline. That seemed to make a
difference in some tests, where this patch seemed to cause the
compiler to generate slightly less efficient code for mul_var_short()
when it was inlined. In any case, it seems preferable to separate the
two, especially if mul_var_short() grows in the future.

Overall, those optimisations made quite a big difference for large inputs:

-- var1ndigits1=16383, var2ndigits2=16383
call rate=23.991785  -- HEAD
call rate=35.19989   -- v1 patch
call rate=75.50344   -- v2 patch

which is now a 3.1x speedup relative to HEAD.

For small inputs (above mul_var_short()'s 4-digit threshold), it's
pretty-much performance-neutral:

-- var1ndigits1=5, var2ndigits2=5
call rate=6.045675e+06   -- HEAD
call rate=6.1231815e+06  -- v2 patch

which is pretty-much in the region of random noise. It starts to
become a more definite win for anything larger (in either input):

-- var1ndigits1=5, var2ndigits2=10
call rate=5.437945e+06   -- HEAD
call rate=5.6906255e+06  -- v2 patch

-- var1ndigits1=6, var2ndigits2=6
call rate=5.748427e+06   -- HEAD
call rate=5.953112e+06   -- v2 patch

-- var1ndigits1=7, var2ndigits2=7
call rate=5.3638645e+06  -- HEAD
call rate=5.700681e+06   -- v2 patch

What's less clear is whether there are any platforms for which this
makes it significantly slower.

I tried testing with SIMD disabled, which ought to not affect the
small-input cases, but actually seemed to favour the patch slightly:

-- var1ndigits1=5, var2ndigits2=5 [SIMD disabled]
call rate=6.0269715e+06  -- HEAD
call rate=6.2982245e+06  -- v2 patch

For large inputs, disabling SIMD makes everything much slower, but it
made the relative difference larger:

-- var1ndigits1=16383, var2ndigits2=16383 [SIMD disabled]
call rate=8.24175  -- HEAD
call rate=36.3987   -- v2 patch

which is now 4.3x faster with the patch.

Then I tried compiling with -m32, and unfortunately this made the
patch slower than HEAD for small inputs:

-- var1ndigits1=5, var2ndigits2=5 [-m32, SIMD disabled]
call rate=5.052332e+06  -- HEAD
call rate=3.883459e+06  -- v2 patch

-- var1ndigits1=6, var2ndigits2=6 [-m32, SIMD disabled]
call rate=4.7221405e+06  -- HEAD
call rate=3.7965685e+06  -- v2 patch

-- var1ndigits1=7, var2ndigits2=7 [-m32, SIMD disabled]
call rate=4.4638375e+06   -- HEAD
call rate=3.39948e+06     -- v2 patch

and it doesn't reach parity until around ndigits=26, which is
disappointing. It does still get much faster for large inputs:

-- var1ndigits1=16383, var2ndigits2=16383 [-m32, SIMD disabled]
call rate=5.6747904
call rate=20.104694

and it still makes numeric_big much faster:

[-m32, SIMD disabled]
ok 224       - numeric_big                               596 ms  [HEAD]
ok 224       - numeric_big                               294 ms  [v2 patch]

I'm not sure whether compiling with -m32 is a realistic simulation of
systems people are really using. It's tempting to just regard such
systems as legacy, and ignore them, given the other large gains, but
maybe this is not the only case that gets slower.

Another option would be to only use this optimisation on 64-bit
machines, though I think that would make the code pretty messy, and it
would be throwing away the gains for large inputs, which might well
outweigh the losses.

Thoughts anyone?

Regards,
Dean

[1] 
https://www.postgresql.org/message-id/19834f2c-4bf1-4e27-85ed-ca5df0f28...@app.fastmail.com
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
new file mode 100644
index d0f0923..f81bdd3
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -101,6 +101,8 @@ typedef signed char NumericDigit;
 typedef int16 NumericDigit;
 #endif
 
+#define NBASE_SQR	(NBASE * NBASE)
+
 /*
  * The Numeric type as stored on disk.
  *
@@ -558,8 +560,9 @@ static void sub_var(const NumericVar *va
 static void mul_var(const NumericVar *var1, const NumericVar *var2,
 					NumericVar *result,
 					int rscale);
-static void mul_var_short(const NumericVar *var1, const NumericVar *var2,
-						  NumericVar *result);
+static pg_noinline void mul_var_short(const NumericVar *var1,
+									  const NumericVar *var2,
+									  NumericVar *result);
 static void div_var(const NumericVar *var1, const NumericVar *var2,
 					NumericVar *result,
 					int rscale, bool round);
@@ -8674,17 +8677,23 @@ mul_var(const NumericVar *var1, const Nu
 		int rscale)
 {
 	int			res_ndigits;
+	int			res_ndigitpairs;
 	int			res_sign;
 	int			res_weight;
+	int			pair_offset;
 	int			maxdigits;
-	int		   *dig;
-	int			carry;
-	int			maxdig;
-	int			newdig;
+	int			maxdigitpairs;
+	uint64	   *dig;
+	uint64		carry;
+	uint64		maxdig;
+	uint64		newdig;
 	int			var1ndigits;
 	int			var2ndigits;
+	int			var1ndigitpairs;
+	int			var2ndigitpairs;
 	NumericDigit *var1digits;
 	NumericDigit *var2digits;
+	uint32	   *var2digitpairs;
 	NumericDigit *res_digits;
 	int			i,
 				i1,
@@ -8729,86 +8738,139 @@ mul_var(const NumericVar *var1, const Nu
 		return;
 	}
 
-	/* Determine result sign and (maximum possible) weight */
+	/* Determine result sign */
 	if (var1->sign == var2->sign)
 		res_sign = NUMERIC_POS;
 	else
 		res_sign = NUMERIC_NEG;
-	res_weight = var1->weight + var2->weight + 2;
 
 	/*
-	 * Determine the number of result digits to compute.  If the exact result
-	 * would have more than rscale fractional digits, truncate the computation
-	 * with MUL_GUARD_DIGITS guard digits, i.e., ignore input digits that
-	 * would only contribute to the right of that.  (This will give the exact
+	 * Determine the number of result digits to compute and the (maximum
+	 * possible) result weight.  If the exact result would have more than
+	 * rscale fractional digits, truncate the computation with
+	 * MUL_GUARD_DIGITS guard digits, i.e., ignore input digits that would
+	 * only contribute to the right of that.  (This will give the exact
 	 * rounded-to-rscale answer unless carries out of the ignored positions
 	 * would have propagated through more than MUL_GUARD_DIGITS digits.)
 	 *
 	 * Note: an exact computation could not produce more than var1ndigits +
-	 * var2ndigits digits, but we allocate one extra output digit in case
-	 * rscale-driven rounding produces a carry out of the highest exact digit.
+	 * var2ndigits digits, but we allocate at least one extra output digit in
+	 * case rscale-driven rounding produces a carry out of the highest exact
+	 * digit.
+	 *
+	 * To speed up the computation, we process the digits of each input in
+	 * pairs, converting them to base NBASE^2, and producing a base-NBASE^2
+	 * intermediate result.
 	 */
-	res_ndigits = var1ndigits + var2ndigits + 1;
+	/* digit pairs in each input */
+	var1ndigitpairs = (var1ndigits + 1) / 2;
+	var2ndigitpairs = (var2ndigits + 1) / 2;
+
+	/* digits in exact result */
+	res_ndigits = var1ndigits + var2ndigits;
+
+	/* digit pairs in exact result with at least one extra output digit */
+	res_ndigitpairs = res_ndigits / 2 + 1;
+
+	/* pair offset to align output to end of dig[] */
+	pair_offset = res_ndigitpairs - var1ndigitpairs - var2ndigitpairs + 1;
+
+	/* maximum possible result weight */
+	res_weight = var1->weight + var2->weight + 1 + 2 * res_ndigitpairs -
+		res_ndigits;
+
+	/* truncate computation based on requested rscale */
 	maxdigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS +
 		MUL_GUARD_DIGITS;
-	res_ndigits = Min(res_ndigits, maxdigits);
+	maxdigitpairs = (maxdigits + 1) / 2;
+	res_ndigitpairs = Min(res_ndigitpairs, maxdigitpairs);
+	res_ndigits = 2 * res_ndigitpairs;
 
-	if (res_ndigits < 3)
+	/*
+	 * In the computation below, digit pair i1 of var1 and digit pair i2 of
+	 * var2 are multiplied and added to digit i1+i2+pair_offset of dig[]. Thus
+	 * input digit pairs with index >= res_ndigitpairs - pair_offset don't
+	 * contribute to the result, and can be ignored.
+	 */
+	if (res_ndigitpairs <= pair_offset)
 	{
 		/* All input digits will be ignored; so result is zero */
 		zero_var(result);
 		result->dscale = rscale;
 		return;
 	}
+	var1ndigitpairs = Min(var1ndigitpairs, res_ndigitpairs - pair_offset);
+	var2ndigitpairs = Min(var2ndigitpairs, res_ndigitpairs - pair_offset);
 
 	/*
-	 * We do the arithmetic in an array "dig[]" of signed int's.  Since
-	 * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
-	 * to avoid normalizing carries immediately.
+	 * We do the arithmetic in an array "dig[]" of unsigned 64-bit integers.
+	 * Since PG_UINT64_MAX is noticeably larger than NBASE^4, this gives us
+	 * headroom to avoid normalizing carries immediately.
 	 *
 	 * maxdig tracks the maximum possible value of any dig[] entry; when this
-	 * threatens to exceed INT_MAX, we take the time to propagate carries.
-	 * Furthermore, we need to ensure that overflow doesn't occur during the
-	 * carry propagation passes either.  The carry values could be as much as
-	 * INT_MAX/NBASE, so really we must normalize when digits threaten to
-	 * exceed INT_MAX - INT_MAX/NBASE.
+	 * threatens to exceed PG_UINT64_MAX, we take the time to propagate
+	 * carries. Furthermore, we need to ensure that overflow doesn't occur
+	 * during the carry propagation passes either.  The carry values could be
+	 * as much as PG_UINT64_MAX/NBASE^2, so really we must normalize when
+	 * digits threaten to exceed PG_UINT64_MAX - PG_UINT64_MAX/NBASE^2.
 	 *
-	 * To avoid overflow in maxdig itself, it actually represents the max
-	 * possible value divided by NBASE-1, ie, at the top of the loop it is
-	 * known that no dig[] entry exceeds maxdig * (NBASE-1).
+	 * To avoid overflow in maxdig itself, it actually represents the maximum
+	 * possible value divided by NBASE^2-1, i.e., at the top of the loop it is
+	 * known that no dig[] entry exceeds maxdig * (NBASE^2-1).
+	 *
+	 * The conversion of var1 to base NBASE^2 is done on the fly, as each new
+	 * digit is required.  The digits of var2 are converted upfront, and
+	 * stored at the end of dig[].
 	 */
-	dig = (int *) palloc0(res_ndigits * sizeof(int));
+	dig = (uint64 *) palloc(res_ndigitpairs * sizeof(uint64) +
+							var2ndigitpairs * sizeof(uint32));
+
+	/* zero the result digits */
+	MemSetAligned(dig, 0, res_ndigitpairs * sizeof(uint64));
 	maxdig = 0;
 
+	/* convert var2 to base NBASE^2, shifting up if length is odd */
+	var2digitpairs = (uint32 *) (dig + res_ndigitpairs);
+	for (i1 = i2 = 0; i1 < var2ndigitpairs - (var2ndigits & 1); i1++, i2 += 2)
+		var2digitpairs[i1] = var2digits[i2] * NBASE + var2digits[i2 + 1];
+	if ((var2ndigits & 1) != 0)
+	{
+		var2digitpairs[i1] = var2digits[i2] * NBASE;
+		if (i2 + 1 < var2ndigits)
+			var2digitpairs[i1] += var2digits[i2 + 1];
+	}
+
 	/*
-	 * The least significant digits of var1 should be ignored if they don't
-	 * contribute directly to the first res_ndigits digits of the result that
-	 * we are computing.
-	 *
-	 * Digit i1 of var1 and digit i2 of var2 are multiplied and added to digit
-	 * i1+i2+2 of the accumulator array, so we need only consider digits of
-	 * var1 for which i1 <= res_ndigits - 3.
+	 * Compute the base-NBASE^2 result in dig[].  The adjustment made to
+	 * var1ndigitpairs above ensures that this loop only considers var1 digits
+	 * that actually contribute to the result.
 	 */
-	for (i1 = Min(var1ndigits - 1, res_ndigits - 3); i1 >= 0; i1--)
+	for (i1 = 0; i1 < var1ndigitpairs; i1++)
 	{
-		NumericDigit var1digit = var1digits[i1];
+		uint32		var1digitpair;
 
-		if (var1digit == 0)
+		/* Next base-NBASE^2 digit from var1 */
+		if (2 * i1 + 1 < var1ndigits)
+			var1digitpair = var1digits[2 * i1] * NBASE + var1digits[2 * i1 + 1];
+		else
+			var1digitpair = var1digits[2 * i1] * NBASE;
+
+		if (var1digitpair == 0)
 			continue;
 
 		/* Time to normalize? */
-		maxdig += var1digit;
-		if (maxdig > (INT_MAX - INT_MAX / NBASE) / (NBASE - 1))
+		maxdig += var1digitpair;
+		if (maxdig > (PG_UINT64_MAX - PG_UINT64_MAX / NBASE_SQR) / (NBASE_SQR - 1))
 		{
-			/* Yes, do it */
+			/* Yes, do it (to base NBASE^2) */
 			carry = 0;
-			for (i = res_ndigits - 1; i >= 0; i--)
+			for (i = res_ndigitpairs - 1; i >= 0; i--)
 			{
 				newdig = dig[i] + carry;
-				if (newdig >= NBASE)
+				if (newdig >= NBASE_SQR)
 				{
-					carry = newdig / NBASE;
-					newdig -= carry * NBASE;
+					carry = newdig / NBASE_SQR;
+					newdig -= carry * NBASE_SQR;
 				}
 				else
 					carry = 0;
@@ -8816,14 +8878,15 @@ mul_var(const NumericVar *var1, const Nu
 			}
 			Assert(carry == 0);
 			/* Reset maxdig to indicate new worst-case */
-			maxdig = 1 + var1digit;
+			maxdig = 1 + var1digitpair;
 		}
 
 		/*
 		 * Add the appropriate multiple of var2 into the accumulator.
 		 *
-		 * As above, digits of var2 can be ignored if they don't contribute,
-		 * so we only include digits for which i1+i2+2 < res_ndigits.
+		 * This must only include digits pairs of var2 that contribute to the
+		 * first res_ndigitpairs of the result, so we only include digit pairs
+		 * for which i1+i2+pair_offset < res_ndigitpairs.
 		 *
 		 * This inner loop is the performance bottleneck for multiplication,
 		 * so we want to keep it simple enough so that it can be
@@ -8833,42 +8896,46 @@ mul_var(const NumericVar *var1, const Nu
 		 * not matter.
 		 */
 		{
-			int			i2limit = Min(var2ndigits, res_ndigits - i1 - 2);
-			int		   *dig_i1_2 = &dig[i1 + 2];
+			int			i2limit = Min(var2ndigitpairs,
+									  res_ndigitpairs - i1 - pair_offset);
+			uint64	   *dig_i1_off = &dig[i1 + pair_offset];
 
 			for (i2 = 0; i2 < i2limit; i2++)
-				dig_i1_2[i2] += var1digit * var2digits[i2];
+				dig_i1_off[i2] += (uint64) var1digitpair * var2digitpairs[i2];
 		}
 	}
 
 	/*
-	 * Now we do a final carry propagation pass to normalize the result, which
-	 * we combine with storing the result digits into the output. Note that
-	 * this is still done at full precision w/guard digits.
+	 * Now we do a final carry propagation pass to normalize the base-NBASE^2
+	 * result, and convert it back to base NBASE, storing the result digits
+	 * into the output. Note that this is still done at full precision w/guard
+	 * digits.
 	 */
 	alloc_var(result, res_ndigits);
 	res_digits = result->digits;
 	carry = 0;
-	for (i = res_ndigits - 1; i >= 0; i--)
+	for (i1 = res_ndigitpairs - 1, i2 = res_ndigits - 1; i1 >= 0; i1--)
 	{
-		newdig = dig[i] + carry;
-		if (newdig >= NBASE)
+		newdig = dig[i1] + carry;
+		if (newdig >= NBASE_SQR)
 		{
-			carry = newdig / NBASE;
-			newdig -= carry * NBASE;
+			carry = newdig / NBASE_SQR;
+			newdig -= carry * NBASE_SQR;
 		}
 		else
 			carry = 0;
-		res_digits[i] = newdig;
+		res_digits[i2--] = (NumericDigit) (newdig % NBASE);
+		res_digits[i2--] = (NumericDigit) (newdig / NBASE);
 	}
 	Assert(carry == 0);
 
 	pfree(dig);
 
 	/*
-	 * Finally, round the result to the requested precision.
+	 * Adjust the weight, if the inputs were shifted up during base
+	 * conversion, and round the result to the requested precision.
 	 */
-	result->weight = res_weight;
+	result->weight = res_weight - (var1ndigits & 1) - (var2ndigits & 1);
 	result->sign = res_sign;
 
 	/* Round to target rscale (and set result->dscale) */
@@ -8886,7 +8953,7 @@ mul_var(const NumericVar *var1, const Nu
  *	has at least as many digits as var1, and the exact product var1 * var2 is
  *	requested.
  */
-static void
+static pg_noinline void
 mul_var_short(const NumericVar *var1, const NumericVar *var2,
 			  NumericVar *result)
 {

Reply via email to