On Tue, 9 Jul 2024 at 10:11, Dean Rasheed <dean.a.rash...@gmail.com> wrote:
>
> OK, I have committed this.
>

Before considering the other patches to optimise for larger inputs, I
think it's worth optimising the existing mul_var() code as much as
possible.

One thing I noticed while testing the earlier patches on this thread
was that they were significantly faster if they used unsigned integers
rather than signed integers. I think the reason is that operations
like "x / 10000" and "x % 10000" use fewer CPU instructions (on every
platform, according to godbolt.org) if x is unsigned.

In addition, this reduces the number of times the digit array needs to
be renormalised, which seems to be the biggest factor.

Another small optimisation that seems to be just about worthwhile is
to pull the first digit of var1 out of the main loop, so that its
contributions can be set directly in dig[], rather than being added to
it. This allows palloc() to be used to allocate dig[], rather than
palloc0(), and only requires the upper part of dig[] to be initialised
to zeros, rather than all of it.

Together, these seem to give a decent speed-up:

 NBASE digits |  HEAD rate    | patch rate
--------------+---------------+---------------
            5 | 5.8797105e+06 |  6.047134e+06
           12 |  4.140017e+06 | 4.3429845e+06
           25 | 2.5711072e+06 | 2.7530615e+06
           50 | 1.0367389e+06 | 1.3370771e+06
          100 |      367924.1 |     462437.38
          250 |      77231.32 |     104593.95
         2500 |     881.48694 |     1197.4739
        15000 |     25.064987 |      32.78391

The largest gains are above around 50 NBASE digits, where the time
spent renormalising dig[] becomes significant.

Regards,
Dean
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
new file mode 100644
index f6e20cf..2a2e22d
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -8690,18 +8690,21 @@ mul_var(const NumericVar *var1, const Nu
 	int			res_sign;
 	int			res_weight;
 	int			maxdigits;
-	int		   *dig;
-	int			carry;
+	uint32	   *dig,
+			   *dig_i1_2;
+	uint32		carry;
 	int			maxdig;
-	int			newdig;
+	uint32		newdig;
 	int			var1ndigits;
 	int			var2ndigits;
 	NumericDigit *var1digits;
 	NumericDigit *var2digits;
 	NumericDigit *res_digits;
+	NumericDigit var1digit;
 	int			i,
 				i1,
-				i2;
+				i2,
+				i2limit;
 
 	/*
 	 * Arrange for var1 to be the shorter of the two numbers.  This improves
@@ -8775,23 +8778,22 @@ mul_var(const NumericVar *var1, const Nu
 	}
 
 	/*
-	 * We do the arithmetic in an array "dig[]" of signed int's.  Since
-	 * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
-	 * to avoid normalizing carries immediately.
+	 * We do the arithmetic in an array "dig[]" of unsigned 32-bit integers.
+	 * Since PG_UINT32_MAX is noticeably larger than NBASE*NBASE, this gives
+	 * us headroom to avoid normalizing carries immediately.
 	 *
 	 * maxdig tracks the maximum possible value of any dig[] entry; when this
-	 * threatens to exceed INT_MAX, we take the time to propagate carries.
-	 * Furthermore, we need to ensure that overflow doesn't occur during the
-	 * carry propagation passes either.  The carry values could be as much as
-	 * INT_MAX/NBASE, so really we must normalize when digits threaten to
-	 * exceed INT_MAX - INT_MAX/NBASE.
+	 * threatens to exceed PG_UINT32_MAX, we take the time to propagate
+	 * carries.  Furthermore, we need to ensure that overflow doesn't occur
+	 * during the carry propagation passes either.  The carry values could be
+	 * as much as PG_UINT32_MAX/NBASE, so really we must normalize when digits
+	 * threaten to exceed PG_UINT32_MAX - PG_UINT32_MAX/NBASE.
 	 *
 	 * To avoid overflow in maxdig itself, it actually represents the max
 	 * possible value divided by NBASE-1, ie, at the top of the loop it is
 	 * known that no dig[] entry exceeds maxdig * (NBASE-1).
 	 */
-	dig = (int *) palloc0(res_ndigits * sizeof(int));
-	maxdig = 0;
+	dig = (uint32 *) palloc(res_ndigits * sizeof(uint32));
 
 	/*
 	 * The least significant digits of var1 should be ignored if they don't
@@ -8801,17 +8803,37 @@ mul_var(const NumericVar *var1, const Nu
 	 * Digit i1 of var1 and digit i2 of var2 are multiplied and added to digit
 	 * i1+i2+2 of the accumulator array, so we need only consider digits of
 	 * var1 for which i1 <= res_ndigits - 3.
+	 *
+	 * We process the least significant digit of var1 outside the main loop,
+	 * since the results can be applied directly to dig[], rather than being
+	 * added to it.
 	 */
-	for (i1 = Min(var1ndigits - 1, res_ndigits - 3); i1 >= 0; i1--)
+	i1 = Min(var1ndigits - 1, res_ndigits - 3);
+	var1digit = var1digits[i1];
+	maxdig = var1digit;
+
+	/*
+	 * Multiply by the least significant applicable digit of var1 -- see
+	 * comments below.  Any changes made there should be reflected here.
+	 */
+	i2limit = Min(var2ndigits, res_ndigits - i1 - 2);
+	dig_i1_2 = &dig[i1 + 2];
+
+	memset(dig, 0, sizeof(uint32) * (i1 + 2));
+	for (i2 = 0; i2 < i2limit; i2++)
+		dig_i1_2[i2] = var1digit * var2digits[i2];
+
+	/* Add contributions from remaining digits of var1 */
+	for (i1 = i1 - 1; i1 >= 0; i1--)
 	{
-		NumericDigit var1digit = var1digits[i1];
+		var1digit = var1digits[i1];
 
 		if (var1digit == 0)
 			continue;
 
 		/* Time to normalize? */
 		maxdig += var1digit;
-		if (maxdig > (INT_MAX - INT_MAX / NBASE) / (NBASE - 1))
+		if (maxdig > (PG_UINT32_MAX - PG_UINT32_MAX / NBASE) / (NBASE - 1))
 		{
 			/* Yes, do it */
 			carry = 0;
@@ -8845,13 +8867,11 @@ mul_var(const NumericVar *var1, const Nu
 		 * Since we aren't propagating carries in this loop, the order does
 		 * not matter.
 		 */
-		{
-			int			i2limit = Min(var2ndigits, res_ndigits - i1 - 2);
-			int		   *dig_i1_2 = &dig[i1 + 2];
+		i2limit = Min(var2ndigits, res_ndigits - i1 - 2);
+		dig_i1_2 = &dig[i1 + 2];
 
-			for (i2 = 0; i2 < i2limit; i2++)
-				dig_i1_2[i2] += var1digit * var2digits[i2];
-		}
+		for (i2 = 0; i2 < i2limit; i2++)
+			dig_i1_2[i2] += var1digit * var2digits[i2];
 	}
 
 	/*

Reply via email to